如果不是有要求或者kafka生产者没有消费者群组,就不要用assign方式订阅,还是用subscribe订阅主题,我是被生产者坑了,开始给我说没有消费者群组,所有我只能用assign订阅指定分区,后来才给我说有消费者群组。
import com.alibaba.fastjson2.JSON; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.*; @Component public class KafkaConsumerAssign implements CommandLineRunner { @Value("${ss.pubTopic}") private String pubTopic = "topic"; @Value("${ss.kafkaAddress}") private String kafkaAddress = "xx.xx.xxx.xx:8093,xx.xxx.xxx.xx:8093,xx.xxx.xxx.xx:8093"; public void autoCommit() { ConsumerDict consumerDict = new ConsumerDict(); Properties properties = new Properties(); // 指定key与value的反序列化器 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("enable.auto.commit", false);//手动提交提交 properties.put("bootstrap.servers", kafkaAddress);//kafka连接地址 //消费者群组,如果没有群组的话可以写通,若果有消费者组不写会,后面提交偏移量的时候会报错 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");//消费者组 // properties.put("max.poll.records",50);//单次最大记录数 // properties.put("session.timeout.ms","50000");//消费者连接的超时时间 properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='用户名' password='密码';"); properties.put("security.protocol", "SASL_SSL");//安全协议 properties.put("sasl.mechanism", "SCRAM-SHA-256");//加密方式 //指定truststore文件 properties.put("ssl.truststore.location", "D:/xxx/xx/xxx/xxxxxx.jks"); //truststore文件密码 properties.put("ssl.truststore.password", "aaaaaa"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); //使用partitionsFor获取该topic下所有的分区 List<PartitionInfo> partitionInfos = consumer.partitionsFor(pubTopic); for (PartitionInfo partitionInfo : partitionInfos) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } //使用assign方式订阅kafka consumer.assign(topicPartitions); operationKafkaMessage(consumer); } //启动程序后自动启动此kafka客户端 @Override public void run(String... args) { new KafkaConsumerAssign().autoCommit(); } private void operationKafkaMessage(KafkaConsumer<String, String> consumer) { while (true) { ConsumerRecords<String, String> records = consumer.poll(100);//100ms 自动获取一次数据,消费者主动发起请求 //循环所有的分区 for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); //获取每个分区中的所有数据 for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } //当前的消费到的位置 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //处理完每个分区中的消息后,提交偏移量。 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } public static void main(String[] args) { //new KafkaConsumerAssign().autoCommit(); } }
文章来源地址https://www.toymoban.com/news/detail-572148.html
文章来源:https://www.toymoban.com/news/detail-572148.html
到了这里,关于java使用assign订阅,使用SASL_SSL协议的SCRAM-SHA-256加密方式消费kafka数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!