kafka调优配置

这篇具有很好参考价值的文章主要介绍了kafka调优配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka生产者核心参数配置

kafka调优配置,kafka,数据库,分布式来源于尚硅谷

参数名称 描述
bootstrap.servers 生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。
key.serializer和value.serializer 指定发送消息的key和value的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator缓冲区总大小,默认32m。
batch.size 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。
max.in.flight.requests.per.connection 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是100ms。
enable.idempotence 是否开启幂等性,默认true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

生产者如何提高吞吐量

参数名称 描述
buffer.memory RecordAccumulator缓冲区总大小,默认32m。
batch.size 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间
compression.type 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

数据可靠性

acks 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据去重: 检查是否开启幂等性 默认true,表示开启幂等性

// 1初始化事务
void initTransactions();

// 2开启事务
void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务
void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

数据有序 :单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序;

Kafka Broker 核心参数配置

kafka调优配置,kafka,数据库,分布式

参数 描述
replica.lag.time.max.ms ISR中,如果Follower长时间未向Leade r发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。
auto.leader.rebalance.enable 默认是true。 自动Leader Partition 平衡。建议关闭。
leader.imbalance.per.broker.percentage 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
leader.imbalance.check.interval.seconds 默认值300秒。检查leader负载是否平衡的间隔时间。
log.segment.bytes Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。
log.index.interval.bytes 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。
log.retention.hours Kafka中数据保存的时间,默认7天。
log.retention.minutes Kafka中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是5分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。
log.cleanup.policy 默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。
num.io.threads 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。
num.replica.fetchers 默认是1。副本拉取线程数,这个参数占总核数的50%的1/3
num.network.threads 默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

自动创建主题:如果broker端配置参数auto.create.topics.enable设置为true(默认值是true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false。

Kafka消费者核心参数配置

kafka调优配置,kafka,数据库,分布式
kafka调优配置,kafka,数据库,分布式

参数 描述
bootstrap.servers 向Kafka集群建立初始连接用到的host/port列表。
key.deserializer和value.deserializer 指定接收消息的key和value的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。
auto.offset.reset 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets的分区数,默认是50个分区。不建议修改。
heartbeat.interval.ms Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于session.timeout.ms ,也不应该高于 session.timeout.ms的1/3。不建议修改。
session.timeout.ms Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认1个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次poll拉取数据返回消息的最大条数,默认是500条。

消费者再平衡

参数名称 描述
heartbeat.interval.ms Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。
session.timeout.ms Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy 消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

如何提升吞吐量
1)提升生产吞吐量
(1)buffer.memory:发送消息的缓冲区大小,默认值是32m,可以增加到64m。
(2)batch.size:默认是16k。如果batch设置太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(3)linger.ms,这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。
(4)compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的CPU开销。
2)增加分区
3)消费者提高吞吐量
(1)调整fetch.max.bytes大小,默认是50m。
(2)调整max.poll.records大小,默认是500条。
4)增加下游消费者处理能力

如何保证数据精准发送接收

1)生产者角度

  • acks设置为-1 (acks=-1)
  • 幂等性(enable.idempotence = true) + 事务 。

2)broker服务端角度

  • 分区副本大于等于2 (–replication-factor 2)。
  • ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。

3)消费者

  • 事务 + 手动提交offset (enable.auto.commit = false)。
  • 消费者输出的目的地必须支持事务(MySQL、Kafka)。

合理设置分区数
(1)创建一个只有1个分区的topic。
(2)测试这个topic的producer吞吐量和consumer吞吐量。
(3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
(4)然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)。
例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;
分区数 = 100 / 20 = 5分区
分区数一般设置为:3-10个
分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。

单条日志大于1m文章来源地址https://www.toymoban.com/news/detail-682413.html

参数名称 描述
message.max.bytes 默认1m,broker端接收每个批次消息最大值。
max.request.size 默认1m,生产者发往broker每个请求消息最大值。针对topic级别设置消息体的大小。
replica.fetch.max.bytes 默认1m,副本同步数据,每个批次消息最大值。
fetch.max.bytes 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

到了这里,关于kafka调优配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式数据库HBase

    HBase是一个高可靠、高性能、 面向列 、可伸缩的分布式数据库,是谷歌BigTable的开源实现,主要用来存储非结构化和把结构化的松散数据。 HBase的目标是处理非常庞大的表,可以通过水平扩展的方式,利用 廉价计算机集群 处理由超过10亿行数据和数百万列元素组成的数据表。

    2024年02月09日
    浏览(57)
  • 分布式数据库NoSQL(二)——MongoDB 数据库基本操作

    MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。 MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似 json 的

    2024年02月06日
    浏览(52)
  • 分布式数据库-事务一致性

    version: v-2023060601 author: 路__ 分布式数据库的“强一致性”应该包含两个方面: serializability(串行) and linearizability(线性一致) ,上述图为“Highly Available Transactions: Virtues and Limitations”论文中对于一致性模型的介绍。图中箭头表示一致性模型之间的关系。对于异步网络上的分

    2024年02月08日
    浏览(53)
  • tim实践系列——分布式数据存储与动态数据库扩容

    前言: tim是去中心化分布式即时通讯引擎。不依赖于任何中心服务器,采用去中心化分布式架构,解决传统中心化通讯方式的问题,去中心化分布式架构的通讯引擎的各个节点之间相互连接,形成一个庞大的分布式网络。可以轻松地扩展服务规模,支持更多的用户和业务需求

    2024年02月02日
    浏览(50)
  • 聊聊分布式 SQL 数据库Doris(三)

    在 Doris 的存储引擎规则: 表的数据是以分区为单位存储的,不指定分区创建时,默认就一个分区. 用户数据首先被划分成若干个分区(Partition),划分的规则通常是按照用户指定的分区列进行范围划分,比如按时间划分。 在每个分区内,数据被进一步的按照Hash的方式分桶,分

    2024年02月05日
    浏览(56)
  • 聊聊分布式 SQL 数据库Doris(一)

    MPP:Massively Parallel Processing, 即大规模并行处理. 一般用来指多个SQL数据库节点搭建的数据仓库系统. 执行查询的时候, 查询可以分散到多个SQL数据库节点上执行, 然后汇总返回给用户. Doris 作为一款开源的 MPP 架构 OLAP 高性能、实时的分析型数据库,能够运行在绝大多数主流的商

    2024年02月05日
    浏览(46)
  • 聊聊分布式 SQL 数据库Doris(六)

    此处的负载均衡指的是FE层的负载均衡. 当部署多个 FE 节点时,用户可以在多个 FE 之上部署负载均衡层来实现 Doris 的高可用。官方文档描述: 负载均衡 。 实现方式 实现方式有多种,如下列举。 开发者在应用层自己进行重试与负载均衡。 JDBC Connector 发现一个连接挂掉,就自

    2024年02月05日
    浏览(54)
  • 聊聊分布式 SQL 数据库Doris(二)

    Doris中,Leader节点与非Leader节点和Observer节点之间的元数据高可用和一致性,是通过bdbje(全称:Oracle Berkeley DB Java Edition)的一致性和高可用实现的。 元数据与同步流程 元数据主要存储四类数据: 用户数据信息. 包括数据库, 表的schema, 分片信息等 各类作业信息. 如导入作业, clo

    2024年02月05日
    浏览(68)
  • 聊聊分布式 SQL 数据库Doris(五)

    阅读 Doris SQL 原理解析,总结下Doris中SQL解析流程: 词法识别:解析原始SQL文本,拆分token 语法识别:将token转换成AST 单机逻辑查询计划:将AST经过一系列的优化(比如,谓词下推等)成查询计划,提高执行性能与效率。 分布式逻辑查询计划:根据分布式环境(数据分布信息

    2024年02月05日
    浏览(55)
  • 聊聊分布式 SQL 数据库Doris(八)

    密集索引:文件中的每个搜索码值都对应一个索引值,就是叶子节点保存了整行. 稀疏索引:文件只为索引码的某些值建立索引项. 稀疏索引的创建过程包括将集合中的元素分段,并给每个分段中的最小元素创建索引。在搜索时,先定位到第一个大于搜索值的索引的前一个索引

    2024年02月05日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包