springboot集成kafka详细步骤(发送及监听消息示例)

这篇具有很好参考价值的文章主要介绍了springboot集成kafka详细步骤(发送及监听消息示例)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、本机的kafka环境配置,不再赘述

2、添加 pom 文件

 <!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.6</version>
        </dependency>

3、配置application.yml

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093
    producer:
      # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      acks: all
    consumer:
      # 默认的消费组ID
      group-id: java-group
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
       # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量一次最大拉取数据量
      max-poll-records: 65535
      #监测消费端心跳时间
      heartbeat-interval: 30000
      # 批量拉取间隔,要大于批量拉取数据的处理时间,时间间隔太小会有重复消费
      max.poll.interval.ms: 50000
      listener:
        #手工ack,调用ack后立刻提交offset
        ack-mode: MANUAL_IMMEDIATE
        #容器运行的线程数
        concurrency: 4

4、复写kafka的相关配置类:生产、消费相关配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Configuration
public class KafkaMQConfig {
    // 此处配置代替zk
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    // 消费组标识
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    // 偏移量的起始点
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    // 偏移量的提交方式
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    // 一次从kafka服务拉取的数据量
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
    // 监测消费端心跳时间
    @Value("${spring.kafka.consumer.heartbeat-interval}")
    private String heartbeatInterval;
    // 两次拉取数据的最大时间间隔
    @Value("${spring.kafka.consumer.max.poll.interval.ms}")
    private String maxPollIntervalMs;
    //生产者相关配置
    @Bean
    public KafkaProducer kafkaProducer() {
        Properties props = new Properties();
        // 设置接入点,请通过控制台获取对应 Topic 的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // Kafka 消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        // 构造 Producer 对象,注意,该对象是线程安全的
        // 一般来说,一个进程内一个 Producer 对象即可
        // 如果想提高性能,可构造多个对象,但最好不要超过 5 个
        return new KafkaProducer<String, String>(props);
    }
    // 消费端相关配置
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

5、生产、消费的伪代码

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaUtils {

    private final KafkaTemplate<String, String> kafkaTemplate;


    /**
     * 发送消息
     */
    public void kafkaSendMsg(String topicName, String msg) {
        kafkaTemplate.send(topicName, msg);
        log.info("kafka成功发送消息给:" + topicName + ",内容为:" + msg);
    }

    /**
     * 监听消息
     */
    @KafkaListener(topics = {"test"}, groupId = "java-group",containerFactory="kafkaListenerContainerFactory")
    public void kafkaListener(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("这是消费者在消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());

        ack.acknowledge();
    }

}

6、测试消息发送

@RestController
@RequestMapping("/v1")
public class TestController {

    @Autowired
    private KafkaUtils kafkaUtils;

    /**
     * 测试卡夫卡消息
     * @return 结果
     */
    @GetMapping("/kafkaTest")
    public JSONObject kafkaTest() {
        kafkaUtils.kafkaSendMsg("test", "2022年11月10日上午发送的消息!!!");
        return null;
    }
}

经过测试!文章来源地址https://www.toymoban.com/news/detail-511691.html

到了这里,关于springboot集成kafka详细步骤(发送及监听消息示例)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(53)
  • SpringBoot集成Mqtt发送消息

    MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即 消息队列传输探测 ,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。 三种身份: 客户端(Client):MQTT 客户端是

    2024年04月25日
    浏览(36)
  • Kafka:springboot集成kafka收发消息

    kafka环境搭建参考Kafka:安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 2、配置application.yml 传递String类型的消息 3、controller实现消息发送接口 4、component中实现接收类HelloListener  5、测试 浏览器访问该接口并查看控制台         接收成功   传递对象类型的消息

    2024年02月13日
    浏览(41)
  • SpringBoot集成RocketMQ实现三种消息发送方式

    目录 一、pom文件引入依赖 二、application.yml文件添加内容 三、创建producer生产者 四、创建Consumer消费者(创建两个消费者,所属一个Topic) 五、启动项目测试 RocketMQ 支持3 种消息发送方式: 同步 (sync)、异步(async)、单向(oneway)。 同步 :发送者向 MQ 执行发送消息API 时

    2024年02月13日
    浏览(46)
  • SpringBoot3集成Kafka优雅实现信息消费发送

           首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。        这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据

    2024年02月02日
    浏览(53)
  • Springboot Kafka整合(开发实例、连接、配置TOPICS、发送消息)—官方原版

    Spring for Apache Kafka项目将Spring的核心概念应用于基于Kafka的消息传递解决方案的开发。我们提供了一个“模板”作为发送消息的高级抽象。 本快速教程适用于以下版本: Apache Kafka 客户端 3.3.x Spring Framework 6.0.x 最低 Java 版本:17 以下是一个不使用Spring Boot的应用程序示例;它既

    2024年02月06日
    浏览(62)
  • Springboot实战14 消息驱动:如何使用 KafkaTemplate 集成 Kafka?

    从今天开始,我们将进入 Spring Boot 中另一个重要话题的讨论,即消息通信。 消息通信是 Web 应用程序中间层组件中的代表性技术体系,主要用于构建复杂而又灵活的业务流程。在互联网应用中,消息通信被认为是实现系统解耦和高并发的关键技术体系。本节课我们将在 Spri

    2024年02月04日
    浏览(37)
  • 简单的RabbitMQ集成Springboot实现订单异步发送功能示例以及RabbitMQ的相关问题

    引入RabbitMQ的依赖,在pom.xml文件中添加以下代码: 在application.properties文件中配置RabbitMQ的相关信息: 创建消息队列,并定义消息接收者: 定义消息发送者: 在需要发送订单信息的地方调用OrderSender的send方法即可: RabbitMQ是一个开源的消息中间件,主要用于实现应用之间的异

    2024年02月09日
    浏览(33)
  • 微信小程序的订阅消息是一个允许开发者向用户发送重要通知的功能。这里为您展示如何实现小程序订阅消息的基本步骤和代码示例

    步骤 1: 获取模板 ID 首先,您需要登录微信公众平台,进入「小程序管理」后台,找到“设置” “开发设置” “订阅消息”,然后选择并配置所需的模板,记录模板 ID。 步骤 2: 小程序前端请求订阅 在小程序的某个页面或组件中,当用户执行某个操作(例如点击按钮)时,可

    2024年02月04日
    浏览(89)
  • 使用SpringBoot集成Mybatis的详细步骤

    这里mybatis采用的是3.4.5版本,注意:mybatis3.4.0版本以上才支持@Mapper注解 相关依赖代码如下: pom文件完整代码如下: 导入pom文件后,若依赖报红,需要重新import下(点击下图里的刷新图标) 这边主要设计到的包有Controller(控制层),Entity(实体层),Service(接口层),ServiceImpl(接口

    2024年02月09日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包