springboot 集成 kafka批量消费数据

这篇具有很好参考价值的文章主要介绍了springboot 集成 kafka批量消费数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

新建一个maven工程,添加kafka依赖

	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>

yaml配置文件文章来源地址https://www.toymoban.com/news/detail-539399.html

spring:
	kafka:
	    bootstrap-servers: ${local_host_ip}:9092
	    producer: # producer 生产者
	      retries: 0 # 重试次数
	      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
	      batch-size: 100000 # 批量处理的最大大小 单位 byte
	      buffer-memory: 33554432 # 生产端缓冲区大小
	      key-serializer: org.apache.kafka.common.serialization.StringSerializer
	      value-serializer: org.apache.kafka.common.serialization.StringSerializer
	    consumer: # consumer消费者
	      group-id: javagroup # 默认的消费组ID
	      enable-auto-commit: false # 是否自动提交offset
	      #auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
	      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
	      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
	      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
	      auto-offset-reset: latest
	      max-poll-records: 1000   #批量消费每次最多消费多少条消息
	      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	    listener:
	      enable-auto-commit: false
	      ack-mode: manual_immediate
	      max-poll-records: 1000
	      # 消费者监听的topic不存在时,项目会报错,设置为false
	      missing-topics-fatal: false
	      type: batch 
	/**
	 * 手动提交批量消费
	 *
	 * @param list (list 的大小和 batch-size(单位) 有关)
	 * @param consumer
	 */
	@KafkaListener(topics = "topic_test", groupId = "topic_test_group")
	public void recieveRealTimNta(List<Message<String>> list, Consumer consumer) {
		try {
			//业务模块
			consumer.commitAsync();
		} catch (Exception e) {
			log.error("安全事件-告警数据解析 -nta  消费失败");
		} finally {
			consumer.commitSync();
		}
	}
	

到了这里,关于springboot 集成 kafka批量消费数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring-Kafka如何实现批量消费消息并且不丢失数据

    先给答案: 某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同

    2024年02月13日
    浏览(35)
  • spring boot集成mybatis-plus——Mybatis Plus 批量 Insert_新增数据(图文讲解)

     更新时间 2023-01-10 16:02:58 大家好,我是小哈。 本小节中,我们将学习如何通过 Mybatis Plus 实现 MySQL 批量插入数据。 先抛出一个问题:假设老板给你下了个任务,向数据库中添加 100 万条数据,并且不能耗时太久! 通常来说,我们向 MySQL 中新增一条记录,SQL 语句类似如下:

    2024年02月04日
    浏览(35)
  • Spring Boot+Kafka实战生产级Kafka消费组

    作者:禅与计算机程序设计艺术 Kafka是一个开源分布式消息系统,最初由LinkedIn开发,之后成为Apache项目的一部分。Kafka主要用于大数据实时流处理,具有低延迟、高吞吐量等特点。本文将会从基本概念、术语说明、原理及应用场景三个方面对Kafka进行详细介绍。 Kafka作为一个

    2024年02月10日
    浏览(26)
  • springboot集成kafka消费手动启动停止

    在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 1.通过定时任务自动触发,通过@Scheduled,在某个时间点暂停kafka某个监听的消费,也可以在某个时间点

    2024年02月06日
    浏览(32)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月08日
    浏览(31)
  • 【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

    本文主要有以下内容: 简单消息的发送 顺序消息的发送 RocketMQTemplate的API介绍 环境搭建: RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示: 在 Spring boot 项目中引入 RocketMQ 依赖: 在application.yml增加相关配置: 在 Spring Boot 中使用RocketM

    2024年02月14日
    浏览(33)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(33)
  • spring boot学习第八篇:kafka监听消费

    为了实现监听器功能 pom.xml文件内容如下:  application.yml文件内容如下: logback.xml文件内容如下: BackendApplication.java文件内容如下: 然后添加了kafkaConsumerListenerExample.java文件 发到服务器上,启动hmblogs报错,截图如下: Caused by: java.lang.TypeNotPresentException: Type org.springframework.k

    2024年01月19日
    浏览(43)
  • SpringBoot3集成Kafka优雅实现信息消费发送

           首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。        这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据

    2024年02月02日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包