【注意】Kafka生产者异步发送消息仍有可能阻塞

这篇具有很好参考价值的文章主要介绍了【注意】Kafka生产者异步发送消息仍有可能阻塞。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题描述

Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用异步发送的方式,如下所示。

@Slf4j
@Component
public class KafkaSender {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendAsync(String topic, String message) {
        kafkaTemplate.send(topic, message)
                .addCallback(
                        sendResult -> log.info("Send success"),
                        e -> log.error("Send failed", e));
    }
}

本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,调用sendAsync()方法的主线程会长时间阻塞。这点是出乎意料的。

原因分析

跟踪源码可知,Kafka生产者在第一次发送消息时,会尝试从Broker获取元数据Metadata(见KafkaProducerwaitOnMetadata()方法),如果Broker连接失败,则会一直阻塞于此,循环尝试获取,直至超时(超时时间由max.block.ms定义)。

    /**
     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param partition A specific partition expected to exist in metadata, or null if there's no preference
     * @param nowMs The current time in ms
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The cluster containing topic metadata and the amount of time we waited in ms
     * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
     * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
     */
    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        metadata.add(topic, nowMs);

        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long remainingWaitMs = maxWaitMs;
        long elapsed = 0;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic, nowMs + elapsed);
            int version = metadata.requestUpdateForTopic(topic);
            sender.wakeup();
            try {
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - nowMs;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }

也就是说,Kafka生产者在发送消息前,要先获取到Metadata。对于异步发送,虽然消息发送的过程是非阻塞的,但获取Metadata的过程是阻塞的。如果因为Broker连接失败、Topic未创建等原因而一直获取不到Metadata,主线程将长时间阻塞。

解决办法

解决办法也很简单。如果Kafka发送消息并非关键业务,为了不影响主业务流程的进行,可以创建线程池来专门执行消息发送工作,保证sendAsync()方法一定是异步执行的。注意,线程池大小和工作队列长度需要合理限定,避免因阻塞任务过多而OOM;拒绝策略可以视情况选择DiscardPolicy。

另外,还可以考虑指定max.block.ms,来限制获取Metadata的最大阻塞时间(默认60000ms):

spring:
  kafka:
    producer:
      properties:
        max.block.ms: 1000

实际上,在异步发送消息的过程中,除了因为获取不到Metadata而阻塞外,还可能因为消息缓冲池已满而阻塞(参考:Kafka Producer 异步发送消息居然也会阻塞?)。这2种阻塞的超时时间均由max.block.ms定义。

总结

Kafka生产者异步发送消息的方法(如Spring Boot中的kafkaTemplate.send()),看似异步,实则可能阻塞。由于发送消息前需要获取元数据Metadata,如果一直获取失败(可能原因包括Broker连接失败、Topic未创建等),将导致长时间阻塞。这点与我们的一般理解不符,需要特别注意。文章来源地址https://www.toymoban.com/news/detail-412084.html

到了这里,关于【注意】Kafka生产者异步发送消息仍有可能阻塞的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 多图详解 kafka 生产者消息发送过程

    生产者客户端代码 KafkaProducer 通过解析 producer.propeties 文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器、 RecordAccumulator消息累加器 、 元信息更新器 、启动发送请求的后台线程 生产者元信息更新器 我们之前有讲过. 客户端都会保存集群的元信息,例如

    2023年04月09日
    浏览(29)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(37)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(36)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(33)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(36)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(30)
  • Kafka中的生产者如何处理消息发送失败的情况?

    在Kafka中,生产者可以通过以下方式处理消息发送失败的情况: 同步发送模式(Sync Mode):在同步发送模式下,生产者发送消息后会阻塞等待服务器的响应。如果发送失败,生产者会抛出异常(例如 ProducerRecord 发送异常)或返回错误信息。开发者可以捕获异常并根据需要进行

    2024年02月06日
    浏览(31)
  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    报这个错误是因为kafka里的配置要修改下 在config目录下 server.properties配置文件 这下发送消息就不会一直等待,就可以发送成功了

    2024年02月06日
    浏览(26)
  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包