【项目实战】Java 开发 Kafka 生产者

这篇具有很好参考价值的文章主要介绍了【项目实战】Java 开发 Kafka 生产者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏

【项目实战】Java 开发 Kafka 生产者,中间件,java,kafka,生产者,项目实战,原力计划

1、什么是 Kafka 生产者

【项目实战】Java 开发 Kafka 生产者,中间件,java,kafka,生产者,项目实战,原力计划

Kafka 生产者是指使用 Apache Kafka 的应用程序,用于向 Kafka 集群发送消息。生产者将消息发布到 Kafka 主题(topic),然后消费者可以从该主题订阅并接收这些消息。Kafka 生产者是实现消息发布的一方,可以是任何编程语言中的应用程序。

2、Java 如何使用 Kafka 生产者

  1. 首先,在Java项目中添加Kafka客户端依赖项。您可以在构建工具(如Maven或Gradle)中添加以下依赖项:
<dependency>
 	<groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>
  1. 创建Kafka生产者配置。您需要指定Kafka集群的地址和端口等配置信息。以下是一个示例配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器

Producer<String, String> producer;
try {
	producer = new KafkaProducer<>(props);
	
	String topic = "your-topic-name";
	String key = "your-message-key";
	String value = "your-message-value";
	ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
	producer.send(record);
} catch (Exception ex) {
} finally {
	try {
		producer.close();
	} catch (Exception ex) {
	}
}

但是在 SpringBoot 的项目中我们会使用 KafkaTemplate 去实现生产消息的发送。

3、SpringBoot 如何使用 Kafka 生产者

都需添加以下依赖项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>		
</dependency>

3.1、方式一:代码

@Configuration
public class KafkaProducerConfig {
	/**
	 * kafka 地址
	 */
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

使用如下:

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value);
    }
}

3.2、方式二:配置文件

可以 application.properties: 加上:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value-serializer=org.apache.kafka.common.serialization.StringSerializer

或者 yml 里面加上

spring:
  kafka:
    bootstrap-servers: localhost:9092
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

直接使用

@Service
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value);
    }
}

以上也只是一个简单的实例,后面我们根据 【项目实战】手把手教你搭建前后端分离项目 SpringBoot + Vue + Element UI + Mysql, 在这个教程的基础上,我们写如何实战。

4、Kafka Properties 的详细讲解

以下是所有参数的详细解释:

  1. bootstrap.servers :生产者用于与Kafka集群建立初始连接的主机和端口列表。
  2. acks :生产者要求leader在认为请求完成之前接收的确认数。可能的值有:
  • 0 :生产者不等待任何确认。
  • 1 :生产者等待leader确认请求。
  • all :生产者等待所有同步副本确认请求。
  1. retries :在放弃之前,生产者将重试发送失败的消息的次数。设置大于0的值以启用重试。
  2. batch.size :生产者尝试发送到Kafka代理的批次的大小(以字节为单位)。较大的批次大小可以提高吞吐量,但会增加消息传递的延迟。
  3. linger.ms :生产者在将批次发送到Kafka代理之前等待更多消息累积的时间(以毫秒为单位)。这有助于批处理,减少发送到代理的请求数量。
  4. buffer.memory :生产者用于缓冲等待发送到Kafka代理的消息的总内存量。
  5. key.serializer :用于将键对象序列化为字节的类。常见的选项是 StringSerializerByteArraySerializer
  6. value.serializer :用于将值对象序列化为字节的类。常见的选项是 StringSerializerByteArraySerializer
  7. compression.type :用于消息的压缩类型。支持的值有 nonegzipsnappylz4 。压缩可以减少网络带宽和存储要求。
  8. max.in.flight.requests.per.connection :在阻塞之前,生产者可以有的未确认请求的最大数量。将此值设置为较高的值可以增加吞吐量,但也会增加用于缓冲的内存。
  9. request.timeout.ms :生产者在考虑请求失败之前,从Kafka代理等待响应的最长时间(以毫秒为单位)。
  10. max.block.ms :当缓冲区已满或元数据不可用时,生产者在 send() 方法中阻塞的最长时间(以毫秒为单位)。

以上这些是Kafka生产者配置中常用的一些属性,使用方法如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址和端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 键的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器

5、Spring-Kafka Yml 配置参数

spring:
  kafka:
    bootstrap-servers: <bootstrap-servers>
    producer:
      key-serializer: <key-serializer>
      value-serializer: <value-serializer>
      retries: <retries>
      batch-size: <batch-size>
      linger-ms: <linger-ms>
      buffer-memory: <buffer-memory>
      compression-type: <compression-type>
    consumer:
      group-id: <group-id>
      key-deserializer: <key-deserializer>
      value-deserializer: <value-deserializer>
      auto-offset-reset: <auto-offset-reset>
      enable-auto-commit: <enable-auto-commit>
      max-poll-records: <max-poll-records>

以下是每个参数的解释:

  • bootstrap-servers :Kafka broker地址的逗号分隔列表。
  • producer.key-serializer :用于将键对象序列化为字节的类。
  • producer.value-serializer :用于将值对象序列化为字节的类。
  • producer.retries :在放弃之前,生产者将重试发送失败的消息的次数。
  • producer.batch-size :生产者将尝试发送到Kafka broker的批次的大小(以字节为单位)。
  • producer.linger-ms :生产者在将批次发送到Kafka broker之前等待更多消息累积的时间(以毫秒为单位)。
  • producer.buffer-memory :生产者用于缓冲等待发送到Kafka broker的消息的总内存量。
  • producer.compression-type :消息的压缩类型。
  • consumer.group-id :消费者组ID。
  • consumer.key-deserializer :用于将键对象从字节反序列化的类。
  • consumer.value-deserializer :用于将值对象从字节反序列化的类。
  • consumer.auto-offset-reset :当Kafka中没有初始偏移量或当前偏移量不再存在时,使用的策略。
  • consumer.enable-auto-commit :消费者的偏移量是否应自动提交。
  • consumer.max-poll-records :消费者在一次轮询中最多获取的记录数。

6、Kafka 生产者异步回调方式生产消息

6.1、什么是异步回调

什么是异步回调要搞清楚,异步回调指的是我发送完成了,我就不管了,我不需要等你的返回。具体的定义如下:

异步回调是一种编程模式,用于处理异步操作的结果。在异步回调中,当一个操作被触发时,程序不会立即阻塞等待结果,而是继续执行其他任务。当操作完成后,系统会调用预先定义的回调函数来处理操作的结果。

异步回调常用于处理需要等待时间较长的操作,例如网络请求、数据库查询等。通过使用异步回调,可以提高系统的响应性能和并发处理能力,避免阻塞和等待的情况。

在异步回调中,通常将回调函数作为参数传递给异步操作的方法。当操作完成后,系统会调用回调函数,并将操作的结果作为参数传递给回调函数,以便进行后续处理。

异步回调在编写异步代码时非常有用,可以帮助开发人员处理异步操作的结果,而无需显式地等待操作完成。这种方式可以提高系统的性能和可伸缩性,同时保持代码的简洁性和可读性。

6.2、匿名内部类的方式做异步回调

public class KafkaProducerExample{
     
   private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            String message = "Hello, Kafka! This is message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 匿名内部类的方式做异步回调
                }
            });
        }

        producer.close();
    }
}

6.3、 KafkaTemplate 的异步回调

package com.pany.camp.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

/**
 *
 * @description:  生产者
 * @copyright: @Copyright (c) 2022
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0
 * @createTime: 2023-06-26 18:10
 */
@Component
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);

        future.addCallback(new ListenableFutureCallback<>() {

            @Override
            public void onSuccess(Object o) {
            }

            @Override
            public void onFailure(Throwable ex) {
                // Handle failure callback
                System.err.println("Failed to send message: " + ex.getMessage());
            }
        });
    }

}

💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃
【项目实战】Java 开发 Kafka 生产者,中间件,java,kafka,生产者,项目实战,原力计划文章来源地址https://www.toymoban.com/news/detail-570015.html

到了这里,关于【项目实战】Java 开发 Kafka 生产者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(44)
  • 三、Kafka生产者1---Kafka生产者初始化-new KafkaProducer

    概述 本文主要是分享Kafka初始化生产者的 大体过程 初始化过程中会新建很多对象,本文暂先分享部分对象 1.分区器---Partitioner partitioner 2.重试时间---long retryBackoffMs 3.序列化器---SerializerK keySerializer,SerializerV valueSerializer 4.拦截器--- ListProducerInterceptorK, V interceptorList 5.累加器-

    2024年03月14日
    浏览(62)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(47)
  • 「Kafka」生产者篇

    在消息发送的过程中,涉及到了 两个线程 —— main 线程 和 Sender 线程 。 在 main 线程中创建了 一个 双端队列 RecordAccumulator 。 main线程将消息发送给RecordAccumulator,Sender线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。 main线程创建 Producer 对象,调用 send 函数发送消息,

    2024年01月19日
    浏览(40)
  • Kafka-生产者

    Kafka在实际应用中,经常被用作高性能、可扩展的消息中间件。 Kafka自定义了一套网络协议,只要遵守这套协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息。 在实践生产过程中,一套API封装良好、灵活易用的客户端可以避免开发人员重复劳动,提高开发效率,也

    2024年01月20日
    浏览(35)
  • Kafka(生产者)

    目 前 企 业 中 比 较 常 见 的 消 息 队 列 产 品 主 要 有 Kafka(在大数据场景主要采用 Kafka 作为消息队列。) ActiveMQ RabbitMQ RocketMQ 1.1.1 传统消息队列的应用场景 传统的消息队列的主要应用场景包括: 缓存/消峰 、 解耦 和 异步通信 。 缓冲/消峰: 有助于控制和优化数据流经过

    2024年02月11日
    浏览(45)
  • 三、Kafka生产者

    1 发送原理 在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 【RecordAccumulator缓冲的结构: 每一个分区对应一

    2024年02月12日
    浏览(38)
  • Kafka生产者

    1.acks 如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。 缺点:如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息就丢失了 优点:因为生产者不需要等待服务器的响应,所有他可以以网络能够支持的最大速度发送消息,从而

    2024年01月19日
    浏览(37)
  • kafka学习-生产者

    目录 1、消息生产流程 2、生产者常见参数配置 3、序列化器 基本概念 自定义序列化器 4、分区器 默认分区规则 自定义分区器 5、生产者拦截器 作用 自定义拦截器 6、生产者原理解析 在Kafka中保存的数据都是字节数组。 消息发送前,需要将消息序列化为字节数组进行发送。

    2024年02月09日
    浏览(39)
  • (三)Kafka 生产者

    创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基

    2024年02月09日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包