1. Kafka 发送消息的主要步骤
- 创建一个 ProducerRecord 对象,需要包含目标主题和要发送的内容,还可以指定键、分区、时间戳或标头。
- 在发送 ProducerRecord 对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。
- 如果没有显式地指定分区,那么数据将被传给分区器。分区器通常会基于 ProducerRecord 对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条消息了。
- 紧接着,该消息会被添加到一个消息批次里,这个批次里的所有消息都将被发送给同一个主题和分区。有一个独立的线程负责把这些消息批次发送给目标 broker。
- broker 在收到这些消息时会返回一个响应。如果消息写入成功,就返回一个 RecordMetaData 对象,其中包含了主题和分区信息,以及消息在分区中的偏移量。如果消息写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,重试几次之后如果还是失败,则会放弃重试,并返回错误信息。
2.创建 Kafka 生产者
- 要向 Kafka 写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka 生产者有 3
个必须设置的属性。- bootstrap.servers:broker 的地址。由多个 host:port 组成,地址之间以逗号分隔。生产者用它们来建立初始的 Kafka 集群连接。它不需要包含所有的 broker 地址,因为生产者在建立初始连接之后可以从给定的 broker 那里找到其他 broker 的信息。不过还是建议至少提供两个 broker 地址,因为一旦其中一个停机,则生产者仍然可以连接到集群。
- key.serializer:一个类名,用来序列化消息的键。broker 希望接收到的消息的键和值都是字节数组。生产者可以把任意 Java 对象作为键和值发送给 broker,但它需要知道如何把这些 Java 对象转换成字节数组。key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会用这个类把键序列化成字节数组。Kafka客户端默认提供了 ByteArraySerializer、StringSerializer 和 IntegerSerializer 等,如果你只使用常见的几种 Java 对象类型,就没有必要实现自己的序列化器。需要注意的是,必须设置 key.serializer 这个属性,尽管你可能只需要将值发送给 Kafka。如果只需要发送值,则可以将 Void 作为键的类型,然后将这个属性设置为 VoidSerializer。
- value.serializer:一个类名,用来序列化消息的值。与设置 key.serializer 属性一样,需要设置成可以序列化消息值对象的类。
/* 创建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 创建一个生产者对象,设置键和值得类型,传入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
3.发送消息到 Kafka
- 发送消息主要有以下 3 种方式:发送并忘记、同步发送、异步发送。
(1)发送并忘记
- 把消息发送给服务器,但并不关心它是否成功送达。大多数情况下,消息可以成功送达,因为 Kafka 是高可用的,而且生产者有自动尝试重发的机制。但是,如果发生了不可重试的错误或超时,那么消息将会丢失,应用程序将不会收到任何信息或异常。
- 在发送消息之前,生产者仍有可能抛出其他的异常。这些异常可能是 SerializationException(序列化消息失败)、BufferExhaustedException 或 TimeoutException(缓冲区已满),或者InterruptException(发送线程被中断)
/* 创建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 创建一个生产者对象,设置键和值得类型,传入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
/* 指定目标主题、消息的键和值 */
ProducerRecord<String, String> record = new ProducerRecord<>(
"CustomerCountry", "Precision Products", "France"
);
/*
* 发送 ProducerRecord 对象
* 消息会先被放进缓冲区,然后通过单独的线程发送给服务器端。send() 方法会返回一个包含 RecordMetadata 的 Future 对象。因为我们选择忽略返回值,所以不知道消息是否发送成功。
* 在发送消息之前,生产者仍有可能抛出其他的异常。这些异常可能是 SerializationException(序列化消息失败)、BufferExhaustedException 或 TimeoutException(缓冲区已满),或者InterruptException(发送线程被中断)。
*/
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
(2)同步发送
- 一般来说,生产者是异步的——我们调用 send() 方法发送消息,它会返回一个 Future对象。可以调用 get() 方法等待 Future 完成,这样就可以在发送下一条消息之前知道当前消息是否发送成功。如果采用同步发送方式,那么发送线程在这段时间内就只能等待,什么也不做,甚至都不发送其他消息,这将导致糟糕的性能。
/* 创建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 创建一个生产者对象,设置键和值得类型,传入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
/* 指定目标主题、消息的键和值 */
ProducerRecord<String, String> record = new ProducerRecord<>(
"CustomerCountry", "Precision Products", "France"
);
/*
* 如果消息没有发送成功,那么这个方法将抛出一个异常。如果没有发生错误,那么我们将得到一个 RecordMetadata 对象,并能从中获取消息的偏移量和其他元数据。
*/
try {
RecordMetadata recordMetadata = producer.send(record).get();
log.info("================topic: " + recordMetadata.topic() + ", offset: " + recordMetadata.offset() + ", partition: " + recordMetadata.partition());
} catch (Exception e) {
e.printStackTrace();
}
(3)异步发送
- 大多数时候,并不需要等待响应——尽管 Kafka 会把消息的目标主题、分区信息和偏移量返回给客户端,但对客户端应用程序来说可能不是必需的。不过,当消息发送失败,需要抛出异常、记录错误日志或者把消息写入“错误消息”文件以便日后分析诊断时,就需要用到这些信息了。为了能够在异步发送消息时处理异常情况,生产者提供了回调机制。
- 为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion 方法。
- 如果 Kafka 返回错误,那么 onCompletion 方法会收到一个非空(nonnull)异常。这里只是简单地把它打印了出来,但在生产环境中应该使用更好的处理方式。
/* 创建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
/* 创建一个生产者对象,设置键和值得类型,传入配置信息 */
producer = new KafkaProducer<String, String>(kafkaProps);
/* 指定目标主题、消息的键和值 */
ProducerRecord<String, String> record = new ProducerRecord<>(
"CustomerCountry", "Precision Products", "France"
);
try {
producer.send(record, new Callback(){
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
log.info("================topic: " + recordMetadata.topic() +
", offset: " + recordMetadata.offset() +
", partition: " + recordMetadata.partition()
);
}
});
} catch (Exception e) {
e.printStackTrace();
}
4.生产者配置
/* 创建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "203.2.195.142:9092,203.2.195.143:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("client.id","granola123");
kafkaProps.put("ack","all");
kafkaProps.put("max.block.ms","60000"); // 默认 60000,即 60s
kafkaProps.put("batch.size","16384"); // 默认 16384,即 16KB
kafkaProps.put("linge.ms","100"); // 改,默认 0ms
kafkaProps.put("request.timeout.ms","30000"); // 默认 30000,即 30s
kafkaProps.put("retries","3"); // 改,默认 0次
kafkaProps.put("retry.backoff.ms","100"); // 默认 100ms
kafkaProps.put("max.request.size","10485760"); // 默认 1048576,即 1M
kafkaProps.put("compression.type","snappy "); // 默认不压缩
kafkaProps.put("buffer.memory","33554432"); // 默认 33554432,即 32M
kafkaProps.put("receive.buffer.bytes","32768"); // 默认 32768,即 32KB
kafkaProps.put("send.buffer.bytes","131072"); // 默认 33554432,即 128KB
kafkaProps.put("max.in.flight.requests.per.connection","2"); // 默认 5
kafkaProps.put("enable.idempotence",true); // 默认 false
(1)client.id
- 客户端标识符。值可以是任意字符串,broker 用它来识别从客户端发送过来的消息。被用在日志、指标和配额中。
(2)ack
- 指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。默认情况下,Kafka 会在首领副本收到消息后向客户端回应消息写入成功。acks 设置的值越小,生产者发送消息的速度就越快。也就是说,我们通过牺牲可靠性来换取较低的生产者延迟。不过,端到端延迟是指从消息生成到可供消费者读取的时间,这对 3 种配置来说都是一样的。这是因为为了保持一致性,在消息被写入所有同步副本之前,Kafka 不允许消费者读取它们。因此,如果你关心的是端到端延迟,而不是生产者延迟,那么就不需要在可靠性和低延迟之间做权衡了:你可以选择最可靠的配置,但仍然可以获得相同的端到端延迟。建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。
- acks=0: 生产者不会等待任何来自 broker 的响应。如果 broker 因为某些问题没有收到消息,那么生产者便无从得知,消息也就丢失了。生产者不需要等待 broker 返回响应,所以它们能够以网络可支持的最大速度发送消息,从而达到很高的吞吐量。
- acks=1: 只要集群的首领副本收到消息,生产者就会收到消息成功写入的响应。如果消息无法到达首领副本(比如首领副本发生崩溃,新首领还未选举出来),那么生产者会收到一个错误响应。为了避免数据丢失,生产者会尝试重发消息。不过,在首领副本发生崩溃的情况下,如果消息还没有被复制到新的首领副本,则消息还是有可能丢失。
- acks=all:只有当所有副本全部收到消息时,生产者才会收到消息成功写入的响应。它的延迟比 acks=1 高,因为生产者需要等待不止一个 broker 确认收到消息。
(3)调用 send 方法后消息的传递时间(Kafka 返回成功响应或放弃重试并承认发送失败的时间)
- 从 Kafka 2.1 开始,我们将 ProduceRecord 的发送时间分成如下两个时间间隔,它们是被分开处理的。
- 异步调用 send() 所花费的时间。在此期间,调用 send() 的线程将被阻塞。在发送消息之前,生产者仍有可能抛出其他的异常。这些异常可能是 SerializationException(序列化消息失败)、BufferExhaustedException 或 TimeoutException(缓冲区已满),或者InterruptException(发送线程被中断)。
- 从异步调用 send() 返回到触发回调(不管是成功还是失败)的时间,也就是从 ProduceRecord 被放到批次中直到Kafka成功响应、出现不可恢复异常或发送超时的时间。
文章来源:https://www.toymoban.com/news/detail-485806.html
- max.block.ms:用于控制在调用 send() 或通过 partitionsFor() 显式地请求元数据时生产者可以发生阻塞的时间。
- 当生产者的发送缓冲区被填满或元数据不可用时,这些方法就可能发生阻塞。当达到 max.block.ms 配置的时间时,就会抛出一个超时异常。
- 默认值 60s。
- batch.size:当有多条消息被发送给同一个分区时,生产者会把它们放在同一个批次里。这个参数指定了一个批次可以使用的内存大小。需要注意的是,该参数是按照字节数而不是消息条数来计算的。
- 当批次被填满时,批次里所有的消息都将被发送出去。但是生产者并不一定都会等到批次被填满时才将其发送出去。那些未填满的批次,甚至只包含一条消息的批次也有可能被发送出去。
- 就算把批次大小设置得很大,也不会导致延迟,只是会占用更多的内存而已。但如果把批次大小设置得太小,则会增加一些额外的开销,因为生产者需要更频繁地发送消息。
- 默认值为 16384,即 16KB。合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到 32KB,调大也意味着消息会有相对较大的延时。
- linger.ms:指定了生产者在发送消息批次之前等待更多消息加入批次的时间。生产者会在批次被填满或等待时间达到 linger.ms 时把消息批次发送出去。
- 在默认情况下,只要有可用的发送者线程,生产者都会直接把批次发送出去,就算批次中只有一条消息。
- 默认值为 0,表示消息需要被立即发送,把 linger.ms 设置成比 0 大的数,比如设置为 100ms,可以让生产者在将批次发送给服务器之前等待一会儿,以使更多的消息加入批次中。虽然这样会增加一点儿延迟,但也极大地提升了吞吐量。这是因为一次性发送的消息越多,每条消息的开销就越小,如果启用了压缩,则计算量也更少了。
- request.timeout.ms:用于控制生产者在发送消息时等待服务器响应的时间。
- 需要注意的是,这是指生产者在放弃之前等待每个请求的时间。如果设置的值已触及,但服务器没有响应,那么生产者将重试发送,或者执行回调,并传给它一个 TimeoutException。
- 默认值为 30000,即 30s,如果生产端负载很大,可以适当调大以避免超时,比如可以调到 60000。
- retries 和 retry.backoff.ms:当生产者收到来自服务器的错误消息时,这个错误有可能是暂时的(例如,一个分区没有首领)。在这种情况下,retries 参数可用于控制生产者在放弃发送并向客户端宣告失败之前可以重试多少次,默认值为0,表示不进行重试,这个参数一般是为了解决因瞬时故障导致的消息发送失败,比如网络抖动、leader换主,其中瞬时的leader重选举是比较常见的,建议设置为一个大于 0 的值,比如 3 或者更大值。
- 在默认情况下,重试时间间隔是 100 毫秒,但可以通过 retry.backoff.ms 参数来控制重试时间间隔,通常可以不调整。
- 可以测试一下 broker 在发生崩溃之后需要多长时间恢复(也就是直到所有分区都有了首领副本),让重试时间大于 Kafka 集群从崩溃中恢复的时间,以免生产者过早放弃重试。
- 生产者并不会重试所有的错误。有些错误不是暂时的,生产者就不会进行重试(例如,“消息太大”错误)。对于可重试的错误,生产者会自动进行重试,所以不需要在应用程序中处理重试逻辑。你要做的是集中精力处理不可重试的错误或者当重试次数达到上限时的情况。
- 如果想完全禁用重试,那么唯一可行的方法是将 retries 设置为 0。
- delivery.timeout.ms:用于控制从消息准备好发送(send() 方法成功返回并将消息放入批次中)到
broker 响应或客户端放弃发送(包括重试)所花费的时间。- 这个时间应该大于 linger.ms 和request.timeout.ms。如果配置的时间不满足这一点,则会抛出异常。
- 如果生产者在重试时超出了 delivery.timeout.ms,那么将执行回调,并会将 broker 之前返回的错误传给它。如果消息批次还没有发送完毕就超出了 delivery.timeout.ms,那么也将执行回调,并会将超时异常传给它。
- 可以将这个参数配置成你愿意等待的最长时间,通常是几分钟,并使用默认的重试次数(几乎无限制)。基于这样的配置,只要生产者还有时间(或者在发送成功之前),它都会持续重试。这是一种合理的重试方式。
(4)max.request.size
- 限制了可发送的单条最大消息的大小和单个请求的消息总量的大小。假设这个参数的值为 1 MB,那么可发送的单条最大消息就是 1 MB,或者生产者最多可以在单个请求里发送一条包含 1024 个大小为 1 KB 的消息。另外,broker 对可接收的最大消息也有限制(message.max.bytes),其两边的配置最好是匹配的,以免生产者发送的消息被 broker 拒绝。
- 默认值为1048576,即1M。
(5)compression.type
- 默认情况下,生产者发送的消息是未经压缩的。这个参数可以被设置为 snappy、gzip、lz4 或 zstd,这指定了消息被发送给 broker 之前使用哪一种压缩算法。
- snappy 压缩算法由谷歌发明,虽然占用较少的 CPU 时间,但能提供较好的性能和相当可观的压缩比。如果同时有性能和网络带宽方面的考虑,那么可以使用这种算法。
- gzip 压缩算法通常会占用较多的 CPU 时间,但提供了更高的压缩比。如果网络带宽比较有限,则可以使用这种算法。使用压缩可以降低网络传输和存储开销,而这些往往是向 Kafka 发送消息的瓶颈所在。
(6)buffer.memory
- 设置生产者要发送给服务器的消息的内存缓冲区大小。如果应用程序调用send() 方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的 send() 方法调用会等待内存空间被释放,如果在 max.block.ms 之后还没有可用空间,就抛出异常。
- 需要注意的是,这个异常与其他异常不一样,它是 send() 方法而不是 Future 对象抛出来的。
- 默认值为33554432,即 32M。
(7)receive.buffer.bytes 和 send.buffer.bytes
- TCP socket 接收和发送数据包的缓冲区大小。被设为 –1,就使用操作系统默认值。
- receive.buffer.bytes 默认是 32768,32KB,send.buffer.bytes 默认是 131072,128KB。
(8)max.in.flight.requests.per.connection
- 指定了生产者在收到服务器响应之前可以发送多少个消息批次。它的值越大,占用的内存就越多,不过吞吐量也会得到提升。在单数据中心环境中,该参数被设置为 2 时可以获得最佳的吞吐量,但使用默认值 5 也可以获得差不多的性能。
☆☆☆ Kafka 中顺序保证
- Kafka 可以保证同一个分区中的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息,那么 broker 会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,向一个账户中存入 100 元再取出来与先从账户中取钱再存回去是截然不同的!不过,有些场景对顺序不是很敏感。
- 假设我们把 retries 设置为非零的整数,并把 max.in.flight.requests.per.connection 设置为比 1 大的数。如果第一个批次写入失败,第二个批次写入成功,那么 broker 会重试写入第一个批次,等到第一个批次也写入成功,两个批次的顺序就反过来了。
- 我们希望至少有 2 个正在处理中的请求(出于性能方面的考虑),并且可以进行多次重试(出于可靠性方面的考虑),这个时候,最好的解决方案是将enable.idempotence 设置为 true。这样就可以在最多有 5 个正在处理中的请求的情况下保证顺序,并且可以保证重试不会引入重复消息。第 8 章将深入探讨幂等生产者。
(9)enable.idempotence
- 从 0.11 版本开始,Kafka 支持精确一次性(exactly once)语义。幂等生产者是它的一个简单且重要的组成部分。
- 假设为了最大限度地提升可靠性,你将生产者的 acks 设置为 all,并将 delivery.timeout.ms 设置为一个比较大的数,允许进行尽可能多的重试。这些配置可以确保每条消息被写入 Kafka 至少一次。但在某些情况下,消息有可能被写入 Kafka 不止一次。假设一个 broker 收到了生产者发送的消息,然后消息被写入本地磁盘并成功复制给了其他 broker。此时,这个 broker 还没有向生产者发送响应就发生了崩溃。而生产者将一直等待,直到达到request.timeout.ms,然后进行重试。重试发送的消息将被发送给新的首领,而这个首领已经有这条消息的副本,因为之前写入的消息已经被成功复制给它了。现在,你就有了一条重复的消息。
- 为了避免这种情况,可以将 enable.idempotence 设置为 true。当幂等生产者被启用时,生产者将给发送的每一条消息都加上一个序列号。如果 broker 收到具有相同序列号的消息,那么它就会拒绝第二个副本,而生产者则会收到 DuplicateSequenceException,这个异常对生产者来说是无害的。
- 如果要启用幂等性,那么 max.in.flight.requests.per.connection 应小于或等于 5、retries 应大于 0,并且 acks 被设置为 all。如果设置了不恰当的值,则会抛出 ConfigException 异常。
- 默认为 false。
5.分区
(1)键
- ProducerRecord 对象包含了主题名称、记录的键和值。Kafka 消息就是一个个的键 – 值对,ProducerRecord 对象可以只包含主题名称和值,键默认情况下是 null。如果要创建键为 null 的消息,那么不指定键就可以了。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "USA");
- 大多数应用程序还是会用键来发送消息。键有两种用途:一是作为消息的附加信息与消息保存在一起,二是用来确定消息应该被写入主题的哪个分区。具有相同键的消息将被写入同一个分区。如果一个进程只从主题的某些分区读取数据,那么具有相同键的所有记录都会被这个进程读取。要创建一个包含键和值的记录,只需像下面这样创建一个 ProducerRecord 即可。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
(2)默认的分区器
键为 null
- 如果键为 null,并且使用了默认的分区器,那么记录将被随机发送给主题的分区。分区器使用轮询调度(round-robin)算法将消息均衡地分布到各个分区中。
- 从 Kafka 2.4 开始,在处理键为 null 的记录时,默认分区器使用的轮询调度算法具备了黏性。
- kafka producer 发送数据并不是一个一个消息发送,而是取决于两个 producer 端参数。一个是
linger.ms
,默认是 0ms,当达到这个时间后,kafka producer 就会立刻向 broker 发送数据。另一个参数是batch.size
,默认是 16kb,当产生的消息数达到这个大小后,就会立即向 broker 发送数据。 - 按照这个设计,从直观上思考,肯定是希望每次都尽可能填满一个 batch 再发送到一个分区。
- 但实际决定 batch 如何形成的一个因素是分区策略(partitionerstrategy)。
- 在 Kafka 2.4 版本之前,在 producer 发送数据默认的分区策略是轮询策略(没指定 key 的情况。如果多条消息不是被发送到相同的分区,它们就不能被放入到一个batch中。
- 所以如果使用默认的轮询 partition 策略,可能会造成一个大的 batch 被轮询成多个小的 batch 的情况。
- 鉴于此,社区于 2.4 版本引入了黏性分区策略(Sticky Partitioning Strategy),该策略是一种全新的策略,能够显著地降低给消息指定分区过程中的延时,并减少 broker 的负载。它会随机地选择一个分区并会尽可能地坚持使用该分区——即所谓的粘住这个分区。
- kafka producer 发送数据并不是一个一个消息发送,而是取决于两个 producer 端参数。一个是
键不为 null
- 如果键不为空且使用了默认的分区器,那么 Kafka 会对键进行哈希(使用 Kafka 自己的哈希算法,即使升级 Java 版本,哈希值也不会发生变化),然后根据哈希值把消息映射到特定的分区。这里的关键在于同一个键总是被映射到同一个分区,所以在进行映射时,会用到主题所有的分区,而不只是可用的分区。这也意味着,如果在写入数据时目标分区不可用,那么就会出错。
(3)其它分区器
- 除了默认的分区器,Kafka 客户端还提供了 RoundRobinPartitioner 和 UniformStickyPartitioner。在消息包含键的情况下,可以用它们来实现随机分区分配和黏性随机分区分配。对某些应用程序(例如,ETL 应用程序会将数据从 Kafka 加载到关系数据库中,并使用 Kafka 记录的键作为数据库的主键)来说,键很重要,但如果负载出现了倾斜,那么其中某些键就会对应较大的负载。这个时候,可以用 UniformStickyPartitioner 将负载均衡地分布到所有分区。
- 默认的分区器,如果有key的话,那么它是按照 key 来决定分区的,这个时候并不会使用粘性分区。UniformStickyPartitioner 不管你有没有 key,统一都用粘性分区来分配。
Properties kafkaProps = new Properties();
kafkaProps .put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");
(4)自定义分区策略
- 假设你是 B2B 供应商,你有一个大客户,它是手持设备 Banana 的制造商。你的日常交易中有 10% 以上的交易与这个客户有关。如果使用默认的哈希分区算法,那么与 Banana 相关的记录就会和其他客户的记录一起被分配给相同的分区,导致这个分区比其他分区大很多。服务器可能会出现存储空间不足、请求处理缓慢等问题。因此,需要给 Banana 分配单独的分区,然后使用哈希分区算法将其他记录分配给其他分区。
package com.chb.partitioner;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages "+ "to have customer name as key");
if (((String) key).equals("Banana"))
return numPartitions - 1; // Banana的记录总是被分配到最后一个分区
// 其他记录被哈希到其他分区
return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
}
public void close() {}
}
- 在 Kafka 配置参数时设置分区器的类。
/* 创建配置文件 */
Properties kafkaProps = new Properties();
kafkaProps.put("partitioner.class", "com.chb.partitioner.BananaPartitioner");
6.标头
- 除了键和值,记录还可以包含标头。可以在不改变记录键 – 值对的情况下向标头中添加一些有关记录的元数据。
- 标头指明了记录数据的来源,可以在不解析消息体的情况下根据标头信息来路由或跟踪消息(消息有可能被加密,而路由器没有访问加密数据的权限)。
- 标头由一系列有序的键值对组成。键是字符串,值可以是任意被序列化的对象,就像消息里的值一样。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
record.headers().add("privacy-level","YOLO".getBytes(StandardCharsets.UTF_8));
7.拦截器
- 有时候,你希望在不修改代码的情况下改变 Kafka 客户端的行为。这或许是因为你想给公司所有的应用程序都加上同样的行为,或许是因为无法访问应用程序的原始代码。常见的生产者拦截器应用场景包括:捕获监控和跟踪信息、为消息添加标头,以及敏感信息脱敏。
- 下面是一个非常简单的生产者拦截器示例,它只是简单地统计在特定时间窗口内发送和接
收的消息数量。- 可以覆盖 configure 方法,并在调用其他方法之前设置好需要的东西。这个方法的参数包含了生产者所有的配置属性,你可以随意访问它们。示例中,添加了一个自己的配置属性。
- close 方法会在生产者被关闭时调用,我们可以借助这个机会清理拦截器的状态。示例中,关闭了之前创建的线程。如果你打开了文件句柄、与远程数据库建立了接,或者做了其他类似的操作,那么可以在这里关闭所有的资源,以免发生资源泄漏。
public class CountingProducerInterceptor implements ProducerInterceptor {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
static AtomicLong numSent = new AtomicLong(0);
static AtomicLong numAcked = new AtomicLong(0);
public void configure(Map<String, ?> map) {
Long windowSize = Long.valueOf((String) map.get("counting.interceptor.window.size.ms"));
executorService.scheduleAtFixedRate(CountingProducerInterceptor::run,windowSize, windowSize, TimeUnit.MILLISECONDS);
}
public ProducerRecord onSend(ProducerRecord producerRecord) {
numSent.incrementAndGet();
return producerRecord;
}
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
numAcked.incrementAndGet();
}
public void close() {
executorService.shutdownNow();
}
public static void run() {
System.out.println(numSent.getAndSet(0));
System.out.println(numAcked.getAndSet(0));
}
}
- Kafka 的 ProducerInterceptor 拦截器包含两个关键方法。
- ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) 这个方法会在记录被发送给 Kafka 之前,甚至是在记录被序列化之前调用。如果覆盖了这个方法,那么你就可以捕获到有关记录的信息,甚至可以修改它。只需确保这个方法返回一个有效的 ProducerRecord 对象。这个方法返回的记录将被序列化并发送给 Kafka。
- void onAcknowledgement(RecordMetadata metadata, Exception exception) 这个方法会在收到 Kafka 的确认响应时调用。如果覆盖了这个方法,则不可以修改 Kafka 返回的响应,但可以捕获到有关响应的信息。
8.配额和节流
- Kafka 可以限制生产消息和消费消息的速率,这是通过配额机制来实现的。Kafka 提供了 3 种配额类型:生产、消费和请求。生产配额和消费配额限制了客户端发送和接收数据的速率(以字节 / 秒为单位)。请求配额限制了 broker 用于处理客户端请求的时间百分比。
- 可以为所有客户端(使用默认配额)、特定客户端、特定用户,或特定客户端及特定用户设置配额。特定用户的配额只在集群配置了安全特性并对客户端进行了身份验证后才有效。
- 默认的生产配额和消费配额是 broker 配置文件的一部分。如果要限制每个生产者平均发送的消息不超过 2 MBps,那么可以在 broker 配置文件中加入 quota.producer.default=2M。
- 也可以覆盖 broker 配置文件中的默认配额来为某些客户端配置特定的配额,尽管不建议这么做。如果允许 clientA 的配额达到 4 MBps、clientB 的配额达到 10 MBps,则可以这样配置:quota.producer.override=“clientA:4M,clientB:10M”。
- 在配置文件中指定的配额都是静态的,如果要修改它们,则需要重启所有的 broker。因为随时都可能有新客户端加入,所以这种配置方式不是很方便。因此,特定客户端的配额通常采用动态配置。可以用 kafka-config.sh 或 AdminClient API 来动态设置配额。
- 下面来看一些例子。
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024' --entity-name clientC --entity-type clients ➊
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_
byte_rate=1024,consumer_byte_rate=2048' --entity-name user1 --entity-type users ➋
bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'consumer_
byte_rate=2048' --entity-type users ➌
➊ 限制 clientC(通过客户端 ID 来识别)每秒平均发送不超过 1024 字节。
➋ 限制 user1(通过已认证的账号来识别)每秒平均发送不超过 1024 字节以及每秒平均消
费不超过 2048 字节。
➌ 限制所有用户每秒平均消费不超过 2048 字节,有覆盖配置的特定用户除外。这也是动态修改默认配置的一种方式。文章来源地址https://www.toymoban.com/news/detail-485806.html
- 当客户端触及配额时,broker 会开始限制客户端请求,以防止超出配额。这意味着 broker将延迟对客户端请求做出响应。对大多数客户端来说,这样会自动降低请求速率(因为执行中的请求数量也是有限制的),并将客户端流量降到配额允许的范围内。但是,被节流的客户端还是有可能向服务器端发送额外的请求,为了不受影响,broker 将在一段时间内暂停与客户端之间的通信通道,以满足配额要求。
- 节流行为通过 produce-throttle-time-avg、produce-throttle-time-max、fetch-throttle-time-avg 和fetch-throttle-time-max 暴露给客户端,这几个参数是生产请求和消费请求因节流而被延迟的平均时间和最长时间。需要注意的是,这些时间对应的是生产消息和消费消息的吞吐量配额、请求时间配额,或两者兼而有之。其他类型的客户端请求只会因触及请求时间配额而被节流,这些节流行为也会通过其他类似的指标暴露出来。
- 如果你异步调用 Producer.send(),并且发送速率超过了 broker 能够接受的速率(无论是由于配额的限制还是由于处理能力不足),那么消息将会被放入客户端的内存队列。如果发送速率一直快于接收速率,那么客户端最终将耗尽内存缓冲区,并阻塞后续的 Producer.send() 调用。如果超时延迟不足以让 broker 赶上生产者,使其清理掉一些缓冲区空间,那么 Producer.send() 最终将抛出 TimeoutException 异常。或者,批次里的记录因为等待时间超过了 delivery.timeout.ms 而过期,导致执行 send() 的回调,并抛出TimeoutException 异常。因此,要做好计划和监控,确保 broker 的处理能力总是与生产者发送数据的速率相匹配。
到了这里,关于(三)Kafka 生产者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!