一.版本兼容的问题
因为某个功能需要对接的kafka是一个上古版本0.10.0.0,公司项目又是springcloud项目,导致版本兼容性的问题很头大
1.kafka的版本号
下载的windows版kafka如:kafka_2.10-0.10.0.0
2.10标识编译kafka集群的scala版本号,kafka的服务端编码语言为scala
0.10.0.0标识kafka真正的版本号
kafka的版本号从1.0开始由四位版本号改为了三位,既类似0.9.0.0–>1.0.0。
2.java对接kafka一般有以下的方式
-
spring-cloud-stream/spring-cloud-stream-binder-kafka
各个版本的官方文档:spring-could-stream
scs中也引入了 spring kafka,kafka client也有对应关系在官网中可以看到 - kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
第二这种会引入两个依赖jar,不使用 scala api可以用第一种
kafka-clients-0.10.2.0.jar
kafka_2.11-0.10.2.0.jar
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
-
spring-kafka
官方文档:spring-kafka
spring kafka的版本和spring-boot-starter-parent要匹配
spring-kafka中引入了kafka-client的版本对照关系如下
此处有个坑就是他强制要求springboot的版本和spring-kafka对应
//https://blog.csdn.net/lzx1991610/article/details/100777040
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二.实现订阅和发布消息代码
Kafka消费者通过groupId消费指定topic的,
以groupId区分不同的消费者,即不同的groupId消费相同的topic,对于topic而言,就是不同的消费者,
同时,消费者需要记录消费到的offset,以便下次启动时定位到具体的位置,消费消息。
这里,配置的offset策略为:latest,即每次重启消费者时,从最新的offset开始消费(上次记录的offset之后的一个,如果上次消费没有记录,则从当前offset之后开始消费)。
offset的重置这样理解: 当前topic写入数据有4条,offset从0到3,
如果,offset重设为earliest,则每次重启消费者,offset都会从0开始消费数据;
如果,offset重设为latest,则,每次消费从上次消费的offset下一个开始消费,如果上次消费的offset为3,则,重启后,
从4开始消费数据。 原文链接:https://blog.csdn.net/Xin_101/article/details/126154171
参考博客: https://www.jianshu.com/p/1f9e18e926f6
public class KafkaUtil {
final static String url = "localhost:9092";
public static void receiveBPMessage(){
Properties props = new Properties();
//183.240.87.230:9092为消息服务器开放的TCP端口
props.put("bootstrap.servers", KafkaUtil.url);
//0为消费者所在的用户组,同一个组对于消息的消费只能有一次,不同组可以共同消费同一条消息
props.put("group.id", "0");
//指定了消费者是否自动提交偏移量,默认值是 true,自动提交
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
//server.keystore.jks证书所在路径,以及密码。由消息服务器颁发。
// props.put("ssl.keystore.location","/root/securityCA/server.keystore.jks");
// props.put("ssl.keystore.password", "123456");
// props.put("security.protocol","SSL");
// props.put("ssl.truststore.type", "JKS");
// props.put("ssl.keystore.type", "JKS");
//client.truststore.jks证书所在路径,以及密码。由消息服务器颁发。
// props.put("ssl.truststore.location","/root/securityCA/client.truststore.jks");
// props.put("ssl.truststore.password", "123456");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//建立consumer连接
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//订阅主题
consumer.subscribe(Collections.singletonList("test"));
//消息轮询是消费者的核心,通过轮询向服务器请求数据
try {
while (true) {
//消费消息
ConsumerRecords<String, String> records = consumer.poll(500);
// for (ConsumerRecord<String, String> record : records) {
// // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
// System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
// record.topic(), record.partition(), record.offset(),record.key(), record.value()));
// }
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords){
//对消息做简单地打印操作
System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),record.key(), record.value()));
}
long lastOffset=partitionRecords.get(partitionRecords.size() - 1).offset();
//提交消息消费的offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
// 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
consumer.close();
}
}
public static void sendBPMessage(JSONObject object){
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaUtil.url);
//server.keystore.jks证书所在路径。由消息服务器颁发。
// producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/root/securityCA/server.keystore.jks");
// //server.keystore.jks证书的密码。由消息服务器提供。
// producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"medstarMessageServer");
// //client.truststore.jks证书所在路径。由消息服务器颁发。
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/root/securityCA/client.truststore.jks");
// //client.truststore.jks证书的密码。由消息服务器提供。
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"medstarMessageServer");
// producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
// producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//根据配置文件创建生产者连接
KafkaProducer producer = new KafkaProducer(producerProps);
//发送消息,该实例中,为循环发送test数据100次,可以根据实际情况,遍历列表中的数据,拼接成规定的消息格式进行发送,一般,同一个机构的消息发送通道是固定的,通道会由消息服务器产生并分配给对应机构
for (int i = 0; i < 10; i++) {
//新建ProducerRecord类型的数据,第一个参数为发送的通道,第二个参数为发送消息的内容
ProducerRecord<String,String> r = new ProducerRecord<String,String>("test","key-"+i,"中文-"+i);
producer.send(r);
System.err.println("发送消息");
}
//关闭消息服务器连接,可以在消息全部发送完毕的时候关闭连接
producer.close();
}
}
三.安装windows版kafka进行测试
参考博客: https://blog.csdn.net/marquis0/article/details/126525221
命令参考文章来源:https://www.toymoban.com/news/detail-729583.html
//启动内置zk
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
//启动kafka服务
.\bin\windows\kafka-server-start.bat .\config\server.properties
//创建一个名称为test的topic 类似于数据库的表
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic test
//创建一个生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
//创建一个消费者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning --zookeeper localhost:2181
不同版本的kafka命令会不一样 以下参考
旧版本
##创建topic
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xxoo
#查看topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
# topic 描述
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxoo
# producer(控制台向topic生产数据)
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
>this is a message
>this is another message
##consumer(控制台消费topic的数据2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
this is a message
this is another message
## 查看某一个topic对应的消息数量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
## 新版本的消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上,老版本存在zookeeper上
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
./kafka-consumer-groups.sh --bootstrap-server kafka01.qq.cn:9092,kafka02.qq.cn:9092,kafka03.qq.cn:9092 --list
##删除消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --delete --group py-test
##查看消费组的的列表
./kafka-consumer-groups.sh --list --bootstrap-server 192.168.100.11:9092
或者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 xxoo --list
## 查看特定消费组的情况
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --group py-test --describe
-- 旧版本Kafka命令行参数(kafka_scala2.11-2.0.0 为例)
# 查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
## topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xxoo
## 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xxoo
# topic 查看信息
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xx
# 分区扩展
# /usr/local/kafka/bin/kafka-topics.sh --alter --topic xx --zookeeper localhost:2181 --partitions 24
## consumer(控制台消费topic的数据2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
# 指定消费组消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --group xx-group
### 生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
## 查看某一个topic对应的消息数
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
## delete topic
./kafka-topics --delete --zookeeper localhost:2181 --topic javadaemon
# 查看消费组列表
./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
./kafka-consumer-groups.sh --list --bootstrap-server kafka01.car.cn:9092
# 查看指定消费组以及连接的ip地址
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --describe --group vmsOperationLogGroup|grep vms-road_fee
## 查看指定消费组的堆积情况
./kafka-consumer-groups.sh --bootstrap-server kafka01.car.cn:9092 --describe --group knight_group
## 查看指定分区的信息
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper IP:2181 --topic test
清理openapi-AccessLog-Rest指定保留2天
# /usr/local/kafka/bin/kafka-configs.sh --zookeeper IP:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=172800000
测试在生产者命令窗口发布消息,发现消费者命令窗口打印显示,并且项目main方法调用执行消费者后,也会收到消息
测试使用java接口发布消息,kafka客户端也能接受到消息
文章来源地址https://www.toymoban.com/news/detail-729583.html
到了这里,关于java使用kafka-clients集成0.10.0.0版本kafka(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!