Golang中常用的kafka库

这篇具有很好参考价值的文章主要介绍了Golang中常用的kafka库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文将介绍目前主流的三种第三方kafka库,并结合实际使用中遇到的问题,给出实际的解决方案,本文只涉及kafka数据的消费,且其中对所有库的使用仅为测试过的简单代码,仅记录自己踩坑过程。
问题:最开始使用的是sarama-cluster库,git地址为链接: github.com/bsm/sarama-cluster,但是这个库在使用过程中,存在从头消费kafka数据问题,实际消费场景是实时消费kafka数据,其中有关kafka的配置如下:

package main

import (
	"time"
	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
)

func main() {
	clusterConfig := cluster.NewConfig()
	clusterConfig.Consumer.Return.Errors = true
	clusterConfig.Group.Return.Notifications = true
	clusterConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	clusterConfig.Version = sarama.V0_11_0_0
	clusterConfig.Consumer.Offsets.CommitInterval = 1 * time.Second
	clusterConfig.Consumer.Offsets.Initial = sarama.OffsetNewest

	// TODO: 添加你的业务逻辑
}

sarama-cluster已经废弃了,没有人维护了,所以遇到的问题不易找到解决方案,故考虑换库。
注意:在实际使用中,需要考虑一个现实问题,生产数据的kafka集群和消费kafka的集群在两台服务器,且两台服务器之间并没有进行时钟同步,故基于此情况下考虑去消费实时数据

  1. kafka-go

首先考虑比较轻量的kafka-go库,github的地址为链接: github.com/segmentio/kafka-go,具体的代码示例如下:

package main

import (
   "time"
   "github.com/segmentio/kafka-go"
)

func main() {
   brokers := "192.168.13.22:9092"
   groupID := "test"
   topic := "example"
   consumer := kafka.NewReader(kafka.ReaderConfig{
   	Brokers:     brokers,
   	GroupID:     groupID,
   	Topic:       topic,
   	MinBytes:    10e3, // 10KB
   	MaxBytes:    10e6, // 10MB
   	StartOffset: kafka.LastOffset, // 这个很关键,决定了是否是从最新的位置消费数据
   })
   // 消费数据
   for {
   	msg, err := consumer.ReadMessage(context.Background())
   	if err != nil {
   		fmt.Printf("kafka消费异常,err:%v\n", err)
   		continue
   	}
   	fmt.Printf("Message on partition%d, offset %d, topic %s, msg:%s, time[%v]\n", msg.Partition,
   		msg.Offset, msg.Topic, string(msg.Value), msg.Time)
   	// TODO: 添加你的业务逻辑
   }
}

踩坑点:经过多次尝试和实际代码调试,kafka-go库中需要注意以下几点:

  • broker是slice,故可以设置多个,但是topic是string类型只能是一个,所以如果有多个topic需要消费,只能采取使用NewReader去创建多个kafka消费者实例
  • 关于group id的设置,真的是踩坑无数,如果group id 设置为空的话,只会消费partition为0的数据。group id相同可能会接着上次未消费的数据进行消费而不是实时消费,这个结论没有验证,但是为了保险起见,多个consumer实例设置为不同的group id没有任何问题
  • 如果需要实时消费,即消费最新的数据,一定要设置StartOffset字段为kafka.LastOffset,这个是尝试了很多才找到的
  1. confluent-kafka-go
    git上的地址为:github.com/confluentinc/confluent-kafka-go/kafka
    创建kafkaConsumer实例及消费数据如下:
func NewConFluentConsumer(broker string, groupId string, topics []string) (*kafka.Consumer, error) {
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"group.id":          groupId,
		"auto.offset.reset": "latest",
	})
	if err != nil {
		fmt.Printf("new confluent consumer failed, err[%v]\n", err)
		return nil, err
	}

	err = c.SubscribeTopics(topics, nil)
	if err != nil {
		fmt.Printf("subscribe topic[%v] failed, err[%v]\n", topics, err)
		return nil, err
	}

	fmt.Printf"new confluent consumer success, broker[%s], groupId[%s], topic[%v]\n", broker, groupId, topics)
	return c, nil
}

func ConsumerMessageWorker(consumer *kafka.Consumer) {
	if consumer == nil {
		return
	}
	for {
		msg, err := consumer.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on partition%d, topic %s, msg:%s\n", msg.TopicPartition.Partition,*msg.TopicPartition.Topic, string(msg.Value))
		// TODO 增加消费到的消息处理逻辑
		} else if !err.(kafka.Error).IsTimeout() {
			// 没超时,但是报错了
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}
}

其中kafka消费配置使用的map参数可以在https://github.com/confluentinc/librdkafka/tree/master/CONFIGURATION.md中找到相关的描述
经测试,是可以支持单个broker,多个topic对数据进行消费,但是考虑到此库使用了c库,故在跨平台方面可能对程序的移植存在兼容性问题,故弃用。
阿里云的相关网站上也有kafka消费的一些示例,可以参考,链接如下:https://help.aliyun.com/zh/sls/user-guide/use-confluent-kafka-go-to-achieve-kafka-consumption
3. sarama
比较冗余和复杂,还有考虑到当时的时间问题,没有进行具体的测试文章来源地址https://www.toymoban.com/news/detail-780983.html

到了这里,关于Golang中常用的kafka库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • golang redis第三方库github.com/go-redis/redis/v8实践

    这里示例使用 go-redis v8 ,不过 go-redis latest 是 v9 安装v8:go get github.com/go-redis/redis/v8 Redis 5 种基本数据类型:  string 字符串类型;list列表类型;hash哈希表类型;set集合类型;zset有序集合类型   最基本的Set/Get操作 # setget.go package  main import  ( \\\"context\\\" \\\"fmt\\\" \\\"time\\\" \\\"github.com/go-re

    2024年02月12日
    浏览(59)
  • Golang中常用的kafka库

    本文将介绍目前主流的三种第三方kafka库,并结合实际使用中遇到的问题,给出实际的解决方案,本文只涉及kafka数据的消费,且其中对所有库的使用仅为测试过的简单代码,仅记录自己踩坑过程。 问题 :最开始使用的是sarama-cluster库,git地址为链接: github.com/bsm/sarama-cluster,

    2024年02月02日
    浏览(24)
  • 【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    kafka客户端是公司内部基于spring-kafka封装的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群认证方式:SASL_PLAINTEXT/SCRAM-SHA-512 经过多年的经验,以及实际验证,配置是没问题的,但是业务方反馈用相同的配置,还是报错! 封装的kafka客户端版本过低,高版本的配置项:secu

    2024年01月17日
    浏览(41)
  • 分布式存储系统举例剖析(elasticsearch,kafka,redis-cluster)

    1. 概述 对于分布式系统,人们首先对现实中的分布式系统进行高层抽象,然后做出各种假设,发展了诸如CAP, FLP 等理论,提出了很多一致性模型,Paxos 是其中最璀璨的明珠。我们对分布式系统的时序,复制模式,一致性等基础理论特别关注。 在共识算法的基础上衍生了选举

    2024年02月12日
    浏览(32)
  • 正在连接 github.com (github.com)|127.0.0.1|:443... 失败:拒绝连接。

     问题1wget https://github.com/vulhub/vulhub/archive/master.zip -O vulhub-master.zip --2023-07-05 11:58:38--  https://github.com/vulhub/vulhub/archive/master.zip 正在解析主机 github.com (github.com)... 127.0.0.1 正在连接 github.com (github.com)|127.0.0.1|:443... 失败:拒绝连接。 问题2sudo git pull fatal: 无法访问 \\\'https://github.co

    2024年01月21日
    浏览(38)
  • 使用 Kafka Tools(现已更名为 Offeset Exploer)无法连接虚拟机的 Kafka 集群,报错error connecting to the cluster

    学习 Kafka 的使用,结果发现使用 Kafka Tools(现已更名为 Offeset Exploer)无法连接虚拟机的 Kafka 集群,报错信息:error connecting to the cluster. unable to connect to zookeeper server xxx.xxx.xxx.xxx2181 with timeout of 10000ms 电脑系统版本:Windows 10 64bit VMware Workstation:VMware Workstation 15Pro 15.1.0 build-1

    2024年01月17日
    浏览(27)
  • fatal: unable to connect to github.com: github.com[0: 20.205.243.166]: errno=???

    错误 fatal: unable to connect to github.com: github.com[0: 20.205.243.166]: errno=连接超时 处理 :执行以下命令 然后再重新输入你的克隆命令就完美解决。嘿嘿嘿

    2024年02月03日
    浏览(43)
  • 快速访问github.com

    例在国内克隆github里的代码时,经常遇到下载速度十分缓慢的情况,通常只有十几kB/s的样子。 1.2.1 只对github.com添加代理(强烈推荐!!!)  打开终端(Ctrl + Alt + T),在终端中输入如下命令  其中,127.0.0.1:2340的后4位是系统中代理的端口号(可以打开网络代理查看,前提你得

    2023年04月09日
    浏览(29)
  • fatal: unable to access ‘https://github.com/......‘: Failed to connect to github.com

    1、在git内重置proxy 2、在cmd内执行ipconfig/flushdns 清理DNS缓存

    2024年02月17日
    浏览(37)
  • Failed toconnect to github.com port 443: 拒绝连接 Could not resolve host: github.com

     下面的命令只针对 github.com ,在国内还可能会用到 gitee.com ,所以不能将所有的 git 站点都添加上代理。  使用cdn加速 或者 2.1 打开hosts文件  2.2 在添加github.com域名(localhost下添加即可)  如果不行或者网速过慢可以尝试换个域名,在网上能搜到

    2024年02月08日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包