springboot整合rabbitMQ

springboot整合rabbitMQ

起男 1,420 2022-01-03

springboot整合rabbitMQ

声明

javaBean方式

创建交换机

	@Bean
    Exchange demoExchange(){
        return ExchangeBuilder
                .directExchange("交换机名称")
                .build();
    }

创建队列

    @Bean
    Queue demoQueue(){
        return QueueBuilder
                .durable("队列名称")
                .build();
    }

绑定交换机和队列

    @Bean
    Binding queueBinging(){
        return BindingBuilder
                .bind(demoQueue())//队列
                .to(demoExchange())//交换机
                .with("路由key")
                .noargs();
    }

注解方式

@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "队列名",declare = "是否持久化"),
                exchange = @Exchange(value = "交换机名",type = "类型",declare = "是否持久化",ignoreDeclarationExceptions = "声明异常则忽略"),
                key = "路由key"
        )
)

生产者

	@Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        //交换机名称、路由key、消息
        rabbitTemplate.convertAndSend(EXCHANGE_NAME,ROUTING_KEY,"mag");
    }

消费者

@Component
@Slf4j
public class DemoConsumer {

    @RabbitListener(queues = "监听的队列名称")
    public void getQueue1(Message message){
        log.info("接收queue消息:{}",new String(message.getBody()));
    }
}

消息接收确认

配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #设置手动消息确认

消费者

    @SneakyThrows
    @RabbitListener(queues = "监听的队列")
    public void getQueue2(Message message,/*@Header Long amqp_deliveryTag*/ Channel channel){
        log.info("接收queue2消息:{}",new String(message.getBody()));
        //手动消息确认,参数:表示消息投递序号、是否批量确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }

其它函数:

basicNack表示失败确认,参数:

  • 消息投递序号
  • 是否批量确认
  • 值为 true 消息将重新入队列

basicReject拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。参数:

  • 消息投递序号
  • 值为 true 消息将重新入队列

发布确认

可以获取消息是否发送到交换机

配置文件

spring:
  rabbitmq:
    publisher-confirm-type: correlated

修改RabbitTemplate

@Component
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 消息确认,消息只要被rabbitmq接收到就会触发confirmCallback回调
     *
     * @param correlationData 对象内部只有一个id属性,用来表示当前消息的唯一性
     * @param ack 消息投递到broker的状态,true表示成功
     * @param s 投递失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (ack){
            log.info("{}消息投递成功",correlationData.getId());
        }else {
            log.info("{}消息投递失败:{}",correlationData.getId(),s);
        }
    }
}

生产者

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("交换机名","路由key","消息",correlationData);

消息返回

当消息没有被发送到队列时返回给生产者

配置文件

spring:
  rabbitmq:
    publisher-returns: true

修改RabbitTemplate

@Component
@Slf4j
public class RabbitConfig implements RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息退回:消息体:{},响应code:{},响应内容:{},交换机:{},路由key:{}",
                returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText(),
                returnedMessage.getExchange(),returnedMessage.getRoutingKey());
    }
}

延迟队列

rabbitmq的延迟队列是基于死信队列和消息过期实现的

配置文件

    //普通交换机
    @Bean
    Exchange commonExchange(){
        return ExchangeBuilder
                .directExchange(COMMON_EXCHANGE)
                .build();
    }
	//死信交换机
    @Bean
    Exchange deadExchange(){
        return ExchangeBuilder
                .directExchange(DEAD_EXCHANGE)
                .build();
    }
	//普通队列
    @Bean
    Queue commonQueue(){
        //设置参数
        Map<String,Object> arguments = new HashMap<>();
        //死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置路由key
        arguments.put("x-dead-letter-routing-key",DEAD_BINDING);
        //过期时间
        arguments.put("x-message-ttl",10000);

        return QueueBuilder
                .durable(COMMON_QUEUE)
                .withArguments(arguments)//设置参数
                .build();
    }
	//死信队列
    @Bean
    Queue deadQueue(){
        return QueueBuilder
                .durable(DEAD_QUEUE)
                .build();
    }
	//普通队列和交换机绑定
    @Bean
    Binding commonBinding(){
        return BindingBuilder
                .bind(commonQueue())
                .to(commonExchange())
                .with(COMMON_BINDING)
                .noargs();
    }
	//死信队列和交换机绑定
    @Bean
    Binding deadBinding(){
        return BindingBuilder
                .bind(deadQueue())
                .to(deadExchange())
                .with(DEAD_BINDING)
                .noargs();
    }
  • 生产者:正常向普通交换机发送消息即可
  • 消费者:监听死信队列(当普通队列消息过期后,消息会发给死信交换机,然后到死信队列)

事务

配置文件

spring:
  rabbitmq:
    publisher-confirm-type: none #需要关闭消息的发布确认

修改RabbitTemplate

@Component
@Slf4j
public class RabbitConfig{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setChannelTransacted(true);//开启事务
    }
}

配置类

    /**
     * 启用rabbitmq事务
     * @param factory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory factory){
        return new RabbitTransactionManager(factory);
    }

生产者

    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")//使用事务
    public void send(){
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange_name","roting_key","mag",correlationData);
        //int i = 1/0;
    }

生产者发送消息时,如果发送消息前后发生异常,消息将不会发出

RPC模式

远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束

同步阻塞

在一条消息接受到回复前不能发送其他消息

生产者,使用convertSendAndReceive方法

Object resp=rabbitTemplate.convertSendAndReceive("exchange_name","roting_key","mag");

消费者,方法的返回值就是给生产者的响应

    @RabbitListener(queues = "queue_name")
	public String getMag(String mag){
        log.info("获取队列消息:{}",mag);
        return mag+"abc";
    }

异步

可以不用等待回复,继续发送别的消息

配置类

    /**
     * 配置AsyncRabbitTemplate
     * SpringBoot 没有默认的AsyncRabbitTemplate注入
     * @param rabbitTemplate
     * @return
     */
    @Bean
    AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
        return new AsyncRabbitTemplate(rabbitTemplate);
    }

生产者

AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("exchange_name","roting_key","mag");

        future.addCallback(new ListenableFutureCallback<Object>() {
            @Override
            public void onSuccess(Object result) {
                log.info("接收响应:{}",result);
            }

            @Override
            public void onFailure(Throwable ex) {

            }
        });