golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费

这篇具有很好参考价值的文章主要介绍了golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

当我们使用kafka的时候存在这样一个场景:

有一个消费组正在正常消费中并且消息偏移量策略为lastoffset(最新偏移量),这个时候在kafka服务器中为当前主题下新增了一个分区,各个生产者纷纷将消息投递到了这个新增分区中。当然我们知道针对于这种场景消费者方可以触发重平衡回调方法,不过需要注意的一点是这个过程并非即时触发,它中间是会有一段时间的空档期,这个空档期决策与消费者刷新kafka集群元数据时间参数有关,一般都会设置为分钟级。那么问题就来了,在空档期中新分区的消息没有任何消费者接管,这就导致了即使过了这个空档期触发了重平衡机制也无法消费到之前的消息,因为我们的偏移量策略为lastoffset(最新偏移量)。

针对于这个场景我个人给出的一些解决方案和思路,供大家参考:

我的实现思路:

一、操作kafka系统命令实现(简单暴力,需要手动启停相关消费者进程):

1. 执行强行重置某个主题下某个分区的消费者组消息偏移量为0的命令:

需要注意的是如果当前myGroup消费者组处于活跃状态(有消费者进程运行)的情况,该命令会报错,所以需要停止掉当前消费者组下的所有消费者进程。

kafka-consumer-groups.sh --bootstrap-server kafka-kraft:9092  --group myGroup --reset-offsets --topic myTopic:1 --to-offset 0 --execute

2. 启动当前这个消费者组下的消费者进程,这个时候消费者组检测当前分区偏移量为0,自然就会重头开始消费这个分区的所有消息。

二、消费者客户端代码层实现(无需启停相关消费者进程):

  1. 自行维护当前主题的某个消费者组的分区数;

  1. 需要知道本次周期新增了哪些分区;

  1. 在重平衡方法中将这个新分区的offset置为0,也就是重头消费这个新分区的所有消息;

我的实现方案:

在重平衡回调方法中使用redis的无序集合储存当前主题、当前消费者组下的全部分区信息,然后在根据当前消费者会话去和上一代的全部分区信息进行差集对比,对比的结果就是新增的分区。得到新增的分区后将这些分区的偏移量重置为0即重头开始消费,当前存在重复消费的情况,需要你的业务逻辑上做好幂等性。最后再将这些差集新增分区ID更新至redis存储的当前主题、当前消费者组下的全部分区信息内形成了一个闭环,即使一个消费者组下有多个消费者进程也不会出现数据覆盖更新等问题,因为如果存在差集新增分区只会将更新差集分区ID。

以下是代码演示:文章来源地址https://www.toymoban.com/news/detail-704074.html

package main

import (
    "context"
    "fmt"
    "github.com/Shopify/sarama"
    "github.com/go-redis/redis/v8"
    "log"
    "os"
    "strconv"
    "time"
)

type ConsumerGroupHandler struct {
    Client  sarama.Client
    GroupId string
}

var redisClient *redis.Client

func init() {
    redisClient = redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // no password set
        DB:       0,  // use default DB
    })
}

func SliceDiff(a, b []int32) (diff []int32) {
    set := make(map[int32]struct{})

    for _, v := range b {
        set[v] = struct{}{}
    }

    for _, v := range a {
        if _, ok := set[v]; !ok {
            diff = append(diff, v)
        }
    }

    return
}

// c *ConsumerGroupHandler.getNewPartition 获取新增分区
func (c *ConsumerGroupHandler) getNewPartition(topic, groupId string, currentPartition []int32) ([]int32, error) {

    key := fmt.Sprintf("kafkaPartition.%s:%s", topic, groupId)
    if exists, err := redisClient.Exists(context.Background(), key).Result(); exists == 0 || err != nil {
        allPartitions, err := c.Client.Partitions(topic)
        err = c.syncPartition(topic, groupId, allPartitions)
        return nil, err
    }

    oldPartition := make([]int32, 0)
    cursor := uint64(0)

    for {
        keys, cursor, err := redisClient.SScan(context.Background(), key, cursor, "*", 100).Result()
        if err != nil {
            return nil, err
        }
        newKeys := make([]int32, 0, len(keys))
        for _, k := range keys {
            i, _ := strconv.ParseInt(k, 10, 32)
            newKeys = append(newKeys, int32(i))
        }
        oldPartition = append(oldPartition, newKeys...)
        if cursor == 0 {
            break
        }
    }
    diffPartition := SliceDiff(currentPartition, oldPartition)
    log.Printf("当前分区集合: %v 上一代分区集合: %v 差集新增分区集合: %v \n", currentPartition, oldPartition, diffPartition)
    return diffPartition, nil
}

// c *ConsumerGroupHandler.syncPartition 同步更新分区信息
func (c *ConsumerGroupHandler) syncPartition(topic, groupId string, partition []int32) (err error) {
    partitionStrings := make([]string, 0, len(partition))
    for _, p := range partition {
        partitionStrings = append(partitionStrings, strconv.Itoa(int(p)))
    }
    _, err = redisClient.SAdd(context.Background(), fmt.Sprintf("kafkaPartition.%s:%s", topic, groupId), partitionStrings).Result()
    return
}

func (c *ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
    log.Println("Setup CallBack Start...")
    for topic, partition := range sess.Claims() {
        log.Printf("主题:%s 分区:%+v \n", topic, partition)
        newPartition, err := c.getNewPartition(topic, c.GroupId, partition)
        if newPartition != nil && err == nil {
            for _, p := range newPartition {
                //这里之所以调用这个方法是因为当消费者没有消费过这个新分区的任何一条消息时
                //kafka内部__consumer_offsets这个主题下分区偏移量为-1
                //当分区偏移量为-1时sess.ResetOffset方法是无效的,所以先将偏移量提交至
                //kafka集群并且0,这里不必担心偏移量出现问题,因为sess.MarkOffset方法
                //不会提交真实分区偏移量小于第三个参数的情况
                sess.MarkOffset(topic, p, 0, "")
                sess.ResetOffset(topic, p, 0, "")
                log.Printf("主题:%s 分区:%d 已完成偏移量重置. \n", topic, p)
            }
            if err = c.syncPartition(topic, c.GroupId, newPartition); err != nil {
                log.Println("Setup CallBack syncPartition Error: ", err)
                return err
            }
        } else if err != nil {
            log.Println("Setup CallBack getNewPartition ERROR:", err)
            return err
        }

        log.Println("Setup CallBack syncPartition Success!")
    }
    log.Println("Setup CallBack Finish...")
    return nil
}
func (*ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
    log.Println("\n Cleanup CallBack...")
    return nil
}
func (c *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        if err := c.handler(msg); err != nil {
            log.Println("消息处理失败: ", err, " msg: ", msg)
        }
        sess.MarkMessage(msg, "")
        sess.Commit()
    }
    return nil
}

func (c *ConsumerGroupHandler) handler(msg *sarama.ConsumerMessage) error {
    log.Printf("Message key:%v value:%v topic:%q partition:%d offset:%d \n", string(msg.Key), string(msg.Value), msg.Topic, msg.Partition, msg.Offset)
    return nil
}

func main() {

    groupId := os.Args[1]

    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Consumer.Offsets.AutoCommit.Enable = false
    config.Version = sarama.V3_2_0_0
    config.Metadata.RefreshFrequency = time.Minute

    client, err := sarama.NewClient([]string{"192.168.6.105:9093"}, config)
    if err != nil {
        panic(any(err))
    }
    defer func() { _ = client.Close() }()

    handler := &ConsumerGroupHandler{Client: client, GroupId: groupId}

    group, err := sarama.NewConsumerGroup([]string{"192.168.6.105:9093"}, handler.GroupId, config)
    if err != nil {
        panic(any(err))
    }

    defer func() { _ = group.Close() }()

    // Track errors
    go func() {
        for err = range group.Errors() {
            fmt.Println("ERROR", err)
        }
    }()

    ctx := context.Background()
    topics := []string{"myTopic"}
    for {
        err = group.Consume(ctx, topics, handler)
        if err != nil {
            panic(any(err))
        }
    }
}
shopify/sarama,golang,kafka,golang,kafka,Powered by 金山文档

到了这里,关于golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(47)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(44)
  • Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(46)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(46)
  • Kafka3.0.0版本——消费者(消费者组原理)

    1.1、消费者组概述 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 注意: (1)、消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。 (2)、消费者组之间互不影响。所有的消费者

    2024年02月09日
    浏览(54)
  • 【Kafka】Kafka消费者

    pull(拉)模式:consumer采用从broker中主动拉取数据。 Kafka采用这种方式。 push(推)模式:Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有的消费者的消费速率。例如推送的速度是50m/s,consumer1和consumer2旧来不及处理消息。 pull模式不足之处是,如果Kafka没有数

    2024年02月13日
    浏览(48)
  • Kafka消费者无法消费数据,解决

    作为一个在项目中边学边用的实习生,真的被昨天还好好的今天就不能消费数据的kafka折磨到了,下面提供一点建议,希望能对大家有所帮助。 //操作前集群都关了 1.首先去kafka-home的config目录下找到server.properties文件, 加入advertised.listeners=PLAINTEXT://ip:9092    如果有配置liste

    2024年02月17日
    浏览(52)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(47)
  • Kafka进阶篇-消费者详解&Flume消费Kafka原理

    由于挺多时候如果不太熟系kafka消费者详细的话,很容易产生问题,所有剖析一定的原理很重要。 消费方式 消费者总体工作流程 消费者组初始化流程   消费者详细消费流程   消费者重要参数  bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。 key.deserializervalu

    2024年02月15日
    浏览(49)
  • Kafka及Kafka消费者的消费问题及线程问题

    Topic:是 Kafka 消息发布和订阅的基本单元,同时也是消息的容器。Topic 中的消息被分割成多个分区进行存储和处理。 Partition:是 Topic 分区,将 Topic 细分成多个分区,每个分区可以独立地存储在不同的 Broker 中,从而增加了消息的并发性、可扩展性和吞吐量。 Broker:是 Kafka

    2024年02月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包