rocketmq-笔记

rocketmq-笔记

起男 474 2023-09-11

rocketmq-笔记

基本概念

消息(message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题

主题(topic)

topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是rocketmq进行消息订阅的单位

一个生产者可以同时发送多种topic的消息;而一个消费者只能对某种特定的topic感兴趣,即只可以订阅和消费一种topic消息

标签(tag)

为消息设置的标签,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效的保持代码的清晰度和连贯性,并优化rocketMQ提供的查询系统。消费者可以根据tag实现对不同子主题的不同消费逻辑,实现更好的扩展性

队列(queue)

存储消息的物理实体。一个topic中可以包含多个queue,每个queue中存放的就是该topic的消息。一个topic的queue也被称为一个topic中的消息分区

类似于kafka中的分区partition

消息标识(messageId/key)

rocketmq中每个消息拥有唯一的messageId,且可以携带具有业务标识的key,以方便对消息的查询。不过需要注意的是,messageId有两个:在生产者send()消息时会自动生成一个messageId(msgId),当消息到达broker后,broker也会自动生成一个messageId(offsetMsgId)。msgId、offsetMsgId与key都称为消息标识

  • msgId:由producer端生成,其生成规则为:

    producerIp+进程pid+MessageClientIDSetter类的classLoader的hashCode+当前时间+AutomicInteger自增计数器

  • offsetMsgId:由broker端生成,其生成规则为:

    brokerIp+物理分区的offset(queue中的偏移量)

  • key:由用户指定的业务相关的唯一标识

系统架构

producer

消息生产者,负责生产消息。producer通过mq的负载均衡模块选择相应的broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟

rocketmq中的消息生产者都是以生产者组(producer group)的形式出现的。生产者组是同一类生产者的集合,一个生产者组可以同时发送多个topic的消息

consumer

消息消费者,负责消费消息。一个消费者会从broker服务器中获取到消息,并对消息进行相关业务处理

rockermq中的消费者都是以消费者组(consumer group)的形式出现的。消费者组是同一类消费者的集合,这类consumer消费的是同一个topic的消息。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易

消费者组中的consumer的数量应该小于等于订阅的topic中的queue的数量。如果超出queue数量,则多出的consumer不能消费消息

一个topic的消息可以被多个消费者组同时消费

nameServer

NameServer是一个broker与topic路由的注册中心,支持broker的动态注册与发现

主要功能为:

  • broker管理:接受broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检测broker是否存活
  • 路由信息管理:每个NameServer中都保存着broker集群的整个路由信息和用于客户端查询的队列信息。producer和consumer通过NameServer可以获取整个比如broker集群的路由信息,从而进行消息的投递和消费

rocketmq 1.0和2.0版本中,依赖的仍是zookeeper。从3.0开始去掉了zookeeper的依赖,使用了自己的NameServer

broker

broker充当着消息中转角色,负责存储消息、转发消息。broker在rocketmq系统中负责接收并存储生产者发送过来的消息,同时为消费者的拉取请求作准备。broker同时也存储着消息相关的元数据,包括消费者组消费进度偏移量offset、主题、队列等

NameServer

路由注册

NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点间是无差异的,各节点间相互不进行信息通信。那各节点中的数据是如何进行数据同步的呢?在broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在每个NameServer内部维护者一个broker列表,用来动态存储broker的信息

这种NameServer的无状态方式

  • 优点:集群搭建简单
  • 缺点:对于broker,必须明确指出所有NameServer地址,否则未指出的将不会注册,也因为如此,NameServer并不能随便扩容。如果不重新配置broker,则新增的NameServer对broker来说是不可见的

broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个broker的最新存活时间

路由剔除

由于broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除

NameServer中有一个定时任务,每隔10秒就会扫描一次Broker列表,查看每一个broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定broker失败,然后将其从broker列表中剔除

路由发现

rocketmq的路由发现采用的是pull模型。当topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取topic最新的路由。默认客户端每30秒会拉取一次最新的路由

push模型:推送模型。实时性好,但需要维护一个长连接,消耗资源较大

pull模型:拉取模型。存在的问题是,实时性差

long polling模型:长轮询模型。其是对push和pull模型的整合,充分利用了这两种模型的优势

客户端NameServer选择策略

客户端(producer和consumer)在配置时必须要写上NameServer集群的地址,那么客户端到底连接的是哪个NameServer节点呢?客户端首先会生成一个随机数,然后再与NameServer节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用轮询策略,逐个尝试着去连接其它节点

工作流程

  1. 启动NameServer,NameServer启动后开始监听端口,等待broker、producer、consumer连接
  2. 启动broker时,broker会与所有NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳
  3. 发送消息前,可以先创建topic,创建topic时需要指定该topic要存储在哪些broker上,当然,在创建topic时也会将topic与broker的关系写入到NameServer中。不过,这步是可选的,也可以在发送消息时自动创建topic
  4. producer发送消息,启动时先根NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送的topic的queue与broker的地址(ip+port)的映射关系。然后根据算法策略从中选择一个queue,与队列所在的broker建立长连接从而向broker发送消息。当然,在获取到路由信息后,producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一次路由信息
  5. consumer跟producer类似,跟其中一台NameServer建立长连接,获取其所订阅topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的queue,然后直接跟broker建立长连接,开始消费其中的消息。consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于producer的是,consumer还会向broker发送心跳,以确保broker的存活状态

手动topic的创建模式:

  • 集群模式:该模式下创建的topic在集群中,所有broker中的queue数量是相同的
  • broker模式:该模式下创建的topic在集群中,每个broker的queue数量可以不同

自动创建topic时,默认采用的是broker模式,会为每个broker默认创建4个queue

读写队列

从物理上来讲读写队列是同一个队列。所以,不存在读写队列数据同步的问题。读写队列是逻辑上进行区分的概念。一般情况下,读写队列数量是相同的

如果创建topic时的写队列数量为8,读队列为4,此时系统会创建8个queue,分别为0-7。producer会将消息写入到这8个队列,但consumer只会消费0-3这4个queue中的消息,4-7中的消息是不会被消费到的

如果创建topic时的写队列数量为4,读队列为8,此时系统会创建8个queue,分别为0-7。producer会将消息写入到0-3这4个队列,但consumer只会消费0-7这8个queue中的消息,但是4-7中是没有消息的。此时假设consumer group中包含两个consumer,consumer1消费0-3,consumer2消费4-7。consumer2是没有消息可消费的

当读写队列数量不同时,总是有问题的,那么为什么这样设计?这样设计是为了方便topic的缩容。例如,原来topic中有16个queue,如何能使其queue缩容为8,还不会丢失消息?可以动态修改写队列数量为8,读队列数量不变。此时新的消息只能写入前8个队列,而消费的却是16个队列中的数据。当发现后8个队列中的消息消费完毕后,就可以再将读队列数量动态设置为8.整个缩容过程没有丢失任何消息

perm

用于设置当前创建topic的操作权限:

  • 2:只写
  • 4:只读
  • 6:读写

复制策略

复制策略是broker的master与slave间的数据同步方式。分为同步复制和异步复制:

  • 同步复制:消息写入master后,master会等待slave同步数据成功后才向producer返回成功ack
  • 异步复制:消息写入master后,master立即向producer返回成功ack,无需等待slave同步成功

刷盘策略

刷盘策略指的是broker中消息的落盘方式,即消息发送到broker内存后消息持久化到磁盘的方式。分为同步刷盘和异步刷盘:

  • 同步刷盘:当消息持久化到broker的磁盘后才算是消息写入成功
  • 异步刷盘:当消息写入到broker的内存后即表示消息写入成功,无需等待消息持久化到磁盘

消息写入到broker的内存,一般是写入到了pageCache

对于异步刷盘策略,消息会写入到pageCache后立即返回成功ack。但并不会立即做落盘操作,而是当pageCache到达一定量时会自动进行落盘

broker集群模式

单master

只有一个broker。这种方式也只能是在测试时使用,生成环境下不能使用,因为存在单点问题

多master

broker集群仅由多个master构成,不存在slave。同一topic的各个queue会平均分布在各个master节点上

  • 优点:配置简单,单个master宕机或重启维护对应用无影响,在磁盘配置为raid10时,即使机器宕机不可恢复情况下,由于raid10磁盘非常可靠,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高

    如果没有配置raid磁盘阵列,一旦出现master宕机,则会发送大量消息丢失的情况

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复前不可订阅(不可消费)消息实时性会受到影响

多master多slave(异步复制)

broker集群由多个master构成,每个master又配置了多个slave(在配置了raid磁盘阵列的情况下,一个master一般配置一个slave即可)。master与slave的关系是主备关系,即master负责处理消息的读写请求,而slave仅负责消息的备份和master宕机后的角色切换

异步复制策略,即消息写入master成功后,立即返回ack,无需等待slave同步成功。该模式下当master宕机后,可能会存在少量消息丢失的问题

多master多slave(同步双写)

该模式是多个master多slave的同步复制实现。所谓同步双写,指的是消息写入master成功后,master会等待slave同步数据成功后才返回ack,即master与slave都要写入成功才会返回ack,也即双写

该模式与异步复制模式比,优点是消息更安全,不存在丢失消息的情况。但单个消息的rt略高,从而导致性能略低(大约10%)

消息的生产

producer可以将消息写入到某broker中的某queue中,其过程如下:

  1. producer发送消息之前,会先向NameServer发出获取topic路由信息的请求
  2. NameServer返回该topic的路由表和broker列表
  3. producer根据代码中指定的queue选择策略,从queue列表中选出一个queue,用于后续存储消息
  4. producer对消息做一些特殊处理,例如,消息本身超过4m,则会进行压缩
  5. producer向选择的queue所在的broker发出rpc请求,将消息发送到queue

路由表:实际上是一个map,key为topic名称,value为一个QueueData实例列表

QueueData:一个topic在一个broker中的所有queue

broker列表:也是一个map。key为brokerName,value为BrokerData

BrokerData:一组brokerName相同的小集群(master-slave)。包含brokerName和一个map。map的key为brokerId,value为该broker的地址(brokerId为0为master,非0为slave)

queue选择算法

对于无序消息,其queue选择算法,也称为消息投递算法,常见的有两种:

  • 轮询算法:默认算法,该算法保证了每个queue可以均匀的获取到消息
  • 最小投递延迟算法:该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的queue。如果延迟相同,则采用轮询算法投递

消息的存储

rocketmq中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下store目录中

  • abort:该文件在broker启动后会自动创建,正常关闭broker,该文件会自动消失。若在没有启动broker的情况下,发现这个文件是存在的,则说明之前broker的关闭是非正常的
  • checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
  • commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的
  • consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中
  • index:其中存放着消息索引文件indexFile
  • config:存放着broker运行期间的一些配置数据
  • lock:运行期间使用到的全局资源锁

commitlog

commitlog目录中存放着很多的mappedFile文件,当前broker中的所有消息都是落盘到这些mappedFile文件中的。mappedFile文件最大为1g,文件名由20位十进制数构成,表示当前文件的第一条消息的起始位偏移量

注意:一个broker中仅包含一个commitlog目录,所有的mappedFile文件都是存放在该目录中的。即无论当前broker中存放着多少topic的消息,这些消息都是被顺序写入到了mappedFile文件中的。也就是说,这些消息在broker中存放时没有按照topic进行分类存放

消息单元

mappedFile文件内容由一个个的消息单元构成。每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在queue中存储的偏移量QueueOffset等近20余项相关属性

consumequeue

为了提高效率,会为每个topic在/store/consumequeue中创建一个目录,目录名称为topic名称,在该目录下,会再为每个queue创建一个目录,目录名为queueId。每个目录中存放着若干consumequeue文件,consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息

consumequeue文件名也是由20位数字构成,表示当前文件的第一个索引条目的起始位偏移量。与mappedFile文件名不同的是,其后续文件名是固定的。因为consumequeue文件大小是固定不变的

索引条目

每个consumequeue文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性:

  1. 消息在mappedFile文件中的偏移量CommitLog Offset,8字节
  2. 消息长度,4字节
  3. 消息Tag的hashcode值,8字节

这三个属性占20个字节,所以每个文件的大小固定是30w*20字节

对文件的读写

消息写入:

  1. broker根据生产者发送过来的queueId,获取到该消息对应的索引条目要在consumequeue中的写入偏移量,即QueueOffset
  2. 将queueId、queueOffset等数据,与消息一起封装为消息单元
  3. 将消息单元写入到commitlog。同时,形成消息索引条目
  4. 将消息索引条目分发到相应的consumequeue

消息拉取:

  1. consumer获取到其要消费消息所在queue的消费offset(消费进度),计算出其要消费消息的消息offset(消费offset+1)
  2. consumer向broker发送拉取请求,其中会包含其要拉取消息的queue、消息offset及消息tag
  3. broker计算在该consumequeue中的queueOffset(queueOffset=消息offset*20字节)
  4. 从该queueOffset处开始向后查找第一个指定tag的索引目录
  5. 解析该索引目录的前8个字节,即可定位到该消息在commitlog中的commitlog offset
  6. 从对应commitlog offset中读取消息单元,并发送给consumer

性能优化

  • rocketmq对文件的读写操作是通过mmap零拷贝进行的,将对文件的操作转化为直接对内存地址进行操作,从而极大的提高了文件的读写效率
  • consumequeue中的数据是顺序存放的,还引入了PageCache预读取机制,使得对consumequeue文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能

PageCache机制:

  • 写操作:os会先将数据写入到PageCache中,随后会以异步的方式由pdflush内核线程将cache中的数据刷盘到物理磁盘
  • 读数据:先从PageCache中读取数据,若没有命中,则os再从物理磁盘上加载数据到PageCache,同时也会对相邻数据块中的数据进行预读取

rockermq中可能会影响性能的是对commitlog的读取。因为对commitlog来说,读取消息时会产生大量的随机访问

indexFile

rocketmq提供了根据key进行消息查询的功能。该查询是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索引数据是包含了key的消息被发送到了broker时写入的,如果消息中没有包含key,则不会写入

indexFile的创建时机:

  • 当一条条带key的消息发送过来后,发现没有indexFile,此时会创建
  • 当一个indexFile中挂载的index索引单元数量超出2000w个时,会创建新的indexFile(根据indexHeader中的indexCount)

indexFile结构

每个broker中会包含一组indexFile,每个indexFile都是以一个文件创建的时间戳命名的。每个indexFile文件由三部分构成:indexHeader、slots槽位、indexes索引单元。每个indexFile文件中包含500w个slot槽。而每个slot槽又可能会挂载很多的index索引单元

一个indexFile的最大大小为:40+500w*4+2000w*20个字节

indexFile的文件名为当前被创建的时间戳,根据业务key进行查询时,查询条件除了key之外,还需要额外指定一个要查询的时间戳,表示要查询不大于该时间戳的最新消息,这个时间戳文件名可以简化查询,提高效率

indexHeader

indexHeader固定40个字节

  • beginTimestamp:该indexFile中第一条消息的存储时间
  • endTimestamp:该indexFile中最后一条消息存储时间
  • beginPhyoffset:该indexFile中第一条消息在commitlog中的偏移量commitlog offset
  • endPhyoffset:该indexFile中最后一条消息在commitlog中的偏移量commitlog offset
  • hashSlorCount:已经填充有index的slot数量
  • indexCount:该indexFile中包含的索引个数

slots槽位

key的hash值%500w的结果即为slot槽位,然后将该slot值修改为该index索引单元的indexNo,根据这个indexNo可以计算出该index单元在indexFile中的位置。不过,该取模结果的重复率是很高的,为了解决这个问题,在每个index索引单元中添加了preIndexNo,用于指定该slot中当前index索引单元的前一个index索引单元。而slot中始终存放的是其下最新的index索引单元的indexNo,这样的话,只要找到了slot就可以找到最新的index索引单元,而通过这个index索引单元就可以找到其之前的所有index索引单元

indexNo是一个在indexFile中的流水号,从0开始递增

index索引单元

index索引单元默认20个字节,包含4个属性

  • keyHash:消息中指定的业务key的hash值
  • phyOffset:当前key对应的消息在commitlog中的偏移量commitlog offset
  • timeDiff:当前key对应消息的存储时间与当前indexFile创建时间的时间差
  • preIndexNo:当前slot下当前index索引单元的前一个索引单元的indexNo

indexNo在index索引单元中是没有体现的,其是通过indexes中依次数出来的

查询流程

定位公式:

  • 计算指定消息key的slot槽位序号:key的hash%500w
  • 计算槽位为n的slot在indexFile中的起始位置:40+(n-1)*4
  • 计算indexNo为m的index在indexFile中的位置:40+500w*4+(m-1)*20
  1. 输入业务key与要查询的时间,开始查询
  2. 根据传入的时间找到相应的indexFile(文件名<=指定查询时间 的最大的文件)
  3. 计算出传入时间与找到的indexFile文件名间的差值diff
  4. 计算出业务key的hash值
  5. 计算出slot槽位序号(key的hash%500w)
  6. 根据slot槽位序号计算出该slot在indexFile中的位置(40+(n-1)*4)
  7. 找到slot后读取slot值,即当前slot中最新的index索引单元的indexNo
  8. 根据indexNo计算出该index单元在indexFile的位置(40+500w*4+(m-1)*20)
  9. 之前计算的时间差diff减去当前index单元中的timeDiff
    • 如果>=0:读取该index单元的phyOffset,然后就可以定位到commitlog中的消息了
    • 如果<0:读取该index单元的preIndexNo,作为要查找的下一个index索引单元的indexNo(越向前,index单元中的timeDiff越小)

消息的消费

消费者从broker中获取消息的方式有两种:pull拉取和push推送。消费者组对消息的消费模式又分为两种:集群消费clustering和广播消费broadcasting

获取消息类型

  • 拉取模式:consumer主动从broker中拉取消息,主动权由consumer控制。这种方式实时性较弱

  • 推送模式:broker收到数据后会主动推送给consumer。该模式实时性较高

    该消费类型是典型的发布-订阅模式,即consumer向其关联的queue注册监听器,一旦有新消息就会触发回调,回调方式是consumer去queue中拉取消息。而这需要基于consumer和broker间的长连接。长连接是需要消耗系统资源的

对比:

  • pull:需要应用去实现对关联queue的遍历,实时性差;但便于应用控制消息的拉取
  • push:封装了对关联queue的遍历,实时性强,但会占用较多系统资源

组消费模式

  • 广播模式:相同consumer group中的每个consumer实例都接收同一个topic的全量消息。即每条消息都会被发送到consumer group中的每个consumer
  • 集群模式:相同consumer group中的每个consumer实例平均分摊同一个topic的消息。即每条消息只会被发送到consumer group中的某个consumer

消息进度保存

  • 广播模式:消费进度保存在consumer端。因为广播模式下consumer group中每个consumer都会消费所有消息,但他们的消费进度是不同的。所以consumer各自保存各自的消费进度
  • 集群模式:消费进度保存在broker中。consumer group中的所有consumer共同消费同一个topic中的消息,同一条消息只会被消费一次。消费进度参与到了消费的负载均衡中,故消费进度是需要共享的(config/consumerOffset.json中)

rebalance机制

rebalance即再均衡,是指将一个topic下的多个queue在同一个consumer group中的多个consumer间进行重新分配的过程

集群模式才会有rebalance机制

rebalance机制是为了提升消息的并行消费能力。例如:一个topic下5个队列,在只有一个消费者的情况下,这个消费者要负责这五个队列的消息。如果此时增加一个消费者,那么就可以给其中一个消费者分配两个,另一个分配三个,从而提升并行消费能力

限制

由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列数量时,多余的消费者将分配不到任何队列

危害

  • 消费暂停:在触发rebalance时consumer要暂停部分队列的消费,要等rebalance结束时才能继续消费

  • 消费重复:consumer在消费新分配给自己的队列时,必须接着之前consumer提交的消费进度的offset继续消费。然而默认情况下,offset是异步提交的,可能会导致提交的offset和实际的offset不符,导致重复消费

    • 同步提交:提交offset后,需要等待broker返回ack后才能获取下一批消息

    • 异步提交:提交offset后,不需要等待broker的ack就可以直接获取下一批消息

  • 消费突刺:由于rebalance可能导致重复消费,如果需要重复消费的消息过多,或者因为rebalance暂停时间过长从而导致积压了部分消息。那么有可能会导致rebalance结束后瞬间需要消费很多消息

原因

导致rebalance产生的原因:

  • 消费者所订阅的topic的queue数量发生变化
  • 消费者组中消费者数量发生变化

过程

  1. 在broker中维护着多个map集合,这些集合中动态存放着当前topic中的queue信息、consumer group中的consumer实例信息。一旦发现消费者所订阅的queue数量发生变化,或消费者组中消费者的数量发生变化,立即向consumer group中的每个实例发出rebalance通知
  2. consumer实例接收到通知后会采用queue分配算法自己获取到相应的queue,即由consumer实例自主进行rebalance

在kafka中,一旦发现出现了rebalance条件,broker会调用group coordinator来完成rebalance。group coordinator是broker中的一个进程。

group coordinator会在consumer group中选出一个group leader。由这个leader根据自己本身情况完成partition分区的再分配。这个再分配结果会上报给coordinator,并由coordinator同步给group中的所有consumer实例

kafka中的的rebalance是由consumer leader完成的。而rockermq是由每个consumer自身完成的

queue分配算法

  • 平均分配策略:该算法是要根据avg = QueueCount / ConsumerCount的结果进行分配的。如果能够整除,则按顺序将avg个queue逐个分配给Consumer;如果不能整除,则将多余的queue按照consumer顺序逐个分配
  • 环形分配策略:根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配
  • 一致性hash策略:将consumer的hash值作为node节点存放到hash环上,然后将queue的hash值也放到hash环上,通过顺时针的方向,距离queue最近的那个consumer就是该queue要分配的consumer
  • 同机房策略:根据queue的部署机房位置和consumer的位置,过滤出当前consumer相同机房的queue。然后按照平均分配策略或环形平均策略对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形分配策略对所有queue进行分配

至少一次原则

rocketm有一个原则:每条消息必须要被成功消费一次

consumer在消费完消息后会向其消费进度记录器提交其消费消息的offset,offset被成功记录到记录器中,那么这条消息就被成功消费了

消费进度记录器:

  • 广播模式:consumer本身就是消费进度记录器
  • 集群模式:broker是消费进度记录器

offset管理

消费进度offset是用来记录每个queue在不同消费者组的消费进度。根据消费进度记录器的不同,可以分为两种模式:本地模式和远程模式

本地模式

当消费者模式为广播模式时,offset使用本地模式存储。因为每条消息都会被所有消费者消费,每个消费者管理自己的消费进度,各个消费者直接不存在消费进度的交集

consumer在广播模式下offset相关数据以json的形式持久化到consumer本地磁盘文件中,默认文件路径为当前用户目录下的.rocketmq_offsets/${clinetId}/${group}/Offsets.json。其中${clientId}为当前消费者id,默认为ip@DEFAULT;${group}为消费者组名

远程模式

当消费模式为集群模式时,offset使用远程模式管理。因为所有consumer实例对消息采用的是均衡消费,所有consumer共享queue的消费进度

consumer在集群模式下offset相关数据以json的形式持久化到broker磁盘中,文件路径为当前用户主目录下的store/config/consumerOffset.json

broker启动时会加载这个文件,并写入到一个双层map。外层map的key为topic@group,value为内层map。内层map的key为queueId,value为offset。当发生rebalance时,新的consumer会从该map中获取到相应的数据来继续消费

使用远程模式不光是因为共享消费进度,也是为了可以方便进行rebalance

用途

消费者要消费的第一条消息的起始位置是用户通过consumer.setConsumeFromWhere()方法指定的

在consumer启动后,要消费的第一条消息的起始位置常用的三种(ConsumeFromWhere枚举):

  • CONSUME_FROM_LAST_OFFSET:从queue的的当前最后一条消息开始消费
  • CONSUME_FROM_FLRST_OFFSET:从queue的第一条消息开始消费
  • CONSUME_FROM_TIMESTAMP:从指定的具体时间戳位置的消息开始消费。时间戳是通过另外一个语句指定的cosumer.setConsumerTimestamp(“yyyyMMddHHmmss”)

当消费完一批消息后,consumer会提交其消费进度offset给broker,broker在收到消费进度后会将其更新到哪个双层map及consumerOffset.json文件中,然后向该consumer进行ack,而ack内容包含:当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次消费的起始offset(nextBeginOffset)

重试队列

当rocketmq对消息的消费出现异常时,会将发生异常的消息的offset提交到broker中的重试队列。系统在发生消息消费异常时会为当前topic@group创建一个重试队列,该队列以%RETRY%开头,到达重试时间后进行消费重试

同步提交和异步提交

集群模式下,consumer消费完消息后会向broker提交消费进度offset,其提交方式分为:

  • 同步提交:消费者在消费完一批消息后会向broker提交这些消息的offset,然后等待broker的成功响应。若在等待超时之前收到了成功响应,则继续读取下一批消息进行消费(从ack中获取nextBeginOffset)。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待的过程中,消费者是阻塞的。其严重影响了消费者的吞吐量
  • 异步提交:消费者在消费完一批消息后向broker提交offset,但无需等待broker的成功响应,可以继续读取并消费下一批消息。这种方式增加了消费者的吞吐量。但需要注意,broker在收到提交的offset后,还是会向消费者进行响应的。在没有收到ack时,consumer会直接从broker中获取nextBeginOffset

消费幂等

当出现消费者对某条消息重复消费时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是幂等的

幂等解决方案的设计中涉及到两项要素:幂等令牌,与唯一性处理。只要充分利用好这两项要素,就可以设计好的幂等解决方案

  • 幂等令牌:是生产者和消费者两者间的既定协议,通常指具备唯一业务标识的字符串
  • 唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行成功多次

解决方案

  1. 首先通过缓存去重。在缓存中如果已经存在了幂等令牌,则说明本次操作是重复性操作;若缓存没有命中,则进入下一步
  2. 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作;若不存在,则进入下一步
  3. 唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引写入到db
  • 1中的缓存过期是有可能直接进入2中的,1可以帮2减轻压力
  • rocketmq中不要用MessageID作为幂等令牌,因为会重复
  • rocketmq本身可以保证消息不丢失,但不能保证消息不重复

消息堆积和消费延迟

消息处理流程中,如果consumer的消费速度跟不上producer的发送速度,mq中的未处理消息会越来越多,这部分消息就称为“堆积消息”。消息堆积进而会造成“消费延迟”

consumer使用长轮询pull模式消费消息时,分为两个阶段:

  1. 拉取消息:consumer通过长轮询pull模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。对于拉取式消费,在内网环境下会有很高的吞吐量,所以这一阶段不会成为消息堆积的瓶颈
  2. 消费消息:consumer将本地缓存的消息提交到消费线程中,使用业务消费逻辑对消费进行处理,处理完毕后获取到一个结果。这是真正的消息消费过程。此时consumer的消费能力就完全依赖于消息的消费耗时和消费并发度了。如果因为业务逻辑复杂,导致单条消息的耗时过长,则整体的吞吐量肯定不高,此时会导致consumer本地缓冲队列达到上限,停止从服务拉取消息

消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时和消费并发度决定(消费耗时的优先级高于消费并发度)

普通消息、延时消息、事务消息:消费并发度=单节点线程数*节点数量

顺序消息:

  • 全局顺序消息:这类topic只有一个queue,consumer group中只有一个consumer能消费,并发度为1
  • 分区顺序消息:这类topic有多个queue,只能保证单个queue的顺序消费。消息并发度=topic的queue分区数量

消息的清理

消息被消费过后不会被清理掉

消息是被顺序存储在commitlog文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以commitlog文件为单位进行清理的。否则会降低清理效率,并且实现逻辑复杂

commitlog文件存在一个过期时间,默认为72小时,即三天。除了用户手动清理外,在以下情况也会被自动清理(无论文件中的消息是否被消费过):

  • 文件过期,且到达清理时间点(默认凌晨4点)后,自动清理过期文件
  • 文件过期,且磁盘空间占用率已达到过期清理警戒线(默认75%)后,无论是否达到清理时间点,都会自动清理过期文件
  • 磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的规则清理文件,无论是否过期。默认从最老的文件开始清理
  • 磁盘占用率达到危险警戒线(默认90%)后,broker将拒绝消息写入

官方建议rocketmq服务的Linux文件系统采用ext4。因为对于文件删除操作,ext4比ext3性能更好