Kafka 批量消费

这篇具有很好参考价值的文章主要介绍了Kafka 批量消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

业务背景
项目有个需求需要统计IM聊天相关数据,原设计思想是在聊天产生时通过消息队列进行数据记录,利用rocketMQ实现。上线后发现由于内部IM活跃用户量级较大,MQ生产者生产消息过多,消费者实时消费会造成服务器CPU和硬盘读写压力,在改不了硬件配置的情况下,笔者通过了解到kafka批量消费的实现可解决这个问题,记录下该方案。

环境

kafka、Springboot、JDK8

依赖

使用的是Springboot v2.1.5.RELEASE版本,pom依赖如下:

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

配置文件

生产者配置

核心配置是:

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=10000

单条消费和提交有时候会影响性能,spring-kafka提供了批量拉取数据和手动提交的策略

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 集群地址
spring.kafka.bootstrap-servers=192.168.2.135:9092
# 重试次数
spring.kafka.producer.retries=3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks=all
# 批量处理的最大大小 单位 byte
spring.kafka.producer.batch-size=4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.buffer-memory=33554432
# 客户端ID
spring.kafka.producer.client-id=im-kafka
# Key 序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
spring.kafka.producer.compression-type=gzip
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=1000
# KafkaProducer.send()partitionsFor() 方法的最长阻塞时间 单位 ms
spring.kafka.producer.properties.max.block.ms=6000

消费者配置

核心配置是:

kafka:
    listener:
      # 手动
      ack-mode: manual_immediate
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: batch
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 批量消费最大数量
      max-poll-records: 100

在配置文件中关闭自动提交,开启手动提交和批量消费就可以批量消费了,但是最后需要手动提交offset文章来源地址https://www.toymoban.com/news/detail-507441.html

  kafka:
    listener:
      # 手动
      ack-mode: manual_immediate
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: batch
      # 集群地址
    bootstrap-servers: 192.168.2.135:9092
      # 消费者配置
    consumer:
      # 默认消费者组
      group-id: imStatisticsConsumerGroup
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 自动提交的频率 单位 ms
      auto-commit-interval: 1000
      # 批量消费最大数量
      max-poll-records: 100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest
      properties:
        session:
          timeout:
            # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
            ms: 120000
        request:
          timeout:
            # 请求超时
            ms: 120000

生产者端代码

    public void sendToImStatistics(List<ImChatStatistics> statistics) {
        kafkaTemplate.send(KAFKA_IM_CHAT_STATISTICS, JsonUtils.toString(statistics));
    }

消费者端代码

    @KafkaListener(topics = {"imChatStatistics"}, groupId = "{imStatisticsConsumerGroup}")
    public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
        try {
            if (CollectionUtils.isEmpty(consumerRecords)) {
                return;
            }
            LogUtils.info("KafkaImStatisticsListener 处理推送消息[data大小: {}]", consumerRecords.size());
            List<ImChatStatistics> totalList = new ArrayList<>();
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                List<ImChatStatistics> list = JSON.parseArray(consumerRecord.value(), ImChatStatistics.class);
                list.stream().forEach(item -> {
                    item.setWeek(DateUtils.getWeek(item.getDate()));
                });
                totalList.addAll(list);
            }
            imChatStatisticsMapper.batchInsertOrUpdate(totalList);
            // 手动提交offset
            acknowledgment.acknowledge();
        } catch (Exception e) {
            LogUtils.error("ImChartConsumer 消息消费失败 :" + e.getMessage(), e);
        }
    }

到了这里,关于Kafka 批量消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 轻松通关Flink第24讲:Flink 消费 Kafka 数据业务开发

    在上一课时中我们提过在实时计算的场景下,绝大多数的数据源都是消息系统,而 Kafka 从众多的消息中间件中脱颖而出,主要是因为 高吞吐 、 低延迟 的特点;同时也讲了 Flink 作为生产者像 Kafka 写入数据的方式和代码实现。这一课时我们将从以下几个方面介绍 Flink 消费

    2024年02月08日
    浏览(30)
  • Spring-Kafka如何实现批量消费消息并且不丢失数据

    先给答案: 某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同

    2024年02月13日
    浏览(36)
  • 【项目实战】Java 开发 Kafka 消费者

    👉 博主介绍 : 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人 Java知识图谱点击链接: 体系化学习Java(Java面试专题) 💕💕 感兴趣的同学可以收藏关注下 , 不然下次找不到哟

    2024年02月16日
    浏览(33)
  • Java Maven 构建项目里面有个聚合的概念

    Java 项目里面有个聚合的概念,它没有.net里面解决方案(solution)的能力,可以统一的编译项目下的所有包,或设置统一的打包路径,使用maven编译后的产物也不会像.net那样编译到当前项目的bin文件夹下面,而是统一的生成到配置好的 settings.xml 文件的 localRepository 配置节下的路

    2024年02月14日
    浏览(25)
  • 金融业务对服务器的需求是什么

    金融平台的好坏主要从检索迅速快、可靠性高、实时性强、存储量大、保密性好、稳定性好等来进行分析判断。所以金融行业相对于其他行业来说在选用服务器上需要考虑的问题也会更多一些。为了保障用户的体验感,以及平台的稳定快速运行,金融行业平台应该如何选用服

    2023年04月11日
    浏览(34)
  • 云计算的服务模型:如何满足不同业务需求

    云计算是一种基于互联网的计算资源分配和共享方式,它可以让用户在需要时轻松获取计算资源,并根据需求支付相应的费用。云计算的服务模型是云计算的核心组成部分,它定义了不同类型的云计算服务,以及如何为用户提供这些服务。在这篇文章中,我们将深入探讨云计

    2024年04月09日
    浏览(39)
  • 安全策略与业务需求不匹配:安全规则与业务流程的优先级不一致

    随着网络攻击手段层出不穷、黑客技术的日益升级和网络安全法规的日益严格化,企业在保障信息安全的同时也面临着越来越大的压力和挑战。其中一个突出的问题是**安全策略与业务需求的不匹配问题**。这主要表现为安全规则的制定与企业日常的业务流程存在很大的差异和

    2024年01月21日
    浏览(35)
  • 体育电子商务:大数据揭示消费者需求

    体育电子商务(Sports E-commerce)是一种将体育与电子商务结合起来的业务模式,涉及到在线购买体育赛事门票、体育服装、体育用品等产品和服务的过程。随着互联网的普及和人们对体育的热情不断增加,体育电子商务已经成为一个具有巨大市场潜力的行业。然而,在这个竞争激

    2024年02月19日
    浏览(29)
  • 从Z世代消费需求洞察文旅新业态

    Z世代,泛指1995—2009年间出生的一批人。在伴随着全球生育指数下降以及老人口老龄化的趋势下,Z世代人群已成为新生代主力军,人数高达19亿人,占全球总人口的25%,而作为人口大国的中国则为Z世代贡献了大约2.65亿人,占2021年总人口的18.4%。 目前,最年轻的Z世代人群(

    2023年04月08日
    浏览(25)
  • 业务需求紧急,IT部门缺失,企业如何应对"影子IT"危机?

           在当今数字化时代,业务部门的需求通常非常紧急,但IT部门的排期却跟不上,导致业务部门焦头烂额。IT部门面临着诸多需求,无法在规定时间范围内满足每一个需求,因此未授权的应用程序安全监管也变得愈发困难。        影子IT的频发,使得公司无法完全消除

    2024年02月14日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包