09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置)

这篇具有很好参考价值的文章主要介绍了09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

通过修改保存时间来删除消息

★ 删除指定主题的消息

Kafka并没有提供直接删除特定主题下消息的方法,只能是强制让消息过期之后,再来删除消息。

因此需要指定如下两个配置:

  1. 控制将指定主题下消息的保存时间设为一个很短时间: retention.ms(为特定主题设置)

  2. 控制 Kafka 尽快去检查消息是否过期(默认是5分钟检查一次)。 log.retention.check.interval.ms

可通过如下两个属性来强制删除指定主题的消息

retention.ms属性:将该属性设为一个极短的时间,比如1000。

log.retention.check.interval.ms:设置Kafka轮询检查的时间间隔,比如将该该属性设为300000。

这意味着每5分钟,Kafka会对指定主题的消息执行一次检查,检查消息是否过期(超过retention.ms属性设置的时间值),如果过期则删除这些消息。

演示

官网显示:Kafka 默认是5分钟检查一次主题下是否有消息过期

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

1、修改kafka检查过期消息的时间间隔

为了方便演示,我这里改成120秒检查一次,演示完要记得注释掉这个配置,不然这么频繁检查会很卡。

配置修改后要重启下kafka。

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

2、修改主题下消息的过期时间

然后修改test2主题下的消息的保存时间,改成1秒,就是消息存在一秒后就过期。

▲ 将test2主题下的消息的保存时间设为1秒

 kafka-configs --bootstrap-server localhost:9092 ^
 --alter ^
 --entity-name test2 ^
 --entity-type topics ^
 --add-config retention.ms=1000

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

3、查看修改是否生效

修改完消息的过期时间,那么就查看下是否生效:

命令查看:
使用 kafka-configs.bat 命令的 --describe 子命令
——该命令可查看所有对象的信息

 kafka-configs --bootstrap-server localhost:9092 ^
 --describe ^
 --entity-name test2 ^
 --entity-type topics

主题下消息的保存(过期)时间已经修改成1秒了
09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

CMAK界面查看:

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

4、先查看下主题下有没有消息

查看端口为9092的kafka节点,主题是test2,查看内容是从beginning开始就存在的消息:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

此时这个test2主题下没有任何消息,为方便演示,先添加几条消息进去
09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

5、添加几条消息看效果

添加带 key 的消息,相当于生产者发送消息到指定的主题里面去:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.key=true

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

然后马上查看,发现消息发送成功,test2主题下有这些消息:为了方便,我命令再拷贝一份:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

再检查一遍消息的存活时间,是存活1秒没错

 kafka-configs --bootstrap-server localhost:9092 ^
 --describe ^
 --entity-name test2 ^
 --entity-type topics

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

6、查看消息是否被删除

kafka每个120就会去检查是否有过期的消息,有的话就把过期的消息删除掉,我又把消息的过期时间设置为1秒,所以看过期的消息在120后,是否会被kafka检查到并删除掉:

接下来就等120秒,发现过期消息已经被成功删除了:

查看 test2 主题下,从beginning 一开始到现在的所有消息:

 kafka-console-consumer --bootstrap-server localhost:9092 ^
 --topic test2 ^
 --from-beginning ^
 --property print.timestamp=true ^
 --property print.key=true ^
 --property print.offset=true ^
 --property print.partition=true

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

演示完,把这个设置给注释掉,恢复成默认的,默认是每5分钟检查一次。

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

★ 恢复主题的retention.ms配置

若要指定主题的retention.ms配置依然使用默认值,则只要使用kafka-configs.bat命令的 --delete-config 选项删除该配置即可。例如如下命令:

因为上面同通过 --add-config retention.ms=1000 把消息的过期时间设置为1秒,这个设置肯定是不合理的,所以需要把这个设置给删除掉。

1、先查看没修改前的test2主题的配置信息:

 kafka-configs --bootstrap-server localhost:9092 ^
 --describe ^
 --entity-name test2 ^
 --entity-type topics

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

2、 将test2主题下的消息的保存时间删除。

(–delete-config retention.ms 就是删除 retention.ms 这个配置属性)
–alter ^ 表示修改的意思

kafka-configs.bat --alter ^
--bootstrap-server localhost:9092 ^
--entity-type topics ^
--entity-name test2 ^
--delete-config retention.ms

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

3、再查看修改后的test2主题的配置信息:

命令行查看:

 kafka-configs --bootstrap-server localhost:9092 ^
 --describe ^
 --entity-name test2 ^
 --entity-type topics

发现 retention.ms 这个配置属性 已经被成功删除了,表示消息的过期时间恢复成默认的168小时了。
09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

这个就是默认的消息过期时间:

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式

CMAK 查看:
retention.ms 这个配置属性恢复成默认的了

09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),Kafka 系列,kafka,linq,分布式文章来源地址https://www.toymoban.com/news/detail-823914.html

到了这里,关于09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka删除topic消息的三种方式

    kafka删除topic消息的三种方式 方法一:快速配置删除法(确保topic数据不要了) 1.kafka启动之前,在server.properties配置delete.topic.enable=true 2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具删除 注意:如果kafka启动之前没有配置delete.topic.e

    2024年02月16日
    浏览(41)
  • 使用时间戳来消费消息(kafka)

    每条消息都有一个与之相关的 时间戳(timestamp ),可以使用这个时间戳来筛选或消费特定时间范围内的消息。 timestamp() 方法获取消息的时间戳,并检查它是否在指定的时间范围内。 请注意,时间戳是以毫秒为单位的UNIX时间戳。需要根据需要调整 start_timestamp 和 end_timestamp

    2024年01月24日
    浏览(45)
  • Golang 操作 Kafka 设置消息的失效时间

    在使用 Golang 操作 Kafka 时,你可以使用 Sarama 库来设置消息的失效时间。以下是一个示例代码,演示如何在生产者端设置数据失效时间: 上述示例中,我们首先创建了一个 sarama.Config 实例,并通过 config.Message.MaxAge 属性设置了消息的失效时间,此处设定为一天 (time.Hour * 24)。然

    2024年02月11日
    浏览(47)
  • 第7次修改了可删除可持久保存的前端html备忘录

    第7次修改了可删除可持久保存的前端html备忘录  

    2024年01月20日
    浏览(42)
  • 第10次修改了可删除可持久保存的前端html备忘录

    第10次修改了可删除可持久保存的前端html备忘录      

    2024年01月24日
    浏览(57)
  • 学生信息管理系统(录入、查找、删除、修改、排序、统计等功能实现)超详细完整代码,建议保存。

    许多老师都会布置Python期末大作业,作业题目很多就是学生信息管理系统,以前都是练习小题目,几十行代码就能搞定,而完整的写完这个系统我用了差不多400行代码。完整写完这个系统,是对一个学期所学知识的进一步深入了解于学习,还能扩充一些课外知识。篇幅很长,

    2024年02月12日
    浏览(47)
  • 【无标题】【第6次修改了可删除可持久保存的前端html备忘录:去掉第2页面可误删除

    第6次修改了可删除可持久保存的前端html备忘录:去掉第2页面

    2024年01月17日
    浏览(40)
  • kafka 消息日志原理 & 指定偏移量消费 & 指定时间戳消费

    Apache Kafka日志存储在物理磁盘上各种数据的集合,日志按照topic分区进行文件组织,每一个分区日志由一个或者多个文件组成。生产者发送的消息被顺序追加到日志文件的末尾。 如上图所述,Kafka主题被划分为3个分区。在Kafka中,分区是一个逻辑工作单元,其中记录被顺序附

    2024年02月15日
    浏览(46)
  • 70 # 协商缓存的配置:通过修改时间

    比较一下再去决定是用缓存还是重新获取数据,这样会减少网络请求,提高性能。 客户端第一次请求服务器的时候,服务器会把数据进行缓存,同时会生成一个缓存标识符,这个缓存标识符会被发送到客户端,客户端第二次请求服务器的时候,会把缓存标识符发送到服务器,

    2024年02月12日
    浏览(37)
  • kafka通过命令行删除指定topic下所有records

     1、kafka环境配置  由于在windows环境下,在kafka官网下载下来Apache Kafka需要将E:kafka_2.12-3.3.1binwindows下的路径加入到环境变量中,方便直接使用kafka工具,其他系统直接使用bin下的工具即可: 2、配置kafka指定topic的json文件,命名为delete.json,此文件放在任意位置都可: 可以查

    2024年02月14日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包