Spring Boot 整合 Kafka

这篇具有很好参考价值的文章主要介绍了Spring Boot 整合 Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka 环境搭建

kafka 安装、配置、启动、测试说明:

1. 安装:直接官网下载安装包,解压到指定位置即可(kafka 依赖的 Zookeeper 在文件中已包含)
下载地址:https://kafka.apache.org/downloads
示例版本:kafka_2.13-2.8.0.tgz
下载后可本地解压安装,解压位置自选,如 D:\Java 下
解压命令:tar -zxvf kafka_2.13-2.8.0.tgz
PS:可在 idea 命令行窗口或 git 提供的命令窗口中进行命令操作
使用 git 提供的命令窗口:空白文件夹中右键——》Git Bash Here 即可打开

2. 添加地址配置
在 D:\Java\kafka_2.13-2.8.0\config\server.properties 中搜索添加以下两行配置:
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
说明:以上配置默认是注释掉的,可搜索找到,根据需求进行自定义地址配置

重要说明:以下命令操作默认都是在 D:\Java\kafka_2.13-2.8.0\ 即 kafaka 根目录下进行!

3. 使用配置文件方式后台启动/关闭 Zookeeper 服务
启动:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
关闭【自选】:bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties

4. 使用配置文件方式后台启动/关闭 kafka 服务
启动:bin/kafka-server-start.sh -daemon config/server.properties
关闭【自选】:bin/kafka-server-stop.sh -daemon config/server.properties 

5. 服务测试

5.1 创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka

5.2 查看主题(可能需要查一会儿)
bin/kafka-topics.sh --list --zookeeper localhost:2181

说明:发送消息和监听消息需要打开两个窗口进行测试!

5.3 发送消息(kafka 根目录下新建窗口)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
输入以上命令回车后,可继续输入内容测试消息发送

5.4 监听消息(kafka 根目录下新建窗口)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Hello-Kafka --from-beginning
输入以上命令后,可观察消息接收情况,并且可在消息发送窗口继续发送消息测试此监听窗口的接收情况,正常接收,则服务环境搭建成功。

Spring Boot 整合 Kafka

环境:自行创建 Spring Boot 项目,添加测试依赖,并启动 Zookeeper 和 kafka 服务。

注意:Zookeeper 默认好像占用 8080 端口,自己注意端口占用问题。文章来源地址https://www.toymoban.com/news/detail-410707.html

1. 添加依赖

<!-- spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 添加配置

# kafka 配置
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      # listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

3. 创建消息生产者

@Component
public class KafkaProducer {

    private Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public static final String TOPIC_TEST = "Hello-Kafka";

    public static final String TOPIC_GROUP = "test-consumer-group";

    public void send(Object obj) {
        String obj2String = JSON.toJSONString(obj);
        logger.info("准备发送消息为:{}", obj2String);

        // 发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                logger.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                logger.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

}

4. 创建消息消费者

@Component
public class KafkaConsumer {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
    public void topicTest(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) { // 包含非空值,则执行
            Object msg = message.get();
            logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge(); // 确认成功消费一个消息
        }
    }

}

5. 消息发送测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

    private Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);

    @Resource
    private KafkaProducer kafkaProducer; // 注意使用自己创建的,看清楚!

    /*
      测试之前需要开启 Kafka 服务
      启动 Zookeeper:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
      启动 Kafka:bin/kafka-server-start.sh -daemon config/server.properties

      测试结果数据:

      准备发送消息为:"你好,我是Lottery 001"
      Hello-Kafka - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=Hello-Kafka, partition=null,
      headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=你好,我是Lottery 001, timestamp=null),
      recordMetadata=Hello-Kafka-0@47]

      topic_test 消费了: Topic:Hello-Kafka,Message:你好,我是Lottery 001
     */
    @Test
    public void test_send() throws InterruptedException {
        // 循环发送消息
        while (true) {
            kafkaProducer.send("你好,我是Lottery 001");
            Thread.sleep(3500);
        }
    }

}

到了这里,关于Spring Boot 整合 Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot 整合kafka消费模式AckMode以及手动消费

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月08日
    浏览(42)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(43)
  • Spring Boot 整合kafka消费模式AckMode以及手动消费 依赖管理

    在pom.xml文件中导入依赖 需要自己配置AckMode时候的配置 kafka支持的消费模式,设置在 AbstractMessageListenerContainer.AckMode 的枚举中,下面就介绍下各个模式的区别 AckMode模式 AckMode模式 作用 MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment

    2024年02月16日
    浏览(42)
  • 从零到Kafka:万字带你体验Spring Boot整合消息驱动的奇妙之旅

    主页传送门:📀 传送 Spring boot : | 基于Spring的开源框架,用于简化新Spring应用的初始搭建以及开发过程 特性: | 快速开发、轻量级、无代码生成和独立运行等特性 优势: | 简化配置,提供自动配置,减少开发时间 应用场景: | 适用于微服务架构、云原生应用等场景 环境

    2024年02月05日
    浏览(39)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(51)
  • 01_kafka_环境搭建安装_topic管理

    略 hostnamectl set-hostname kafka_1 /etc/sysconfig/network /etc/hosts ping kafka_1 测试 由于 集群中的 Leader 的监控 和 Topic 元数据 存储在 Zookeeper 中 ,所以需要安装 zk; https://zookeeper.apache.org/releases.html 中选择需要的版本; conf 目录 复制zoo_sample.cfg 为 zoo.cfg, 修改其中的配置 dataDir 指定自己的

    2024年02月07日
    浏览(37)
  • java阻塞队列/kafka/spring整合kafka

    queue增加删除元素 增加元素 add方法在添加元素的时候,若超出了度列的长度会直接抛出异常: put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素 offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false 删除元素 pol

    2024年02月12日
    浏览(46)
  • Spring整合kafka

    只用spring-kafka依赖就行  注入KafkaTemplate模板 消息发送  监听消息消费 测试发送 spring-kafka和kafka-clients结合使用(推荐)  消费者组件 生产者组件 生产消息和消费消息  注: 这里记录一下生产发生的问题 关于max.poll.interval.ms配置的问题,根据自己的业务配置poll拉去间隔等待

    2024年02月02日
    浏览(42)
  • 在Windows上搭建Kafka环境的步骤,包括安装Java、下载Kafka、配置Zookeeper和Kafka、启动Zookeeper和Kafka、创建主题和生产者/消费者等

    1. 安装Java Kafka需要Java环境支持。可以从Oracle官网下载JDK,或者使用OpenJDK。 2. 下载Kafka 可以从Kafka官网下载Kafka二进制压缩包。解压后可以看到bin、config、libs等目录。 3. 配置Zookeeper Kafka依赖Zookeeper实现分布式协作。可以使用Kafka自带的Zookeeper,也可以独立安装Zookeeper。 如果使

    2024年02月11日
    浏览(42)
  • 在Spring Boot微服务集成Kafka客户端(spring-kafka)操作Kafka

    记录 :457 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka。使用Spring封装的KafkaTemplate操作Kafka生产者Producer。使用Spring封装的@KafkaListener操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen1

    2024年02月09日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包