在Go项目中二次封装Kafka客户端功能

这篇具有很好参考价值的文章主要介绍了在Go项目中二次封装Kafka客户端功能。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.摘要

在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。

在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.。

2.功能结构组织

为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为:

kafka (目录)
  |
  ----- consumer.go   (消费者方法实现)
  |
  ----- producer.go   (生产者方法实现)
  |
  ----- kafka.go      (定义接口)
  |
  ----- kafka_test.go (单元功能测试)

为方便项目使用,在此基础上做了二次封装。

3.消费者实现

第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息:

type KafkaConsumer struct {
	Hosts    string          // Kafka主机IP:端口,例如:192.168.201.206:9092
	Ctopic   string          // topic名称
	Kchan    chan string     // 接收信息通道
	Consumer sarama.Consumer // 消费者对象
}

接下来是消费者初始化函数:

func (k *KafkaConsumer) kafkaInit() {
  // 定义配置选项 
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_10_2_0
	
	// 初始化一个消费对象
	consumer, err := sarama.NewConsumer(k.Hosts, config)
	if err != nil {
		err = errors.New("NewConsumer错误,原因:" + err.Error())
		fmt.Println(err.Error())
		return
	}
	
	// 获取所有Topic
	topics, err := consumer.Topics()
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	
	// 判断是否有自定义的Topic
	var topicsName = ""
	for _, e := range topics {
		if e == k.Ctopic {
			topicsName = e
			break
		}
	}
	
	// 没有自定义的Topic则报错
	if topicsName == "" {
		err = errors.New("找不到topics内容")
		fmt.Println(err.Error())
		return
	}
	
	// 将消费对象保存到结构体以备后面使用
	k.Consumer = consumer
}

在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。

接下来是消费监控函数实现,代码如下:

func (k *KafkaConsumer) kafkaProcess() {
	var wg sync.WaitGroup
	
	// 遍历指定Topic分区持续监控消息
	Partitions, _ := k.Consumer.Partitions(k.Ctopic)
	for _, subPartitions := range Partitions {
		pc, err := k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)
		if err != nil {
			continue
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			// 这里进入另一个函数可以过滤消息内容
			k.processPartition(pc)
		}()
	}
	wg.Wait()
}

函数processPartition()的实现代码如下:

func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {
	defer pc.AsyncClose()
	for msg := range pc.Messages() {
	  // 这里可以过滤不需要的Topic的信息
		if strings.Contains(string(msg.Value), "group_state2") {
			continue
		}
		// 这里将获取到的Topic信息发送到通道
		k.Kchan <- string(msg.Value)
	}
}

4.生产者实现

为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。

定义生产者的结构体如下:

type KafkaProducer struct {
	hosts         string               // Kafka主机
	sendmsg       string               // 消费方返回给生产方的消息
	ptopic        string               // Topic
	AsyncProducer sarama.AsyncProducer // Kafka生产者接口对象
}

对应的生产者初始化函数实现如下:

func (k *KafkaProducer) kafkaInit() {
  // 定义配置参数
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true
	config.Version = sarama.V0_10_2_0
	
	// 初始化一个生产者对象
	producer, err := sarama.NewAsyncProducer(k.hosts, config)
	if err != nil {
		err = errors.New("NewAsyncProducer错误,原因:" + err.Error())
		fmt.Println(err.Error())
		return
	}
	
	// 保存对象到结构体
	k.AsyncProducer = producer
}

给生产者回复信息的函数实现如下:

func (k *KafkaProducer) kafkaProcess() {
	msg := &sarama.ProducerMessage{
		Topic: k.ptopic,
	}
	// 信息编码
	msg.Value = sarama.ByteEncoder(k.sendmsg)
	
	// 将信息发送给通道
	k.AsyncProducer.Input() <- msg
}

5.接口定义实现

首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下:

// Kafka方法接口
type IKafkaMethod interface {
	kafkaInit()     // 初始化方法
	kafkaProcess()  // 执行方法
}

为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数:

// 接口管理结构体
type KafkaManager struct {
	kafkaMethod IKafkaMethod  // 接口对象
}

// 定义实现Set方法
func (km *KafkaManager) Set(m IKafkaMethod) {
	km.kafkaMethod = m  // 将指定的方法赋给接口
}

// 定义实现Run方法
func (km *KafkaManager) Run() {
  km.kafkaMethod.kafkaInit()
  go km.kafkaMethod.kafkaProcess()
}

最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针:

type KafkaMessager struct {
	KafkaManager  *KafkaManager   // 接口管理对象指针
	KafkaProducer *KafkaProducer  // 生产者对象指针
	KafkaConsumer *KafkaConsumer  // 消费者对象指针
	Hosts         string          // Kafka主机
	topic         string          // topic
}

// 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量
func NewKafkaMessager(hosts, topic string) *KafkaMessager {
	km := &KafkaMessager{
		KafkaManager:  new(KafkaManager),
		KafkaProducer: new(KafkaProducer),
		KafkaConsumer: new(KafkaConsumer),
		Hosts:         hosts,
		topic:         topic,
	}
	return km
}

6.功能调用和验证

在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下:

func TestKafka(t *testing.T) {
    ....
}

使用单元测试函数的好处是可以单独调试, 专注核心功能本身。

我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图:

在Go项目中二次封装Kafka客户端功能,编程语言应用,kafka,分布式,golang

下面是我模拟用户调用的客户端代码片段:

// 这里选择我自己搭建的Kafka所在服务器,Topic为test123
// 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092
hosts := "192.168.201.206:9092"
topic := "test123"

// 调用初始化函数,并将上面的内容作为参数传进去
nkm := NewKafkaMessager(hosts, topic)

// 初始化消费者,当生产者发出消息,消费者自动消费
nkm.KafkaConsumer.Hosts = hosts             // 消费者host赋值
nkm.KafkaConsumer.Ctopic = topic            // 消费者topic赋值
nkm.KafkaConsumer.Kchan = make(chan string) // 初始化消息通道
nkm.KafkaManager.Set(nkm.KafkaConsumer)     // 接口赋值,设置成操作消费者方法
nkm.KafkaManager.Run()                  // 执行消费者初始化方法


// 监听通道,接收生产客户端发过来的消息
recv := <- nkm.KafkaConsumer.Kchan
fmt.Println(recv)  // 打印接收到的消息

现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送:

在Go项目中二次封装Kafka客户端功能,编程语言应用,kafka,分布式,golang

可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。

--- END --文章来源地址https://www.toymoban.com/news/detail-724060.html

到了这里,关于在Go项目中二次封装Kafka客户端功能的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka 02——三个重要的kafka客户端

    请参考下面的文章: Kafka 01——Kafka的安装及简单入门使用. AdminClient API: 允许管理和检测Topic、Broker以及其他Kafka对象。 Producer API: 发布消息到一个或多个API。 Consumer API: 订阅一个或多个Topic,并处理产生的消息。 如下: 完整的pom 关于配置,可参考官网: https://kafka.apa

    2024年02月13日
    浏览(45)
  • kafka客户端应用参数详解

    Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可: 1、消息发送者主流程  然后可以使用Kafka提供的Producer类,快速发送消息。 ​ 整体来说,构建Producer分为三个步骤: 设置Producer核心属性  :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTST

    2024年02月07日
    浏览(52)
  • kafka客户端工具(Kafka Tool)的安装

    官方下载 根据不同的系统下载对应的版本,点击下载后双击,如何一直下一步,安装 kafka环境搭建请参考:CentOS 搭建Kafka集群 (1)连接kafka (2)简单使用  

    2024年04月23日
    浏览(70)
  • kafka之java客户端实战

            Kafka提供了两套客户端API, HighLevel API和LowLevel API 。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,

    2024年01月17日
    浏览(59)
  • 自定义kafka客户端消费topic

    使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。 后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没

    2024年02月02日
    浏览(51)
  • kafka:java集成 kafka(springboot集成、客户端集成)

    摘要 对于java的kafka集成,一般选用springboot集成kafka,但可能由于对接方kafka老旧、kafka不安全等问题导致kafak版本与spring版本不兼容,这个时候就得自己根据kafka客户端api集成了。 一、springboot集成kafka 具体官方文档地址:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/

    2023年04月22日
    浏览(57)
  • c#客户端Kafka的使用方法

    Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目之一。Kafka能够处理大规模的实时数据流,支持高可靠性、高可扩展性、低延迟和高吞吐量。它主要用于构建实时数据管道和流式处理应用程序。 Kafka的核心概念包括:Producer(生产者)

    2024年02月12日
    浏览(60)
  • Kafka客户端程序无法连接到Kafka集群的解决方法

    Kafka是一个高性能、分布式的流式数据平台,广泛用于构建实时数据流处理应用程序。然而,有时候我们可能会遇到Kafka客户端程序无法连接到Kafka集群的问题。在本文中,我将介绍一些可能导致连接问题的常见原因,并提供相应的解决方案。 网络配置问题 首先,确保Kafka集群

    2024年01月21日
    浏览(59)
  • 【Kafka】Kafka客户端认证失败:Cluster authorization failed.

    kafka客户端是公司内部基于spring-kafka封装的 spring-boot版本:3.x spring-kafka版本:2.1.11.RELEASE 集群认证方式:SASL_PLAINTEXT/SCRAM-SHA-512 经过多年的经验,以及实际验证,配置是没问题的,但是业务方反馈用相同的配置,还是报错! 封装的kafka客户端版本过低,高版本的配置项:secu

    2024年01月17日
    浏览(51)
  • client-go源码结构及客户端对象

    G  Goup 资源组,包含一组资源操作的集合 V Version 资源版本,用于区分不同API的稳定程度及兼容性 R Resource 资源信息,用于区分不同的资源API K Kind 资源对象类型,每个资源对象都需要Kind来区分它自身代表的资源类型 (1)通过 GVR 可以构造 REST Api  进行接口调用,而 GVK 可以

    2024年04月26日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包