Kafka 整合 SpringBoot

这篇具有很好参考价值的文章主要介绍了Kafka 整合 SpringBoot。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、生产端

1. 添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件 application.properties

## Spring 整合 kafka的服务地址ip列表
spring.kafka.bootstrap-servers=192.168.31.101:9092
## kafka producer 发送消息失败时的一个重试的次数
spring.kafka.producer.retries=0
## 批量发送数据的配置 
spring.kafka.producer.batch-size=16384
## 设置kafka 生产者内存缓存区的大小(32M)
spring.kafka.producer.buffer-memory=33554432
## kafka消息的序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。

## 	这个是kafka生产端最重要的选项
spring.kafka.producer.acks=1

3. 代码实现发送消息

@Component
public class KafkaProducerService {

	
	@Autowired
	private KafkaTemplate<String, Object> kafkaTemplate;
	
	public void sendMessage(String topic, Object object) {
		
		ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
		
		future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
			@Override
			public void onSuccess(SendResult<String, Object> result) {
				log.info("发送消息成功: " + result.toString());
			}

			@Override
			public void onFailure(Throwable throwable) {
				log.error("发送消息失败: " + throwable.getMessage());
			}
		});
		
	}
}

二、消费端

1. 添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件 applicatio.properties

# kafka服务的ip地址列表
spring.kafka.bootstrap-servers=192.168.31.101:9092

## consumer 消息的签收机制:手工签收
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
spring.kafka.consumer.auto-offset-reset=earliest
## 序列化配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.listener.concurrency=5

 3. 代码实现消费消息

@Component
public class KafkaConsumerService {

	@KafkaListener(groupId = "group02", topics = "topic02")
	public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
		log.info("消费端接收消息: {}", record.value());
		//	收工签收机制
		acknowledgment.acknowledge();
	}
}

三、测试

首先在 kafka 节点上创建 topic:

打开kafka节点服务器的终端,输入以下命令:

/usr/local/kafka-3.2.1/bin/kafka-topics.sh --bootstrap-server 192.168.31.101:9092 --create --topic topic02 --partitions 2 --replication-factor 1

一些常用命令:

# 创建 topic
./kafka-topics.sh --bootstrap-server 192.168.31.101:9092 --create --topic topic02 --partitions 1 --replication-factor 1
 
# 查看 kafka 中topic列表
./kafka-topics.sh --bootstrap-server 192.168.31.101:9092 --list
 
# 查看消费者组group02订阅的topic的消费进度
./kafka-consumer-groups.sh --bootstrap-server 192.168.31.101:9092 --describe --group group02

编写测试代码:ApplicationTests.java文章来源地址https://www.toymoban.com/news/detail-726805.html

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

	@Autowired
	private KafkaProducerService kafkaProducerService;
	
	@Test
	public void send() throws InterruptedException {
		
		String topic = "topic02";
		for(int i=0; i < 1000; i ++) {
			kafkaProducerService.sendMessage(topic, "hello kafka" + i);
			Thread.sleep(5);
		}
		
		Thread.sleep(Integer.MAX_VALUE);
		
	}	
	
}

到了这里,关于Kafka 整合 SpringBoot的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合kafka消费模式AckMode以及手动消费

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月08日
    浏览(43)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(46)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(45)
  • java阻塞队列/kafka/spring整合kafka

    queue增加删除元素 增加元素 add方法在添加元素的时候,若超出了度列的长度会直接抛出异常: put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素 offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false 删除元素 pol

    2024年02月12日
    浏览(46)
  • 从零到Kafka:万字带你体验Spring Boot整合消息驱动的奇妙之旅

    主页传送门:📀 传送 Spring boot : | 基于Spring的开源框架,用于简化新Spring应用的初始搭建以及开发过程 特性: | 快速开发、轻量级、无代码生成和独立运行等特性 优势: | 简化配置,提供自动配置,减少开发时间 应用场景: | 适用于微服务架构、云原生应用等场景 环境

    2024年02月05日
    浏览(43)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(54)
  • Spring boot使用Kafka Java反序列化漏洞 CVE-2023-34040

    背景:公司项目扫描到 Spring-Kafka上使用通配符模式匹配进行的安全绕过漏洞 CVE-2023-20873 中等风险 | 2023年8月23日 | CVE-2023-34040 在Spring for Apache Kafka 3.0.9及更早版本以及2.9.10及更早版本中,存在可能的反序列化攻击向量,但只有在应用了不常见的配置时才会出现。攻击者必须在

    2024年02月07日
    浏览(51)
  • 什么是kafka,如何学习kafka,整合SpringBoot

    目录 一、什么是Kafka,如何学习 二、如何整合SpringBoot 三、Kafka的优势   Kafka是一种分布式的消息队列系统,它可以用于处理大量实时数据流 。学习Kafka需要掌握如何安装、配置和运行Kafka集群,以及如何使用Kafka API编写生产者和消费者代码来读写数据。此外,还需要了解Ka

    2024年02月10日
    浏览(40)
  • Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置

    公司Kafka一直没做安全验证,由于是诱捕程序故需要面向外网连接,需要增加Kafka连接验证,保证Kafka不被非法连接,故开始研究Kafka安全验证 使用Kafka版本为2.4.0版本,主要参考官方文档 官网对2.4版本安全验证介绍以及使用方式地址: https://kafka.apache.org/24/documentation.html#secu

    2024年02月01日
    浏览(66)
  • Spring整合kafka

    只用spring-kafka依赖就行  注入KafkaTemplate模板 消息发送  监听消息消费 测试发送 spring-kafka和kafka-clients结合使用(推荐)  消费者组件 生产者组件 生产消息和消费消息  注: 这里记录一下生产发生的问题 关于max.poll.interval.ms配置的问题,根据自己的业务配置poll拉去间隔等待

    2024年02月02日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包