kafka-笔记
kafka是一个分布式流平台。一个分布式的流平台应该包含3个关键的能力
- 发布和订阅数据流,类似于消息队列或者是企业消息传递系统
- 以容错持久化方式存储数据流
- 处理数据流
目录结构
目录 | 说明 |
---|---|
bin | 执行脚本。例如:启动kafka服务器、创建topic、生产者、消费者程序等 |
config | 配置文件 |
libs | 运行所需要的jar包 |
logs | 日志文件,如果kafka出现一些问题,需要到该目录下查看异常信息 |
site-docs | 网站帮助文件 |
测试
创建topic
消息存放在topic中
kafka-topics.bat --create --bootstrap-server localhost:9092 --topic test
- --create:创建topic
- --bootstrap-server:指定服务地址
- --topic:topic名称
- --zookeeper:指定zookeeper
- --partitions:设置分区数
- --replication-factor:设置副本数
查看topic
kafka-topics.bat --bootstrap-server localhost:9092 --list
- --list:查看所有主题
- --describe:查看主题详情描述
修改topic
kafka-topics.bat --bootstrap-server localhost:9092 --alter --partitions 3
- --alter:修改(分区只能增加不能减少)
生产消息到kafka
使用kafka内置的测试程序
kafka-console-producer.bat --broker-list localhost:9092 --topic test
- --broker-list(--bootstrap-server):指定服务
- --topic:指定topic
从kafka消费消息
使用kafka内置的测试程序
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
- --bootstrap-server:指定服务
- --topic:指定topic
- --from-beginning:从头拉取所有数据(默认不会消费历史数据)
- --group:设置消费者组名称(如果不设置会自动生成一个随机值)
生产消息基准测试
kafka-producer-perf-test.bat --topic test --num-records 50000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=localhost:9092 acks=1
- --topic:指定topic
- --num-records:总共指定生产数据量(默认5000w)
- --throughput:指定吞吐量(-1不指定)
- --record-size:record数据大小(字节)
- --producer-props :生产特性
- bootstrap.servers:指定kafka集群地址
- acks=1:ack模式
消费消息基准测试
Kafka-consumer-perf-test.bat --broker-list localhost:9092 --topic test --fetch-size 1000 --messages 500000
- --broker-list:指定kafka集群地址
- --topic:指定topic名称
- --fetch-size:每次拉取的数据大小
- --messages:总共要消费的消息个数
java API
导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
生产者编写
//创建连接kafka的properties配置
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092");//fafka服务地址
properties.put("acks","all");//生产者生产消息到kafka中,kafka会以怎样的策略返回
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//使用字符串方式发送
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//同上
//创建一个生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
//构建一条消息
ProducerRecord producerRecord = new ProducerRecord("test", null, "v1");
//发送消息到指定的topic
Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
//等待响应
future.get();
//关闭生产者
kafkaProducer.close();
消费者
//消费者配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","testGroup");//消费者组(使用组将若干消费者组织到一起,共同消费topic的数据)
properties.setProperty("enable.auto.commit","true");//自动提交offset
properties.setProperty("auto.commit.interval.ms","1000");//自动提交offset时间间隔
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//反序列号方式
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//创建消费者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);
//订阅要消费的topic
kafkaConsumer.subscribe(Arrays.asList("test"));
//不断拉取数据
while (true){
//每次拉取一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
//主题
String topic = consumerRecord.topic();
//处于分区中的哪个位置
long offset = consumerRecord.offset();
//key/value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println(topic+":"+offset+":"+key+":"+value);
}
}
生产者异步方式生产消息
如果我们想获取生产者消息是否成功,或者成功生产消息到kafka中后,执行一些其它操作。此时可以很方便的使用带有回调函数的方法来发送消息
Future future = kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//判断发送消息是否成功
if (e == null){
//发送成功
//主题
String topic = recordMetadata.topic();
//分区id
int partition = recordMetadata.partition();
//偏移量
long offset = recordMetadata.offset();
System.out.println(topic+":"+partition+":"+offset);
}else {
//发送失败
System.out.println("消息发送异常");
System.out.println(e.getMessage());
}
}
});
- RecordMetadata:消息元数据
- Exception:封装了出现的异常,如果为空表示发送成功
重要概念
broker
- kafka服务器进程。生产者消费者都要连接broker
- 一个kafka集群通常由多个broker组成,这样才能实现负载均衡、以及容错
- broker是无状态的,它们通过zookeeper来维护集群状态
- 一个kafka的broker每秒可以处理数十万次读写,每个broker都可以处理tb级消息而不影响性能
zookeeper
- zk是用来管理协调broker,并且存储了kafka的元数据(例如:有多少topic、partition、consumer)
- zk服务主要用于通知生产者和消费者kafka集群中有新的broker加入、或者kafka集群中出现故障的broker
producer
- 生产者负责把数据推送给broker的topic
consumer
- 消费者负责从broker的topic中拉取数据,并进行处理
consumer group
- 消费者组是kafka提供的可扩展且具有容错性的消费者机制
- 一个消费者组可以包含多个消费者
- 一个消费者组有一个唯一的id(group id)
- 组内的消费者一起消费主题内的所有分区数据
partitions
- 在kafka集群中,主题被分为多个分区
- kafka集群的分布式就是由分区实现的,一个topic中的消息可以分布在topic中的不同分区中
replicas
- 副本可以确保某个服务出现故障时,确保数据依然可用
- 实现分区的容错
- 在kafka中,一般副本的个数>1
topic
- 一个kafka可用包含多个topic,一个topic可用包含多个分区
- 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
- kafka中的主题需要要有标识符,而且是唯一的,kafka中可用有任意数量的主题,没有数量上的限制
- 在主题中的消息是有结构的,一般一个主题包含一类消息
- 一旦生产者发送消息到主题中,这些消息就不能被更新
offset
- 偏移量纪录着下一条要发送给消费者的消息的序号
- 默认kafka将offset存储在zookeeper
- 在一个分区中,消息以有顺序的方式存储着,每个在分区的消费者都有一个递增的id。这个就是偏移量offset
- 偏移量在分区中才是有意义的。在分区之间offset是没有任何意义的
幂等性
生产者生产消息到kafka,如果直接发送消息,kafka会将消息保存到分区中,并返回一个ack给生产者,表示当前操作是否成功,如果ack响应的过程失败了,此时生产者会进行重试,继续发送没有发送成功的消息,kafka又会保存一条一模一样的数据
为了解决生产者重复发布消息的问题,kafka引入了prodicer id(pid)和sequence number(序列号)的概念
- pid:每个生产者在初始化时,都会分配一个唯一的pid,这个pid对用户来说,是透明的
- sequence number:针对每个生产者(对应pid)发送到指定主题分区的消息都对应一个从0开始递增的序列号
发送消息时,会将pid和序列号一起发送,kafka会将pid和序列号保存,如果再次发送消息,kafka会检查pid和序列号判断是否需要保存(判断依据:发送过来的序列号不能小于分区中的序列号)
分区和副本机制
生产者分区写入策略
生产者写入消息到topic,kafka将依据不同的策略将数据分配到不同的分区中
轮询
- 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到每个分区
- 如果在生产消息时,key为null,则使用轮询算法分配分区
随机
- 每次都随机的将消息分配到各个分区
- 早期版本,默认的分区策略是随机
按key分配
- 用key的hashcode取模分区的数量
- 有可能会出现数据倾斜,例如:某个key包含了大量的数据,因为key值一样,所有的数据都分配到了一个分区中,造成该分区的消息数量远大于其它的分区
自定义分区
-
实现Partitioner接口
-
在kafka生产者配置中,配置使用自定义分区器的类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomStrategy.class.getName());
消费者组rebalance机制
rebalance称为再均衡,是kafka中确保消费者组下所有的消费者如何达成一致,分配订阅的topic的每个分区的机制
触发时机
- 消费者组中消费者的个数发生变化
- 订阅的topic个数发生了变化
- 订阅的topic分区数量发生了变化
不良影响
- 再分配过程中会对消费者组产生非常严重的影响,过程中的所有消费者都将停止工作,直到再分配完成
消费者分区分配策略
保证每个消费者尽量能均衡的消费分区的数据,不能出现某个消费者消费分区的数据量特别多,某个消费者消费的分区特别少
Range范围分配策略
范围分配策略是kafka默认的分配策略,它可以保证每个消费者消费的分区数量是均衡的
范围分配策略是针对每个topic的
RoundRobin轮询分配策略
轮询策略是将消费者组内所有消费者以及消费者所订阅的所有topic的分区按照字典序排序,然后通过轮询的方式逐个将分区依次分配给每个消费者
Stricky粘性分配策略
从kafka0.11.x开始,引入此分配策略,主要目的:
- 分区分配尽可能均匀
- 在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同
没有发生rebalance时,粘性分配策略和轮询分配策略类似
副本机制
副本的目的就是冗余备份,当某个broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的broker上的副本是可用的
生产者的ACKs参数
对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍
properties.put("acks","all");//生产者生产消息到kafka中,kafka会以怎样的策略返回
- all 或 -1:等待所有副本已经将数据同步后,才会发送下一条数据,性能最慢
- 0:不等待broker确认,直接发送下一条数据,性能最高,但可能会存在数据丢失的情况
- 1:等待leader副本确认接收后,才会发送下一条数据,性能中等
分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区中leader去读写消息,follower做的事情就是同步数据
高级api和低级api
- 高级api就是直接让kafka帮助管理、处理分配数据
- offset存储在zk中
- 由kafka的rebalance来控制消费者的分区
- 开发起来比较简单,无需开发者关注底层细节
- 无法做到细粒度的控制
- 低级api由编写的程序自己控制逻辑
- 自己来管理offset,可用将offset存储在zk、MySQL、redis、hbase、flink的状态存储
- 指定消费者拉取某个分区的数据
- 可用做到细粒度的控制
- 原有的kafka的策略会失效,需要我们自己来实现消费机制
分区的leader与follower
在kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或多个follower,在创建topic时,kafka会将每个分区的leader均匀的分配在每个broker上。我们正常使用kafka是感觉不到leader、follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader,所以可以这样说:
- kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
- 如果leader出现故障,其它follower会被重新选举为leader
- follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中
ar、isr、osr
在实际环境中,leader有可能会出现一些故障,所以kafka一定会选举出新的leader。kafka中把follower按照不同状态分为三类ar、isr、osr
- ar:一个topic下所有的副本
- isr:正在同步的副本
- osr:不再同步的副本
ar = isr + osr
正常情况下,所有的follower副本都应该与leader副本保持同步,即ar = isr,osr集合为空
controller
- kafka启动时,会在所有的broker中选择一个controller
- leader和follower是针对partition,而controller是针对broker的
- 创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的
- kafka分区leader的选举,也是由controller决定的
controller选举
- 在kafka集群启动的时候,每个broker都会尝试去zookeeper上注册成为controller
- 但只有一个竞争成功,其他的broker会注册该节点的监视器
- 一旦该临时节点状态发生变化,就可以进行相应的处理
- controller是高可用的,一旦某个broker崩溃,其他broker会重新注册成为controller
leader选举
- 所有分区的leader选举都由controller决定
- controller会将leader的改变直接通过rpc的方式通知需为此做出响应的broker
- controller读取到当前分区的isr,只要有一个副本还幸存,就选择其作为leader
- 如果该分区所有replica都已宕机,则新的leader为-1
为什么不用zk选举:
kafka集群如果业务很多的情况下,会有很多分区,假设某个broker宕机,就会出现很多的分区都需要选举leader。如果使用zk选举leader,会给zk带来巨大的压力
生产者写入数据流程
- 从zk中获取对应分区leader位置
- 发送消息给leader
- leader将消息写入本地日志文件(顺序写)
- follower拉取leader消息并写入本地日志文件,然后向leader发送ack
- leader收到所有isr的ack后,返回生产者ack
消费流程
- 每个消费者都可以根据分配策略(默认范围分配),获得要消费的分区
- 获取到consumer对应的offset(默认从zk中获取上一次消费的offset)
- 找到该分区的leader,拉取数据
- 消费者提交offset(自动提交:每隔多少秒提交一次,手动提交:放到事务中提交)
模式
- kafka采用
拉取模型
,由消费者自己纪录消费状态,每个消费者互相独立的顺序拉取每个分区的消息 - 消费者可以按照任意的顺序消费消息。比如消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费
数据存储形式
- 一个topic由多个分区组成
- 一个分区(partition)由多个段(segment)组成
- 一个段(segment)由多个文件(log、index、timeindex)组成
存储日志
消息是保存在以:主题-分区id
的文件夹中的
文件夹中有以下文件:
文件名 | 说明 |
---|---|
.index | 索引文件,根据offset查找数据就是通过该索引(稀疏索引) |
.log | 日志数据文件 |
.timeindex | 时间索引 |
leader-epoch-checkpoint | 持久化每个partition leader对应的leo(log end offset,日志文件中下一条待写入消息的offset) |
- 每个文件的文件名为起始偏移量,因为每个分区的起始偏移量是0,所以,分区的日志文件都是以00000000000000000000.log开始
- 默认的每个日志默认最大为1g
- 为了简化根据offset查找消息,kafka日志文件名设计为开始的偏移量
写入消息
- 新的消息总数写入到最后一个日志文件中(接着leo向后写)
- 该文件如果达到指定大小(默认1g)时,将会滚动到一个新的文件中
读取消息
- 消费者的offset是一个针对partition的全局offset
- 可以根据这个offset找到segment段
- 接着要将全局的offset换成segment的局部offset
- 根据局部的offset,就可以从稀疏索引(.index)找到对应的数据位置
- 开始顺序读取
为了提高查询效率,每个文件都会维护对应的范围内存,查找的时候就使用简单的二分查找
删除消息
- 在kafka中,消息是会被定期清理的。一次删除一个segment段的日志文件
- kafka的日志管理器,会根据kafka的配置,来决定哪些文件可以被删除
消息不丢失
broker消息不丢失
生产者通过分区的leader写入数据后,所有在isr中follower都会从leader中复制数据,这样,可以确保即使leader崩溃了,其他的follower的数据仍然是可用的
生产者消息不丢失
- 生产者连接leader写入数据时,可用通过ack机制来确保数据已经成功写入
- -1:所有节点都受到数据
- 1:leader收到数据
- 0:不关心数据是否丢失
- 生产者可用采用同步和异步两种方式发送数据
- 同步:发送数据后等待返回结果
- 异步:提供一个回调函数
消费者消息不丢失
在消费消息的时候,只要每个消费者纪录好offset,就能保证消息不丢失
数据清理
kafka的消息存储在磁盘中,为了控制磁盘占用空间,kafka需要不断的对过去的一些消息进行清理工作。kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理
在kafka中提供两种日志清理方式
- 日志删除(log deletion):按照指定的策略直接删除不符合条件的日志
- 日志压缩(log compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本
配置:
配置项 | 配置值 | 说明 |
---|---|---|
log.cleaner.enable | true(默认) | 开启自动清理日志功能 |
log.cleanup.policy | delete(默认) | 删除日志 |
log.cleanup.policy | compaction | 压缩日志 |
log.cleanup.policy | delete、compact | 同时支持删除、压缩 |
日志删除
日志删除是以段(segment日志)为单位来进行定期删除的
定时日志删除任务
kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合条件的日志分段文件,这个周期可以通过broker参数log.retention.check.interval.ms
来配置,默认值为300,000,即5分钟
当前日志分段保留策略有三种:
- 时间
- 日志大小
- 日志起始偏移量
删除流程
- 从日志文件对象中所维护日志分段的跳跃表中移除删除的日志分段,以保证没有线程对这些日志分段进行操作
- 将日志分段文件加上“.deleted”的后缀(包括对应的索引文件)
- kafka的后台定时任务会定期删除这些“.deleted”后缀的文件,这个任务的延迟执行时间可以通过
file.delete.delay.ms
参数来设置,默认60000,即1分钟