Spring Kafka生产者实现

这篇具有很好参考价值的文章主要介绍了Spring Kafka生产者实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求

我们需要通过Spring Kafka库,将消息推送给Kafka的topic中。这里假设Kafka的集群和用户我们都有了。这里Kafka认证采取SASL_PLAINTEXT方式接入,SASL 采用 SCRAM-SHA-256 方式加解密。

pom.xml

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

我这里不需要写版本号,应为我使用的Spring Boot。Spring Boot会自动帮我挑选spring-kafka应该使用哪个版本合适。

application.yml

spring:
  kafka:
    producer:
	  # kafka集群地址
      bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092
	  client-id: producer-dev
      # SASL_PLAINTEXT 接入方式
      security:
        protocol: SASL_PLAINTEXT
      # 序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # SASL 采用 SCRAM-SHA-256 方式
        sasl:
          mechanism: SCRAM-SHA-256
    # jaas配置
    jaas:
      options:
        username: kafkauser
        password: kafkapwd
      enabled: true
      login-module: org.apache.kafka.common.security.scram.ScramLoginModule
      control-flag: required

以上,是关于Spring Kafka的全部配置。下面摘要出来的配置,是可以单独配置在配置中心的:

topic:
  # 接收消息的主题配置
  save: hello_kafka_topic
spring:
  kafka:
    producer:
      client-id: producer-dev
      # kafka集群地址
      bootstrap-servers: xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092,xx.xx.xx.xxx:9092
    # jaas配置
    jaas:
      options:
        username: kafkauser
        password: kafkapwd

Java

KafkaProducerService.java


public interface KafkaProducerService {

    /**
     * 转发消息到kafka
     */
    void sendToKafka(String msg);

}

KafkaProducerServiceImpl.java



import cn.com.xxx.service.KafkaProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;

/**
 * 转发消息到kafka
 */
@RefreshScope
@Slf4j
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * kafka接收消息的主题
     */
    @Value("${topic.save}")
    private String topic;


    @Override
    public void sendToKafka(String msg) {
        log.info(String.format("$$$$ => Producing message: %s", msg));

        ProducerRecord<String, String> recordKafka = new ProducerRecord<>(topic, msg);

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(recordKafka);
        future.addCallback(new KafkaSendCallback<String, String>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("成功发消息:{}给Kafka:{}", msg, result);
            }

            @Override
            public void onFailure(KafkaProducerException ex) {
                log.error("发消息:{}给Kafka:{}", msg, recordKafka, ex);
            }
        });
    }
}

到这里为止Spring Kafka生产者所有配置就都可以了。这里使用的异步监听kafka回调的方式发送消息。

总结

这里使用Spring Kafka库异回调步给Kafka消息。这里使用的Spring Kafka库是老版本,所以,这里的使用的回调类是ListenableFuture类。文章来源地址https://www.toymoban.com/news/detail-714816.html

参考:

  • Spring for Apache Kafka2.8.3
  • Spring for Apache Kafka

到了这里,关于Spring Kafka生产者实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(37)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(50)
  • Apache Kafka - 重识Kafka生产者

    Kafka 生产者是 Apache Kafka 中的一个重要组件,它负责将数据发送到 Kafka 集群中。在实时数据处理和流式处理应用程序中,Kafka 生产者扮演着非常重要的角色。 这里我们将介绍 Kafka 生产者的概念、工作原理以及如何使用 Kafka 生产者。 Kafka 生产者是一种用于将数据发送到 Kafk

    2024年02月05日
    浏览(31)
  • [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题

    [kafka消息生产被阻塞] - 如何解决Kafka生产者阻塞的问题 Kafka是一个高度可扩展的分布式流平台,用于构建实时数据管道和流处理应用程序。作为一个广泛使用的消息代理系统,Kafka在数据传输方面表现出色,但是在极端情况下,它可能会出现生产者阻塞的问题。这可能会导致

    2024年02月11日
    浏览(35)
  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(32)
  • Kafka 生产者

    目录 一、kafka生产者原理 二、kafka异步发送 配置kafka 创建对象,发送数据 带回调函数的异步发送 同步发送   三、kafka生产者分区 分区策略 指定分区:  指定key: 什么都不指定: 自定义分区器 四、生产者提高吞吐量 五、数据的可靠性 ACK应答级别 数据完全可靠条件 可靠性

    2023年04月15日
    浏览(36)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(31)
  • 三、Kafka生产者

    1 发送原理 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 【RecordAccumulator缓冲的结构: 每一个分区对应一

    2024年02月12日
    浏览(31)
  • kafka学习-生产者

    目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 在Kafka中保存的数据都是字节数组。 消息发送前,需要将消息序列化为字节数组进行发送。

    2024年02月09日
    浏览(31)
  • Kafka(生产者)

    目 前 企 业 中 比 较 常 见 的 消 息 队 列 产 品 主 要 有 Kafka(在大数据场景主要采用 Kafka 作为消息队列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 传统消息队列的应用场景 传统的消息队列的主要应用场景包括: 缓存/消峰 、 解耦 和 异步通信 。 缓冲/消峰: 有助于控制和优化数据流经过

    2024年02月11日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包