网上有很多文章讲述 Kafka rebalance 的原理,本文是列举常见的几种 rebalance 场景。文章来源:https://www.toymoban.com/news/detail-430348.html
rebalance 期间,当前 consumer group 的所有 consumer 都要暂停消费,开销较大。因此应该尽量减少 rebalance ,而 relalance 的原因通常是 consumer 数量变化,常见的几种情况如下:文章来源地址https://www.toymoban.com/news/detail-430348.html
- 如果一个 consumer 刚启动,则会向 broker 发送 JoinGroup 请求,加入 group ,被分配一个 member id ,触发一次 rebalance 。
- 如果一个 consumer 终止,不再运行。则等到 Heartbeat 超时,broker 会认为该 consumer 下线,触发一次 rebalance 。
- 上述 consumer 启动、终止的情况一般不频繁,可以容忍它触发 rebalance 。但有的情况下,consumer 会频繁启动、终止,比如被 k8s HPA 改变 consumer 数量。
- 解决方案:额外开发一个应用,称为 dispatcher ,让它作为唯一的 consumer 连接到 broker ,获取消息。而原本的应用连接到 dispatcher ,间接获取消息。
- 使用 dispatcher 还能解决另一个问题:group 中的 consumer 数量,大于当前 topic 的 partition 数量,导致部分 consumer 空闲、不能消费。
- 如果一个 consumer 消费太慢,连续调用 poll() 的时间间隔超过 max.poll.interval.ms ,也会导致 Heartbeat 超时,触发 rebalance 。
- 解决方案:增加 max.poll.interval.ms 阈值,或者优化 consumer 客户端代码,例如减少每次拉取的数据量从而减少消费耗时、更快地开始下一次消费,例如从同步消费改为异步消费。
- 如果一个 consumer 终止,然后又重启。则不记得自己之前的 member id ,依然会发送 JoinGroup 请求,加入 group ,被分配一个新的随机 member id ,触发一次 rebalance 。
- 而旧的 member id 不再使用,等到 Heartbeat 超时,又会触发一次 rebalance 。因此 consumer 重启时会触发两次 rebalance 。
- 解决方案:Kafka v2.3 开始,consumer 增加了配置参数 group.instance.id ,用于避免 consumer 重启时触发 rebalance 。
- 给该参数赋值为非空字符串时,consumer 会从默认的 Dynamic Member 类型变成 Static Member 类型,并采用该参数的值作为 client.id。
当 consumer 重启之后发送 JoinGroup 请求时,Coordinator 会识别出它是 Static Member ,属于 rejoin ,因此分配一个新 member id ,并删除旧的 member id 。
这样不会触发 rebalance ,除非 consumer 重启太慢,导致 Heartbeat 超时。 - 日志示例:
INFO [GroupCoordinator 1]: Static member Some(static_member_1) of group test_group_1 with unknown member id rejoins, assigning new member id static_member_1-cdf1c4ea-2f1c-4f4d-bc46-bf443e5f7322, while old member id static_member_1-8b5d89b3-0757-4441-aeaa-50e7f9f55cee will be removed. INFO [GroupCoordinator 1]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.
- 如果一个 group 中只运行了一个 consumer ,则用户可以配置一个固定的 group.instance.id 值。
- 如果一个 group 中运行了多个 consumer ,则用户需要在客户端增加一些代码,给每个 consumer 配置一个互不相同的、长期不变的 group.instance.id 值。
例如以 k8s StatefulSet 方式部署多个 consumer ,它们的 Pod 名称会从 0 开始编号。可让每个 consumer 通过环境变量读取自己的 POD_NAME ,用作 group.instance.id 。
- 给该参数赋值为非空字符串时,consumer 会从默认的 Dynamic Member 类型变成 Static Member 类型,并采用该参数的值作为 client.id。
到了这里,关于Kafka rebalance 的几种原因与解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!