第二章 Spring Boot 整合 Kafka消息队列 生产者

这篇具有很好参考价值的文章主要介绍了第二章 Spring Boot 整合 Kafka消息队列 生产者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证

第二章  Spring Boot 整合 Kafka消息队列 生产者

第三章  Spring Boot 整合 Kafka消息队列 消息者


前言

        Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的Kafka-client,用于在Spring Boot 项目里快速集成kafka。


一、Kafka 是什么?

Apache Kafka是分布式发布-订阅消息系统。
它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

二、生产者

1.引入库

引入需要依赖的jar包,引入POM文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

2.配置文件

spring:
  custom:
    kafka:
      username: admin
      password: admin-secret
      partitions: 1
      enable-auto-commit: false
      batch-listener: false
      bootstrap-servers:
        - 192.168.1.95:9092

3.端启动类

启动类名 EnableAutoKafka
package com.cdkjframework.kafka.producer.annotation;

import com.cdkjframework.kafka.producer.config.KafkaMarkerConfiguration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer.annotation
 * @ClassName: EnableAutoKafka
 * @Description: Kafka自动启动类
 * @Author: xiaLin
 * @Date: 2023/7/18 9:20
 * @Version: 1.0
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({KafkaMarkerConfiguration.class})
public @interface EnableAutoKafka {
}

4.spring.factories配置文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.cdkjframework.kafka.producer.config.KafkaAutoConfiguration

5.配置类

5.1 kafka 配置 KafkaConfig

package com.cdkjframework.kafka.producer.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer.config;
 * @ClassName: KafakConfig
 * @Description: Kafak 配置
 * @Author: xiaLin
 * @Version: 1.0
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.custom.kafka")
public class KafkaConfig {

  /**
   * 服务列表
   */
  private List<String> bootstrapServers;

  /**
   * 主题
   */
  private List<String> topics;

  /**
   * 账号
   */
  private String username;

  /**
   * 密码
   */
  private String password;

  /**
   * 延迟为1毫秒
   */
  private Integer linger = 1;

  /**
   * 批量大小
   */
  private Integer batchSize = 16384;

  /**
   * 重试次数,0为不启用重试机制
   */
  private Integer retries = 0;

  /**
   * 人锁
   */
  private Integer maxBlock = 6000;

  /**
   * acks
   */
  private String acks = "1";

  /**
   * security.providers
   */
  private String securityProviders;

  /**
   * 启用自动提交
   */
  private boolean enableAutoCommit = true;

  /**
   * 会话超时
   */
  private String sessionTimeout = "5000";

  /**
   * 会话超时
   */
  private Integer maxPollInterval = 10000;

  /**
   * 组ID
   */
  private String groupId = "defaultGroup";

  /**
   * 最大投票记录
   */
  private Integer maxPollRecords = 1;

  /**
   * 并发性
   */
  private Integer concurrency = 3;

  /**
   * 拉取超时时间
   */
  private Integer pollTimeout = 60000;

  /**
   * 批量监听
   */
  private boolean batchListener = false;

  /**
   * 副本数量
   */
  private String sort = "1";

  /**
   * 分区数
   */
  private Integer partitions = 3;

  /**
   * 消费者默认支持解压
   */
  private String compressionType = "none";

  /**
   * offset偏移量规则设置
   */
  private String autoOffsetReset = "earliest";

  /**
   * 自动提交的频率
   */
  private Integer autoCommitInterval = 100;

  /**
   * 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
   */
  private Integer bufferMemory = 33554432;

  /**
   * 消息的最大大小限制
   */
  private Integer maxRequestSize = 1048576;
}

5.2 kafka 自动配置 KafkaAutoConfiguration

package com.cdkjframework.kafka.producer.config;

import com.cdkjframework.kafka.producer.ProducerConfiguration;
import com.cdkjframework.kafka.producer.util.ProducerUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.function.client.WebClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer.config
 * @ClassName: KafkaAutoConfiguration
 * @Description: kafka 自动配置
 * @Author: xiaLin
 * @Date: 2023/7/18 9:21
 * @Version: 1.0
 */
@Lazy(false)
@RequiredArgsConstructor
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaConfig.class)
@AutoConfigureAfter({WebClientAutoConfiguration.class})
@ImportAutoConfiguration(ProducerConfiguration.class)
@ConditionalOnBean(KafkaMarkerConfiguration.Marker.class)
public class KafkaAutoConfiguration {

  /**
   * 读取配置文件
   */
  private final KafkaConfig kafkaConfig;

  /**
   * kafka topic 启动触发器
   *
   * @return 返回结果
   */
  @Bean(initMethod = "kafkaAdmin")
  @ConditionalOnMissingBean
  public TopicConfig kafkaTopic() {
    TopicConfig trigger = new TopicConfig(kafkaConfig);
    return trigger;
  }

  /**
   * kafka 配置 启动触发器
   *
   * @return 返回结果
   */
  @Bean(initMethod = "start")
  @ConditionalOnMissingBean
  public ProducerUtils Producer() {
    return new ProducerUtils();
  }
}

5.3 kafka 标记配置 KafkaMarkerConfiguration

package com.cdkjframework.kafka.producer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer.config
 * @ClassName: KafkaMarkerConfiguration
 * @Description: Kafka标记配置
 * @Author: xiaLin
 * @Date: 2023/12/6 9:45
 * @Version: 1.0
 */
@EnableKafka
@Configuration(proxyBeanMethods = false)
public class KafkaMarkerConfiguration {

  @Bean
  public Marker kafkaMarker() {
    return new Marker();
  }

  public static class Marker {

  }
}

5.4 topic配置 TopicConfig

package com.cdkjframework.kafka.producer.config;

import com.cdkjframework.constant.IntegerConsts;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer.config
 * @ClassName: TopicConfig
 * @Description: topic配置
 * @Author: xiaLin
 * @Version: 1.0
 */
public class TopicConfig {

  /**
   * 配置
   */
  private final KafkaConfig kafkaConfig;

  /**
   * 构造函数
   */
  public TopicConfig(KafkaConfig kafkaConfig) {
    this.kafkaConfig = kafkaConfig;
  }

  /**
   * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
   */
  public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>(IntegerConsts.ONE);
    // 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
    return new KafkaAdmin(configs);
  }
}

6.生产者配置

生产者配置类 ProducerConfiguration

package com.cdkjframework.kafka.producer;

import com.cdkjframework.kafka.producer.config.KafkaConfig;
import com.cdkjframework.kafka.producer.util.ProducerUtils;
import com.cdkjframework.util.tool.StringUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer
 * @ClassName: ProducerConfiguration
 * @Description: 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
 * @Author: xiaLin
 * @Version: 1.0
 */
@Configuration
@RequiredArgsConstructor
public class ProducerConfiguration {

  /**
   * 配置
   */
  private final KafkaConfig kafkaConfig;

  /**
   * JAAS配置
   */
  private String JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=%s password=%s;";

  /**
   * Producer Template 配置
   */
  @Bean(name = "kafkaTemplate")
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

  /**
   * Producer 工厂配置
   */
  @Bean(name = "producerFactory")
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  /**
   * Producer 参数配置
   */
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // 指定多个kafka集群多个地址
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());

    // 重试次数,0为不启用重试机制
    props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfig.getRetries());
    //同步到副本, 默认为1
    // acks=0 把消息发送到kafka就认为发送成功
    // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
    // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
    props.put(ProducerConfig.ACKS_CONFIG, kafkaConfig.getAcks());

    // 生产者空间不足时,send()被阻塞的时间,默认60s
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaConfig.getMaxBlock());
    // security.providers
    props.put(ProducerConfig.SECURITY_PROVIDERS_CONFIG, kafkaConfig.getSecurityProviders());
    // 控制批处理大小,单位为字节
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaConfig.getBatchSize());
    // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
    props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaConfig.getLinger());
    // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaConfig.getBufferMemory());
    // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfig.getMaxRequestSize());
    // 键的序列化方式
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 值的序列化方式
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
    // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfig.getCompressionType());
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaConfig.getPartitions());

    // 账号密码
    if (StringUtils.isNotNullAndEmpty(kafkaConfig.getUsername()) &&
            StringUtils.isNotNullAndEmpty(kafkaConfig.getPassword())) {
      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
      String SASL_MECHANISM = "PLAIN";
      props.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);
      props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG, kafkaConfig.getUsername(), kafkaConfig.getPassword()));
    }

    return props;
  }

}

7. 生产者工具

生产者端 ProducerUtils

package com.cdkjframework.kafka.producer.util;

import com.cdkjframework.constant.IntegerConsts;
import com.cdkjframework.util.log.LogUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.producer.util
 * @ClassName: ProducerUtils
 * @Description: 生产工具
 * @Author: xiaLin
 * @Version: 1.0
 */

public class ProducerUtils {

  /**
   * 日志
   */
  private static LogUtils logUtils = LogUtils.getLogger(ProducerUtils.class);

  /**
   * 模板
   */
  private static KafkaTemplate kafkaTemplate;

  /**
   * 数据模板
   */
  @Resource(name = "kafkaTemplate")
  private KafkaTemplate template;

  /**
   * 初始化工具
   */
  private void start() {
    kafkaTemplate = template;
  }

  /**
   * producer 同步方式发送数据
   *
   * @param topic   topic名称
   * @param message producer发送的数据
   * @throws InterruptedException 异常信息
   * @throws ExecutionException   异常信息
   * @throws TimeoutException     异常信息
   */
  public static void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
    kafkaTemplate.send(topic, message).get(IntegerConsts.TEN, TimeUnit.SECONDS);
  }

  /**
   * producer 异步方式发送数据
   *
   * @param topic   topic名称
   * @param message producer发送的数据
   */
  public static void sendMessageAsync(String topic, String message) {
    kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() {
      @Override
      public void onFailure(Throwable throwable) {
        logUtils.error("topic:" + topic + ",message:" + message);
        logUtils.error(throwable, throwable.getMessage());
      }

      @Override
      public void onSuccess(Object o) {
        logUtils.info("topic:" + topic + ",发送成功");
      }
    });
  }

  /**
   * producer 异步方式发送数据
   *
   * @param topic   topic名称
   * @param key     key值
   * @param message producer发送的数据
   */
  public static void sendMessageAsync(String topic, String key, String message) {
    kafkaTemplate.send(topic, key, message).addCallback(new ListenableFutureCallback() {
      @Override
      public void onFailure(Throwable throwable) {
        logUtils.error("topic:" + topic + ",message:" + message);
        logUtils.error(throwable, throwable.getMessage());
      }

      @Override
      public void onSuccess(Object o) {
        logUtils.info("topic:" + topic + ",发送成功");
      }
    });
  }
}

总结

例如:以上就是今天要讲的内容,本文仅仅简单介绍了 Spring Boot 集成消息生产者的封装,消息者待续。

相对应的开源项目欢迎访问:维基框架文章来源地址https://www.toymoban.com/news/detail-823847.html

到了这里,关于第二章 Spring Boot 整合 Kafka消息队列 生产者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【深入理解Kafka系列】 第二章 生产者

          生产者就是负责向Kafka发送消息的应用程序。Kafka一共两个大版本的客户端,第一个是开源之处使用Scala编写的客户端;第二个是0.9.x版本开始推出的java编写的客户端。 一个正常的生产逻辑需要以下几个步骤: (1)配置生产者客户端参数及创建相应的生产者实例。 (

    2023年04月26日
    浏览(25)
  • 深入Kafka核心设计与实践原理读书笔记第二章

    配置生产者客户端参数及创建相应的生产者实例。 构建待发送的消息。 发送消息 关闭实列 参数说明 bootstrap.servers :用来指定生产者客户端链接Kafka集群搜需要的broker地址清单,具体格式 host1:port1,host2:port2,可以设置一个或多个地址中间,号分割,参数默认 空串。 这里要注意

    2023年04月08日
    浏览(67)
  • 《Spring揭秘》-第二章- 学习记录

    IoC全称为Inversion of Control,中文翻译为控制反转,同时还有一个别名叫 依赖注入DI(Dependency Injection)。大多将IoC与DI看作同等概念,也有部分观点认为 依赖注入可以看作IoC的一种实现方式。 在没有Spring的时候,当我们需要依赖某个类或服务时,一般通过new创建一个对象(或者通

    2023年04月11日
    浏览(64)
  • (SpringBoot)第二章:Spring创建和使用

    注意 :在Java中对象也叫做Bean,所以后续文章中用Be

    2024年02月08日
    浏览(32)
  • 使用 Spring Boot 整合 Kafka:实现高效的消息传递

    Kafka 是一种流处理平台,用于在分布式系统中处理高吞吐量的数据流。它是一种基于发布订阅模式的消息系统,能够处理来自多个应用程序的数据流。Kafka 具有高度的可扩展性、可靠性和性能,使得它成为处理大数据的流行选择。 Spring Boot 是一种开源框架,用于简化 Java 应用

    2024年02月14日
    浏览(34)
  • [第二章—Spring MVC的高级技术] 2.3 处理异常

    各位小猿,程序员小猿开发笔记,希望大家共同进步。 引言 我是谁——异常处理。 来自那——所有功能正常运行,但出现错误 怎么办——如何处理异常和响应客户端 我是谁——Spring框架中的一个注解 用在哪——应用在控制器类或方法上 什么用——用于在控制器方法中指定

    2024年01月22日
    浏览(33)
  • 从零到Kafka:万字带你体验Spring Boot整合消息驱动的奇妙之旅

    主页传送门:📀 传送 Spring boot : | 基于Spring的开源框架,用于简化新Spring应用的初始搭建以及开发过程 特性: | 快速开发、轻量级、无代码生成和独立运行等特性 优势: | 简化配置,提供自动配置,减少开发时间 应用场景: | 适用于微服务架构、云原生应用等场景 环境

    2024年02月05日
    浏览(27)
  • 第二章(第二节):无穷小量和函数

    若 lim f(x) = 0 , 则称函数 f(x) 当 x → x 0 时是无穷小量,简称: 无穷小 。      x→ x 0 定理1. 有限多个 无穷小量的代数和仍是无穷小量 定理2. 有限多个 无穷小量的积也是无穷小量 定理3.常数与无穷小量的积也是无穷小量 定理4.有界变量与无穷小量的积是无穷小量 当 x→

    2024年02月08日
    浏览(30)
  • 第二章 集合

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 例如:第一章 Python 机器学习入门之pandas的使用 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 HashSet 底层就是基于 HashMap 实现的。两者主要区别: 线程是否安全: HashMap 是非

    2024年02月02日
    浏览(46)
  • 第二章 变量和引用

    目录 2.1. 深入认识变量 2.1.1. 什么是变量 2.1.2. 变量的名称 2.1.3. 变量的类型 2.1.4. 变量的定义 2.1.5. 自定义变量 2.1.6. 环境变量 2.1.7. 位置变量 2.2. 变量赋值和作用域 2.2.1. 显示赋值:变量名=变量值 2.2.2. read 从键盘读入变量值 2.2.3. 变量和引号 2.2.4. 变量的作用域 变量是在程序

    2024年02月20日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包