kafka-分区器
分区的好处
- 便于合理使用存储资源,每个分区在一个broker上存储,可以把海量的数据按照分区切割成多块数据存储在多台broker上,合理控制分区的任务,可以实现负载均衡的效果
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位消费数据
默认分区器
默认使用DefaultPartitioner完成分区策略
- 指明分区时,直接将指明的值作为分区值
- 没有指明分区但有key的情况下,将key的hash值与topic的分区数进行取模,得到最终的分区
- 既没有指明分区又没有设置key的情况下,将采用粘性分区器,会随机选择一个分区,并尽可能一直使用该分区,待该分区的缓冲队列已满或者已完成,kafka再随机一个分区进行使用(和上一次不同)
在ProducerRecord类构造中可以对分区和key进行指定
自定义分区器
-
实现Partitioner接口
public class MyPartitioner implements Partitioner { /** * 核心 * @param topic 主题 * @param key * @param keyBytes 序列号之后的key * @param value * @param valueBytes 序列号之后的value * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取数据 String msg = value.toString(); if (msg.contains("dqn")){ //发送到0号分区 return 0; } return 1; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
-
在配置参数中指定分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());