Go语言中使用kafka

这篇具有很好参考价值的文章主要介绍了Go语言中使用kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.Windows环境下安装zookeeper和kafka

Windows环境下安装zookeeper和kafka文章来源地址https://www.toymoban.com/news/detail-725077.html

2.运行zookeeper

3.运行kaka

4.生产者

import (
	"encoding/json"
	"github.com/Shopify/sarama"
	"strconv"
)

type Product struct {
	Id    int
	Name  string
	Title string
}

func NewProduct() error {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 10
	config.Producer.Return.Successes = true
	brokers := []string{"localhost:9092"}
	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		return err
	}
	p := &Product{
		Id: 1,
		Name: "钻戒",
		Title: "那戒指的质地似乎是钻石制成的吧,闪闪发光又不失内敛,清雅又不失高贵,阳光洒下来,发出淡淡的光,和淡淡的清香,有着像是通了灵般的仙气",
	}

	key := sarama.StringEncoder(strconv.Itoa(p.Id))
	value, err := json.Marshal(p)
	if err != nil {
		return err
	}
	msg := &sarama.ProducerMessage{
		Topic: "new-products",
		Key:   key,
		Value: sarama.ByteEncoder(value),
	}
	producer.Input() <- msg
	return nil
}

5.消费者

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"log"
)

func Consume() error {
	// 初始化 Kafka 消费者
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 10
	config.Producer.Return.Successes = true
	brokers := []string{"localhost:9092"}
	consumer, err := sarama.NewConsumer(brokers, config)
	partitionConsumer, err := consumer.ConsumePartition("newProduct", 0, sarama.OffsetNewest)
	if err != nil {
		log.Printf("Error consuming partition: %v", err)
		return err
	}
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			var product Product
			err = json.Unmarshal(msg.Value, &product)
			if err != nil {
				log.Printf("Error unmarshaling product: %v", err)
				return err
			} else {
				fmt.Printf("New product: %+v\n", product)
			}
		case err = <-partitionConsumer.Errors():
			log.Printf("Error consuming message: %v", err)
			return err
		}
	}
}

6.main函数

import (
	"fmt"
	"golang_test/kafka_test/kafka"
	"log"
	"sync"
)

var wg sync.WaitGroup

func main() {
	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := kafka.NewProduct(); err != nil {
			log.Println("kafka生产者运行失败")
			return
		}
	}()
	go func() {
		defer wg.Done()
		if err := kafka.Consume(); err != nil {
			log.Println("kafka生产者运行失败")
			return
		}
	}()
	wg.Wait()
	fmt.Println("运行结束")
}

到了这里,关于Go语言中使用kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 一文教你Go语言如何轻松使用kafka

    本文档详细介绍了如何使用Go语言对Kafka进行基础操作。我们将介绍如何使用Go连接Kafka、生产消息、消费消息。以下是详细操作步骤: 首先,使用以下命令安装Sarama,一个优秀的Kafka Go客户端库: 导入必要的依赖包: 创建一个函数,用于连接并返回一个Kafka生产者: 创建一个

    2024年02月12日
    浏览(40)
  • golang学习之go连接Kafka

    1、Kafka 本质上是⼀个消息队列,一个高吞吐量、持久性、分布式的消息系统。 2、包含生产者(producer)和消费者(consumer),每个consumer属于一个特定的消费者组(Consumer Group)。 3、生产者生产消息(message)写入到kafka服务器(broker,kafka集群的节点),消费者从kafka服务器

    2023年04月09日
    浏览(37)
  • 一个golang小白使用vscode搭建Ununtu20.04下的go开发环境

    先交代一下背景,距离正式接触golang这门语言已经有5年时间,平时偶尔也会用go写写工具和功能,但其实充其量就是语言小白,基本上就是按照教程配置好环境,按照需求写写逻辑,能跑起来就行了。golang随着这几年的变化,这门语言的变化还是非常大的,之前写过一篇《

    2024年01月22日
    浏览(77)
  • Golang:Go语言结构

    在我们开始学习 Go 编程语言的基础构建模块前,让我们先来了解 Go 语言最简单程序的结构。 Go 语言的基础组成有以下几个部分: 包声明 引入包 函数 变量 语句 表达式 注释 接下来让我们来看下简单的代码,该代码输出了\\\"Hello World!\\\": 让我们来看下以上程序的各个部分: 第一

    2024年02月10日
    浏览(59)
  • 使用Go语言进行安卓开发

    本文将介绍如何使用Go语言进行安卓开发。我们将探讨使用Go语言进行安卓开发的优点、准备工作、基本概念和示例代码。通过本文的学习,你将了解如何使用Go语言构建高效的安卓应用程序。 随着移动互联网的快速发展,安卓应用程序的需求越来越旺盛。使用传统的Java和K

    2024年02月06日
    浏览(46)
  • 【GoLang】MAC安装Go语言环境

    小试牛刀 首先安装VScode软件 或者pycharm mac安装brew软件  brew install go 报了一个错误 不提供这个支持  重新brew install go 之后又重新brew reinstall go 使用go version 可以看到go 的版本 使用go env  可以看到go安装后的配置 配置一个环境变量 vim ~/.zshrc,  

    2024年02月15日
    浏览(60)
  • Go语言(Golang)数据库编程

    要想连接到 SQL 数据库,首先需要加载目标数据库的驱动,驱动里面包含着于该数据库交互的逻辑。 sql.Open() 数据库驱动的名称 数据源名称 得到一个指向 sql.DB 这个 struct 的指针 sql.DB 是用来操作数据库的,它代表了0个或者多个底层连接的池,这些连接由sql 包来维护,sql 包会

    2024年02月03日
    浏览(93)
  • 【Golang】VScode配置Go语言环境

    安装VScode请参考我的上一篇博客:VScode安装_㫪548的博客-CSDN博客 接下来我们直接进入正题: Go语言(又称Golang)是一种开源的编程语言,由Google开发并于2009年首次发布。Go语言具有简洁、高效、可靠和易于阅读的特点,被设计用于解决大型项目的开发需求。它结合了静态类型

    2024年02月03日
    浏览(66)
  • 【Golang】Golang进阶系列教程--Go 语言 map 如何顺序读取?

    Go 语言中的 map 是一种非常强大的数据结构,它允许我们快速地存储和检索键值对。 然而,当我们遍历 map 时,会有一个有趣的现象,那就是输出的键值对顺序是不确定的。 先看一段代码示例: 当我们多执行几次这段代码时,就会发现,输出的顺序是不同的。 首先,Go 语言

    2024年02月14日
    浏览(69)
  • 【Golang】Golang进阶系列教程--Go 语言数组和切片的区别

    在 Go 语言中,数组和切片看起来很像,但其实它们又有很多的不同之处,这篇文章就来说说它们到底有哪些不同。 数组和切片是两个常用的数据结构。它们都可以用于存储一组相同类型的元素,但在底层实现和使用方式上存在一些重要的区别。 Go 中数组的长度是不可改变的

    2024年02月15日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包