kafka-分区器

kafka-分区器

起男 527 2022-03-02

kafka-分区器

分区的好处

  • 便于合理使用存储资源,每个分区在一个broker上存储,可以把海量的数据按照分区切割成多块数据存储在多台broker上,合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位消费数据

默认分区器

默认使用DefaultPartitioner完成分区策略

  • 指明分区时,直接将指明的值作为分区值
  • 没有指明分区但有key的情况下,将key的hash值与topic的分区数进行取模,得到最终的分区
  • 既没有指明分区又没有设置key的情况下,将采用粘性分区器,会随机选择一个分区,并尽可能一直使用该分区,待该分区的缓冲队列已满或者已完成,kafka再随机一个分区进行使用(和上一次不同)

在ProducerRecord类构造中可以对分区和key进行指定

自定义分区器

  1. 实现Partitioner接口

    public class MyPartitioner implements Partitioner {
    
        /**
         * 核心
         * @param topic 主题
         * @param key
         * @param keyBytes 序列号之后的key
         * @param value
         * @param valueBytes 序列号之后的value
         * @param cluster
         * @return
         */
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            //获取数据
            String msg = value.toString();
            if (msg.contains("dqn")){
                //发送到0号分区
                return 0;
            }
            return 1;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }
    
  2. 在配置参数中指定分区器

    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());