配置Kafka消息保留时间

这篇具有很好参考价值的文章主要介绍了配置Kafka消息保留时间。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

生产者发送消息给kafka,消息被追加值日志文件并保留一定周期(基于配置)。本文探讨对Kafk主题配置消息保留时间。

基于时间保留

通过保留期属性,消息就有了TTL(time to live 生存时间)。到期后,消息被标记为删除,从而释放磁盘空间。对于kafka主题中所有消息具有相同的生存时间,但可以在创建主题之前设置属性,或对已存在的主题在运行时修改属性。

接下来我们将学习如何通过代理配置属性进行调整,以设置新主题的保留周期,并通过主题级配置在运行时控制它。

服务器级配置

Apache Kafka支持服务器级配置保留策略,我们可以通过配置以下三个基于时间的配置属性中的一个来进行优化:

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

注意:Kafka用更高精度值覆盖低精度值,所以log.retention.ms具有最高的优先级。

查看默认值

首先让我们检查保留时间的缺省值,在kafka目录下执行下面命令:

$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168

显示默认周期为7天。如果要设置消息保留周期为10分钟,可以通过config/server.properties配置文件的log.retention.minutes 属性进行配置。

log.retention.minutes=10

配置新主题

kafka提供了几个Shell脚本用于执行管理任务,利用它们创建工具脚本functions.sh。下面增加两个函数,分别为创建主题、展示配置:

function create_topic {
    topic_name="$1"
    bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
      --partitions 1 --replication-factor 1 \
      --zookeeper localhost:2181
}

function describe_topic_config {
    topic_name="$1"
    ./bin/kafka-configs.sh --describe --all \
      --bootstrap-server=0.0.0.0:9092 \
      --topic ${topic_name}
}

现在创建两个独立脚本,create-topic.sh、get-topic-retention-time.sh:

bash-5.1# cat create-topic.sh
#!/bin/bash
../functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
../functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?

简单解释下脚本的特殊符号:

$?-表示上一个命令执行状态.

$0-当前脚本的文件名称.

$#-在脚本中使用参数,如$1,$2分别表示第一个参数和第二参数.

$$-当前脚本的进程号,就是当前执行脚本的进程ID.

需要说明的是:describe_topic_config列出给定主题的所有属性配置,因此必须使用awk进行过滤,找出retention.ms property属性值。

现在可以启动kafka环境并验证retention.ms property属性配置:

bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000

通过脚本创建主题,列出描述,可以看到retention.ms 是 600000 (10分钟)。这是默认值,从之前设置server.properties文件中读出来的。

主题级配置

一旦kafka代理已经启动,log.retention.{hours|minutes|ms} 服务器级属性为只读属性。我们获得 retention.ms,但可以通过主题级参数进行调整。我们继续在functions.sh 脚本中增加方法配置主题属性:

function alter_topic_config {
    topic_name="$1"
    config_name="$2"
    config_value="$3"
    ./bin/kafka-configs.sh --alter \
      --add-config ${config_name}=${config_value} \
      --bootstrap-server=0.0.0.0:9092 \
      --topic ${topic_name}
}

然后在alter-topic-config.sh 脚本使用它:

#!/bin/sh
../functions.sh

alter_topic_retention_config $1 $2 $3
exit $?

最后设置test-topic主题保存周期为5分钟,然后查看验证:

bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.

bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000

验证

我们已经配置kafka主题的消息保留周期。现在来验证在超时后消息确实过期。

生产-消费

在 functions.sh脚本增加两个produce_message 和 consume_message 函数,其内部分别使用kafka-console-producer.sh 和 kafka-console-consumer.sh,分别用于产生/消费消息:

function produce_message {
    topic_name="$1"
    message="$2"
    echo "${message}" | ./bin/kafka-console-producer.sh \
    --bootstrap-server=0.0.0.0:9092 \
    --topic ${topic_name}
}

function consume_message {
    topic_name="$1"
    timeout="$2"
    ./bin/kafka-console-consumer.sh \
    --bootstrap-server=0.0.0.0:9092 \
    --from-beginning \
    --topic ${topic_name} \
    --max-messages 1 \
    --timeout-ms $timeout
}

我们看到消费总是从头开始读消息,因为我们需要消费者读主题中任何有效的消息。

下面创建独立的生产者函数:

bash-5.1# cat producer.sh
#!/bin/sh
../functions.sh
topic_name="$1"
message="$2"

produce_message ${topic_name} ${message}
exit $?

最后创建消费者函数:

bash-5.1# cat consumer.sh
#!/bin/sh
../functions.sh
topic_name="$1"
timeout="$2"

consume_message ${topic_name} $timeout
exit $?

消息过期

我们已经准备了工具函数,开始产生单个消息,然后消费两次:

bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages

我们看到消费者重复消费所有有效消息。现在引入延迟机制延迟5分钟,然后再次消费消息:

bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

与我们期望一致,消费者没有发现任何消息,因为消息已经超过了它的保存周期。

限制

在Kafka Broker内部,维护另一个名为log.retention.check.interval.ms的属性,用于决定检查消息是否过期的频率。因此,为了保持保留策略的有效性,必须确保log.retention.check.interval.ms的值低于retention.ms 的属性值。对于任何给定的主题都一样。

总结

本文探索了Apache Kafka消息基于时间的保留策略。通过创建简单的shell脚本来简化管理过程,接着我们创建了独立的消费者和生产者,以验证在保留期之后消息的过期场景。文章来源地址https://www.toymoban.com/news/detail-430550.html

到了这里,关于配置Kafka消息保留时间的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(48)
  • kafka服务端允许生产者发送最大消息体大小

            server.properties中加上的message.max.bytes配置,我目前设置为5242880,即5MB,可以根据实际情况增大。         在生产者端配置max.request.size,这是单个消息最大字节数,根据实际调整,max.request.size 必须小于 message.max.bytes 以及消费者的 max.partition.fetch.bytes。这样消息

    2024年02月15日
    浏览(46)
  • Kafka 入门到起飞系列 - 生产者发送消息流程解析

    生产者通过 producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 生产者通过 send() 方法发送消息,send()方法会经过如下几步 1. 首先将消息交给 拦截器(Interceptor) 处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的

    2024年02月16日
    浏览(48)
  • 07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

    简单来说,就是一个数据项。 ▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。 从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。 ▲ 下面是一个示例事件

    2024年01月20日
    浏览(46)
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用 异步 发送的方式,如下所示。 本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,

    2023年04月13日
    浏览(78)
  • Kafka中的生产者如何处理消息发送失败的情况?

    在Kafka中,生产者可以通过以下方式处理消息发送失败的情况: 同步发送模式(Sync Mode):在同步发送模式下,生产者发送消息后会阻塞等待服务器的响应。如果发送失败,生产者会抛出异常(例如 ProducerRecord 发送异常)或返回错误信息。开发者可以捕获异常并根据需要进行

    2024年02月06日
    浏览(44)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(50)
  • kafka生产者发送消息报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

    报这个错误是因为kafka里的配置要修改下 在config目录下 server.properties配置文件 这下发送消息就不会一直等待,就可以发送成功了

    2024年02月06日
    浏览(46)
  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(40)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包