kafka-数据不重复

kafka-数据不重复

起男 485 2022-03-03

kafka-数据不重复

在kafka0.11版本后,引入了:幂等性和事务

幂等性就算指不论向broker发送多少次数据,broker都只会持久化一条,保证不会重复

数据的传递语义

  • 至少一次:ack级别设置为-1 + 分区副本数大于等于2 + isr里应答的最小副本数大于等于2,保证数据不丢失,但是可能会重复
  • 最多一次:ack级别设置为0,保证数据不重复,但是可能会丢失
  • 精确一次:幂等性 + 至少一次,要求数据既不能重复也不会丢失

重复数据判断标准

具有<pid、partition、seqnumber>相同主键的消息提交时,broker只会持久化一条

  • pid:生产者id号,kafka每次重启都会分配一个新的
  • partition:分区号
  • sequence number:单调自增

幂等性只能保证单分区单会话内不重复

开启幂等性

设置参数enable.idempotence为true,默认true

事务

开启事务,必须开启幂等性

生产者在使用事务前,必须自定义一个唯一的transactional.id。这样即使生产者挂了,它重启后也能继续处理未完成的事务

使用

        //配置
        Properties properties = new Properties();
        //连接
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //指定对应的key和value的序列号类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"t_id_01");
        //创建生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
        //初始化事务
        kafkaProducer.initTransactions();
        //开始事务
        kafkaProducer.beginTransaction();
        try {
            //发送数据
            kafkaProducer.send(new ProducerRecord<>("test","hello"));
            //提交事务
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            //回滚事务
            kafkaProducer.abortTransaction();
        }finally {
            //关闭资源
            kafkaProducer.close();
        }