【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

这篇具有很好参考价值的文章主要介绍了【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Kafka是什么?

Apache Kafka是分布式发布-订阅消息系统。
它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

二、Kafka的特点

Apache Kafka与传统消息系统相比,有以下不同:

  • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。
  • 它被设计为一个分布式系统,易于向外扩展;
  • 它同时为发布和订阅提供高吞吐量;
  • 它支持多订阅者,当失败时能自动平衡消费者;

三、Kafka几个主要的概念

概念 解释
Broker 节点,一个Broker代表是一个Kafka实例节点,多个Broker可以组成Kafka集群
Topic 主题,等同于消息系统中的队列(queue),一个Topic中存在多个Partition
Partition 分区,构成Kafka存储结构的最小单位
Partition offset offset为消息偏移量,以Partition为单位,即使在同一个Topic中,不同Partition的offset也是重新开始计算(也就是会重复)
Group 消费者组,一个Group里面包含多个消费者
Message 消息,是队列中消息的承载体,也就是通信的基本单位,Producer可以向Topic中发送Message

四、Kafka环境准备

windows下安装kafka可以参考这一篇博客:https://blog.csdn.net/w546097639/article/details/88578635

五、SpringBoot整合Kafka消息队列

(1) 消息生产者端

Step1、引入POM文件

在spring boot环境中使用,引入需要依赖的jar包,引入POM文件

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

Step2、新增生产者端的配置文件

spring:
    kafka:
        bootstrap-servers: ${KAFKA_SERVER:IP地址:端口号,IP地址:端口号,IP地址:端口号,IP地址:端口号}
        consumer:
            auto-commit-interval: ${KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL:5000}
            auto-offset-reset: ${KAFKA_CONSUMER_AUTO_OFFSET_RESET:earliest}
            enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}
            group-id: ${KAFKA_CONSUMER_GROUPID:default_kafka_group}
        producer:
            acks: ${KAFKA_PRODUCER_ACKS:all}
            batch-size: ${KAFKA_PRODUCER_BATCH:16384}
            buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMORY:33554432}
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            retries: ${KAFKA_PRODUCER_RETRIES:0}
            

Step3、定义Kafka生产者


@RestController
@RequestMapping(value = "/kafka/v1")
@Slf4j
public class KafkaProducerController {

    public static final String Upstream_C2S_Topic = "Upstream_C2S_Topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @ResponseBody
    @PostMapping(value = "/serviceListChanges", produces = "application/json")
    public JSONObject serviceListChanges(@RequestBody JSONObject jsonData) {
        log.info("URL = {},vin={}, 请求的jsonObject的值 = {}",
                "serviceListChanges", jsonData.getStr("vin"), jsonData);
        try {
            kafkaTemplate.send(Upstream_C2S_Topic, jsonData.toString());
            jsonData.set("success", true);
            return jsonData;
        } catch (Exception e) {
            log.error("KafkaProducerController serviceListChanges error = {}", e.getMessage());
        }
        return new JSONObject();
    }
}

(2) 消息消费者端

Step1、引入POM文件

       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

Step2、 新增消费者的配置文件

spring:
    kafka:
        bootstrap-servers: ${KAFKA_SERVER:IP地址:端口号,IP地址:端口号,IP地址:端口号,IP地址:端口号}
        consumer:
            auto-commit-interval: ${KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL:5000}
            auto-offset-reset: ${KAFKA_CONSUMER_AUTO_OFFSET_RESET:earliest}
            enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}
            group-id: ${KAFKA_CONSUMER_GROUPID:default_kafka_group}
            max-poll-records: ${KAFKA_CONSUMER_MAX_POLL_RECORDS:100}
            properties:
                session:
                    timeout:
                        ms: ${KAFKA_CONSUMER_PROPERTIES_SESSION_TIMEOUT_MS:10000}
        listener:
            concurrency: ${KAFKA_LISTENER_CONCURRENCY:4}
        producer:
            acks: ${KAFKA_PRODUCER_ACKS:all}
            retries: ${KAFKA_PRODUCER_RETRIES:0}
            batch-size: ${KAFKA_PRODUCER_BATCH:16384}
            buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMORY:33554432}
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            properties:
                autoCommitInterval: ${KAFKA_PRODUCER_PROPERTIES_AUTO_COMMIT_INTERVAL:100}
                autoOffsetReset: ${KAFKA_PRODUCER_PROPERTIES_AUTO_OFFSET_RESET:latest}
                concurrency: ${KAFKA_PRODUCER_PROPERTIES_CONCURRENCY:10}
                enableAutoCommit: ${KAFKA_PRODUCER_PROPERTIES_ENABLE_AUTO_COMMIT:true}
                groupId: ${KAFKA_PRODUCER_PROPERTIES_GROUPID:default_kafka_group}
                linger: ${KAFKA_PRODUCER_PROPERTIES_LINGER:0}
                maxPollRecords: ${KAFKA_PRODUCER_PROPERTIES_MAX_POLL_RECORDS:100}
                sessionTimeout: ${KAFKA_PRODUCER_PROPERTIES_SESSION_TIMEOUT:60000}

见底部:配置文件的参数详解

Step3、 定义抽象类 Consumer

@Slf4j
public abstract class Consumer {

    public void listenTopic(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String value = record.value();
        log.info("kafka的key:{},value:{} ", topic, value);
        if (JSONUtil.isJson(value)) {
            consumerTopic(topic, value);
        }
    }

    public void add(String value) {
        this.consumerTopic(null, value);
    }

    //执行消费逻辑
    public abstract void consumerTopic(String topic, String value);
}

Step4、 定义实现类 XXXConsumer

@Component
@Slf4j
public class ServiceListChangesConsumer extends Consumer {

    public static final String Upstream_C2S_Topic = "Upstream_C2S_Topic";

    @Override
    @KafkaListener(topicPattern = Upstream_C2S_Topic)
    public void listenTopic(ConsumerRecord<String, String> record) {
        super.listenTopic(record);
    }

    // 执行消费逻辑
    @Override
    public void consumerTopic(String topic, String value) {
        JSONObject jsonObject = JSONUtil.parseObj(value);
        String vin = jsonObject.getStr("vin");
        log.info("VIN={},ServiceListChangesConsumer消费成功,消息id={} ", vin, jsonObject.getStr("eventID"));
    }

}

(3)业务测试验证

将两个服务正常运行,看到控制台中输出一些Kafka配置的相关参数

【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)配置如下的ApiFox用于接口调试

【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

点击发送之后,能正常收到后台反馈的业务响应

【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

查看控制台输出(生产者的数据)

2023-01-04 22:42:51.805 INFO 10132 — [nio-9091-exec-6] c.d.t.l.g.c.KafkaProducerController : URL = serviceListChanges,vin=LFV2B20L3M4999999, 请求的jsonObject的值 = {“vin”:“LFV2B20L3M4999999”,“time”:“LFV2B20L3M4999999”}

【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)

查看控制台输出(消费者的数据)

2023-01-04 22:42:51.818 INFO 13800 — [ntainer#0-2-C-1] c.d.tsp.logic.analyze.kafka.Consumer : kafka的key:Upstream_C2S_Topic,value:{“vin”:“LFV2B20L3M4999999”,“time”:“LFV2B20L3M4999999”}

2023-01-04 22:42:51.818 INFO 13800 — [ntainer#0-2-C-1] c.d.t.l.a.c.ServiceListChangesConsumer : VIN=LFV2B20L3M4999999,ServiceListChangesConsumer消费成功,消息id=null

【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)文章来源地址https://www.toymoban.com/news/detail-407295.html

六、KafkaUtils, Kafka的工具类

@Component
public class KafkaUtils {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    //发送消息到kafka
    public boolean sendMsg(String topic, String json) {
        if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
            return false;
        }
        kafkaTemplate.send(topic, json);
        return true;
    }

    //发送消息到kafka
    public boolean sendMsg(String topic, String key, String json) {
        if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
            return false;
        }
        kafkaTemplate.send(topic, key, json);
        return true;
    }

    //批量存储
    public boolean sendBatchMsg(String topic, List<KafkaMsgBean> msgs) {
        if (CollUtil.isEmpty(msgs)) {
            return false;
        }
        msgs.forEach(msg -> {
            sendMsg(topic, msg.getDeviceId(), msg.getMsgContent());
        });
        return true;
    }
}

七、配置文件的参数详解

(1)入门了解

序号 内容 解释
1 bootstrap-servers 指定kafka 代理地址,可以多个
2 producer 定义生产者
3 batch-size 每次批量发送消息的数量
4 key-serializer 指定消息key的编解码方式
5 value-serializer 指定消息体的编解码方式

(2)详细了解

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: 10.200.8.29:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      bootstrap-servers: 10.200.8.29:9092
      # 可重试错误的重试次数,例如“连接错误”、“无主且未选举出新Leader”
      retries: 1 #生产者发送消息失败重试次数
      # 多条消息放同一批次,达到多达就让Sender线程发送
      batch-size: 16384 # 同一批次内存大小(默认16K)
      # 发送消息的速度超过发送到服务器的速度,会导致空间不足。send方法要么被阻塞,要么抛异常
      # 取决于如何设置max.block.ms,表示抛出异常前可以阻塞一段时间
      buffer-memory: 314572800 #生产者内存缓存区大小(300M = 300*1024*1024)
      #acks=0:无论成功还是失败,只发送一次。无需确认
      #acks=1:即只需要确认leader收到消息
      #acks=all或-1:ISR + Leader都确定收到
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的编解码方法
      #开启事务,但是要求ack为all,否则无法保证幂等性
      #transaction-id-prefix: "COLA_TX"
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
      properties:
        #自定义拦截器,注意,这里结尾时classes(先于分区器,快递先贴了标签再指定地址)
        interceptor.classes: cn.com.controller.TimeInterceptor
        #自定义分区器
        #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
        #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
        linger.ms: 1000
        #最大请求大小,200M = 200*1024*1024,与服务器broker的message.max.bytes最好匹配一致
        max.request.size: 209715200
        #Producer.send()方法的最大阻塞时间(115秒)
        # 发送消息的速度超过发送到服务器的速度,会导致空间不足。send方法要么被阻塞,要么抛异常
        # 取决于如何设置max.block.ms,表示抛出异常前可以阻塞一段时间
        max.block.ms: 115000
        #该配置控制客户端等待服务器的响应的最长时间。
        #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 
        #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
        request.timeout.ms: 115000
        #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
        #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
        delivery.timeout.ms: 120000
        # 生产者在服务器响应之前能发多少个消息,若对消息顺序有严格限制,需要配置为1
        # max.in.flight.requests.per.connection: 1
 spring:
   kafka:
    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      bootstrap-servers: 10.200.8.29:9092
      group-id: auto-dev #消费者组
      #消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
      # earliest:无提交记录,从头开始消费
      #latest:无提交记录,从最新的消息的下一条开始消费
      auto-offset-reset: earliest 
      enable-auto-commit: false #是否自动提交偏移量offset
      auto-commit-interval: 1S #前提是 enable-auto-commit=true。自动提交的频率
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。 
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true
        #poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
        heartbeat.interval.ms: 40000 
        #每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
        #0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
        #仍然会返回该消息,以确保消费者可以进行
        #max.partition.fetch.bytes=1048576  #1M
 
    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
 
    template:
      default-topic: "COLA"

到了这里,关于【项目实战】SpringBoot整合Kafka消息队列(基于KafkaTemplate和@KafkaListener实现)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于springboot+Redis的前后端分离项目之消息队列(六)-【黑马点评】

    🎁🎁资源文件分享 链接:https://pan.baidu.com/s/1189u6u4icQYHg_9_7ovWmA?pwd=eh11 提取码:eh11 我们来回顾一下下单流程 当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤 1、查询优惠卷 2、判断秒杀库存是否足够 3、查询订单

    2024年02月12日
    浏览(41)
  • 第二章 Spring Boot 整合 Kafka消息队列 生产者

    第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证 第二章  Spring Boot 整合 Kafka消息队列 生产者 第三章  Spring Boot 整合 Kafka消息队列 消息者         Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的

    2024年01月25日
    浏览(45)
  • 消息队列——spring和springboot整合rabbitmq

    目录 spring整合rabbitmq——生产者 rabbitmq配置文件信息 倒入生产者工程的相关代码 简单工作模式 spring整合rabbitmq——消费者 spring整合rabbitmq——配置详解 SpringBoot整合RabbitMQ——生产者  SpringBoot整合RabbitMQ——消费者   使用原生amqp来写应该已经没有这样的公司了 创建两个工程

    2024年02月16日
    浏览(54)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(54)
  • Flink订阅Kafka消息队列实战案例

    Kafka是一款开源的分布式消息系统,最初由LinkedIn公司开发并开源。它被设计用于处理海量的实时数据流,可以支持高吞吐量和低延迟的数据传输。 Kafka的设计主要目标是提供一个持久化的、高吞吐量的、可扩展的、分布式发布/订阅消息系统,以解决实时数据处理的需求。它

    2024年02月15日
    浏览(37)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(48)
  • Springboot整合kafka实现高效的消息传递和处理

    Kafka是一个分布式的流处理平台,它可以处理高吞吐量的消息。Spring Boot是一个流行的Java开发框架,提供了快速构建应用程序的能力。将这两者结合起来可以实现高效的消息传递和处理,同时支持多种消息模式。 本篇博客将介绍如何使用Spring Boot整合Kafka,并支持多种消息模式

    2024年02月09日
    浏览(36)
  • SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

    SpringBoot整合SpringCloudStream3.1+版本Kafka 添加死信队列配置文件,添加对应channel 通道绑定配置对应的channel位置添加重试配置 Kafka基本配置(application-mq.yml) 创建死信队列配置文件(application-dql.yml) 注意:这里的valueSerde使用了对象类型,需要搭配 application/json 使用,consumer接收

    2024年02月16日
    浏览(37)
  • SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的发送方:生产者 消息的接收方:消费者 同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送 异步消息:不需要接收方回应就可以进行下一步的发送 什么是消息队列? 当此时有很多个用户同时访问服务器,需要服务器进行操作,但此

    2024年04月27日
    浏览(52)
  • 消息队列kafka基础,基于go代码举例

    broker就是单个kafka实例。Kafka集群中,一个kafka服务器就是一个broker。 一类消息的集合。在kafka中,消息以主题为单位进行归类,producer负责将消息发送到指定的主题,而consumer负责订阅主题并进行消费。在生产者向kafka发送数据以及消费者订阅数据都要指定具体的topic来消费。

    2024年02月02日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包