51.Go操作kafka示例(kafka-go库)

这篇具有很好参考价值的文章主要介绍了51.Go操作kafka示例(kafka-go库)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go

一、简介

之前已经介绍过一个操作kafka的go库了,28.windows安装kafka,Go操作kafka示例(sarama库) ,但是这个库比较老了,当前比较流行的库是github.com/segmentio/kafka-go,所以本次我们就使用一下它。

我们在GitHub直接输入kafka并带上language标签为Go时,可以可以看到当前get github.com/segmentio/kafka-go库是最流行的。
go kafka库,go,golang,kafka,开发语言

首先启动kafka的服务器,然后在项目中go get github.com/segmentio/kafka-go

接着我们就可以创建生产者和消费者了,注意:在实际工作中,一般是一个服务为生产者,另一个服务作为消费者,但是本案例中不涉及微服务,就是演示一下生成和消费的示例代码,因此写到了一个服务当中。代码文件组织如下:
go kafka库,go,golang,kafka,开发语言
user.go :用于测试发送和消费结构体字符串消息

package model

type User struct {
	Id       int64  `json:"id"`
	UserName string `json:"user_name"`
	Age      int64  `json:"age"`
}

二、生产者

启动zookeeperkafka,并创建名为testtopic,步骤可以参考:28.windows安装kafka,Go操作kafka示例(sarama库)

producer.go

package producer

import (
	"context"
	"encoding/json"
	"fmt"
	"golang-trick/31-kafka-go/model"
	"time"

	"github.com/segmentio/kafka-go"
)

var (
	topic    = "user"
	Producer *kafka.Writer
)

func init() {
	Producer = &kafka.Writer{
		Addr:                   kafka.TCP("localhost:9092"), //TCP函数参数为不定长参数,可以传多个地址组成集群
		Topic:                  topic,
		Balancer:               &kafka.Hash{}, // 用于对key进行hash,决定消息发送到哪个分区
		MaxAttempts:            0,
		WriteBackoffMin:        0,
		WriteBackoffMax:        0,
		BatchSize:              0,
		BatchBytes:             0,
		BatchTimeout:           0,
		ReadTimeout:            0,
		WriteTimeout:           time.Second,       // kafka有时候可能负载很高,写不进去,那么超时后可以放弃写入,用于可以丢消息的场景
		RequiredAcks:           kafka.RequireNone, // 不需要任何节点确认就返回
		Async:                  false,
		Completion:             nil,
		Compression:            0,
		Logger:                 nil,
		ErrorLogger:            nil,
		Transport:              nil,
		AllowAutoTopicCreation: false, // 第一次发消息的时候,如果topic不存在,就自动创建topic,工作中禁止使用
	}
}

// 生产消息,发送user信息
func SendMessage(ctx context.Context, user *model.User) {
	msgContent, err := json.Marshal(user)
	if err != nil {
		fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))
	}
	msg := kafka.Message{
		Topic:         "",
		Partition:     0,
		Offset:        0,
		HighWaterMark: 0,
		Key:           []byte(fmt.Sprintf("%d", user.Id)),
		Value:         msgContent,
		Headers:       nil,
		WriterData:    nil,
		Time:          time.Time{},
	}

	err = Producer.WriteMessages(ctx, msg)
	if err != nil {
		fmt.Println(fmt.Sprintf("写入kafka失败,user:%v,err:%v", user, err))
	}
}

main.go: 测试消息发送

package main

import (
	"context"
	"fmt"
	"golang-trick/31-kafka-go/model"
	"golang-trick/31-kafka-go/producer"
)

func main() {
	ctx := context.Background()
	for i := 0; i < 5; i++ {
		user := &model.User{
			Id:       int64(i + 1),
			UserName: fmt.Sprintf("lym:%d", i),
			Age:      18,
		}
		producer.SendMessage(ctx, user)
	}
	producer.Producer.Close() // 消息发送完毕后,关闭生产者
}

可以看到五条消息都发送成功
go kafka库,go,golang,kafka,开发语言

三、消费者

consumer.go

package consumer

import (
	"context"
	"encoding/json"
	"fmt"
	"golang-trick/24-gin-learning/class08/model"
	"time"

	"github.com/segmentio/kafka-go"
)

var (
	topic    = "user"
	Consumer *kafka.Reader
)

func init() {
	Consumer = kafka.NewReader(kafka.ReaderConfig{
		Brokers:                []string{"localhost:9092"}, // broker地址 数组
		GroupID:                "test",                     // 消费者组id,每个消费者组可以消费kafka的完整数据,但是同一个消费者组中的消费者根据设置的分区消费策略共同消费kafka中的数据
		GroupTopics:            nil,
		Topic:                  topic, // 消费哪个topic
		Partition:              0,
		Dialer:                 nil,
		QueueCapacity:          0,
		MinBytes:               0,
		MaxBytes:               0,
		MaxWait:                0,
		ReadBatchTimeout:       0,
		ReadLagInterval:        0,
		GroupBalancers:         nil,
		HeartbeatInterval:      0,
		CommitInterval:         time.Second, // offset 上报间隔
		PartitionWatchInterval: 0,
		WatchPartitionChanges:  false,
		SessionTimeout:         0,
		RebalanceTimeout:       0,
		JoinGroupBackoff:       0,
		RetentionTime:          0,
		StartOffset:            kafka.FirstOffset, // 仅对新创建的消费者组生效,从头开始消费,工作中可能更常用从最新的开始消费kafka.LastOffset
		ReadBackoffMin:         0,
		ReadBackoffMax:         0,
		Logger:                 nil,
		ErrorLogger:            nil,
		IsolationLevel:         0,
		MaxAttempts:            0,
		OffsetOutOfRangeError:  false,
	})
}

// 消费消息
func ReadMessage(ctx context.Context) {
	// 消费者应该通过协程一直开着,一直消费
	for {
		if msg, err := Consumer.ReadMessage(ctx); err != nil {
			fmt.Println(fmt.Sprintf("读kafka失败,err:%v", err))
			break // 当前消息读取失败时,并不退出for终止所有后续消费,而是跳过该消息即可
		} else {
			user := &model.User{}
			err := json.Unmarshal(msg.Value, user)
			if err != nil {
				fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))
				break // 当前消息处理失败时,并不退出for终止所有后续消费,而是跳过该消息即可
			}

			fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))
		}
	}
}

main.go: 测试接收消息

package main

import (
	"context"
	"fmt"
	"golang-trick/31-kafka-go/consumer"
	"os"
	"os/signal"
	"syscall"
)

// 需要监听信息2和15,在程序退出时,关闭Consumer
func listenSignal() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	sig := <-c
	fmt.Printf("收到信号 %s ", sig.String())
	if consumer.Consumer != nil {
		consumer.Consumer.Close()
	}
	os.Exit(0)
}

func main() {
	ctx := context.Background()
	//for i := 0; i < 5; i++ {
	//	user := &model.User{
	//		Id:       int64(i + 1),
	//		UserName: fmt.Sprintf("lym:%d", i),
	//		Age:      18,
	//	}
	//	producer.SendMessage(ctx, user)
	//}
	//producer.Producer.Close()

	go consumer.ReadMessage(ctx)
	listenSignal()
}

启动后,因为我们设置的从头开始消费,所以原有的五条消息消费成功,然后在等待着队列中有消息时继续消费
go kafka库,go,golang,kafka,开发语言
我们可以通过kafka客户端发两条消息,看看我们的消费者程序是否能消费到

go kafka库,go,golang,kafka,开发语言
最后关闭服务停止消费
go kafka库,go,golang,kafka,开发语言文章来源地址https://www.toymoban.com/news/detail-840758.html

到了这里,关于51.Go操作kafka示例(kafka-go库)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Golang:Go语言结构

    在我们开始学习 Go 编程语言的基础构建模块前,让我们先来了解 Go 语言最简单程序的结构。 Go 语言的基础组成有以下几个部分: 包声明 引入包 函数 变量 语句 表达式 注释 接下来让我们来看下简单的代码,该代码输出了\\\"Hello World!\\\": 让我们来看下以上程序的各个部分: 第一

    2024年02月10日
    浏览(58)
  • 【Golang】三分钟让你快速了解Go语言&为什么我们需要Go语言?

    博主简介: 努力学习的大一在校计算机专业学生,热爱学习和创作。目前在学习和分享:数据结构、Go,Java等相关知识。 博主主页: @是瑶瑶子啦 所属专栏: Go语言核心编程 近期目标: 写好专栏的每一篇文章 Go 语言从 2009 年 9 月 21 日开始作为谷歌公司 20% 兼职项目,即相关

    2023年04月21日
    浏览(60)
  • Go语言(Golang)数据库编程

    要想连接到 SQL 数据库,首先需要加载目标数据库的驱动,驱动里面包含着于该数据库交互的逻辑。 sql.Open() 数据库驱动的名称 数据源名称 得到一个指向 sql.DB 这个 struct 的指针 sql.DB 是用来操作数据库的,它代表了0个或者多个底层连接的池,这些连接由sql 包来维护,sql 包会

    2024年02月03日
    浏览(88)
  • 【GoLang】MAC安装Go语言环境

    小试牛刀 首先安装VScode软件 或者pycharm mac安装brew软件  brew install go 报了一个错误 不提供这个支持  重新brew install go 之后又重新brew reinstall go 使用go version 可以看到go 的版本 使用go env  可以看到go安装后的配置 配置一个环境变量 vim ~/.zshrc,  

    2024年02月15日
    浏览(57)
  • 【Golang】VScode配置Go语言环境

    安装VScode请参考我的上一篇博客:VScode安装_㫪548的博客-CSDN博客 接下来我们直接进入正题: Go语言(又称Golang)是一种开源的编程语言,由Google开发并于2009年首次发布。Go语言具有简洁、高效、可靠和易于阅读的特点,被设计用于解决大型项目的开发需求。它结合了静态类型

    2024年02月03日
    浏览(65)
  • Golang区块链钱包_go语言钱包

    Golang区块链钱包的特点 Golang区块链钱包具有以下几个特点: 1. 高性能 Golang是一种编译型语言,具有快速的执行速度和较低的内存消耗。这使得Golang区块链钱包在处理大规模交易数据时表现出色,能够满足高性能的需求。 2. 并发支持 Golang内置了轻量级线程——goroutine,以及

    2024年04月15日
    浏览(61)
  • 【GoLang】哪些大公司正在使用Go语言

    前言: 随着计算机科学和软件开发的快速发展,编程语言的选择变得愈加关键。 在这个多元化的编程语境中,Go语言(简称Golang)以其简洁、高效、并发处理能力等特性逐渐受到业界关注。 越来越多的大型科技公司纷纷采用Go语言作为其软件开发的首选语言,这种趋势反映了

    2024年02月04日
    浏览(64)
  • Golang(Go语言)IP地址转换函数

    String形式的IP地址和Int类型互转函数 代码 输出如下:  

    2024年02月05日
    浏览(52)
  • 【Go语言】Golang保姆级入门教程 Go初学者chapter3

    下划线“_”本身在Go中一个特殊的标识符,成为空标识符。可以代表任何其他的标识符,但是他对应的值就会被忽略 仅仅被作为站维度使用, 不能作为标识符使用 因为Go语言中没有private public 所以标记变量首字母大写代表其他包可以使用 小写就是不可使用的 注意:Go语言中

    2024年02月13日
    浏览(59)
  • 【Go语言】Golang保姆级入门教程 Go初学者chapter2

    setting的首选项 一个程序就是一个世界 变量是程序的基本组成单位 变量的使用步骤 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zuxG8imp-1691479164956)(https://cdn.staticaly.com/gh/hudiework/img@main/image-20230726152905139.png)] 变量表示内存中的一个存储区 注意:

    2024年02月14日
    浏览(124)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包