kafka-消费者
消费模式
消费者采用pull模式,主动从broker中拉取数据
没有采用push(推)模式是因为由broker决定消费速率,很难适应所以消费者的速率
pull模式的不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据
消费者组
消费者组,由多个消费者组成。形成一个消费者组的条件,是消费者的groupid相同
- 消费者组内每个消费者负责不同分区的数据,一个分区只能由一个组内的一个消费者消费
- 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
如果消费者组内的消费者的数量超过了主题分区的数量,将会有一部分消费者闲置
初始化
coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择:hashcode(groupid)%50(__consumer_offsets的分区数量)
例如:hashcode(groupid)=1,1%50=1,那么__consumer_offsets主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset
- 消费者组中的每个消费者向coordinator发送joinGroup请求
- 由cordinator选择一个消费者作为leader
- cordinator吧要消费的topic的情况发送给leader
- 由leader负责指定消费方案
- 制定完计划后,把计划发送给coordinator
- coordinator把消费方案下发给各个消费者
每个消费者都会和coordinator保持心跳(默认3秒),一旦超时(默认45秒)该消费者会被移除,并触发再平衡;如果消费者处理消息时间过长(默认5分钟),也会触发再平衡
抓取数据参数
- Fetch.min.bytes:每批次最小抓取大小,默认1字节
- Fetch.max.bytes:每批次最大抓取大小:默认50m
- fetch.max.wait.ms:一批数据最小值未达到的超时时间,默认500ms
工作流程
- 消费者组创建网络连接客户端
- 消费者发送消费请求给网络连接客户端
- 网络连接客户端向broker请求数据
- 响应的数据被保存在一个队列中
- 消费者从队列中抓取数据(默认一次最多500条)
- 然后由消费者对数据进行处理(反序列化、拦截器)
分区分配
kafka由四种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky
可以通过配置参数partition.assignment.strategy
,修改分配的分配策略。默认是Range+CooperativeSticky。kefka可以同时使用多个分配分配策略
Range
范围分配,是对每个topic而言的,首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母进行排序。然后通过分区数/消费者数
来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费一个分区
容易产生数据倾斜
如果只是针对1个topic而言,range分区影响不大。但是如果有n多topic,那么针对每个topic,前面几个消费者都将多消费1个分区,topic越多,前几个消费者的压力越大
RoundRobin
和Range不同是针对所有topic而言
RoundRobin轮询分区策略,是吧所有的分区和所有的消费者都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配分区给各个消费者
Sticky
粘性分区,可以理解为分配的结果带有“粘性”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销
粘性分区是kafka0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化
offset
0.9版本之前,消费者默认将offset保存在zookeeper中
0.9版本之后,消费者默认将offset保存在kafka的一个内置的topic中(__consumer_offsets)
__consumer_offsets主题采用key和value的方式存储数据,key是group.id+topic+分区号
,value就算当前offset的值
。每隔一段时间kafka内部会对这个topic进行compact(压缩),也就是每个key只保留最新数据
自动提交
相关参数:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认5秒
手动提交
手动提交的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高offset提交;不同点是,同步提交会阻塞当前线程,一直到提交成功,并且会自动失败重试;而异步提交则没有失败重试机制,故有可能提交失败
指定offset消费
auto.offset.reset=earliest|latest|none,默认latest
当kafka中没有偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如数据被删除),应该这么办:
- earliest:自动将偏移量重置为最早偏移量,–from-beginning
- latest:自动将偏移量重置为最新(最后)偏移量
- none:如果未找到消费者组的先前偏移量,则抛出异常
- 从指定偏移量消费
数据积压
- 如果是kafka消费能力不足,则可以考虑增加topic的分区数,并且同时提升消费者组的消费者数量
- 如果是下游数据处理不及时,就提高每批次拉取的数据量