Kafka消息发送流程

这篇具有很好参考价值的文章主要介绍了Kafka消息发送流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka消息发送流程

消息发送流程

我们通过创建 KafkaProducer 对象来发送消息,KafkaProducer有两个线程
Producer主线程:把消息发送到内存缓冲区
Sender线程:把内存缓冲区的消息发送到 broker

Producer主线程

Kafka消息发送流程
Producer 主线程的的流程如图所示

拉取元数据:每个 topic 有多个分区,需要知道对应的broker地址
序列化器:将 Java 对象转为字节数组
分区器:决定消息发送到哪个分区?

消息不会实时发送,消息会缓存在 RecordAccumlator,发送的消息会被聚合成 ProducerBatch,ProducerBatch 的大小默认为 16k(可以通过 batch.size 设置 ProducerBatch 的大小)

当 ProducerBatch 满了的话会被 Sender 线程发送出去,超过 linger.ms 指定的时间,如果 ProducerBatch 还没满,也会被发送出去

ProducerBatch 底层就是 ByteBuffer,每次创建和销毁 ByteBuffer 可能会耗费大量的时间,所以一个 BufferPool,提前创建了一堆 ByteBuffer 让用户来使用,用的时候从 BufferPool 里面取,用完再还回来,这样就提高了吞吐量。如果创建的 ByteBuffer 大于16k,则不从 BufferPool 中取,实时创建

BufferPool有两个比较重要的参数
buffer.memory:设置 buffer pool 的大小
max.block.ms:buffer pool 没有剩余空间时,producer 阻塞等待的时间。当 buffer pool 都没用完时,send方法是会阻塞等待的。

Sender线程

Kafka消息发送流程

Sender线程 不断获取可以发送的 ProducerBatch,并在 broker 维度进行聚合
把多个 ProducerBatch 再聚合成一个 request,最终发送的是 request
要发送的请求都会放到 inFightRequest 队列中,然后通过 Sender 发送到 broker,当 inFightRequest 中的请求收到响应后会从队列中删除,并删除对应的 ProducerBatch

https://kafka.apache.org/0110/documentation.html

参数名 描述 默认值
acks 可选值为[all, -1, 0, 1] 1
batch.size batch的大小,默认为16kb,如果batch太小,会导致频繁网络请求,吞吐量下降,如果batch太大,会导致一条消息需要等待很久才能被发送出去 16k
linger.ms 超过linger.ms指定的时间,batch还没满,也会被发送出去,避免消息的延迟太长 0
max.request.size 限制发送出去的消息大小 1m
request.timeout.ms 消息发送的超时时间,默认30s,如果30s内收不到响应,会抛出 TimeoutException 30s
buffer.memory buffer pool的大小 30m
max.block.ms buffer pool没有剩余空间时,producer阻塞等待的时间 60s
max.in.flight.requests.per.connection 单个连接最大可以允许发送中的请求数 5

应答acks:

数字 含义
0 生产者发送过来的数据,不需要等待数据落盘
1 leader 收到数据后应答
-1 leader 和 isr 队列里面的所有节点收到数据后应答。-1 和 all 等价

Producer 开发

public class KafkaProperties {

    public static final String SERVER_URL = "s1:9092";
    public static final String TOPIC = "quickstart";
    public static final String TOPIC2 = "user";
    public static final String GROUP_ID = "quickstartGroup";
}
public class QuickstartProducer {

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.SERVER_URL);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            RecordMetadata metadata = producer.send(new ProducerRecord<>(KafkaProperties.TOPIC, "test" + i)).get();
            System.out.printf("topic: %s, partition: %s, offset: %s %n", metadata.topic(), metadata.partition(), metadata.offset());
        }

        producer.close();
    }
}

后续演示用到的代码参考:https://github.com/erlieStar/kafka-examples

拦截器

在消息发送的流程中,序列化器和分区器是必须的,但是拦截器不是必须的

很多框架都有拦截器这个概念,比如 Servlet Filter,Spring MVC Interceptor,Dubbo Filter。是责任链模式的典型应用,方便用户在消息发送和消息响应之前增加一些自定义的逻辑

想自定义拦截器,只需要实现 ProducerInterceptor 接口即可

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private int errorCount = 0;
    private int successCount = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.out.println("消息发送之前被调用");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("消息被发送到分区之后或者发送失败被调用");
        if (exception == null) {
            successCount++;
        } else {
            errorCount++;
        }
    }

    @Override
    public void close() {
        System.out.println("拦截器关闭时调用");
        System.out.println("success count " + successCount);
        System.out.println("error count " + errorCount);
    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("拦截器实例创建后调用,用于配置拦截器");
    }
}

序列化器

public class MySerialize implements Serializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, User data) {
        return JSON.toJSONBytes(data);
    }

    @Override
    public void close() {

    }
}

分区器

producer 的默认分区器为 DefaultPartitioner,具体实现为

  1. 如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息
  2. 如果 key 不为 null:对 Key 值进行 Hash 计算,从所有分区中根据 Key 的 Hash 值计算出一个分区号;拥有相同 Key 值的消息被写入同一个分区;

Kafka消息发送流程
当自定义分区器时,只需要实现 Partitioner 接口即可,比如下面这个例子,将所有的消息都发送到分区0

public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

只往分区0发送消息

发送方式

Kafka消息发送流程

参数名 描述 默认值
bootstrap.servers 格式为host1:port1,host2:port2,…
interceptor.classes 拦截器
key.serializer 消息key的序列化器
value.serializer 消息value的序列化器
partitioner.class 分区器 org.apache.kafka.clients.producer.internals.DefaultPartitioner
compression.type 是否压缩消息,默认是 none,即不压缩消息 none
retries 对于可重试的错误进行重试的次数 0
retry.backoff.ms 上次发送失败,重试的时间间隔 100

单向发送

for (int i = 0; i < 5; i++) {
    producer.send(new ProducerRecord<>(KafkaProperties.TOPIC, "test" + i));
}

同步发送

for (int i = 0; i < 5; i++) {
    RecordMetadata metadata = producer.send(new ProducerRecord<>(KafkaProperties.TOPIC, "test" + i)).get();
    System.out.printf("topic: %s, partition: %s, offset: %s %n", metadata.topic(), metadata.partition(), metadata.offset());
}

异步发送

for (int i = 0; i < 5; i++) {
    producer.send(new ProducerRecord<>(KafkaProperties.TOPIC, "test" + i), (metadata, exception) -> {
        if (metadata != null) {
            System.out.printf("topic: %s, partition: %s, offset: %s %n", metadata.topic(), metadata.partition(), metadata.offset());
        }
    });
}

参考博客

[1]https://qiankunli.github.io/2017/12/08/kafka_clients.html
[2]https://cloud.tencent.com/developer/article/1990530
[3]https://juejin.cn/post/6967148080810426399文章来源地址https://www.toymoban.com/news/detail-415924.html

到了这里,关于Kafka消息发送流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

    项目地址: A generic message notification system[Github] Websocket Connection Pool Websocket Provider websocket接口 结果截图

    2024年02月10日
    浏览(39)
  • Kafka消息队列实现消息的发送和接收

    消息在Kafka消息队列中发送和接收过程如下图所示: 消息生产者Producer产生消息数据,发送到Kafka消息队列中,一台Kafka节点只有一个Broker,消息会存储在Kafka的Topic(主题中),不同类型的消息数据会存储在不同的Topic中,可以利用Topic实现消息的分类,消息消费者Consumer会订阅

    2024年02月11日
    浏览(48)
  • kafka入门(一):kafka消息发送与消费

    kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Consumer Group (消费者组) 每个消费

    2024年04月12日
    浏览(40)
  • 配置Kafka发送大消息

    Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。 Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要

    2024年02月16日
    浏览(33)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(40)
  • kafka实现消息接受和发送

    1、下载镜像 2、创建容器

    2024年02月12日
    浏览(34)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(50)
  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)
  • RockerMQ发送消息流程

    部分适用场景 当发送的消息不重要时,采用one-way方式,以提升吞吐量 当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式 当发送的消息很重要时,且对响应时间很是敏感的时候采用async方式  监听类 消息中两id必须一致

    2024年01月20日
    浏览(21)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包