配置Kafka发送大消息

这篇具有很好参考价值的文章主要介绍了配置Kafka发送大消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。

问题描述

Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要调整这些参数,本文使用Kafka2.5,在进入配置部分之前,首先需要安装Kafka。

安装

这里搭建单节点kafka代理,生产者应用发送消息给指定主题,该主题为单分区主题。

我们看到整个过程涉及多个环节,kafka生产者、kafka代理、主题、kafka消费者。因此,所有这些配置需要调整,以适用大消息传输。我们的目标是调整参数能够发送20M大消息。

Kafka生产者配置

第一步是消息产生地,我们使用Spring Kafka从应用发送消息给Kafka服务器。因此,首先需要更新 max.request.size 属性。详细细节可参考官方文档,该有效值为常量,对于Kafka Client库中的ProducerConfig.MAX_REQUEST_SIZE_CONFIG 定义,需要增加Spring Kafka依赖。我们配置其值为20971520字节:

public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

    return new DefaultKafkaProducerFactory<>(configProps);
}

Kafka主题配置

生产者发送消息给Kafka代理上的主题。因此,接下来选哟配置kafka主题,这意味着需要更新max.message.bytes属性,其默认值为1M。

该参数控制kafka压缩后(如果启用了压缩)最大记录批次大小,详细内容参考官方文档。我们可以通过cli命令手动配置该属性:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520 

当然也可以通过Kafka Client进行配置:

public NewTopic topic() {
    NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
    Map<String, String> configs = new HashMap<>();
    configs.put("max.message.bytes", "20971520");
    newTopic.configs(configs);

    return newTopic;
}

要发送大消息,至少需要配置这两个参数。

Kafka 代理配置

一个可选配置属性为:message.max.bytes,用于允许所有在代理上的主题接收大于1M的消息。该属性控制kafka压缩后(如何启用了压缩)允许的最大记录批次大小,详细内容参考官方文档。

通过在server.properties配置文件中增加下列属性:

message.max.bytes=20971520

另外,该属性将使用message.max.bytesmax.message.bytes 两者中的最大值。

Kafka消费者配置

下面讨论Kafka消费端配置属性。虽然这些变化对消费大消息不是必须的,为了避免消费端程序性能问题,最好也调整相应参数:

  • max.partition.fetch.bytes: 该属性限制从主题分区能消费的字节数,详细内容可参考官方文档。在Kafka Client 库中对应为ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG常量。

  • fetch.max.bytes: 该属性限制消费者从kafka服务器获取的字节数,kafka能够监听多个分区,详细内容参考官方文档。在Kafka Client 库中对应为ConsumerConfig.FETCH_MAX_BYTES_CONFIG 常量。

因此对配置消费者,需要创建KafkaConsumerFactory。另外需要说明的是:应该配置比topic/broker配置较大的值。

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");

    return new DefaultKafkaConsumerFactory<>(props);
}

这里配置值于前面一样,是因为创建的主题仅有一个分区。但是FETCH_MAX_BYTES_CONFIG 值应该高于MAX_PARTITION_FETCH_BYTES_CONFIG。当消费者监听多个分区时,FETCH_MAX_BYTES_CONFIG 表示从多个分区获取消息的大小,而MAX_PARTITION_FETCH_BYTES_CONFIG表示从单个分区获取消息的大小。

其他选项

前面提及配置Kafka生产者、主题、代理和Kafka消费者中的不同参数,以实现发送、接收大消息。单通常应该避免使用Kafka发送大消息,处理大消息会消耗生产者和消费者更多的CPU和内存,最终在一定程度上限制了它们处理其他任务的能,还有可能导致终端用户高延迟问题。

还有其他一些调优方式:

  • Kafka生产者提供了压缩消息选项,它支持不同的压缩方法。我们可以compression.type property属性配置。
  • 可以将大消息存储在共享存储位置,并通过Kafka消息发送该位置。这可能是更快的方式,具有最小的处理开销。
  • 另一种选择是在生产者端将大消息拆分为每个大小为1KB的小消息。之后,我们可以使用分区键将所有这些消息发送到单个分区,以确保正确的顺序。然后在消费者端可以从较小消息重构为完整大消息。

总结

在本文中,我们介绍了配置调优Kafka选项以发送大于1MB的大消息。包括生产者端、主题、代理服务和消费者端的配置选项。其中一些选项是强制配置,一些是可选配置,虽然消费者配置是可选的,但可以避免负面的性能影响。最后,我们还介绍了发送大消息的其他可能选项。

内容参考:[Send Large Messages With Kafka](Send Large Messages With Kafka)文章来源地址https://www.toymoban.com/news/detail-576576.html

到了这里,关于配置Kafka发送大消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据篇Kafka消息队列指定Topic打印Key、Value、Offset和Partition

    说到Apache Kafka消息传递系统时,以下是一些关键概念的解释: Key(键):Kafka消息由Key和Value组成。Key是一个可选的字段,它通常用于消息的路由和分区策略。Key的目的是确保具有相同Key的消息被写入同一个分区。当消费者接收到消息时,可以使用Key来进行消息处理和路由操

    2024年02月16日
    浏览(52)
  • Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

    kafka尚硅谷视频: 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​      1. producer初始化:加载默认配置,以及配置的参数,开启网络线程      2. 拦截器拦截      3. 序列化器进行消息key, value序列化      4. 进行分区      5. kafka broker集群 获取metaData      6. 消息缓存到

    2024年02月11日
    浏览(47)
  • C# Kafka重置到最新的偏移量,即从指定的Partition订阅消息使用Assign方法

    在使用Kafka的过程中,消费者断掉之后,再次开始消费时,消费者会从断掉时的位置重新开始消费。 场景再现:比如昨天消费者晚上断掉了,今天上午我们会发现kafka消费的数据不是最新的,而是昨天晚上的数据,由于数据量比较多,也不会及时的消费到今天上午的数据,这

    2023年04月11日
    浏览(39)
  • 全网最详细地理解Kafka中的Topic和Partition以及关于kafka的消息分发、服务端如何消费指定分区、kafka的分区分配策略(range策略和RoundRobin策略)

    最近在学习kafka相关的知识,特将学习成功记录成文章,以供大家共同学习。 首先要注意的是, Kafka 中的 Topic 和 ActiveMQ 中的 Topic 是不一样的。 在 Kafka 中, Topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 Kafka 集群的消息都有一个类别。 物理上

    2024年01月25日
    浏览(41)
  • kafka入门(一):kafka消息发送与消费

    kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。 Consumer Group (消费者组) 每个消费

    2024年04月12日
    浏览(40)
  • kafka入门(五):kafka生产者发送消息

    构建消息,即创建 ProduceRecord 对象。 (1) kafka发送消息,最常见的构造方法是: topic 表示主题, value 表示值。 (2) kafka发送消息指定key,ProducerRecord 的 key ,既可以作为消息的唯一id,也可以用来决定消息该被写到主题的哪个分区。拥有相同key 的消息,将被写到同一个分区。

    2024年01月17日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(40)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(80)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(50)
  • Kafka可视化利器 KafkaTool 发送kafka消息

    kafak是我们工作中常用的消息组件,今天在跟合作方联调的时候被告知上游没有准备好消息数据,没有办法从依赖方拿到消息但是我们还又想测试自己写的消息消费的代码,怎么办呢?常规的做法就是把消息消费的代码抽离出来包装成一个公共方法,提供一个Controller直接构造

    2024年01月19日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包