Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

这篇具有很好参考价值的文章主要介绍了Kafka-Java四:Spring配置Kafka消费者提交Offset的策略。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

  1. 自动提交Offset:
    1. 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。
    2. 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了
  2. 手动提交Offset
    1. 消费者在消费消息时/后,再提交offset,在消费者中实现
    2. 手动提交Offset分为:手动同步提交(commitSync)、手动异步提交(commitAsync)
  3. 什么是Offset
    1. 参考文章:Linux:【Kafka三】组件介绍

二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka消费者消费消息,自动提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerAutoSubmitOffset {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        // 一、设置参数
        // 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
        // 配置消息 键值的序列化规则
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 配置消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 设置消费者offset的提交方式
        // 自动提交:默认配置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        // 自动提交offset的时间间隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        // 二、创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        // 三、消费者订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 四、拉取消息,开始消费
        while (true){
            // 从kafka集群中拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时
            // 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }
        }
    }
}

上述代码中的如下代码是自动提交策略的相关设置 

        // 设置消费者offset的提交方式
        // 自动提交:默认配置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        // 自动提交offset的时间间隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

三、手动提交策略

3.1、手动同步提交策略

        手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka消费者消费消息,手动同步提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); 
 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 关键代码:关闭自动提交
        // 手动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
       
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }

            // 关键代码:commitSync():同步提交方法
            // 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。
            // 一般使用同步提交,在同步提交后不再做其他逻辑处理
            consumer.commitSync();

            // do anything
        }
    }
}

3.2、手动异步提交策略

        异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。文章来源地址https://www.toymoban.com/news/detail-715700.html

package com.demo.lxb.kafka;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @Description: kafka消费者消费消息,手动异步提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset2 {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 关键代码:关闭自动提交
        // 手动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
       
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
         
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }
            // 关键代码:commitAsync() 异步提交
            // new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e != null){
                        // 可将提交失败的消息记录到日志
                        System.out.println("记录提交offset失败的消息到日志");
                        System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));
                        System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));
                    }
                }
            });

            // 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。
            //do anything

        }
    }
}

到了这里,关于Kafka-Java四:Spring配置Kafka消费者提交Offset的策略的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 关于kafka消费者超时配置

    在Kafka中,消费者超时配置是指消费者在等待服务器响应时的超时时间。如果消费者在超时时间内未收到服务器的响应,它将重新发起请求或执行其他逻辑。 以下是关于Kafka消费者超时配置的一些常见选项: session.timeout.ms :该配置定义了消费者与Kafka集群之间的会话超时时间

    2024年02月16日
    浏览(44)
  • 基于spring mockito 编写kafka消费者的单元测试

    注: message.json为消息体内容的json文件 重点关注@MockBean和Mockito的使用 readData为从文件中读取json对象的方法

    2024年02月04日
    浏览(58)
  • 浅析Spring-kafka源码——消费者模型的实现

    SpringBoot项目中的消费端实现而言,Spring-kafka没有用原生的ConsumerConnector,,而是借助原生client的拉取消息功能做了自己的消费模型的实现,提供了@KafkaListener注解这种方式实现消费。 开发中在使用Spring-kafka时,一般也都是通过使用@KafkaListener注解的方法来实现消息监听和消费。

    2024年02月09日
    浏览(55)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(45)
  • Kafka消费者常用超时时间配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超时时间(session超时时间)增加成25秒(之前项目设置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息减少为20(之前是默认值500) spring.kafka.consumer.max-poll-records=20 //消息消费超时时间增加为10分钟 spring.kafka.p

    2024年02月03日
    浏览(93)
  • Kafka系列——详解创建Kafka消费者及相关配置

    参考自kafka系列文章——消费者创建与配置 在读取消息之前,需要先创建一个 KafkaConsumer 对象。 创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似——把想要传给消费者的属性放在 Properties 对象里,后面深入讨论所有属性。这里我们只需要使用 3 个必要的属性: bootstrap.

    2024年02月09日
    浏览(44)
  • Spring Boot中使用Kafka时遇到“构建Kafka消费者失败“的问题

    在使用Spring Boot开发应用程序时,集成Apache Kafka作为消息队列是一种常见的做法。然而,有时候在配置和使用Kafka时可能会遇到一些问题。本文将探讨在Spring Boot应用程序中使用Kafka时可能遇到的\\\"构建Kafka消费者失败\\\"错误,并提供解决方案。 错误描述: 当尝试构建Kafka消费者时

    2024年01月17日
    浏览(48)
  • kafka生产者和消费者配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月12日
    浏览(49)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(44)
  • 【项目实战】Java 开发 Kafka 消费者

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包