第三章 Spring Boot 整合 Kafka消息队列 消息者

这篇具有很好参考价值的文章主要介绍了第三章 Spring Boot 整合 Kafka消息队列 消息者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 系列文章目录

第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证

第二章  Spring Boot 整合 Kafka消息队列 生产者

第三章  Spring Boot 整合 Kafka消息队列 消息者


前言

        Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的Kafka-client,用于在Spring Boot 项目里快速集成kafka。


一、Kafka 是什么?

Apache Kafka是分布式发布-订阅消息系统。
它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

二、消息者

 1.引入库

引入需要依赖的jar包,引入POM文件

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>

2.配置文件

spring:
  custom:
    kafka:
      username: admin
      password: admin-secret
      partitions: 1
      enable-auto-commit: false
      topics: CHANNEL-BodyBusiness-dataDev,CHANNEL-BodyBusiness-pushDev
      groupId: consumer-group-lms-dev
      batch-listener: false
      bootstrap-servers:
        - 192.168.1.95:9092

3.端启动类

启动类名 EnableAutoKafkaClient
package com.cdkjframework.kafka.consumer.annotation;

import com.cdkjframework.kafka.consumer.config.KafkaClientMarkerConfiguration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.annotation
 * @ClassName: EnableAutoKafkaClient
 * @Description: Kafka客户端自动启动类
 * @Author: xiaLin
 * @Date: 2023/7/18 9:20
 * @Version: 1.0
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({KafkaClientMarkerConfiguration.class})
public @interface EnableAutoKafkaClient {
}

4.spring.factories配置文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.cdkjframework.kafka.consumer.config.KafkaClientAutoConfiguration

5.配置类

5.1 Kafka客户端配置

package com.cdkjframework.kafka.consumer.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.config
 * @ClassName: KafkaClientConfig
 * @Description: Kafka客户端配置
 * @Author: xiaLin
 * @Version: 1.0
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.custom.kafka")
public class KafkaClientConfig {

  /**
   * 服务列表
   */
  private List<String> bootstrapServers;

  /**
   * 主题
   */
  private List<String> topics;

  /**
   * 账号
   */
  private String username;

  /**
   * 密码
   */
  private String password;

  /**
   * 延迟为1毫秒
   */
  private Integer linger = 1;

  /**
   * 批量大小
   */
  private Integer batchSize = 16384;

  /**
   * 重试次数,0为不启用重试机制
   */
  private Integer retries = 0;

  /**
   * 人锁
   */
  private Integer maxBlock = 6000;

  /**
   * acks
   */
  private String acks = "1";

  /**
   * security.providers
   */
  private String securityProviders;

  /**
   * 启用自动提交
   */
  private boolean enableAutoCommit = true;

  /**
   * 会话超时
   */
  private String sessionTimeout = "5000";

  /**
   * 会话超时
   */
  private Integer maxPollInterval = 10000;

  /**
   * 组ID
   */
  private String groupId = "defaultGroup";

  /**
   * 最大投票记录
   */
  private Integer maxPollRecords = 1;

  /**
   * 并发性
   */
  private Integer concurrency = 3;

  /**
   * 拉取超时时间
   */
  private Integer pollTimeout = 60000;

  /**
   * 批量监听
   */
  private boolean batchListener = false;

  /**
   * 副本数量
   */
  private String sort = "1";

  /**
   * 分区数
   */
  private Integer partitions = 3;

  /**
   * 消费者默认支持解压
   */
  private String compressionType = "none";

  /**
   * offset偏移量规则设置
   */
  private String autoOffsetReset = "earliest";

  /**
   * 自动提交的频率
   */
  private Integer autoCommitInterval = 100;

  /**
   * 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
   */
  private Integer bufferMemory = 33554432;

  /**
   * 消息的最大大小限制
   */
  private Integer maxRequestSize = 1048576;
}

5.2 Kafka客户端自动配置

package com.cdkjframework.kafka.consumer.config;

import com.cdkjframework.kafka.consumer.ConsumerConfiguration;
import com.cdkjframework.kafka.consumer.service.ConsumerService;
import com.cdkjframework.kafka.consumer.listener.ConsumerListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.function.client.WebClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.config
 * @ClassName: KafkaClientAutoConfiguration
 * @Description: Kafka客户端自动配置
 * @Author: xiaLin
 * @Date: 2023/7/18 9:21
 * @Version: 1.0
 */
@Lazy(false)
@RequiredArgsConstructor
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaClientConfig.class)
@AutoConfigureAfter({WebClientAutoConfiguration.class})
@ImportAutoConfiguration(ConsumerConfiguration.class)
@ConditionalOnBean(KafkaClientMarkerConfiguration.Marker.class)
public class KafkaClientAutoConfiguration {

  /**
   * 消费者服务接口
   */
  private final ConsumerService consumerService;

  /**
   * kafka topic 启动触发器
   *
   * @return 返回结果
   */
  @Bean
  @ConditionalOnMissingBean
  public ConsumerListener kafkaConsumer() {
    return new ConsumerListener(consumerService);
  }
}

5.3 Kafka客户端标记配置

package com.cdkjframework.kafka.consumer.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer.config
 * @ClassName: KafkaClientMarkerConfiguration
 * @Description: Kafka客户端标记配置
 * @Author: xiaLin
 * @Date: 2023/12/6 13:11
 * @Version: 1.0
 */
@EnableKafka
@Configuration(proxyBeanMethods = false)
public class KafkaClientMarkerConfiguration {

  @Bean
  public Marker kafkaMarker() {
    return new Marker();
  }

  public static class Marker {

  }
}

6.消费者配置

package com.cdkjframework.kafka.consumer;

import com.cdkjframework.kafka.consumer.config.KafkaClientConfig;
import com.cdkjframework.util.log.LogUtils;
import com.cdkjframework.util.tool.JsonUtils;
import com.cdkjframework.util.tool.StringUtils;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer
 * @ClassName: ProducerConfiguration
 * @Description: 设置@Configuration、@EnableKafka两个注解,声明Config并且打开KafkaTemplate能力。
 * @Author: xiaLin
 * @Version: 1.0
 */
@Configuration
@RequiredArgsConstructor
public class ConsumerConfiguration {

  /**
   * 日志
   */
  private final LogUtils logUtils = LogUtils.getLogger(ConsumerConfiguration.class);

  /**
   * JAAS配置
   */
  private String JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=%s password=%s;";

  /**
   * 配置
   */
  private final KafkaClientConfig kafkaClientConfig;

  /**
   * 监听容器工厂
   *
   * @return 返回结果
   */
  @Bean(name = "kafkaListenerContainerFactory")
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String>
            factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 设置消费者工厂
    factory.setConsumerFactory(consumerFactory());
    // 消费者组中线程数量
    factory.setConcurrency(kafkaClientConfig.getConcurrency());
    // 拉取超时时间
    factory.getContainerProperties().setPollTimeout(kafkaClientConfig.getPollTimeout());
    // 当使用批量监听器时需要设置为true
    factory.setBatchListener(kafkaClientConfig.isBatchListener());
    // 将单条消息异常处理器添加到参数中
    factory.setErrorHandler(new ConsumerAwareErrorHandler() {
      @Override
      public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        logUtils.error("// 将单条消息异常:" + thrownException.getMessage());
        logUtils.error("// 将单条消息:" + data.toString() + "," + consumer.toString());
        Iterator<TopicPartition> iterator = consumer.assignment().iterator();
        if (iterator.hasNext()) {
          // 提交重新消费
          consumer.seek(iterator.next(), data.offset());
        }
      }
    });
    if (kafkaClientConfig.isBatchListener()) {
      // 将批量消息异常处理器添加到参数中
      factory.setBatchErrorHandler(new BatchErrorHandler() {
        @Override
        public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
          logUtils.error("// 将批量消息异常:" + thrownException.getMessage());
          logUtils.error(thrownException);
          logUtils.error(JsonUtils.objectToJsonString(data));
        }
      });
    }
//        factory.setContainerCustomizer();

    return factory;
  }

  /**
   * 消费者工厂
   *
   * @return 返回消费工厂
   */
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
  }

  /**
   * 消费者配置
   *
   * @return 返回结果
   */
  @Bean
  public Map<String, Object> consumerConfig() {
    Map<String, Object> propsMap = new HashMap<>();
    // Kafka地址
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClientConfig.getBootstrapServers());
    //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaClientConfig.getGroupId());
    // 是否自动提交offset偏移量(默认true)
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaClientConfig.isEnableAutoCommit());
    // 心跳机制
    propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaClientConfig.getMaxPollInterval());
    // 每次读取最大记录
    propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaClientConfig.getMaxPollRecords());
    if (kafkaClientConfig.isEnableAutoCommit()) {
      // 自动提交的频率(ms)
      propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaClientConfig.getAutoCommitInterval());
    }
    // 键的反序列化方式
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 值的反序列化方式
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // offset偏移量规则设置:
    // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaClientConfig.getAutoOffsetReset());

    // 安全认证 账号密码
    if (StringUtils.isNotNullAndEmpty(kafkaClientConfig.getUsername()) &&
            StringUtils.isNotNullAndEmpty(kafkaClientConfig.getPassword())) {
      propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
      String SASL_MECHANISM = "PLAIN";
      propsMap.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);
      propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG, kafkaClientConfig.getUsername(), kafkaClientConfig.getPassword()));
    }

    return propsMap;
  }
}

7.消费者服务

package com.cdkjframework.kafka.consumer.service;

import com.cdkjframework.exceptions.GlobalException;

/**
 * @ProjectName: cdkj-framework
 * @Package: com.cdkjframework.kafka.consumer
 * @ClassName: com.cdkjframework.kafka.consumer.service.ConsumerService
 * @Description: 消费者服务
 * @Author: xiaLin
 * @Version: 1.0
 */
public interface ConsumerService {

    /**
     * 消息内容
     *
     * @param topics  主题
     * @param message 内容
     * @throws GlobalException 异常信息
     */
    void onMessage(String topics, String message) throws GlobalException;
}

8.消费者监听器

package com.cdkjframework.kafka.consumer.listener;

import com.cdkjframework.kafka.consumer.service.ConsumerService;
import com.cdkjframework.util.log.LogUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @ProjectName: cdkj-kafka-client
 * @Package: com.cdkjframework.kafka.consumer
 * @ClassName: com.cdkjframework.kafka.consumer.listener.ConsumerListener
 * @Description: 消费者监听器
 * @Author: xiaLin
 * @Version: 1.0
 */
public class ConsumerListener {

    /**
     * 日志
     */
    private final LogUtils logUtils = LogUtils.getLogger(ConsumerListener.class);

    /**
     * 消费者服务接口
     */
    private final ConsumerService consumerService;

    /**
     * 构造函数
     *
     * @param consumerService 消费者服务接口
     */
    public ConsumerListener(ConsumerService consumerService) {
        this.consumerService = consumerService;
    }

    /**
     * 单条监听MQ消息
     *
     * @param data     消息内容
     * @param consumer 消息者
     */
    @KafkaListener(topics = "#{'${spring.custom.kafka.topics}'.split(',')}", groupId = "${spring.custom.kafka.groupId}")
    public void listener(ConsumerRecord<String, String> data, Consumer consumer) {
        try {
            consumerService.onMessage(data.topic(), data.value());
            consumer.commitAsync();
        } catch (Exception e) {
            logUtils.error(e);
            // 抛出异常,以重试消费
            throw new RuntimeException(e.getMessage());
        }
    }
}

总结

 例如:以上就是今天要讲的内容,本文仅仅简单介绍了 Spring Boot 集成消息消费者的封装。

相对应的开源项目欢迎访问:维基框架文章来源地址https://www.toymoban.com/news/detail-836955.html

到了这里,关于第三章 Spring Boot 整合 Kafka消息队列 消息者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务 第三章 Spring Cloud 简介

    第一章 Java线程池技术应用 第二章 CountDownLatch和Semaphone的应用 第三章 Spring Cloud 简介 Spring Cloud 并不是一个拿来即可用的框架,它是一种微服务规范,共有以下 2 代实现: 第一代实现:Spring Cloud Netflix 第二代实现:Spring Cloud Alibaba Spring Cloud 组件 描述 Spring Cloud Netflix Eureka Sp

    2024年02月08日
    浏览(40)
  • Spring Security in Action 第三章 SpringSecurity管理用户

    本专栏将从基础开始,循序渐进,以实战为线索,逐步深入SpringSecurity相关知识相关知识,打造完整的SpringSecurity学习步骤,提升工程化编码能力和思维能力,写出高质量代码。希望大家都能够从中有所收获,也请大家多多支持。 专栏地址:SpringSecurity专栏 本文涉及的代码都已

    2024年02月07日
    浏览(31)
  • 【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程

    1.1.1 消费者群组         Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。         如上图,主题 T 有 4 个分区,群组中只有一个消费者,则该消费者将收到主题 T1 全部 4 个分区的消息。      

    2024年02月22日
    浏览(36)
  • 从零到Kafka:万字带你体验Spring Boot整合消息驱动的奇妙之旅

    主页传送门:📀 传送 Spring boot : | 基于Spring的开源框架,用于简化新Spring应用的初始搭建以及开发过程 特性: | 快速开发、轻量级、无代码生成和独立运行等特性 优势: | 简化配置,提供自动配置,减少开发时间 应用场景: | 适用于微服务架构、云原生应用等场景 环境

    2024年02月05日
    浏览(27)
  • Linux第三章

    无论是Windows、MacOS、Linux均采用多用户的管理模式进行权限管理。在Linux系统中,拥有最大权限的账户名为:root(超级管理员) root用户拥有最大的系统操作权限,而普通用户在许多地方的权限是受限的(普通用户的权限,一般在其HOME目录内是不受限的,一旦出了HOME目录,大

    2023年04月26日
    浏览(35)
  • 第三章nginx详解

    特点: 1,稳定性高。(没有apache稳定) 2,系统资源消耗地较低。(处理http请求的并发能力非常高,单台物理服务器可以处理30000-50000个并发请求) 稳定:一般在企业中,为了保持服务器的稳定,并发量的设置在20000个左右。占用内存2M左右。 nginx主要功能: 1,静态文件服

    2024年02月12日
    浏览(30)
  • 【计组】第三章练习

    4、设有一个具有20位地址和32位字长的存储器,问: (1)该存储器能存储多少个字节的信息? 220 × 32 bits = 1M × 4B = 4MB (220是2的20次方,上标打不出来…) (2)如果存储器由512K * 8位SRAM芯片组成,需要多少片? (1024K * 32)/(512K * 8) = 8 片 (3)需要多少位地址做芯片选择? 存

    2024年02月04日
    浏览(62)
  • 第三章:函数

    1.定义 设 x , y 是两个变量,x的变化范围是实数集D,如果对于任何的x∈D,按照一定的法则都有唯一确定的y值与之对应。则称变量y是变量x的函数。记为 y = f(x) 。称D为函数的 定义域 ,x为自变量,y为因变量。全体函数值的集合称为函数y的 值域 。 2.函数的表示方法 1. 公式

    2024年02月01日
    浏览(57)
  • 第三章 Elasticsearch简介

    Elasticsearch (后称为 ES )是一个天生支持分布式的搜索、聚合分析和存储引擎。 搜索引擎 全文检索引擎 分布式文档系统 分布式数据库 OLAP系统 分布式搜索中间件 不要去死背概念,概念应该作为一种辅助的手段帮助我们去理解一项技术或知识,总之,等你真正会用了,你就

    2024年02月06日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包