kafka-消费者

kafka-消费者

起男 451 2022-03-23

kafka-消费者

消费模式

消费者采用pull模式,主动从broker中拉取数据

没有采用push(推)模式是因为由broker决定消费速率,很难适应所以消费者的速率

pull模式的不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据

消费者组

消费者组,由多个消费者组成。形成一个消费者组的条件,是消费者的groupid相同

  • 消费者组内每个消费者负责不同分区的数据,一个分区只能由一个组内的一个消费者消费
  • 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

如果消费者组内的消费者的数量超过了主题分区的数量,将会有一部分消费者闲置

初始化

coordinator:辅助实现消费者组的初始化和分区的分配

coordinator节点选择:hashcode(groupid)%50(__consumer_offsets的分区数量)

例如:hashcode(groupid)=1,1%50=1,那么__consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset

  1. 消费者组中的每个消费者向coordinator发送joinGroup请求
  2. 由cordinator选择一个消费者作为leader
  3. cordinator吧要消费的topic的情况发送给leader
  4. 由leader负责指定消费方案
  5. 制定完计划后,把计划发送给coordinator
  6. coordinator把消费方案下发给各个消费者

每个消费者都会和coordinator保持心跳(默认3秒),一旦超时(默认45秒)该消费者会被移除,并触发再平衡;如果消费者处理消息时间过长(默认5分钟),也会触发再平衡

抓取数据参数

  • Fetch.min.bytes:每批次最小抓取大小,默认1字节
  • Fetch.max.bytes:每批次最大抓取大小:默认50m
  • fetch.max.wait.ms:一批数据最小值未达到的超时时间,默认500ms

工作流程

  1. 消费者组创建网络连接客户端
  2. 消费者发送消费请求给网络连接客户端
  3. 网络连接客户端向broker请求数据
  4. 响应的数据被保存在一个队列中
  5. 消费者从队列中抓取数据(默认一次最多500条)
  6. 然后由消费者对数据进行处理(反序列化、拦截器)

分区分配

kafka由四种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分配的分配策略。默认是Range+CooperativeSticky。kefka可以同时使用多个分配分配策略

Range

范围分配,是对每个topic而言的,首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母进行排序。然后通过分区数/消费者数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费一个分区

容易产生数据倾斜

如果只是针对1个topic而言,range分区影响不大。但是如果有n多topic,那么针对每个topic,前面几个消费者都将多消费1个分区,topic越多,前几个消费者的压力越大

RoundRobin

和Range不同是针对所有topic而言

RoundRobin轮询分区策略,是吧所有的分区和所有的消费者都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配分区给各个消费者

Sticky

粘性分区,可以理解为分配的结果带有“粘性”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销

粘性分区是kafka0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

offset

0.9版本之前,消费者默认将offset保存在zookeeper中

0.9版本之后,消费者默认将offset保存在kafka的一个内置的topic中(__consumer_offsets)

__consumer_offsets主题采用key和value的方式存储数据,key是group.id+topic+分区号,value就算当前offset的值。每隔一段时间kafka内部会对这个topic进行compact(压缩),也就是每个key只保留最新数据

自动提交

相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒

手动提交

手动提交的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高offset提交;不同点是,同步提交会阻塞当前线程,一直到提交成功,并且会自动失败重试;而异步提交则没有失败重试机制,故有可能提交失败

指定offset消费

auto.offset.reset=earliest|latest|none,默认latest

当kafka中没有偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如数据被删除),应该这么办:

  • earliest:自动将偏移量重置为最早偏移量,–from-beginning
  • latest:自动将偏移量重置为最新(最后)偏移量
  • none:如果未找到消费者组的先前偏移量,则抛出异常
  • 从指定偏移量消费

数据积压

  • 如果是kafka消费能力不足,则可以考虑增加topic的分区数,并且同时提升消费者组的消费者数量
  • 如果是下游数据处理不及时,就提高每批次拉取的数据量