kafka-api

kafka-api

起男 494 2022-03-24

kafka-api

依赖

		<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

生产者

        //配置
        Properties properties = new Properties();
        //连接
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //指定对应的key和value的序列号类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //自定义分区器
		//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
		//创建生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
        //发送数据
        //kafkaProducer.send(new ProducerRecord<>("test","hello"));
		//发送数据(同步)
		//kafkaProducer.send(new ProducerRecord<>("test","hello")).get();
		//发送数据(带回调)
		kafkaProducer.send(new ProducerRecord<>("test", "hello"), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null){
                    System.out.println("主题:"+recordMetadata.topic());
                    System.out.println("分区:"+recordMetadata.partition());
                }
            }
        });
        //关闭资源
        kafkaProducer.close();

事务

使用事务必须开启幂等

        //配置
        Properties properties = new Properties();
        //连接
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //指定对应的key和value的序列号类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //事务id(必填)
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"t_id_01");
        //创建生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties);
        //初始化事务
        kafkaProducer.initTransactions();
        //开始事务
        kafkaProducer.beginTransaction();
        try {
            //发送数据
            kafkaProducer.send(new ProducerRecord<>("test","hello"));
            //提交事务
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            //回滚事务
            kafkaProducer.abortTransaction();
        }finally {
            //关闭资源
            kafkaProducer.close();
        }

消费者

        //配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//连接
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//反序列化
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); //消费者组
//        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());//分区策略
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//自动提交
//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);//自动提交时间

        //创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer(properties);
        //订阅主题
        /*List<String> topicList = new ArrayList<>();
        topicList.add("test");
        consumer.subscribe(topicList);*/
        //订阅主题的分区
        List<TopicPartition> topicPartitionList = new ArrayList<>();
        topicPartitionList.add(new TopicPartition("test",0));
        consumer.assign(topicPartitionList);
        /*//分区信息
        Set<TopicPartition> assignment = consumer.assignment();
        //保证分区方案指定完毕
        while (assignment.size()==0){
            consumer.poll(Duration.ofSeconds(1));
            assignment = consumer.assignment();
        }
        //获取对应时间的offset
        Map<TopicPartition, Long> map = new HashMap<>();
        for (TopicPartition topicPartition : assignment) {
            map.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);//一天前的offset
        }
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(map);
        //从指定offset消费
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            //consumer.seek(topicPartition,100);
            consumer.seek(topicPartition,offsetAndTimestamp.offset());
        }*/

        //消费数据
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
            //手动提交offset(同步)
            //consumer.commitSync();
            //手动提交offset(异步)
            //consumer.commitAsync();
        }