【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡

这篇具有很好参考价值的文章主要介绍了【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一,代码及配置项介绍

kafka版本为3.6,部署在3台linux上。

maven依赖如下:

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>3.6.0</version>
        </dependency>

生产者、消费者和topic代码如下:

    String topic = "items-01";
    
    @Test
    public void producer() throws ExecutionException, InterruptedException {
        Properties p = new Properties();
        p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
        while(true){
            for (int i = 0; i < 3; i++) {
                for (int j = 0; j <3; j++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item"+j,"val" + i);
                    Future<RecordMetadata> send = producer
                            .send(record);

                    RecordMetadata rm = send.get();
                    int partition = rm.partition();
                    long offset = rm.offset();
                    System.out.println("key: "+ record.key()+" val: "+record.value()+" partition: "+partition + " offset: "+offset);

                }
            }
        }
    }

    @Test
    public void consumer(){
    	        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        //KAKFA IS MQ  IS STORAGE
        /**
         *         "What to do when there is no initial offset in Kafka or if the current offset
         *         does not exist any more on the server
         *         (e.g. because that data has been deleted):
         *         <ul>
         *             <li>earliest: automatically reset the offset to the earliest offset
         *             <li>latest: automatically reset the offset to the latest offset</li>
         *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
         *         </ul>";
         */
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//第一次启动,米有offset

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");

                Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String,String> record = iterator.next();
                    int partition = record.partition();
                    long offset = record.offset();
                    System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
                }

                }
            }
    }

这里先简单解释一下,kafka的topic只是一个逻辑上的概念,实际上的物理存储是依赖分布在broker中的分区partition来完成的。kafka依赖的zk中有一个__consumer_offsets[1]话题,存储了所有consumer和group消费的进度,包括当前消费到的进度current-offset、kafka写入磁盘的日志中记录的消息的末尾log-end-offset

kafka根据消息的key进行哈希取模的结果来将消息分配到不同的partition,partition才是consumer拉取的对象。每次consumer拉取,都是从一个partition中拉取。(这一点,大家可以自己去验证一下)

kafka offset异步+同步提交,kafka,kafka,java,linq

下面代码,是描述的当consumer第一次启动时,在kafka中还没有消费的记录,此时current-offset为"-"时,consumer应如何拉取数据的行为。有3个值可选,latest、earliest、none。

当设置如下配置为latest,没有current-offset时,只拉取consumer启动后的新消息。
earliest,没有current-offset时,从头开始拉取消息消费。
node,没有current-offset时,抛异常。

它们的共同点就是current-offset有值时,自然都会按照current-offset拉取消息。

properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

下面代码,true表示设置的异步自动提交,false为手动提交。

properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

下面代码,设置的是自动提交时,要过多少秒去异步自动提交。

properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");

下面代码,是设置kafka批量拉取多少数据,默认的应该是最大500,小于500。 kafka可以批量的拉取数据,这样可以节省网卡资源。

properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");

二、异步自动提交

部分设置项如下:

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10"); 

开启生产者,生产出一些消息,可以看到之前拉取完数据的group又有了新的数据。
kafka offset异步+同步提交,kafka,kafka,java,linq
开启消费者,可以看到消息被消费掉。
kafka offset异步+同步提交,kafka,kafka,java,linq

因为提交是异步的,我们需要需要为了业务代码留出处理时间。所以需要设置异步提交时间。

假设在间隔时间(AUTO_COMMIT_INTERVAL_MS_CONFIG,自动提交间隔毫秒数配置)内,还没有提交的时候,消费过数据假设数据,consumer挂了。那么consumer再次启动时,从kafka拉取数据,就会因为还没有提交offset,而重新拉取消费过的数据,导致重复消费。

假设现在已经过了延间隔时间,提交成功了,但是业务还没有完成,并且在提交后失败了。那么这个消费失败的消息也不会被重新消费了,导致丢失消息。

为了解决上述的问题,可以使用手动同步提交。

三、手动同步提交

假设我们现在是按照批量拉取,下面介绍2种提交粒度的demo,粒度由小到大,分别是按条提交,按partition提交 && 按批次提交。

3.1 按条提交


    public void test1(){

        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        //KAKFA IS MQ  IS STORAGE
        /**
         *         "What to do when there is no initial offset in Kafka or if the current offset
         *         does not exist any more on the server
         *         (e.g. because that data has been deleted):
         *         <ul>
         *             <li>earliest: automatically reset the offset to the earliest offset
         *             <li>latest: automatically reset the offset to the latest offset</li>
         *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
         *         </ul>";
         */
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//第一次启动,米有offset

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");
                
                Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String,String> next = iterator.next();
                    int p = next.partition();
                    long offset = next.offset();
                    String key = next.key();
                    String value = next.value();
                    System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);

                    TopicPartition sp = new TopicPartition(topic,p);
                    OffsetAndMetadata om = new OffsetAndMetadata(offset);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(sp,om);
                    consumer.commitSync(map);
                    
                }
            }
        }
    }

3.2 按partition提交 && 按批次提交

由于消费者每一次拉取都是从一个partition中拉取,所以其实按partition拉取和按批次拉取,是一回事。整体成功

    @Test
    public void test2(){
        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");

                Set<TopicPartition> partitions = records.partitions();
                for (TopicPartition partition : partitions){
                    List<ConsumerRecord<String,String>> pRecords = records.records(partition);

                    Iterator<ConsumerRecord<String,String>> pIterator = pRecords.iterator();
                    while (pIterator.hasNext()){
                        ConsumerRecord<String,String> next = pIterator.next();
                        int p = next.partition();
                        long offset = next.offset();
                        String key = next.key();
                        String value = next.value();
                        System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);
                    }
                    //按partition提交
                    long offset = pRecords.get(pRecords.size() - 1).offset();
                    OffsetAndMetadata om = new OffsetAndMetadata(offset);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(partition,om);
                    consumer.commitSync(map);
                }
                //按批次提交
//                consumer.commitSync();
            }
        }
    }

四,动态负载均衡

我们知道,对于一个topic,一个group中,为了保证消息的顺序性,默认只能有一个consumer来消费。假设我们有3台消费者,那么此时,另外2台消费者就会闲着不干活。有没有可能能够既保证消费消息的顺序性,又能够提升性能呢?

答案就是kafka的动态负载均衡

前面提到了,producer会根据消息的key的哈希取模的结果来把消息分配到某个partition,也就是说同一个key的消息,只存在于一个partition中。而且消费者拉取消息,一个批次,只从一个partition中拉取消息

假设我们现在有一个topic,有2个partition。那么我们可不可以在组内3台消费者中,挑2台出来,各自对应这个topic的2个partition,这样消费者和partition一一对应。既能保证消息的顺序性,又能够提升性能。这就是kafka的动态负载均衡。

代码如下:

    //动态负载均衡
    @Test
    public void test3(){
        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //kafka 的consumer会动态负载均衡
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            //Revoked,取消的回调函数
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("---onPartitionsRevoked:");
                Iterator<TopicPartition> iter = partitions.iterator();
                while(iter.hasNext()){
                    System.out.println(iter.next().partition());
                }
                System.out.println();
            }

            //Assigned 指定的回调函数
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("---onPartitionsAssigned:");
                Iterator<TopicPartition> iter = partitions.iterator();

                while(iter.hasNext()){
                    System.out.println(iter.next().partition());
                }
                System.out.println();
            }
        });

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");

                Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String,String> next = iterator.next();
                    int p = next.partition();
                    long offset = next.offset();
                    String key = next.key();
                    String value = next.value();
                    System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);
                    TopicPartition sp = new TopicPartition(topic,p);
                    OffsetAndMetadata om = new OffsetAndMetadata(offset);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(sp,om);
                    consumer.commitSync(map);
                }
            }
        }
    }

上述代码在订阅时,加了一个ConsumerRebalanceListener监听器,实现了2个回调函数onPartitionsRevoked和onPartitionsAssigned,分别是取消组内消费者负载均衡时触发的回调函数,和指定组内消费者加入负载均衡时触发的回调函数。

在使用动态负载均衡时,需要注意的是,在提交时不要批量提交,否则会报错如下,暂时还没有研究问题原因,有了结果会回来更新的。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

先打开一个消费者A,触发了回调函数onPartitionsAssigned,可以看到partition0 和 partition1都被分配到了A上。kafka offset异步+同步提交,kafka,kafka,java,linq

此时打开生产者,可以看到partition0和1的消息都发送到了A上。
kafka offset异步+同步提交,kafka,kafka,java,linq

我们再打开一个同一个组内的消费者B。
可以看到A取消了partition0和1的分配,被指定了partition0。消费者B则被指定了partition1.kafka offset异步+同步提交,kafka,kafka,java,linqkafka offset异步+同步提交,kafka,kafka,java,linq

再次打开生产者去生产消息,这次A只消费partition 0的消息,B只消费partition1的消息。
kafka offset异步+同步提交,kafka,kafka,java,linq
kafka offset异步+同步提交,kafka,kafka,java,linq

如果我们再启动组内第3台消费者,那么组内消费者会再次负载均衡。由于这个topic只有2个partition,所以即使启动3台组内的消费者,也最多只有2个消费者被分配给某个partition,剩余1个消费者不参与负载均衡
kafka offset异步+同步提交,kafka,kafka,java,linq
kafka offset异步+同步提交,kafka,kafka,java,linq
kafka offset异步+同步提交,kafka,kafka,java,linq

参考文章:
[1],【kafka】记一次kafka基于linux的原生命令的使用文章来源地址https://www.toymoban.com/news/detail-804516.html

到了这里,关于【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Kafka客户端(kafka-clients)的Java API操作Kafka的Topic

    记录 :460 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(52)
  • 服务器异步客户端

    internal class MessageManagerT:SingletonMessageManagerT {     Dictionaryint, ActionT MsgDic = new Dictionaryint, ActionT();     public void OnAddListen(int id,ActionT action)     {         if(MsgDic.ContainsKey(id))         {             MsgDic[id] += action;         }         else         {             MsgDic.Add(id, ac

    2024年04月09日
    浏览(42)
  • 【C++ Boost】一个最基本的异步boost async tcp 服务/客户端代码的深刻解析,一文解决所有接口的用法以及一些容易出错的点

    1.官网链接 https://www.boost.org/doc/libs/1_80_0/doc/html/boost_asio.html 本文代码是以官方实例代码做的一些优化 2.平台选择      Boost 最令人惊艳的地方有两个:一是支持跨平台,即windows和linux下的接口代码都是一样的;二是支持异步操作,即可以让read和write操作不阻塞。      因此

    2024年02月06日
    浏览(49)
  • 基于C#的ModbusTcp客户端Demo

            今天跟大家分享一个基于C#的ModbusTcp客户端的创建,本人小白一枚,第一次发表博客,有诸多不足之处,还请谅解,也希望大佬可以指点,如何可以做得更好。 先展示一下成品效果吧。         Demo看起来就跟上图一样,这里ui使用了sunnyui的一些控件,以及运用

    2024年02月11日
    浏览(40)
  • java代码构建简单http服务器和客户端

    初识http a、超文本传输 、应用层的面向对象的协议,概念介绍网上资源一大堆,关键是基于TCP/IP通信协议来传递数据。 b、一开始接触web项目,都是先接触的servlet,tomcat服务器默认实现的一套http规范,提供了基础服务和组件环境,直接拿到请求、构建正文、响应客户端 然而

    2024年02月10日
    浏览(37)
  • QT5.14 实现ModbusTCP客户端 Demo

    本文在QT5.14平台,基于QModbusClientTcp类,实现了客户端对单个寄存器的读写,用ModbusSlave做服务器做测试。 1.界面 (1)更改读按钮的名称为bt_Read (2)更改写按钮的名称为bt_Write 2.修改pro文件的第三行 greaterThan(QT_MAJOR_VERSION, 4): QT += widgets  serialbus   3.修改mainWindow.h #ifndef MAINWINDOW_H

    2024年01月22日
    浏览(45)
  • Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置

    公司Kafka一直没做安全验证,由于是诱捕程序故需要面向外网连接,需要增加Kafka连接验证,保证Kafka不被非法连接,故开始研究Kafka安全验证 使用Kafka版本为2.4.0版本,主要参考官方文档 官网对2.4版本安全验证介绍以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    浏览(45)
  • RPC教程 2.支持并发与异步的客户端

    对  net/rpc  而言,一个函数需要能够被远程调用,它必须满足一定的条件,否则其会被忽略。 这些条件是: 方法的类型是可输出的 (the method’s type is exported) 方法本身也是可输出的 (the method is exported) 方法必须由两个参数,必须是输出类型或者是内建类型 (the method has tw

    2024年01月24日
    浏览(45)
  • .NetCore gRpc 客户端与服务端的单工通信Demo

    方式一 使用vs 2022(也可以是其他版本)创建一个grpc的服务,如下这样 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uipEG9Xu-1687172462785)(C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-20230619183828284.png)] 简单方便,创建项目后的目录结构如下图

    2024年02月09日
    浏览(42)
  • Python - 【socket】 客户端client重连处理简单示例Demo(一)

    在Python中,使用socket进行网络通信时,如果连接断开,可以通过以下步骤实现重连处理 这个函数使用一个while循环,不断地尝试建立 socket 连接,如果出现 socket.error 异常,则打印异常信息并等待5秒钟重试。当连接成功时,函数会返回一个连接套接字。 在主程序中,可以使用

    2024年02月14日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包