图解系列 图解Kafka之Producer

这篇具有很好参考价值的文章主要介绍了图解系列 图解Kafka之Producer。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


开局一张图,其他全靠吹
图解系列 图解Kafka之Producer,《🌹🍎🍎图解技术🍎🍎》,kafka,分布式
发送消息流程如下

1.初始化流程

  • 指定bootstrap.servers,地址的格式为 host:port。它会连接bootstrap.servers参数指定的所有Broker,Producer启动时会发起与这些Broker的连接。因此,如果你为这个参数指定了1000个Broker连接信息,那么很遗憾,你的Producer启动时会首先创建与这1000个Broker的TCP连接。

    • 在实际使用过程中,我并不建议把集群中所有的Broker信息都配置到bootstrap.servers中,通常你指定3~4台就足以了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息,故没必要为bootstrap.servers指定所有的Broker。
    • props.put("bootstrap.servers", "localhost:9092");
  • 指定Key和Value的序列化方式。

    •  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
  • 指定acks配置,默认值是all(版本3.x)

    • props.put("acks", "all");
    • 设置为0,表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
    • 设置为1,表示leader副本成功写入PageCache就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。
    • 设置成all或-1,表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。
  • producer = new KafkaProducer<>(props);

    • 从配置中获取必要的参数,如transactionalIdclientId
    • 根据clientId创建日志记录上下文(LogContext),用于日志记录。
    • 配置度量(Metrics)相关信息,包括度量标签、度量配置、度量报告器等。
    • 创建度量上下文(MetricsContext)和度量实例(Metrics)。
    • 初始化分区器(Partitioner)。
    • 配置并初始化键(key)和值(value)的序列化器(Serializer)。
    • 配置并初始化拦截器(Interceptors)。
    • 配置集群资源监听器(ClusterResourceListeners)。
    • 设置最大请求大小(maxRequestSize)、内存大小(totalMemorySize)和压缩类型(compressionType)等参数。
    • 配置最大阻塞时间(maxBlockTimeMs)和交付超时时间(deliveryTimeoutMs)。
    • 初始化API版本(apiVersions)和事务管理器(transactionManager)。
    • 创建记录累加器(RecordAccumulator),用于累积记录以进行批量发送。
    • 解析并验证引导服务器地址(addresses)。
    • 如果提供了元数据(metadata),则使用提供的元数据,否则创建新的元数据实例,并通过引导服务器地址进行引导。
    • 初始化错误度量传感器(errors)。
    • 创建并启动IO线程(ioThread)来处理消息发送。
    • 注册应用程序信息,用于JMX度量和监控。
    • 如果在初始化过程中发生任何错误,将调用关闭方法以避免资源泄漏,并向上抛出Kafka异常。

2.发送消息流程

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulatormain 线程将消息发送给 RecordAccumulatorSender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker

  • 构造消息记录ProducerRecord 对象,对象包含了四个属性:Topic,partition,key,value;topic 和 value 是必须的,key 和 partition 是可选的。

  • 同步获取Kafka集群信息(Cluster)。

    • 如果缓存有,并且分区没有超过指定分区范围则返回缓存
    • 否则触发更新,等待从broker获取新的元数据信息
    • 默认强制拉取时间是metadata.max.age.ms: 5分钟
  • 使用键序列化器(keySerializer)将消息的键序列化为字节数组,使用值序列化器(valueSerializer)将消息的值序列化为字节数组。

  • 计算数据发送到那个分区,如果指定了 key,那么相同 key 的消息会发往同一个分区,如果实现了自定义分区器,那么就会走自定义分区器进行分区路由。

    • 如果有Key值,则使用Key值的Hash值来分配分区 murmurhash(key) % 主题分区总数
    • 老版本:如果没有key值,则以Round-Robin的方式分配分区。
    • 新版本:如果没有key值,则以粘性分区的方式分配分区
  • 创建一个TopicPartition对象,表示要发送消息的主题和分区。

  • 判断消息的大小是否超过了我们设置的阈值

  • 异步发送时,给每一条消息都绑定他的回调函数

  • 把消息放入记录累加器(accumulator)(32M的一个内存),然后有accumulator把消息封装成为一个批次一个批次的去发送。

  • 如果批次满了或者新创建出来一个批次, 唤醒sender线程,他才是真正发送数据的线程,发送的时候并不是来一个消息就发送一个消息,这样的话吞吐量比较低,并且频繁的进行网络请求。消息是按照批次来发送的或者等待时间来发的的.

  • Leader Broker接收到消息写入到PageCache,当Producer的acks设置为"all"时,这意味着Producer会等待所有ISR(In-Sync Replicas,即同步副本)都成功确认消息之后才会认为消息发送成功。

  • Leader Partition接收消息:每个Partition都有一个Leader Broker,该Leader Broker负责接收所有消息并处理副本同步。Leader Partition会接收Producer发送的消息。

  • 消息复制到Follower副本:Leader Partition会将消息复制到ISR中的Follower副本(In-Sync Replicas),这些副本是与Leader保持同步的副本。Kafka允许配置多个ISR,以提高可用性。

  • 等待Follower副本确认:Leader Partition会等待ISR中的Follower副本确认已成功复制消息。这些确认信息包括副本在哪个Offset处复制消息。

  • Leader确认消息:一旦ISR中的所有Follower副本都确认了消息,Leader Partition会向Producer发送确认消息。这表示消息已经成功写入Leader Partition并且已在ISR中的所有Follower副本中成功复制。

  • Producer接收确认:Producer收到来自Kafka的确认消息,这时候Producer认为消息发送成功。

  • 定期提交消息到磁盘:Leader Partition会定期将已接收的消息写入磁盘以确保持久性,以防Broker故障。

  • Follower副本的同步:Follower副本会定期从Leader Partition拉取消息,确保与Leader保持同步。如果Follower副本无法赶上Leader,它可能会被认为是“失去同步”,不再被视为ISR的一部分。

示例

Properties props = new Properties();
// 配置broker列表,一般配置3-5个即可,如果你有100个broker,不需要全部配置,能连上一个就行
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
// 持久化机制参数
props.put(ProducerConfig.ACKS_CONFIG, "-1");
// 发送失败会重试发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在 接收者那边做好消息接收的幂等性处理 
// 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 设置重试间隔
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
// 设置发送消息的本地缓冲区大小 32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// kafka本地线程会从缓冲区取数据,批量发送到broker,
// 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 默认值是0,意思就是消息必须立即被发送,但这样会影响性能
// 一般设置100毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果100毫秒内,这个batch满了16kb就会随batch一起被发送出去
// 如果100毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
// 把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);

for (int i = 1; i <= msgNum; i++) {
     Order order = new Order(i, 100 + i, 1, 1000.00);
     // 1.指定发送分区
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));
     // 2.未指定发送分区,指定Key 具体发送的分区计算公式:hash(key) % partitionNum
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , order.getOrderId().toString(), JSON.toJSONString(order));
	 // 3.既没有指定分区,也没有指定Key Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,
     //   待该分区的batch已满或者或者linger.ms设置的时间到,Kafka再随机一个分区进行使用(和上一次的分区不同)
     // 4.自定义分区器
     // 同步发送:等待消息发送成功的同步阻塞方法
     RecordMetadata metadata = producer.send(producerRecord).get();
     System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());*/

     // 异步回调方式发送
     producer.send(producerRecord, new Callback() {
          // 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),
          // 如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败
public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("发送消息失败:" + exception.getStackTrace());
                    }
                    if (metadata != null) {
                        System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                }
            });

生产者发送消息三大模式

  • 发后即忘模式 fire-and-forget 模式

producer.send(producerRecord)

  • 异步发送发送

producer.send(producerRecord, new Callback() )

  • 同步发送模式

producer.send(producerRecord, new Callback() ).get()

生产者核心参数

  • bootstrap.servers: Kafka集群的地址列表,Producer用于发现Broker。这个参数是必需的。一般配置3-5个即可,如果你有100个broker,不需要全部配置,能连上一个就行

  • acks: 默认值是all,控制生产者要求Broker确认消息写入的级别。要求消息可靠性设置-1,要求吞吐量设置为1

    • 设置为0,表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
    • 设置为1,表示leader副本成功写入PageCache就会响应Producer而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。
    • 设置成all或-1,表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。
  • retries: 默认值Integer.MAX_VALUE,生产者在发送消息时的重试次数。如果消息发送失败,Producer会尝试重新发送。设置为大于0的值可以增加消息的可靠性,一般建议设置为3

  • retry.backoff.ms: 两次重试之间的时间间隔,默认是 100ms。

  • max.in.flight.requests.per.connection: 默认值5,建议 1-5,控制每个连接上允许的未确认请求的最大数量。较高的值可以提高吞吐量,但也可能导致更大的内存使用。

  • buffer.memory: 设置发送消息的缓冲区,默认是32M

  • compression.type: 默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,但是会提高cpu的开销

  • batch.size: 设置batch的大小,如果batch太小,会导致频繁的网络请求,吞吐量下降,如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大的压力过多的数据缓存在内存里,默认值是16KB,也就是一个batch满了的时候会把这个batch发送出去,一般在生产环境会适当的增大这个值,如果消息大小大于这个,那么就会使用消息大小(源码中是这样的),一般不改动默认值

  • linger.ms: 这个值默认是0,意思就是消息必须立即被发送,但是这样是不对的,一般设置一个100ms,这样的话就是如果100ms内这个batch满了16KB就会发送出去,如果是0 那么 batch.size 就失效,建议要求实时性设置0,要求吞吐量设置50-100

  • key.serializervalue.serializer: 指定消息的键和值的序列化器,将Java对象转换为字节数组以便发送到Kafka。

  • enable.idempotence :默认true,用于确保生产者产生的消息具有幂等性。幂等性是指无论发送多少次相同的消息,最终的结果都是相同的,不会产生副作用,仅能保证单分区,仅该生成者产生的消息幂等,重启下就废了。

  • max.block.ms: 默认60s,控制在生产者缓冲区已满时调用send方法的行为。如果生产者缓冲区已满,send方法可以选择阻塞等待空间可用,直到指定的超时时间过去。

  • block.on.buffer.full: 默认true,TRUE表示当我们内存用尽时,停止接收新消息记录或者抛出错误。
    默认情况下,这个设置为TRUE。然而某些阻塞可能不值得期待,因此立即抛出错误更好。如果设置为false,则producer抛出一个异常错误:BufferExhaustedException

  • request.timeout.ms: 默认30s,设置一个请求最大等待时间(单位为ms),超过这个时间则会抛Timeout异常。
    超时时间如果设置大一些,如127000(127秒),高并发的场景中,能减少发送失败的情况。

以上Kafka版本为3.X

实际上,当batch.size和linger.ms二者都配置的时候,只要满足其中一个要求,就会发送请求到broker上。

参考文章来源地址https://www.toymoban.com/news/detail-697111.html

  • https://www.clairvoyant.ai/blog/unleash-kafka-producers-architecture-and-internal-workings
  • 尚硅谷 Kafka

到了这里,关于图解系列 图解Kafka之Producer的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(35)
  • 分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

    Kafka 消费者负载均衡策略? Kafka 消费者分区分配策略? 1. 环境准备 创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。 ① 创建主题 test,该主题有5个分区,2个副本: ② 创建3个消费者CustomConsu

    2024年02月13日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(35)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(38)
  • 分布式 - 消息队列Kafka:Kafka消费者分区再均衡(Rebalance)

    01. Kafka 消费者分区再均衡是什么? 消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。 分区

    2024年02月12日
    浏览(27)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(33)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(32)
  • 分布式应用之Zookeeper和Kafka

    1.定义 2.特点 3.数据结构 4.选举机制 第一次选举 非第一次选举 5.部署 1.概念 中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源。 2.消息队列型 3.Web应用型(代理服务器) 1.为什么需要MQ 2.消息队列作用 3.消息队列模式 ①点对

    2024年02月15日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包