Kafka内容分享(七):Kafka 数据清理和配额限速

这篇具有很好参考价值的文章主要介绍了Kafka内容分享(七):Kafka 数据清理和配额限速。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、Kafka中数据清理(Log Deletion)

1.1、日志删除

1.1.1、定时日志删除任务

1.1.2、基于时间的保留策略

1.1.2.1、设置topic 5秒删除一次

1.1.3、基于日志大小的保留策略

1.1.4、基于日志起始偏移量保留策略

1.2 日志压缩(Log Compaction)

二、Kafka配额限速机制(Quotas)

2.1、限制producer端速率

2.2、限制consumer端速率

2.3、取消Kafka的Quota配置

三、Kafka实战

3.1、生产者

3.1.1、导入依赖

3.1.2、配置文件

3.1.3、发送消息

3.2、消费者

3.2.1、配置类

3.2.2、消费消息


一、Kafka中数据清理(Log Deletion)

Kafka的消息存储在磁盘中,为了控制磁盘占用空间,Kafka需要不断地对过去的一些消息进行清理工作。Kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka中,提供两种日志清理方式:

  • 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
  • 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。

在Kafka的broker或topic配置中:

配置项 配置值 说明
log.cleaner.enable true(默认) 开启自动清理日志功能
log.cleanup.policy delete(默认) 删除日志
log.cleanup.policy compaction 压缩日志
log.cleanup.policy delete,compact 同时支持删除、压缩

1.1、日志删除

日志删除是以段(segment日志)为单位来进行定期清理的。

1.1.1、定时日志删除任务

Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:

  1. 基于时间的保留策略
  2. 基于日志大小的保留策略
  3. 基于日志起始偏移量的保留策略

kafka限制消费速度,MQ(Message Queue)消息队列 内容分享,Kafka 面试题分享,kafka,分布式

1.1.2、基于时间的保留策略

以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:

log.retention.hours=168

也就是,默认日志的保留时间为168小时,相当于保留7天。

删除日志分段时:

  1. 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
  2. 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
  3. Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
1.1.2.1、设置topic 5秒删除一次

设置topic的删除策略

  • key: retention.ms
  • value: 5000
1.1.3、基于日志大小的保留策略

日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。

注意:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。

1.1.4、基于日志起始偏移量保留策略

每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

1.2 日志压缩(Log Compaction)

Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。

kafka限制消费速度,MQ(Message Queue)消息队列 内容分享,Kafka 面试题分享,kafka,分布式

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment
  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
  • 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态

二、Kafka配额限速机制(Quotas)

生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。

2.1、限制producer端速率

为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576‬/s,命令如下:

bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试,观察生产消息的速率

bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1

结果:

50000 records sent, 1108.156028 records/sec (1.06 MB/sec)

2.2、限制consumer端速率

对consumer限速与producer类似,只不过参数名不一样。

为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:

bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试:

bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic test --fetch-size 1048576 --messages 500000

结果为:

MB.sec:1.0743

2.3、取消Kafka的Quota配置

使用以下命令,删除Kafka的Quota配置文章来源地址https://www.toymoban.com/news/detail-782346.html

bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default

三、Kafka实战

3.1、生产者

3.1.1、导入依赖
<dependency>
 	<groupId>org.springframework.kafka</groupId>
 	<artifactId>spring-kafka</artifactId>
</dependency>

3.1.2、配置文件

spring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
spring.kafka.angyan.clientId=TEST_DEMO_MESSAGE
spring.kafka.angyan.producer.compressionType=gzip
spring.kafka.angyan.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.angyan.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 提交延时
spring.kafka.angyan.linger.ms=1000
spring.kafka.angyan.template.defaultTopic=TEST_DEMO_TOPIC
3.1.3、发送消息
@Service
public class KafkaServiceImpl implements KafkaService {

    @Qualifier("kafkaTemplate")
    @Autowired
    private KafkaTemplate kafkaRecordTemplate;


    @Override
    public String sendMessage(String key, byte[] bytes) {
        Map header = new HashMap();
        header.put(KafkaHeaders.KEY,key);
        MessageHeaders messageHeaders = new MessageHeaders(header);
        Message message = MessageBuilder.createMessage(bytes, messageHeaders);
        kafkaRecordTemplate.send(message);

        return null;
    }

}

3.2、消费者

3.2.1、配置类
pring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
spring.kafka.angyan.consumer.group.id=TEST_DEMO_MESSAGE
spring.kafka.angyan.consumer.clientId=TEST_DEMO_MESSAGE
spring.kafka.angyan.defaultTopic=TEST_DEMO_TOPIC
spring.kafka.angyan.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.angyan.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
3.2.2、消费消息
@Component
public class KafkaCustomer {

    @KafkaListener(topics = "${spring.kafka.angyan.defaultTopic}",containerFactory = "kafkaTemplateConsumer")
    public void testKafka(ConsumerRecord<String,byte[]> record){
        //处理业务逻辑

    }
}

到了这里,关于Kafka内容分享(七):Kafka 数据清理和配额限速的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka磁盘写满日志清理操作

    最近项目组的kafka集群,老是由于应用端写入kafka topic的消息太多,导致所在的broker节点占满,导致其他的组件接连宕机。 这里和应用端沟通可以删除1天之前的消息来清理磁盘,并且可以调整topic的消息存活时间。 如上调整topic的消息存活时长为为1天,当执行完之后执行查询

    2024年02月04日
    浏览(40)
  • Kafka3.0.0版本——文件清理策略

    1.1、文件清理策略的概述 Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。 参数 解释 log.retention.hours 小时,最低优先级(默认 7天) log.retention.minutes 分钟 log.retention.ms 毫秒,最高优先级 log.retention.check.interval.ms 负责设置检查周期(默认 5 分钟) 1

    2024年02月09日
    浏览(45)
  • k8s资源配额限制

    为什么会有资源配额管理? 资源配额管理维度解释? 资源配额参数有什么? 计算CPU CPU的Requests和Limits是通过CPU数(cpus)来度量的。 CPU的资源值是绝对值,而不是相对值,比如0.1CPU在单核或多核机器上是一样的,都严格等于0.1 CPU core。 计算Memory 内存的Requests和Limits计量单位

    2024年02月13日
    浏览(51)
  • k8s进阶3——资源配额、资源限制

    为什么会有资源配额管理? 可以提高集群稳定性,确保指定的资源对象在任何时候都不会超量占用系统物理资源,避免业务进程在设计或实现上的缺陷导致整个系统运行紊乱甚至意外宕机。 资源配额管理维度: 容器级别,定义每个Pod上资源配额相关的参数,比如CPU/Memory、

    2024年02月10日
    浏览(33)
  • kafka查看topic和消息内容命令

    ①创建一个测试用的topic ② 用Kafka的console-producer在topic test 生产消息 ③ 用Kafka的console-consumer 消费topic test的消息 ④查询topic,进入kafka目录: ⑤查询topic内容: ⑥查看topic 为 test的 详细信息 ⑦往topic 为 test的内部生产消息 ⑧从topic 为test的内部消费消息 ⑨删除kafka的测试top

    2024年02月11日
    浏览(55)
  • CKA 10_Kubernetes工作负载与调度 资源调度 资源限制 LimitRanger 资源配额 ResourceQuota

    官方文档: 概念 | 策略 | 限制范围 官方文档: 概念 | 策略 | 资源配额 默认情况下, Kubernetes 集群上的容器运行使用的计算资源没有限制。 使用资源配额,集群管理员可以以名字空间为单位,限制其资源的使用与创建。 在命名空间中,一个 Pod 或 Container 最多能够使用命名空

    2024年02月08日
    浏览(67)
  • 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka

    目录 《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目_黑马头条项目_软工菜鸡的博客-CSDN博客 04自媒体文章-自动审核 1)自媒体文章自动审核流程 2)内容安全第三方接口 2.1)概述 2.2)准备工作 2.3)文本内容审核接口 2.4)图片审核接口 2.5)项目集成 3)app端文章保存接口

    2024年02月15日
    浏览(34)
  • 10款优秀的Kafka管理工具分享

    Kafka是一种高性能、可扩展的分布式消息队列系统,被广泛应用于大规模数据流处理和实时数据传输场景。有效地管理和监控Kafka集群对于确保其可靠性和性能至关重要。在本文中,我将分享10款优秀的Kafka管理工具,它们可以帮助您轻松管理和监控您的Kafka环境。 Kafka Manager

    2024年01月25日
    浏览(38)
  • 手记系列之六 ----- 分享个人使用kafka经验

    本篇文章主要介绍的关于本人从刚工作到现在使用kafka的经验,内容非常多,包含了kafka的常用命令,在生产环境中遇到的一些场景处理,kafka的一些web工具推荐等等。由于kafka这块的记录以及经验是从我刚开始使用kafka,从2017年开始,可能里面有些内容过时,请见谅。温馨提

    2024年02月08日
    浏览(47)
  • 分享8个分布式Kafka的使用场景

    Kafka 最初是为海量日志处理而构建的。它保留消息直到过期,并让消费者按照自己的节奏提取消息。与它的前辈不同,Kafka 不仅仅是一个消息队列,它还是一个适用于各种情况的开源事件流平台。 下图显示了典型的 ELK(Elastic-Logstash-Kibana)堆栈。Kafka 有效地从每个实例收集日

    2024年02月08日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包