rocketmq-使用

起男 351 2023-11-22

rocketmq-使用

导入依赖

<!--需要和rocketmq版本相同-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.0</version>
        </dependency>

普通消息

同步发送

同步发送是指,producer发出一条消息后,会在收到mq返回的ack之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低

        //创建一个producer,参数为生产者组名称
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        //指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //发送失败时重试次数,默认2
        producer.setRetryTimesWhenSendFailed(3);
        //设置发送超时时限,默认3s
        producer.setSendMsgTimeout(5000);
        //开启生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            byte[] body = ("hi," + i).getBytes();
            //生产消息
            Message message = new Message("someTopic", "someTag", body);
            message.setKeys("key-"+i);//为消息指定key
            //发送消息
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        //关闭producer
        producer.shutdown();

消息发送的状态:

public enum SendStatus {
 SEND_OK, //发送成功
 FLUSH_DISK_TIMEOUT,//刷盘超时(当broker设置的刷盘策略为同步刷盘时,才能出现此异常)
 FLUSH_SLAVE_TIMEOUT,//slave同步超时(当broker集群设置的master-slave的复制方式为同步复制时,才能出现此异常)
 SLAVE_NOT_AVAILABLE,//没有可用的slave(当broker集群设置为master-slave的复制方式为同步复制时,才能出现此异常)
}

异步发送

异步发送是指,producer发出消息后无需等待mq返回ack,直接发送下一条消息。该方式的消息可靠性得到了保证,发送效率也可以

        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        //异步发送失败后不进行重试
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //指定新创建的topic的queue数量为2,默认4
        producer.setDefaultTopicQueueNums(2);

        producer.start();

        for (int i = 0; i < 10; i++) {
            byte[] body = ("hi," + i).getBytes();
            Message message = new Message("myTopicA","myTag",body);
            //异步发送,并指定回调
            producer.send(message, new SendCallback() {
                //当producer接收到mq发送来的ack后就会执行该方法
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        }
        //因为是异步发送,所以需要等一会再关闭
        TimeUnit.SECONDS.sleep(30);
        producer.shutdown();

单向发送

单向发送是指,producer仅负责发送消息,不等待、不处理mq的ack。该发送方式时mq也不返回ack。该方式的消息发送效率最高,但消息可靠性较差

        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            byte[] body = ("hi," + i).getBytes();
            Message message = new Message("single", "someTag", body);
            //单向发送
            producer.sendOneway(message);
        }

        producer.shutdown();

消费消息

        //定义一个pull消费者
        //DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
        //定义一个push消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        //指定nameServer
        consumer.setNamesrvAddr("localhost:9876");
        //指定从第一条消息开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //指定消费的topic和tag
        consumer.subscribe("someTopic","*");
        //指定采用广播模式消费,默认CLUSTERING集群模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //注册一个消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 一旦broker中有了其订阅的消息就会触发该方法的执行
             * @param msgs 消息
             * @param context
             * @return 当前consumer消费的状态
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //消费消息
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                //返回状态消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //开启消费者
        consumer.start();

顺序消息

顺序消息是指,严格按照消息的发送顺序进行消费的消息(fifo)

默认情况下,生产者会把消息以round robin轮询的方式发送到不同的queue分区队列;而消费消息时会从多个queue上拉取消息,这种情况下发送和消费不能保证顺序。但如果将消息仅发送到同一个queue中,消费时也只从这个queue上拉取消息,就严格保证了消息的顺序性

根据有序范围的不同,顺序消息可以分为:

  • 分区有序:有多个queue参与,仅可保证该queue上的消息顺序
  • 全局有序:发送和消费参与的queue只有一个时,保证的有序是整个topic的

创建topic时指定queue的数量:

  1. 在代码中创建producer时,指定其创建queue的数量
  2. rocketmq控制台中手动指定queue的数量
  3. 使用mqadmin命令手动创建topic时指定queue的数量

如何实现queue选择器:

  • 在定义producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的
  • 在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论是什么,都是唯一的
  • 一般的选择算法是,让选择key(或hash值)与该topic的queue的数量取模,其结果为queueId
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        //如果是全局有序,则需要设置为1
        //producer.setDefaultTopicQueueNums(1);
        producer.start();

        for (int i = 0; i < 10; i++) {
            Integer orderId = i;
            byte[] body = ("hi,"+i).getBytes();
            Message message = new Message("topicA","tagA",body);
            //将orderid设置为key
            message.setKeys(orderId.toString());
            //发送消息 MessageQueueSelector选择器
            producer.send(message, new MessageQueueSelector() {
                /**
                 * 具体的选择算法在该方法中定义
                 * @param mqs queue列表
                 * @param msg 当前消息
                 * @param arg send()的第三个参数 (这里就是send方法中指定的orderId)
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //使用消息key作为选择key
                    /*String keys = msg.getKeys();
                    Integer id = Integer.valueOf(keys);*/
                    //使用arg作为选择key
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            },orderId);
        }
        producer.shutdown();

延时消息

当消息写入到broker后,在指定时长后才被消费处理的消息,称为延时消息

延时等级

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在rocketmq服务端的MessageStoreConfig类中的messageDelayLevel变量中

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

即,若指定延时等级为3,则表示延迟时长为10s

如果需要自定义延时等级,可以通过在broker加载的配置文件中新增配置messageDelayLevel=xxx(配置文件在rocketmq安装目录下的config目录中)

原理

producer将消息发送到broker后,broker会首先先将消息写入到commitlog文件,然后需要将其分发到相应的consumerqueue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有,则需要经历一个复杂的过程:

  1. 修改消息的topic为SCHEDULE_TOPIC_XXXX

  2. 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)

    延迟等级delayLevel与queueId的对应关系为:queueId = delayLevel - 1(queueId从0开始,delayLevel从1开始)

    创建queueId时,并不是一次性将所有等级的目录全部创建,而是用到哪个创建哪个

  3. 修改消息索引单元内容。索引单元中的message tag hashcode部分原本存放的是消息的tag的hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原topic后再次被写入到commitlog中的时间。投递时间=消息存储时间+延迟等级时间。消息存储时间指的是消息被发送到broker时的时间戳

  4. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

  5. broker内部有一个延迟消息服务类,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到目标topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延时的普通消息。然后再次将消息投递到目标topic中

    ScheuleMessageService在broker启动时,会创建一个定时器Timer,用于执行相应的定时任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消费与投递。每个TimerTask都会检测相应queue队列的第一条消息是否到期

实例

生产者:

        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            byte[] body = ("hi,"+i).getBytes();
            Message message = new Message("topicB", "someTag", body);
            //指定消息延迟等级3(10秒)
            message.setDelayTimeLevel(3);
            SendResult result = producer.send(message);
            //输出消息被发送的时间
            System.out.println(new SimpleDateFormat("mm:ss")
                    .format(new Date())+","+result);
        }
        producer.shutdown();

消费者:

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("topicB","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new SimpleDateFormat("mm:ss")
                            .format(new Date())+","+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

事务消息

rocketmq提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式处理模式

半事务消息

暂不能投递的消息,发送方已经成功的将消息发送到broker,但是broker未收到最终确认指令,此时该消息被标记成暂不能投递状态,即不能被消费者看到。处于该种状态下的消息即半事务消息

本地事务状态

producer回调操作执行的结果为本地事务状态,其会发送给tc,而tc会再发送给tm。tm会根据tc发送来的本地事务状态来决定全局事务状态

消息回查

消息回查,即重新查询本地事务的执行状态(消息回查,不是重新执行回调操作。如:回调操作是执行预扣款,回查则是查看预扣款的执行结果)

引发回查的原因:

  1. 回调操作返回UNKNWON
  2. tc没有收到tm的最终全局事务确认

XA模式

角色:

  • tc:事务协调者,维护全局和分支事务的状态,驱动全局事务提交和回滚(rocketmq中的broker充当tc)
  • tm:事务管理器,定义全局事务的范围,开始全局事务,提交或回滚全局事务。他是全局事务的发起者(producer充当tm)
  • rm:资源管理器,管理分支事务的资源,与tc进行通信注册分支事务并报告分支事务的状态,驱动分支事务的提交和回滚(producer和broker均是rm)

流程:

  1. tm向tc发起指令,开启一个全局事务
  2. 根据业务要求,各个rm会逐个向tc注册分支事务,然后tc会逐个向rm发出预执行指令
  3. 各个rm在接收到指令后会在本地事务预执行
  4. rm将预执行结果发送给tc(可能是成功,也可能是失败)
  5. tc在接收到各个rm的结果后,会进行汇总并上报给tm,tm根据总汇结果向tc发送确认指令
    1. 若所有结果都成功,则向tc发送Global Commit指令
    2. 只要有结果是失败响应,则向tc发送Global Rollback指令
  6. tc在接收到指令后再次向rm发送确认指令

事务消息方案并不是一个典型的xa模式。因为xa模式中的分支事务是异步的,而事务消息方案中是同步的

注意:

  1. 事务消息不支持延时消息
  2. 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交)

实例

生产者:

        TransactionMQProducer producer = new TransactionMQProducer("tpg");
        producer.setNamesrvAddr("localhost:9876");
        //定义一个线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        //为生产者指定一个线程池
        producer.setExecutorService(executor);
        //为生产者添加事务监听器
        producer.setTransactionListener(new ICBCTransactionListener());

        producer.start();
        String[] tags = {"taga","tagb","tagc"};
        for (int i = 0; i < 3; i++) {
            byte[] body = ("hi,"+i).getBytes();
            Message message = new Message("TTopic", tags[i], body);
            //发送事务消息(第二个参数用于指定在执行本地事务时要使用的自定义业务参数)
            TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
            System.out.println("发送结果为:"+sendResult.getSendStatus());
        }

监听器:

public class ICBCTransactionListener implements TransactionListener {
    /**
     * 回调操作
     * 消息预提交成功就会触发该方法的执行,用于完成本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("预提交消息成功:"+msg);
        if ("taga".equalsIgnoreCase(msg.getTags())){
            return LocalTransactionState.COMMIT_MESSAGE;//成功,可以被消费者消费
        }else if ("tagb".equalsIgnoreCase(msg.getTags())){
            return LocalTransactionState.ROLLBACK_MESSAGE;//失败,不可被消费者消费
        }else if ("tagc".equalsIgnoreCase(msg.getTags())){
            return LocalTransactionState.UNKNOW;//未知,需要执行消息回查
        }
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 消息回查
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("执行消息回查"+msg.getTags());
        return LocalTransactionState.COMMIT_MESSAGE;//回查成功,可以被消费者消费
    }
}

批量消息

批量发送

批量发送限制

  • 批量发送的消息必须具有相同的topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息

批量发送大小

默认情况下,一批发送的消息总大小不能超过4mb字节。如果超出该值则:

  1. 将消息进行拆分
  2. 在producer端和broker端修改属性maxMessageSize

注意:生产者通过send()方法发送Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:topic、消息body、消息日志(20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、要发送的queueid等。最终写入到broker中的消息单元中的数据都是来自于这些属性

实例

生产者:

        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        //指定要发送的消息的最大大小,默认是4m(仅修改此属性是不行的,同时还需要修改broker加载的配置文件)
        producer.setMaxMessageSize(4*1024*1024);
        producer.start();
        //定义要发送的消息集合
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            byte[] body = ("hi,"+i).getBytes();
            Message msg = new Message("someTopic","someTag",body);
            messages.add(msg);
        }
        //消息列表分割器,将消息分割为多个小列表
        MessageListSplitter splitter = new MessageListSplitter(messages);
        while (splitter.hasNext()){
            List<Message> list = splitter.next();
            producer.send(list);
        }
        producer.shutdown();

分割器:

public class MessageListSplitter implements Iterator<List<Message>> {
    //极限值
    private final int SIZE_LIMIT = 4 * 1024 * 1024;
    //所有消息
    private final List<Message> messages;
    //批量发送消息的小集合起始索引
    private int currIndex;

    public MessageListSplitter(List<Message> messages){
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        //判断当前开始遍历的消息索引要小于消息总数
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        //当前要发送的这一批消息列表的大小
        int totalSize = 0;
        for (; nextIndex<messages.size(); nextIndex++){
            Message message = messages.get(nextIndex);
            //topic 和 body的长度
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                //属性的长度
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            //log的长度
            tmpSize += 20;
            //单个数据大于极限
            if (tmpSize > SIZE_LIMIT){
                if (nextIndex - currIndex == 0){
                    nextIndex ++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT){
                break;
            }else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = this.messages.subList(currIndex, nextIndex);
        //下次的开始索引
        currIndex = nextIndex;
        return subList;
    }
}

批量消费

consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过32。因为默认情况下每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定

consumeMessageBatchMaxSize设置的越大,consumer每次拉取的时间就越长

pullBatchSize:设置的越大,consumer的并发消费能力就越低(每一批只会用一个线程消费)

实例

消费者:

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("someTopic","*");
        //指定每次可以消费10条消息,默认1
        consumer.setConsumeMessageBatchMaxSize(10);
        //指定每次可以从broker拉取40条消息,默认32
        consumer.setPullBatchSize(40);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//消费成功
                //return ConsumeConcurrentlyStatus.RECONSUME_LATER;//消费异常
            }
        });
        consumer.start();

消息过滤

消费者在进行消息订阅时,除了可以指定要订阅的topic外,还可以指定条件进行过滤

tag过滤

通过consumer的subscribe()方法指定要订阅的tag。如果订阅多个tag的消息,tag间使用||符号连接

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅mytopic中 tag为 taga和tagc的消息
        consumer.subscribe("mytopic","taga || tagb");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

sql过滤

sql过滤是一种通过表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过sql过滤,可以实现对消息的复杂过滤。不过,只有使用push模式的消费者才能使用sql过滤

支持的常量类型:

  • 数值:比如:123,3.14
  • 字符:必须用单引号括起来,比如:‘abc’
  • 布尔:true或false
  • null:特殊常量,表示空

支持的运算符:

  • 数值比较:>,>=,<,<=,=,between
  • 字符比较:=,<>,in
  • 逻辑运算:and,or,not
  • null判断:is null 或 is not null

注意:

The broker does not support consumer to filter message by SQL92

默认情况下broker没有开启消息的sql过滤功能,需要在broker加载的配置文件中添加属性开启该功能

enablePropertyFilter = true

实例

生产者:

        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            byte[] body = ("hi,"+i).getBytes();
            Message message = new Message("mytopic","mytag",body);
            //埋入用户属性age
            message.putUserProperty("age",i+"");
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        producer.shutdown();

消费者:

        //需要push模式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //从mytopic的消息中过滤出 age 在0-6之间的消息
        consumer.subscribe("mytopic", MessageSelector.bySql("age between 0 and 6"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

消息发送重试

producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制

注意:

  • 生产者在发送消息时,若采用同步或异步方式发送,发送失败会重试,但oneway消息发送方式是没有重试机制的
  • 只有普通消息具有发送重试机制,顺序消息是没有的
  • 消息重投机制可以保证消息尽可能发送成功、不丢失,但可能会造成消息重复。消息重复在rocketmq中是无法避免的

消息重试由三种策略:

  • 同步发送失败:

    对于普通消息,消息发送默认采用round-robin策略来选择要发送到的队列。如果发送失败,默认重试2次。但在重试时是不会选择上次发送失败的broker,而是其他broker。若只有一个broker,就只能发送给这个broker,但会尽量发送到其他队列中

    broker还具有失败隔离功能,使producer尽量选择未发生过异常的broker作为目标

    如果超过重试次数,则抛出异常,由producer去保证消息不丢失

    代码中通过producer.setRetryTimesWhenSendFailed(3);设置重试次数

  • 异步发送失败

    异步发送失败时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢失

    代码中通过producer.setRetryTimesWhenSendAsyncFailed(0);设置重试次数

  • 消息刷盘失败

    消息刷盘超时(master或slave)或slave不可用(返回状态非SEND_OK)时,默认是不会将消息尝试发送到其他的broker的。不过,对于重要消息可以通过在broker的配置文件中设置retryAnotherBrokerWhenNotStoreOK=true来开启

消息消费重试

顺序消息的消费重试

对于顺序消息,当consumer消费消息失败后,为了保证消息的顺序性,其会自动不断的进行消息重试,直到消费成功。重试期间会出现消息消费被阻塞的情况

通过代码consumer.setSuspendCurrentQueueTimeMillis(1000);设置重试消费间隔,默认1000毫秒,取值范围为10到30000毫秒

注意:顺序消息没有发送失败重试,但具有消费重试

无序消息的消费重试

对于无序消息(普通消息、延时消息、事务消息),当consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息

消费重试次数与间隔

对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的时间间隔是不同的,会逐渐变长(有序间隔固定)。若消息经过最多重试次数后仍然失败,则消息进入死信队列

通过代码consumer.setMaxReconsumeTimes(16);设置重试次数(默认16)。若重试次数大于16,重试间的时间间隔均为2小时

对于消费者组,若有一个消费者设置了消费重试次数,则整个组会应用该设置。如果有多个设置了该值,后面的会覆盖前面的设置

重试队列

对于需要重试的消息,并不是consumer在等待了指定的时间后再次去原来的地方进行消费,而是将这些需要重试消费的消息放入到了一个特殊的topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列

当出现需要重试消费的消息时,broker会为每个消费组都设置一个topic名称为%RETRY%consumerGroup@consumerGroup的重试队列

broker对于重试消息的处理是通过延时消息实现的。先将消息保存到SCHEDULE_TOPIC_XXXX延时队列中,延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup重试队列中

消费重试时间间隔和延时消息的延时等级十分相似,除了没有延时等级的前两个等级外(直接使用第三个开始),其他是相同的

消费重试配置方式

集群模式下,消息消费失败后若希望消费重试,则需要在监听器接口实现中明确进行如下方式之一的配置:

  • 返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
  • 返回null
  • 抛出异常

消息不重试配置方式

集群模式下,消息消费失败后若不希望重试,则在捕获异常后返回与成功相同的结果,即:

  • 返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS

死信队列

当一条消息消费失败,消息队列会自动进行重试;当达到最大重试次数后,则将其发送到一个特殊队列中。这个队列就是死信队列

特征

  • 死信队列中的消息不会再被消费者正常消费(死信队列对消费者是不可见的)
  • 死信存储有效期和正常消息相同,均为3天,3天后会被自动删除(commitlog文件的过期时间)
  • 死信队列就是一个特殊的topic,名为%DLQ%consumerGroup@consumerGroup
  • 如果一个消费者组未产生死信消息,则不会为其创建相应的死信队列

死信消息的处理

当一条消息进入死信队列,就意味着系统中某个地方出现了问题。因此对于死信消息,通常需要开发人员进行特殊处理,解决代码中的bug。再将原来的死信消息再次进行投递消费