go之kafka库sarama的使用

这篇具有很好参考价值的文章主要介绍了go之kafka库sarama的使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

saram是一个使用纯go语言编写的kafka库。文章来源地址https://www.toymoban.com/news/detail-631653.html

sarama安装

go get github.com/Shopify/sarama

生产消息(异步)

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "log"
    "time"
)

var address = []string{"192.168.68.142:9092"}

func main() {
    // 配置
    config := sarama.NewConfig()
    // 等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    // 版本
    config.Version = sarama.V0_10_2_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, err := sarama.NewAsyncProducer(address, config)
    if err != nil {
        log.Printf("new async producer error: %s \n", err.Error())
        return
    }
    defer producer.AsyncClose()

    // 循环判断哪个通道发送过来数据
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for {
            select {
            case suc := <-p.Successes():
                fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("error: ", fail.Error())
            }
        }
    }(producer)

    var value string
    for i := 0; ; i++ {
        // 每隔两秒发送一条消息
        time.Sleep(2 * time.Second)

        // 创建消息
        value = fmt.Sprintf("async message, index = %d", i)
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系
        msg := &sarama.ProducerMessage{
            Topic: "web_log",
            Value: sarama.ByteEncoder(value),
        }

        // 使用通道发送
        producer.Input() <- msg
    }
}

消费消息

package main

import (
	"context"
	"fmt"

	"github.com/Shopify/sarama"
)

type AAAConsumerGroupHandler struct{}

func (AAAConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}
func (AAAConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

// 这个方法用来消费消息的
func (h AAAConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// 获取消息
	for msg := range claim.Messages() {
		fmt.Printf("topic:%q partition:%d offset:%d value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
		// 将消息标记为已使用
		sess.MarkMessage(msg, "")
	}
	return nil
}

// 接收数据
func main() {
	// 先初始化 kafka
	config := sarama.NewConfig()
	// Version 必须大于等于  V0_10_2_0
	config.Version = sarama.V0_10_2_1
	config.Consumer.Return.Errors = true
	fmt.Println("start connect kafka")
	// 开始连接kafka服务器
	group, err := sarama.NewConsumerGroup([]string{"192.168.68.142:9092"}, "AAA-group", config)

	if err != nil {
		fmt.Println("连接kafka失败:", err)
		return
	}
	// 检查错误
	go func() {
		for err := range group.Errors() {
			fmt.Println("分组错误 : ", err)
		}
	}()

	ctx := context.Background()
	fmt.Println("开始获取消息")
	// for 是应对 consumer rebalance
	for {
		// 需要监听的主题
		topics := []string{"web_log"}
		handler := AAAConsumerGroupHandler{}
		// 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里
		err := group.Consume(ctx, topics, handler)

		if err != nil {
			fmt.Println("消费失败; err : ", err)
			return
		}
	}

}

到了这里,关于go之kafka库sarama的使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • GO开篇:手握Java走进Golang的世界

    Go(又称 Golang)是 Google 的 Robert Griesemer,Rob Pike 及 Ken Thompson 开发的一种计算机高级编程语言。 Go的官方文档: https://golang.org Go的中文api文档: https://studygolang.com/pkgdoc Go中文社区网站: https://studygolang.com/ 在go退推出之前,已经存在很多高级编程语言了,比如:C、C++、C#、

    2024年02月06日
    浏览(38)
  • 【Golang中的Go Module使用】

    Golang中的Go Module是一个用于包管理和版本控制的工具。在本文中,我们将深入探讨Go Module的相关知识,包括其定义、使用方法以及一些常见的应用场景。 Go Module是Golang中的包管理和版本控制工具,它的发展历程、用法、意义以及相关指令都对于Golang开发者来说非常重要。在本

    2024年02月16日
    浏览(47)
  • 使用 Go (Golang) 使用 OpenCV 绘制对象 GoCV

    本文将向你展示如何使用 OpenCV 使用 Go 编程语言 (Golang) 和 GoCV 包绘制直线、正方形、圆形和椭圆形对象。 OpenCV 是一个主要用于实时计算机视觉的编程函数库。最初由 Intel 开发,然后由 Willow Garage 和 Itseez 提供支持。这个库是跨平台的,可以在开源 Apache 2 许可下免费使用。

    2024年02月07日
    浏览(55)
  • 【GoLang】哪些大公司正在使用Go语言

    前言: 随着计算机科学和软件开发的快速发展,编程语言的选择变得愈加关键。 在这个多元化的编程语境中,Go语言(简称Golang)以其简洁、高效、并发处理能力等特性逐渐受到业界关注。 越来越多的大型科技公司纷纷采用Go语言作为其软件开发的首选语言,这种趋势反映了

    2024年02月04日
    浏览(65)
  • 【Golang】Golang进阶系列教程--为什么 Go 语言 struct 要使用 tags

    在 Go 语言中,struct 是一种常见的数据类型,它可以用来表示复杂的数据结构。在 struct 中,我们可以定义多个字段,每个字段可以有不同的类型和名称。 除了这些基本信息之外,Go 还提供了 struct tags,它可以用来指定 struct 中每个字段的元信息。 在本文中,我们将探讨为什

    2024年02月15日
    浏览(80)
  • golang IDE 使用 go-1.7 无法识别 goroot问题

    当前使用了 golang IDE 要设定 go-1.17 版本作为默认 GOROOT 系统环境变量已经定义好 打开了 ide 会出现下面问题,选择 1.17 后会出现下面报错 The selected directory is not a valid horne for GO SDK 修改 $GOROOT 下文件增加一个变量 再次在 IDC 选择 GOROOT 就可以找到 go 1.17.2 版本 选择后,需要关闭

    2024年02月16日
    浏览(55)
  • golang利用go mod巧妙替换使用本地项目的包

      拉了两个项目下来,其中一个项目依赖另一个项目,因为改动了被依赖的项目,想重新导入测试一下。   go.mod文件的require中想要被代替的包名在replace中进行一个替换,注意:用来替换的需要用绝对路径,一开始我用~/Documents/xboot/xboot/tools/reflect没有效果。   这样原

    2024年02月15日
    浏览(56)
  • CentOS 9 x64 使用 Nginx、Supervisor 部署 Go/Golang 服务

    在 CentOS 9 x64 系统上,可以通过以下步骤来部署 Golang 服务。 安装以下软件包: Golang:Golang 编程语言 Nginx:Web 服务器 Supervisor:进程管理工具 Git:版本控制工具 EPEL:扩展软件包 可以通过以下命令来安装: 为 Git 生成 SSH 密钥,以便于进行代码管理。可以通过以下命令来生成

    2024年02月12日
    浏览(55)
  • Go语言中使用kafka

    Windows环境下安装zookeeper和kafka

    2024年02月07日
    浏览(46)
  • 一个golang小白使用vscode搭建Ununtu20.04下的go开发环境

    先交代一下背景,距离正式接触golang这门语言已经有5年时间,平时偶尔也会用go写写工具和功能,但其实充其量就是语言小白,基本上就是按照教程配置好环境,按照需求写写逻辑,能跑起来就行了。golang随着这几年的变化,这门语言的变化还是非常大的,之前写过一篇《

    2024年01月22日
    浏览(77)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包