多图详解 kafka 生产者消息发送过程

这篇具有很好参考价值的文章主要介绍了多图详解 kafka 生产者消息发送过程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

多图详解 kafka 生产者消息发送过程

生产者客户端代码

public class SzzTestSend {
  
    public static final String bootStrap = "xxxxxx:9090";    public static final String topic = "t_3_1";
    public static void main(String[] args) {
          Properties properties = new Properties();        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);        // 序列化协议  下面两种写法都可以        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");        //过滤器 可配置多个用逗号隔开        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");        //构造 KafkaProducer        KafkaProducer producer = new KafkaProducer(properties);        //  发送消息, 并设置 回调(回调函数也可以不要)        ProducerRecord<String,String> record = new ProducerRecord(topic,"Hello World!");        try {
              producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));        }catch (Exception e){
              e.printStackTrace();        }    }
    /**     * 发送成功回调类     */    public static class SzzTestCallBack implements Callback{
          private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class);        private String topic;        private String key;        private String value;
        public SzzTestCallBack(String topic, String key, String value) {
              this.topic = topic;            this.key = key;            this.value = value;
        }        public void onCompletion(RecordMetadata metadata, Exception e) {
              if (e != null) {
                  log.error("Error when sending message to topic {} with key: {}, value: {} with error:",                        topic, key,value, e);            }else {
                  log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}",                        topic, key,value,metadata.partition(),metadata.offset());            }        }    }}

1 构造 KafkaProducer

KafkaProducer 通过解析producer.propeties文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器、RecordAccumulator消息累加器元信息更新器、启动发送请求的后台线程

        //构造 KafkaProducer        KafkaProducer producer = new KafkaProducer(properties);

生产者元信息更新器

我们之前有讲过. 客户端都会保存集群的元信息,例如生产者的元信息是 ProducerMetadata. 消费组的是 ConsumerMetadata 。

 

相关的 Producer 配置有:

虽然 Producer 元信息会自动更新, 但是有可能在生产者发送消息的时候,发现某个 TopicPartition 不存在,这个时候可能就需要立刻发起一个元信息更新了。

集群资源变更监听器

org.apache.kafka.common.ClusterResourceListener

在构造 KafkaConsumer 的时候, 还会构造一个 集群资源变更监听器 ClusterResourceListener

当用户希望收到有关集群元数据更改的通知时,可以实现回调接口。

需要在拦截器指标采样器序列化器反序列化器 中访问集群元数据的用户可以实现此接口。

public interface ClusterResourceListener {
      /**     * 用户可以实现以获取 ClusterResource 更新的回调方法。     * @param clusterResource cluster metadata     */    void onUpdate(ClusterResource clusterResource);}

下面描述了每种类型的方法调用顺序。

Clients

在每个元数据响应之后都会调用一次 onUpdate(ClusterResource)

当在org.apache.kafka.clients.producer.ProducerInterceptor实现的 ClusterResourceListener 的时候

调用顺序为: ProducerInterceptor.onSend() -> onUpdate(ClusterResource) -> ProducerInterceptor.onAcknowledgement()

当在org.apache.kafka.clients.consumer.ConsumerInterceptor实现的 ClusterResourceListener 的时候

调用顺序为:onUpdate() - > ConsumerInterce文章来源地址https://www.toymoban.com/news/detail-407479.html

到了这里,关于多图详解 kafka 生产者消息发送过程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(36)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(33)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(36)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(30)
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(68)
  • Kafka中的生产者如何处理消息发送失败的情况?

    在Kafka中,生产者可以通过以下方式处理消息发送失败的情况: 同步发送模式(Sync Mode):在同步发送模式下,生产者发送消息后会阻塞等待服务器的响应。如果发送失败,生产者会抛出异常(例如 ProducerRecord 发送异常)或返回错误信息。开发者可以捕获异常并根据需要进行

    2024年02月06日
    浏览(31)
  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    报这个错误是因为kafka里的配置要修改下 在config目录下 server.properties配置文件 这下发送消息就不会一直等待,就可以发送成功了

    2024年02月06日
    浏览(26)
  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(30)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(35)
  • Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验

    1.1 生产者消息发送流程 1.1.1 发送原理 在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。 batch.size:只有数据积累到bat

    2024年02月09日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包