kafka-数据不重复
在kafka0.11版本后,引入了:幂等性和事务
幂等性就算指不论向broker发送多少次数据,broker都只会持久化一条,保证不会重复
数据的传递语义
- 至少一次:ack级别设置为-1 + 分区副本数大于等于2 + isr里应答的最小副本数大于等于2,保证数据不丢失,但是可能会重复
- 最多一次:ack级别设置为0,保证数据不重复,但是可能会丢失
- 精确一次:幂等性 + 至少一次,要求数据既不能重复也不会丢失
重复数据判断标准
具有<pid、partition、seqnumber>相同主键的消息提交时,broker只会持久化一条
- pid:生产者id号,kafka每次重启都会分配一个新的
- partition:分区号
- sequence number:单调自增
幂等性只能保证单分区单会话内不重复
开启幂等性
设置参数enable.idempotence为true,默认true
事务
开启事务,必须开启幂等性
生产者在使用事务前,必须自定义一个唯一的transactional.id。这样即使生产者挂了,它重启后也能继续处理未完成的事务
使用
//配置
Properties properties = new Properties();
//连接
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//指定对应的key和value的序列号类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"t_id_01");
//创建生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
//初始化事务
kafkaProducer.initTransactions();
//开始事务
kafkaProducer.beginTransaction();
try {
//发送数据
kafkaProducer.send(new ProducerRecord<>("test","hello"));
//提交事务
kafkaProducer.commitTransaction();
}catch (Exception e){
//回滚事务
kafkaProducer.abortTransaction();
}finally {
//关闭资源
kafkaProducer.close();
}