SpringBoot-Learning系列之Kafka整合

这篇具有很好参考价值的文章主要介绍了SpringBoot-Learning系列之Kafka整合。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot-Learning系列之Kafka整合

本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。

SpringBoot-Learning系列之Kafka整合

  • 消息系统

    • 主要应用场景
      • 流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦)
      • 异步处理、顺序处理
      • 实时数据传输管道
      • 异构语言架构系统之间的通信
        • 如 C语言的CS客户端的HIS系统与java语言开发的互联网在线诊疗系统的交互
  • Kafka是什么

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。

    核心概念:

    • 生产者(Producer) 生产者应用向主题队列中投送消息数据
    • 消费者 (Consumer) 消费者应用从订阅的Kafka的主题队列中获取数据、处理数据等后续操作
    • 主题 (Topic) 可以理解为生产者与消费者交互的桥梁
    • 分区 (Partition) 默认一个主题有一个分区,用户可以设置多个分区。每个分区可以有多个副本(Replica)。分区的作用是,将数据划分为多个小块,提高并发性和可扩展性。每个分区都有一个唯一的标识符,称为分区号。消息按照键(key)来进行分区,相同键的消息会被分配到同一个分区中。分区可以有不同的消费者同时消费。副本的作用是提供数据的冗余和故障恢复。每个分区可以有多个副本,其中一个被称为领导者(leader),其他副本被称为追随者(follower)。领导者负责处理读写请求,而追随者只负责复制领导者的数据。如果领导者宕机或不可用,某个追随者会被选举为新的领导者,保证数据的可用性。
  • windows 安装kafka

    本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka

    2.8.0后不需要依赖zk了

    • 拉取镜像

      docker pull wurstmeister/zookeeper
      
      docker pull wurstmeister/kafka
      
    • 创建网络

      docker network create kafka-net --driver bridge
      
    • 安装zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
      
    • 安装kafka

      docker run -d --name kafka --publish 9092:9092 \
      --link zookeeper \
      --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
      --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
      --env KAFKA_ADVERTISED_PORT=9092  \
      --volume /etc/localtime:/etc/localtime \
      wurstmeister/kafka:latest
      
    • 测试

      telnet localhost:9092
      
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依赖

      								```
      										<?xml version="1.0" encoding="UTF-8"?>
      										<project xmlns="http://maven.apache.org/POM/4.0.0"
      														 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      														 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      												<modelVersion>4.0.0</modelVersion>
      												<parent>
      														<groupId>org.springframework.boot</groupId>
      														<artifactId>spring-boot-starter-parent</artifactId>
      														<version>3.1.0</version>
      														<relativePath/> <!-- lookup parent from repository -->
      												</parent>
      												<groupId>io.github.vino42</groupId>
      												<artifactId>springboot-kafka</artifactId>
      												<version>1.0-SNAPSHOT</version>
      
      												<properties>
      														<java.version>17</java.version>
      														<maven.compiler.source>17</maven.compiler.source>
      														<maven.compiler.target>17</maven.compiler.target>
      														<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      												</properties>
      
      
      												<dependencies>
      														<dependency>
      																<groupId>org.projectlombok</groupId>
      																<artifactId>lombok</artifactId>
      																<optional>true</optional>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-test</artifactId>
      																<scope>test</scope>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-web</artifactId>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-log4j2</artifactId>
      														</dependency>
      														<!--kafka-->
      														<dependency>
      																<groupId>org.springframework.kafka</groupId>
      																<artifactId>spring-kafka</artifactId>
      																<exclusions>
      																		<!--排除掉 自行添加最新的官方clients依赖-->
      																		<exclusion>
      																				<groupId>org.apache.kafka</groupId>
      																				<artifactId>kafka-clients</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.apache.kafka</groupId>
      																<artifactId>kafka-clients</artifactId>
      																<version>3.5.1</version>
      														</dependency>
      														<dependency>
      																<groupId>com.google.code.gson</groupId>
      																<artifactId>gson</artifactId>
      																<version>2.10.1</version>
      														</dependency>
      														<dependency>
      																<groupId>cn.hutool</groupId>
      																<artifactId>hutool-all</artifactId>
      																<version>5.8.21</version>
      														</dependency>
      
      												</dependencies>
      												<build>
      														<plugins>
      																<plugin>
      																		<groupId>org.springframework.boot</groupId>
      																		<artifactId>spring-boot-maven-plugin</artifactId>
      																		<version>3.1.0</version>
      																</plugin>
      														</plugins>
      												</build>
      										</project>
      						```
      
    • 配置

      spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量发送消息的数量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息体的编解码方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
      #      MANUAL	poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
            #      MANUAL_IMMEDIATE	每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
            #      RECORD	当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
            #      BATCH	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
            #      TIME	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
            #      COUNT	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
            #      COUNT_TIME	TIME或COUNT满足其中一个时提交
            ack-mode: manual_immediate
          consumer:
            group-id: test
            # 是否自动提交
            enable-auto-commit: false
            max-poll-records: 100
            #      用于指定消费者在启动时、重置消费偏移量时的行为。
            #      earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。
            #      latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。
            #      none:如果找不到已保存的消费偏移量,消费者会抛出一个异常
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息体的编解码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      server:
        port: 8888spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量发送消息的数量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息体的编解码方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
            ack-mode: manual_immediate
          consumer:
            group-id: test
            enable-auto-commit: false
            max-poll-records: 100
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息体的编解码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      
    • 生产者代码示例

      package io.github.vino42.publiser;
      
      import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      
      /**
       * =====================================================================================
       *
       * @Created :   2023/8/30 21:29
       * @Compiler :  jdk 17
       * @Author :    VINO
       * @Copyright : VINO
       * @Decription : kafak 消息生产者
       * =====================================================================================
       */
      @Component
      public class KafkaPublishService {
          @Autowired
          KafkaTemplate kafkaTemplate;
      
          /**
           * 这里为了简单 直接发送json字符串
           *
           * @param json
           */
          public void send(String topic, String json) {
              kafkaTemplate.send(topic, json);
          }
      }
      
      
          @RequestMapping("/send")
          public String send() {
              IntStream.range(0, 10000).forEach(d -> {
                  kafkaPublishService.send("test", RandomUtil.randomString(16));
              });
              return "ok";
          }
      
      
    • 消费者

      @Component
      @Slf4j
      public class CustomKafkaListener {
      
          @org.springframework.kafka.annotation.KafkaListener(topics = "test")
          public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) {
              try {
                  String key = String.valueOf(record.key());
                  String body = record.value();
                  log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);
                  log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //手动ack
                  acknowledgment.acknowledge();
              }
          }
      }
      

SpringBoot Learning系列 是笔者总结整理的一个SpringBoot学习集合。可以说算是一个SpringBoot学习的大集合。欢迎Star关注。谢谢观看。

SpringBoot-Learning系列之Kafka整合
关注公众号不迷路文章来源地址https://www.toymoban.com/news/detail-706486.html

到了这里,关于SpringBoot-Learning系列之Kafka整合的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • springboot整合kafka-笔记

    这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 测试发送kafka消息-控制台日志

    2024年02月12日
    浏览(45)
  • springboot整合kafka入门

    producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。 consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,

    2024年02月07日
    浏览(46)
  • 什么是kafka,如何学习kafka,整合SpringBoot

    目录 一、什么是Kafka,如何学习 二、如何整合SpringBoot 三、Kafka的优势   Kafka是一种分布式的消息队列系统,它可以用于处理大量实时数据流 。学习Kafka需要掌握如何安装、配置和运行Kafka集群,以及如何使用Kafka API编写生产者和消费者代码来读写数据。此外,还需要了解Ka

    2024年02月10日
    浏览(39)
  • Kafka入门(安装和SpringBoot整合)

    app-tier:网络名称 –driver:网络类型为bridge Kafka依赖zookeeper所以先安装zookeeper -p:设置映射端口(默认2181) -d:后台启动 安装并运行Kafka, –name:容器名称 -p:设置映射端口(默认9092 ) -d:后台启动 ALLOW_PLAINTEXT_LISTENER任何人可以访问 KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper K

    2024年02月11日
    浏览(81)
  • 实战:彻底搞定 SpringBoot 整合 Kafka

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。 除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。

    2024年02月10日
    浏览(44)
  • Kafka 基础整理、 Springboot 简单整合

    定义: Kafka 是一个分布式的基于发布/订阅默认的消息队列 是一个开源的分布式事件流平台,被常用用于数据管道、流分析、数据集成、关键任务应用 消费模式: 点对点模式 (少用) 消费者主动拉取数据,消息收到后清除消息 发布/订阅模式 生产者推送消息到队列,都消费者

    2024年02月03日
    浏览(79)
  • springboot整合kafka多数据源

    在很多与第三方公司对接的时候,或者处在不同的网络环境下,比如在互联网和政务外网的分布部署服务的时候,我们需要对接多台kafka来达到我们的业务需求,那么当kafka存在多数据源的情况,就与单机的情况有所不同。 单机的情况 如果是单机的kafka我们直接通过springboot自

    2024年02月13日
    浏览(49)
  • springboot整合ELK+kafka采集日志

    在分布式的项目中,各功能模块产生的日志比较分散,同时为满足性能要求,同一个微服务会集群化部署,当某一次业务报错后,如果不能确定产生的节点,那么只能逐个节点去查看日志文件;logback中RollingFileAppender,ConsoleAppender这类同步化记录器也降低系统性能,综上一些

    2024年02月15日
    浏览(42)
  • SpringBoot整合Kafka简单配置实现生产消费

    *本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考 spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html 搭建Kafka环境,参考Kafka集群环境搭建及使用 Java环境:JDK1.8 Maven版本:apach

    2024年02月16日
    浏览(52)
  • 【SpringBoot整合系列】SpringBoot整合FastDFS(一)

    FastDFS(Fast Distributed File System)是一款开源的分布式文件系统,它提供了高性能、高可靠性、高扩展性和高容错性的分布式文件存储解决方案。 FastDFS采用了类似于Google File System(GFS)的架构,它的设计目标是解决大规模数据存储和高访问速度的问题。 分布式架构: FastDFS采用

    2024年04月27日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包