java使用kafka-clients集成0.10.0.0版本kafka(一)

这篇具有很好参考价值的文章主要介绍了java使用kafka-clients集成0.10.0.0版本kafka(一)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.版本兼容的问题

因为某个功能需要对接的kafka是一个上古版本0.10.0.0,公司项目又是springcloud项目,导致版本兼容性的问题很头大

1.kafka的版本号

kafka-clients,kafka,java,kafka,开发语言
下载的windows版kafka如:kafka_2.10-0.10.0.0
2.10标识编译kafka集群的scala版本号,kafka的服务端编码语言为scala
0.10.0.0标识kafka真正的版本号
kafka的版本号从1.0开始由四位版本号改为了三位,既类似0.9.0.0–>1.0.0。

2.java对接kafka一般有以下的方式

  • spring-cloud-stream/spring-cloud-stream-binder-kafka
    各个版本的官方文档:spring-could-stream
    scs中也引入了 spring kafka,kafka client也有对应关系在官网中可以看到
  • kafka-clients
  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
  </dependency>

第二这种会引入两个依赖jar,不使用 scala api可以用第一种
kafka-clients-0.10.2.0.jar
kafka_2.11-0.10.2.0.jar

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.2.0</version>
        </dependency>

  • spring-kafka
    官方文档:spring-kafka
    spring kafka的版本和spring-boot-starter-parent要匹配
    spring-kafka中引入了kafka-client的版本对照关系如下
    kafka-clients,kafka,java,kafka,开发语言
    kafka-clients,kafka,java,kafka,开发语言
    此处有个坑就是他强制要求springboot的版本和spring-kafka对应
//https://blog.csdn.net/lzx1991610/article/details/100777040
 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
 </dependency>

二.实现订阅和发布消息代码

Kafka消费者通过groupId消费指定topic的,
以groupId区分不同的消费者,即不同的groupId消费相同的topic,对于topic而言,就是不同的消费者,
同时,消费者需要记录消费到的offset,以便下次启动时定位到具体的位置,消费消息。
这里,配置的offset策略为:latest,即每次重启消费者时,从最新的offset开始消费(上次记录的offset之后的一个,如果上次消费没有记录,则从当前offset之后开始消费)。
offset的重置这样理解: 当前topic写入数据有4条,offset从0到3,
如果,offset重设为earliest,则每次重启消费者,offset都会从0开始消费数据;
如果,offset重设为latest,则,每次消费从上次消费的offset下一个开始消费,如果上次消费的offset为3,则,重启后,
从4开始消费数据。 原文链接:https://blog.csdn.net/Xin_101/article/details/126154171

参考博客: https://www.jianshu.com/p/1f9e18e926f6

public class KafkaUtil {

    final static String  url = "localhost:9092";

    public static void receiveBPMessage(){
        Properties props = new Properties();
        //183.240.87.230:9092为消息服务器开放的TCP端口
        props.put("bootstrap.servers", KafkaUtil.url);
        //0为消费者所在的用户组,同一个组对于消息的消费只能有一次,不同组可以共同消费同一条消息
        props.put("group.id", "0");
        //指定了消费者是否自动提交偏移量,默认值是 true,自动提交
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //server.keystore.jks证书所在路径,以及密码。由消息服务器颁发。
//        props.put("ssl.keystore.location","/root/securityCA/server.keystore.jks");
//        props.put("ssl.keystore.password", "123456");
//        props.put("security.protocol","SSL");
//        props.put("ssl.truststore.type", "JKS");
//        props.put("ssl.keystore.type", "JKS");
        //client.truststore.jks证书所在路径,以及密码。由消息服务器颁发。
//        props.put("ssl.truststore.location","/root/securityCA/client.truststore.jks");
//        props.put("ssl.truststore.password", "123456");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //建立consumer连接
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅主题
        consumer.subscribe(Collections.singletonList("test"));
        //消息轮询是消费者的核心,通过轮询向服务器请求数据
        try {
            while (true) {
                //消费消息
                ConsumerRecords<String, String> records = consumer.poll(500);
//                for (ConsumerRecord<String, String> record : records) {
//                    // 每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
//                    System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
//                            record.topic(), record.partition(), record.offset(),record.key(), record.value()));
//                }
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords){
                        //对消息做简单地打印操作
                        System.out.println(String.format("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
                                record.topic(), record.partition(), record.offset(),record.key(), record.value()));
                    }
                    long lastOffset=partitionRecords.get(partitionRecords.size() - 1).offset();
                    //提交消息消费的offset
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            // 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
            consumer.close();
        }

    }

    public static void sendBPMessage(JSONObject object){
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KafkaUtil.url);
        //server.keystore.jks证书所在路径。由消息服务器颁发。
//        producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,"/root/securityCA/server.keystore.jks");
//        //server.keystore.jks证书的密码。由消息服务器提供。
//        producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,"medstarMessageServer");
//        //client.truststore.jks证书所在路径。由消息服务器颁发。
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,"/root/securityCA/client.truststore.jks");
//        //client.truststore.jks证书的密码。由消息服务器提供。
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,"medstarMessageServer");
//        producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
//        producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //根据配置文件创建生产者连接
        KafkaProducer producer = new KafkaProducer(producerProps);
        //发送消息,该实例中,为循环发送test数据100次,可以根据实际情况,遍历列表中的数据,拼接成规定的消息格式进行发送,一般,同一个机构的消息发送通道是固定的,通道会由消息服务器产生并分配给对应机构
        for (int i = 0; i < 10; i++) {
            //新建ProducerRecord类型的数据,第一个参数为发送的通道,第二个参数为发送消息的内容
            ProducerRecord<String,String> r = new ProducerRecord<String,String>("test","key-"+i,"中文-"+i);
            producer.send(r);
            System.err.println("发送消息");
        }
        //关闭消息服务器连接,可以在消息全部发送完毕的时候关闭连接
        producer.close();
    }


}

三.安装windows版kafka进行测试

参考博客: https://blog.csdn.net/marquis0/article/details/126525221
命令参考

//启动内置zk
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
//启动kafka服务
.\bin\windows\kafka-server-start.bat .\config\server.properties
//创建一个名称为test的topic 类似于数据库的表  
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 -replication-factor 1 --partitions 1 --topic test
//创建一个生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
//创建一个消费者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic test --from-beginning --zookeeper localhost:2181   
不同版本的kafka命令会不一样 以下参考
旧版本
##创建topic
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xxoo
 
#查看topic
./kafka-topics.sh --list --bootstrap-server localhost:9092
 
# topic 描述
./kafka-topics.sh --describe --zookeeper localhost:2181  --topic xxoo
 
# producer(控制台向topic生产数据)
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
>this is a message
>this is another message
 
##consumer(控制台消费topic的数据2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
this is a message
this is another message
 
## 查看某一个topic对应的消息数量
./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
 
 
 
 
 
 
## 新版本的消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上,老版本存在zookeeper上
 
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
./kafka-consumer-groups.sh --bootstrap-server  kafka01.qq.cn:9092,kafka02.qq.cn:9092,kafka03.qq.cn:9092 --list
 
 
##删除消费组
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --delete --group py-test
 
##查看消费组的的列表
./kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.100.11:9092
或者
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 xxoo --list
 
## 查看特定消费组的情况
./kafka-consumer-groups.sh --bootstrap-server 192.168.100.11:9092 --group py-test --describe
-- 旧版本Kafka命令行参数(kafka_scala2.11-2.0.0 为例)
# 查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
 
## topic描述
./kafka-topics.sh --describe --zookeeper localhost:2181  --topic xxoo
 
## 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xxoo
 
# topic 查看信息
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xx
 
# 分区扩展
# /usr/local/kafka/bin/kafka-topics.sh --alter  --topic xx --zookeeper localhost:2181 --partitions 24
 
## consumer(控制台消费topic的数据2)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --from-beginning
 
# 指定消费组消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxoo --group xx-group
 
### 生产数据
./kafka-console-producer.sh --broker-list localhost:9092 --topic xxoo
 
## 查看某一个topic对应的消息数
./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic xxoo --time -1
 
## delete topic
./kafka-topics --delete --zookeeper localhost:2181 --topic javadaemon 
 
 
# 查看消费组列表
./kafka-consumer-groups.sh --list  --bootstrap-server localhost:9092
./kafka-consumer-groups.sh --list  --bootstrap-server kafka01.car.cn:9092
 
# 查看指定消费组以及连接的ip地址
./kafka-consumer-groups.sh --bootstrap-server 192.168.0.2:9092 --describe --group  vmsOperationLogGroup|grep vms-road_fee
 
 
##  查看指定消费组的堆积情况
./kafka-consumer-groups.sh  --bootstrap-server kafka01.car.cn:9092 --describe --group knight_group
 
## 查看指定分区的信息
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper IP:2181  --topic test
 
清理openapi-AccessLog-Rest指定保留2天
# /usr/local/kafka/bin/kafka-configs.sh --zookeeper IP:2181 --entity-type topics --entity-name test --alter --add-config  retention.ms=172800000

测试在生产者命令窗口发布消息,发现消费者命令窗口打印显示,并且项目main方法调用执行消费者后,也会收到消息
测试使用java接口发布消息,kafka客户端也能接受到消息
kafka-clients,kafka,java,kafka,开发语言
kafka-clients,kafka,java,kafka,开发语言文章来源地址https://www.toymoban.com/news/detail-729583.html

到了这里,关于java使用kafka-clients集成0.10.0.0版本kafka(一)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(49)
  • SpringBoot集成Elasticsearch8.x(6)|(新版本Java API Client使用)

    章节 章节 第一章链接: SpringBoot集成Elasticsearch7.x(1)|(增删改查功能实现) 第二章链接: SpringBoot集成Elasticsearch7.x(2)|(复杂查询) 第三章链接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指标聚合查询) 第四章链接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合

    2024年02月08日
    浏览(49)
  • # SpringBoot集成Elasticsearch8.5.x(5)|( 新版本Java API Client使用)

    章节 章节 第一章链接: SpringBoot集成Elasticsearch7.x(1)|(增删改查功能实现) 第二章链接: SpringBoot集成Elasticsearch7.x(2)|(复杂查询) 第三章链接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指标聚合查询) 第四章链接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合

    2023年04月13日
    浏览(48)
  • SpringBoot集成Elasticsearch8.x(7)|(新版本Java API Client使用完整示例)

    章节 第一章链接: SpringBoot集成Elasticsearch7.x(1)|(增删改查功能实现) 第二章链接: SpringBoot集成Elasticsearch7.x(2)|(复杂查询) 第三章链接: SpringBoot集成Elasticsearch7.x(3)|(aggregations之指标聚合查询) 第四章链接: SpringBoot集成Elasticsearch7.x(4)|(aggregations之分桶聚合查询)

    2024年02月16日
    浏览(56)
  • 通过Java client访问Kafka

    1. Install Kafka 1) download kafka binary from https://kafka.apache.org/downloads 2) extract binary 2. Start Kafka 1) start zookeeper in daemon mode 2) start kafka server in daemon mode 3. Test Kafka 1) create a topic 2) producer events 3) consumer events 4. Access Kafka from Java client 1) download kafka client binary from https://jar-download.com/artifacts

    2024年02月05日
    浏览(41)
  • SpringBoot集成Kafka版本不兼容导致出现错误

    1、系统报错 2、排查与解决 出错原因:springboot集成spring-kafka的时候需要注意两者之间的版本对应关系,因为版本不兼容导致出现错误 解决:kafka-clients : 是springboot集成的spring-kafka,spring-kafka中引入了kafka-client的版本 参考:https://spring.io/projects/spring-kafka 参考:https://stackover

    2024年02月14日
    浏览(31)
  • Java与es8实战之二:Springboot集成es8的Java Client

    配置springboot的application.yml 配置es的自签证书 执行如下命令将es容器中的crt文件复制到本地 docker cp 容器名称:/usr/share/elasticsearch/config/certs/http_ca.crt . 将crt文件放至springboot项目的resource路径下

    2024年02月12日
    浏览(40)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(57)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(41)
  • 【Java】IDE集成开发环境工具IntelliJ安装和使用

    欢迎来到《小5讲堂》 大家好,我是全栈小5。 这是《Java》序列文章,每篇文章将以博主理解的角度展开讲解, 特别是针对知识点的概念进行叙说,大部分文章将会对这些概念进行实际例子验证,以此达到加深对知识点的理解和掌握。 温馨提示:博主能力有限,理解水平有限

    2024年01月18日
    浏览(71)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包