kafka-api
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
生产者
//配置
Properties properties = new Properties();
//连接
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//指定对应的key和value的序列号类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//自定义分区器
//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
//创建生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
//发送数据
//kafkaProducer.send(new ProducerRecord<>("test","hello"));
//发送数据(同步)
//kafkaProducer.send(new ProducerRecord<>("test","hello")).get();
//发送数据(带回调)
kafkaProducer.send(new ProducerRecord<>("test", "hello"), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主题:"+recordMetadata.topic());
System.out.println("分区:"+recordMetadata.partition());
}
}
});
//关闭资源
kafkaProducer.close();
事务
使用事务必须开启幂等
//配置
Properties properties = new Properties();
//连接
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//指定对应的key和value的序列号类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//事务id(必填)
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"t_id_01");
//创建生产者对象
KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
//初始化事务
kafkaProducer.initTransactions();
//开始事务
kafkaProducer.beginTransaction();
try {
//发送数据
kafkaProducer.send(new ProducerRecord<>("test","hello"));
//提交事务
kafkaProducer.commitTransaction();
}catch (Exception e){
//回滚事务
kafkaProducer.abortTransaction();
}finally {
//关闭资源
kafkaProducer.close();
}
消费者
//配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//连接
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); //消费者组
// properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());//分区策略
// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//自动提交
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);//自动提交时间
//创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer(properties);
//订阅主题
/*List<String> topicList = new ArrayList<>();
topicList.add("test");
consumer.subscribe(topicList);*/
//订阅主题的分区
List<TopicPartition> topicPartitionList = new ArrayList<>();
topicPartitionList.add(new TopicPartition("test",0));
consumer.assign(topicPartitionList);
/*//分区信息
Set<TopicPartition> assignment = consumer.assignment();
//保证分区方案指定完毕
while (assignment.size()==0){
consumer.poll(Duration.ofSeconds(1));
assignment = consumer.assignment();
}
//获取对应时间的offset
Map<TopicPartition, Long> map = new HashMap<>();
for (TopicPartition topicPartition : assignment) {
map.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);//一天前的offset
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(map);
//从指定offset消费
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
//consumer.seek(topicPartition,100);
consumer.seek(topicPartition,offsetAndTimestamp.offset());
}*/
//消费数据
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
//手动提交offset(同步)
//consumer.commitSync();
//手动提交offset(异步)
//consumer.commitAsync();
}