【深入理解Kafka系列】 第二章 生产者

这篇具有很好参考价值的文章主要介绍了【深入理解Kafka系列】 第二章 生产者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

      生产者就是负责向Kafka发送消息的应用程序。Kafka一共两个大版本的客户端,第一个是开源之处使用Scala编写的客户端;第二个是0.9.x版本开始推出的java编写的客户端。

1、客户端开发

一个正常的生产逻辑需要以下几个步骤:

(1)配置生产者客户端参数及创建相应的生产者实例。

(2)构建待发送的消息

(3)发送消息

(4)关闭生产者实例

需要单独说明下构建消息的ProducerRecord,它包含了多个属性,定义如下:

public class ProducerRecord<K, V> { 
private final String topic; // 主题
private final Integer partit on //分区号
private nal Headers headers; // 消息头部
private final K key; //键
private nal V value ; //值
private nal Long timestamp ; // 消息的时间戳
//省略其他成员方法和构造方法
}
        topic和 partition 字段分别代表消息要发往的主题和分区号。 key 是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算 分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个 key 以让消息再进行二次归类,同 一个 key 的消息会被划分到同一个分区 value 是指消息体,一 般不为空, 如果为空则表示特定的消息一一墓碑消息。   time stamp 是指消息的时间戳, 它有 Create Time LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到 日志文件的时间。

1.1、必要的参数

        创建真正的生产者实例前需要配置相应的参数,在 Kafka 生产者客户端 KafkaProducer 中有3个参数是必填的。

  • bootstrap.servers :该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 hostl:portl,hos t2:port2 ,可以设置1个或多个地址,中间以逗号隔开,注意这里并非需要所有的 broker址,因为生产者会从给定的 broker 里查找到其他 broker 的信息 ,建议设置2个。
  • key.serializer和value.serializer: broker 端接收的消息必须以字节数组 (byte [])的形式存在。key.serializer value .seriaizer 这两个参数分别用来指定 key、value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

1.2、发送消息

KafkaProducer的send()方法并非是void类型,而是Future<RecordMetadata>类型,send()
方法有2个重载方法,具体定义如下:

publicFuture<RecordMetadata>send(ProducerRecord<K,V>record)
publicFuture<RecordMetadata>send(ProducerRecordcK,V> record,Callback callback)

发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

  • 发后既忘:直接调用send()方法,它只管往Kafka发送消息不关系是否到达。
  • 同步发送:可以利用返回的Future对象实现,示例如下:
try{
    producer.send(record).get();
} catch (ExecutionException|InterruptedExceptione) {
    e.printStackTrace();
}

实际上 send()方法本身就是异步的 , send()方法返回的 Future 对象可以使调用方稍后获得发送的结果。

try {
    Future<RecordMetadata> future= producer.send(record) ;
    RecordMetadata metadata =future.get();
} catch (ExecutionException I InterruptedException e) {

}

可以获取到一个RecordMetadata对象,对象中包含了消息的一些元数据信息,比如消息的主题、分区号、分区中偏移量等。

  • 异步发送:一般是在send()方法里指定一个Callback的回调函数,kafka在返回响应时调用该函数实现异步的发送确认。


KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。

  • 常见的可重试异常:NetworkException、LeaderNotAvailableException等,对于可重试的异常。 如果配置了retries参数,如果只要在规定的重试次数内自行恢复,就不会抛出异常。retries默认值为0。
  • 不可重试异常比如RecordToolLargeExcetption,表示发送的消息太大,KafkaProducer不会进行重试,直接抛出异常。

1.3、序列化

      生产者需要用序列化器( Serializer)把对象转换成字节数组才能通过网络发送给 Kafka,而消费者需要使用反序列化器(Deserializer)把从Kafka收到的字节数组转换成相应的对象。生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。

1.4、分区器

      如果消息ProducerRecord中指定了partition字段,那么就不需要分区器,因为partition就是所要发往的分区号,如果没有指定,就需要依赖分区器,根据key这个字段来计算partition的值。

      默认分区器DefaultPartitioner的主要逻辑:如果key不为null,那么就对key进行哈希根据得到的哈希值计算分区号,如果key为null,那么消息以轮询的方式发往主题内的各个可用分区

1.5、生产者拦截器

      生产者拦截器用来在消息发送前做一些准备工作,如按照某个规则过滤不符合消息、修改消息内容、计数等。

      生产者拦截器的实现:自定义实现org.apache.kafka.clients.producer.Producerlnterceptor接口。接口主要包含3个方法:

public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record );
public void onAcknowledgement(RecordMetadata metadata , Excepti on exception );
public void close() ;

2、原理分析

2.1、整体架构

【深入理解Kafka系列】 第二章 生产者

       整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 (发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator,也称为消息收集器〉中。 Sender 线程负责从RecordAccumulator 中 获取消息并将其发送到 Kafka 中 。
      RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 。
      主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时 ,从双端队列的头部读取。

2.2、重要的生产者参数

· acks

指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks参数有3种类型的值。

  • acks = 1默认值即为 1。生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应 。 acks 设置为1,是消息可靠性和吞吐量之 间的折中方案。
  • acks = 0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下, acks设置为0可以达到到最大的吞吐量。
  • acks =-1或 acks =all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。

· retries和retry.backoff.ms:

retries参数用来配置生产者重试的次数,默认值为0。

重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100 , 它用来设定两次重试之间的时间间隔,避免无效的频繁重试。


Kafka保证同一分区中的消息有序:
一般而言,在需要保证消息顺序的场合建议把参数max.in.flght.requestser.connection 配置为 1,而不是把 retries配置为 0, 不过这样也会影响整体的吞吐。

max.in.flght.requestser.connection:生产者在收到服务器响应之前可以发送多少消息;值越大,吞吐量越大(值为1可以保证消息按发送顺序写入服务器,即使发生了重试)。

·compression.type

指定消息的压缩方式,默认值为"none"。默认情况下消息不会压缩。可以配置为"gzip"等。消息压缩是一时间换空间的优化方式,如果对时延有要求,则不推荐压缩。

· linge.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入
ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过
linger.ms 值时发迭出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞
吐量。

部分其他客户端参数:

【深入理解Kafka系列】 第二章 生产者

【深入理解Kafka系列】 第二章 生产者文章来源地址https://www.toymoban.com/news/detail-426288.html

到了这里,关于【深入理解Kafka系列】 第二章 生产者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 第二章(第二节):无穷小量和函数

    若 lim f(x) = 0 , 则称函数 f(x) 当 x → x 0 时是无穷小量,简称: 无穷小 。      x→ x 0 定理1. 有限多个 无穷小量的代数和仍是无穷小量 定理2. 有限多个 无穷小量的积也是无穷小量 定理3.常数与无穷小量的积也是无穷小量 定理4.有界变量与无穷小量的积是无穷小量 当 x→

    2024年02月08日
    浏览(53)
  • 第二章 翻译

    Section Ⅲ Translation Directions: In this section, there is a text in English. Translate it into Chinese. Write your translation on ANSWER SHEET 2. (15points) “Sustainability” has become a popular word these days, but to Ted Ning, the concept will always have personal meaning. Having endured a painful period of unsustainability in his own life made it

    2024年02月08日
    浏览(66)
  • 第二章 进程管理

    目录 2.1  进程的引入 2.1.1程序的顺序执行 1.程序的顺序执行 2.程序顺序执行时的特征 2.1.2  程序的并发执行及其特征 1.并发执行的概念 2.程序并发执行时的特征 2.1.3  进程的定义与特征 1.进程的定义 2.进程的特征 2.1.4  进程的基本状态及转换 1.进程的三个基本状态

    2024年02月04日
    浏览(59)
  • 操作系统——第二章

    一.单选题(共30题,60.0分) 1 ()是指从作业提交给系统到作业完成的时间间隔 (2.0分) A、 周转时间 B、 响应时间 C、 等待时间 D、 运行时间 正确答案: A 2 引入多道程序设计技术之后,处理器的利用率() (2.0分) A、 有所改善 B、 极大提高 C、 降低 D、 无变化 正确答

    2023年04月08日
    浏览(52)
  • 第二章-算法

    算法是解决特定问题求解步骤的描述,在计算机中表现为指令的有限序列,并且每条指令表示一个或多个操作。 算法有五个基本特征:输入、输出、有穷性、确定性和可行性。 输入:算法具有零个或者多个输入。 输出:算法至少有一个或多个输出。 有穷性:算法在执行了有

    2024年02月14日
    浏览(40)
  • 第二章 re模块

    在处理字符串时,经常会有查找符合某些复杂规则的字符串的需求。正则表达式就是用于描述这些规则的工具。换句话说,正则表达式就是记录文本规则的代码。 Python 提供了 re 模块用于实现正则表达式的操作。在实现时,可以使用 re 模块提供的方法,如search()、match()、fi

    2024年02月09日
    浏览(39)
  • 第二章 集合

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 例如:第一章 Python 机器学习入门之pandas的使用 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 HashSet 底层就是基于 HashMap 实现的。两者主要区别: 线程是否安全: HashMap 是非

    2024年02月02日
    浏览(60)
  • 第二章 变量和引用

    目录 2.1. 深入认识变量 2.1.1. 什么是变量 2.1.2. 变量的名称 2.1.3. 变量的类型 2.1.4. 变量的定义 2.1.5. 自定义变量 2.1.6. 环境变量 2.1.7. 位置变量 2.2. 变量赋值和作用域 2.2.1. 显示赋值:变量名=变量值 2.2.2. read 从键盘读入变量值 2.2.3. 变量和引号 2.2.4. 变量的作用域 变量是在程序

    2024年02月20日
    浏览(54)
  • 第二章 编程基础

    内容框图 单行注释: 快速注释: 多行注释: 使用+号拼接 使用拼接函数 列表 列表是一个有序的序列结构,可以存放不同数据类型的数据。 列表每一个元素有一个索引。 列表可以进行一系列操作,添加,删除,修改元素。 元组是一个有序的序列结构,基本结构和列表类似。

    2024年02月06日
    浏览(65)
  • 第二章:基本概念(下)

    人们往往将信号称为**“软件中断”**。进程收到信号,就意味着某一事件或异常情况的发生。 信号的类型很多,每一种分别标识不同的事件或情况。采用 不同的整数 来标识各种信号类型,并以SIGxxxx 形式的符号名加以定义。 内核、其他进程(只要具有相应的权限)或进程自

    2024年02月08日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包