Kafka中的enable-auto-commit和auto-commit-interval配置

这篇具有很好参考价值的文章主要介绍了Kafka中的enable-auto-commit和auto-commit-interval配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

当前kafka的版本为2.8.11,Spring Boot的版本为2.7.6,在pom.xml中引入下述依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.11</version>
</dependency>

提前说明:当前Kafka的使用是与Spring Boot做了整合,不是使用原生的Kafka,因此Kafka的某些功能Spring Boot是做了二次封装,使其更加符合于实际情况。 

1、Kafka客户端自动提交offset

Windosw环境下面使用下述两个命令重装Zookeeper和Kafka:

docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest
docker run  -d --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.15:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.15:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e TZ="Asia/Shanghai" wurstmeister/kafka:latest

已经提前规划好了当前要测试的消费者组为ONE,其消费的主题为topic0,使用下述命令来查看消费者组ONE的消费情况:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group ONE

由于还没有创建该消费者组,所以在执行上述命令时会报错:

Error: Consumer group 'ONE' does not exist.

auto-commit-interval,Kafka,kafka

在yml配置文件进行如下配置:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true
      auto-commit-interval: 6000ms
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

在项目中创建一个生产者用于往主题topic0中投递消息,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {

    // 自定义的主题名称
    public static final String TOPIC_NAME="topic0";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg")String msg) {
        log.info("准备发送消息为:{}",msg);
        // 1.发送消息
        ListenableFuture<SendResult<String,String>> future=kafkaTemplate.send(TOPIC_NAME,msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 2.发送失败的处理
                log.error("生产者 发送消息失败:"+throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, String> stringObjectSendResult) {
                // 3.发送成功的处理
                log.info("生产者 发送消息成功:"+stringObjectSendResult.toString());
            }
        });
        return "接口调用成功";
    }
}

接着再在项目中创建一个消费者用于消费主题topic0中的消息,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”
    public static final String TOPIC_NAME = "topic0";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者组One消费了消息:Topic:" + topic + ",Record:" + record + ",Message:" + msg);
        }
    }
}

启动整个项目,这时控制台中会打印下述信息:

ConsumerConfig values:
auto.commit.interval.ms = 6000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
client.id = consumer-ONE-1
enable.auto.commit = true
group.id = ONE
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

这个时候,我们使用下述命令来查看消费者组ONE的消费情况:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group ONE

auto-commit-interval,Kafka,kafka

可以看到初始时,消费者组ONE对于主题topic0的消息消费偏移量为0

调用 /kafka/send?msg=1 接口往主题topic0中生产1条消息,可以看到在控制台中该消息已经被消费了,如下所示:

消费者组One消费了消息:Topic:topic0,Record:ConsumerRecord(topic = topic0, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1702558156663, serialized key size = -1, serialized value size = 1, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1),Message:1

再次使用上述命令来查看消费者组ONE的消费情况,可以看到消费者组ONE对于主题topic0的消息消费偏移量为1,如下图所示:

auto-commit-interval,Kafka,kafka

2、设置自动提交时间间隔 

将yml文件中auto-commit-interval属性的值修改为60000ms,如下所示:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true
      auto-commit-interval: 60000ms
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

重启整个项目,然后调用 /kafka/send?msg=2 接口往主题topic0中生产1条消息。可以看到在控制台中日志打印消费者组ONE消费了消息以后,接着我们立马使用上述命令查看消费者组ONE的消费情况,我们会发现消费者组ONECURRENT-OFFSET(已经提交的消费位移偏移量)属性值并不会立刻更新。

auto-commit-interval,Kafka,kafka

大概在一分钟左右以后,我们才能看到该属性值发生了改变,如下图所示:

auto-commit-interval,Kafka,kafka

3、Spring Boot自动提交offset

在yml文件中关于enable-auto-commit和auto-commit-interval的配置全部移除,重启整个项目,这时控制台中会打印下述信息:

ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
client.id = consumer-ONE-1
enable.auto.commit = false
group.id = ONE
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

调用/kafka/send?msg=3接口往主题topic0中生产1条消息,当看到在控制台中日志打印消费者组ONE消费了消息以后,接着使用上述命令查看消费者组ONE的消费情况,我们会发现消费者组ONECURRENT-OFFSET属性值在几秒以内(已经提交的消费位移偏移量)就更新了。

auto-commit-interval,Kafka,kafka

虽然在项目启动过程中看到消费者配置信息中的enable.auto.commitfalse,即没有使用自动提交偏移量,但是Spring在当一个消息被某一个消费者消费了以后,它会自动帮我们进行人工提交,提交已消费消息的偏移量。

如果enable.auto.commit的设置为false,但是项目中最终真的没有进行人工提交offset,那么这个可能是一个隐藏的bug,会导致每次重启服务时,之前已经消费过的消息会被重复进行消费。

Spring的开发者早已料到这一情况,因此做了自动提交offset的封装处理,从而防止新手忘记手动提交offset,从另外一个方面来看这个处理也减轻了开发的复杂度。

4、Spring Boot中实现手动提交偏移量

为了实现在Spring Boot中手动提交偏移量,首先配置文件修改为如下所示:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      ack-mode: manual

上述yml文件中需要重点关注的是 spring.kafka.consumer.enable-auto-commit 和 spring.kafka.listener.ack-mode 这两个配置。

重启整个项目,然后调用 /kafka/send?msg=4 接口往主题topic0中生产1条消息,使用上述命令查看消费者组ONE的消费情况,我们会发现消费者组ONECURRENT-OFFSET(已经提交的消费位移偏移量)属性值不会更新,会一直是旧的偏移量。

再次重启项目以后后,在消费者连接到Kafka服务端以后,会出现前面已经消费过的消息会被重复消费。出现这个问题的原因是在消息被消费了以后,kafka客户端没有自动提交其消息消费的偏移量,由于yml文件中的配置Spring这个时候也没有自动帮我们提交。

那么这个时候就需要在消息被消费以后在代码中手动提交偏移量,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”
    public static final String TOPIC_NAME = "topic0";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment acknowledgment) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("消费者组One消费了消息:Topic:" + topic + ",Record:" + record + ",Message:" + msg);
            // 手动提交offset(偏移量)
            acknowledgment.acknowledge();
        }
    }
}

重点在于 acknowledgment.acknowledge(); 这行代码,有了它我们就可以在代码中实现手动提交偏移量。重启服务后,当消息者再次消费了消息以后,再次查看消费者组ONE的消费情况,可以看到CURRENT-OFFSET(已经提交的消费位移偏移量)属性值已经更新为最新值。文章来源地址https://www.toymoban.com/news/detail-804880.html

5、总结

  • enable-auto-commit的值为true时,是采用kafka的默认提交模式,表示消费者组对于某个主题消息的消费偏移量将在后台交由Kafka客户端定期进行提交。只有设置了enable-auto-commit为true时,auto-commit-interval才会生效,它表示消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
  • Kafka与Spring Boot整合以后,非特殊情况下没必要设置enable-auto-commit和auto-commit-interval这两个属性,自动提交偏移交给Spring管理就行。
  • Spring Boot中开启kafka的手动提交消费偏移量,这个操作不是很建议,一旦你忘记在业务代码手动提交偏移量,那么将是一个新的故事。
  • 原生的kafka使用,细节点很多,其使用细节和上面这些案例可能有点不一样,需要特别注意。

到了这里,关于Kafka中的enable-auto-commit和auto-commit-interval配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka系列之:Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access 000 ( K a f k

    2024年02月16日
    浏览(31)
  • kafka消费者程序日志报错Offset commit failed问题研究

    生产环境偶尔会遇到kafka消费者程序日志报错的问题 截取主要日志如下: kafka客户端版本为2.2.0 结合日志去阅读代码,只能大概定位到,是客户端程序向server发送commit offset请求的时候,server返回的错误信息是:The request timed out 看到 request timed out,第一时间很可能会误以为是

    2024年02月07日
    浏览(43)
  • kafka消费报错, org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since

    问题: 在有大量消息需要消费时,消费端出现报错:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which t

    2024年03月23日
    浏览(34)
  • 【C++那些事儿】内联函数,auto,以及C++中的空指针nullptr

    君兮_的个人主页 即使走的再远,也勿忘启程时的初心 C/C++ 游戏开发 Hello,米娜桑们,这里是君兮_,我之前看过一套书叫做《明朝那些事儿》,把本来枯燥的历史讲的生动有趣。而C++作为一门接近底层的语言,无疑是抽象且难度颇深的。我希望能努力把抽象繁多的知识讲的生

    2024年02月08日
    浏览(36)
  • 在git使用时不小心commit了大文件,如何删除commit中的大文件记录

    问题背景:由于许多人在使用git命令的时候,习惯于使用 git add . 添加所有更改的命令,这个习惯会导致在进行git 进行push的时候,由于无意间提交commit缓存的大文件,阻止正常的push 从而很难解决,本文为了解决这问题,有以下小经验(文章广泛参考了其他帖子) 如果你也因

    2024年02月16日
    浏览(33)
  • 关于Photoshop中的【Stable-Diffusion WEBUI】插件:Auto.Photoshop.SD.plugin

    本篇主要提到Photoshop中的Stable-Diffusion WEBUI插件,相对WEBUI并不算好用,但省得来回切换。 更多不断丰富的内容参考:🔗 《继续Stable-Diffusion WEBUI方方面面研究(内容索引)》 参考:https://github.com/AbdullahAlfaraj/Auto-Photoshop-StableDiffusion-Plugin 不能从WEBUI中直接安装,请选择从url安装吧

    2024年02月07日
    浏览(39)
  • vuex中的this.$store.commit和dispatch()使用

    在vue的项目里常常会遇到 父子组件间 需要进行 数据传递 的情况,我们可以用熟悉的props 或者 $emit 等方式进行父子组件通信,但是,在项目稍微大一点的情况中,面对众多的不相关的平行组件,并且很多数据需要多个组件循环使用,这个时候在这些组件间传递数据,使用上

    2024年02月07日
    浏览(23)
  • kafka启动报错Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073

    安装kafka启动过程中报错 Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error=\\\'Cannot allocate memory\\\' (errno=12) java jvm 内存占用大于 物理机现在剩余的内存导致启动失败 我们可以通过kill 一部分程序解决或者修改 程序jvm的内存参数,这里我们进入

    2024年02月15日
    浏览(40)
  • kafka消费者报错Offset commit ......it is likely that the consumer was kicked out of the group的解决

    2022年10月份接到一个小功能,对接kafka将数据写到数据库,开始的需求就是无脑批量insert,随着时间的推移,业务需求有变更,kafka的生产消息频次越来越高,到今年7月份为止就每秒会有几十条甚至上百条,然后消费消息的代码就报错: Caused by: org.apache.kafka.clients.consumer.Com

    2024年02月07日
    浏览(42)
  • ant-vue1.78版a-auto-complete表单自动搜索返回列表中的关键字标红

    通常在做标红的场景,都是后端返回html结构,前端直接渲染实现,但是如果需要前端处理的话,实现也是很简单的,接下来我直接上应用场景吧 应用场景就是通过去调接口,返回的列表前端去标红,接下来我们看代码 接下来就是我们接口调用后的处理逻

    2024年02月10日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包