Kafka - 异步/同步发送API

这篇具有很好参考价值的文章主要介绍了Kafka - 异步/同步发送API。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式

异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();
            System.out.println(art.offset());
            System.out.println("over - " + i);
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerWithCallBack {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 添加回调
            // 该方法在Producer收到ack时调用,为异步调用
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {
                // 没有异常,输出信息到控制台
                System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式

控制台

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 通过Future接口的get实现同步阻塞
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

Kafka - 异步/同步发送API,【MQ-Apache Kafka】,kafka,分布式文章来源地址https://www.toymoban.com/news/detail-716016.html

到了这里,关于Kafka - 异步/同步发送API的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(78)
  • ZooKeeper Java API 使用教程 - 同步与异步操作示例

    本教程详细介绍了如何使用ZooKeeper Java API进行节点数据的同步和异步获取。通过具体代码示例,展示了如何连接ZooKeeper服务器,获取子节点列表,以及如何处理节点数据变化的事件。

    2024年04月22日
    浏览(35)
  • Kafka入门,手动提交offset,同步提交,异步提交,指定 Offset 消费(二十三)

    虽然offset十分遍历,但是由于其是基于时间提交的,开发人员难以把握offset提交的实际。因此Kafka还提供了手动提交offset的API 手动提交offset的方法有两种:分别commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交:不同点是

    2024年02月11日
    浏览(49)
  • RabbitMQ-同步和异步通讯、安装和入门案例、SpringAMQP(5个消息发送接收Demo,jackson消息转换器)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月11日
    浏览(32)
  • 【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡

    kafka版本为3.6,部署在3台linux上。 maven依赖如下: 生产者、消费者和topic代码如下: 这里先简单解释一下, kafka的topic只是一个逻辑上的概念,实际上的物理存储是依赖分布在broker中的分区partition来完成的 。kafka依赖的zk中有一个 __consumer_offsets [1]话题,存储了所有consumer和g

    2024年01月19日
    浏览(55)
  • TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

    快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群 创建 changefeed,将 TiDB 增量数据输出至 Kafka 使用 go-tpc 写入数据到上游 TiDB 使用 Kafka console consumer 观察数据被写入到指定的 Topic (可选)配置 Flink 集群消费 Kafka 内数据 部署包含 TiCDC 的 TiDB 集群 在实验或测试环境中,可以使用 TiU

    2024年02月12日
    浏览(58)
  • 微服务中间件--MQ服务异步通信

    MQ的一些常见问题 消息可靠性问题 消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机

    2024年02月11日
    浏览(48)
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月10日
    浏览(58)
  • springboot整合阿里大于并结合mq发送短信

    在 pom.xml 文件中添加以下依赖: 在 application.properties 文件中添加以下配置: 其中, accessKeyId 和 accessKeySecret 是阿里云控制台上的AccessKey, signName 是短信签名, templateCode 是短信模板ID。 在Spring Boot中,我们可以使用MQ来异步发送短信,提高系统的响应速度。这里以ActiveMQ为例

    2024年02月08日
    浏览(45)
  • 利用MQ实现mysql与elasticsearch数据同步

    1.声明exchange、queue、RoutingKey 2. 在hotel-admin中进行增删改(SQL),完成消息发送 3. 在hotel-demo中完成消息监听,并更新elasticsearch数据 4. 测试同步 我这里的mq是挂在了docker上,虚拟机地址是192.168.116.128。到时候这个根据自己的项目改就行 在hotel-demo中,定义配置类,声明队列、交

    2024年02月09日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包