golang学习之go连接Kafka

这篇具有很好参考价值的文章主要介绍了golang学习之go连接Kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、kafka是什么

1、Kafka 本质上是⼀个消息队列,一个高吞吐量、持久性、分布式的消息系统。
2、包含生产者(producer)和消费者(consumer),每个consumer属于一个特定的消费者组(Consumer Group)。
3、生产者生产消息(message)写入到kafka服务器(broker,kafka集群的节点),消费者从kafka服务器(broker)读取消息。
4、消息可分为不同的类型即不同的主题(topic)。
5、同一主题(topic)的消息可以分散存储到不同的服务器节点(partition)上,一个分区(partition)只能由一个消费者组内的一个消费者消费。
6、每个partition可以有多个副本,一个Leader和若干个Follower,Leader发生故障时,会选取某个Follower成为新的Leader。

二、kafka的安装

kafka集群管理依赖zookeeper的支持,kafka、zookeeper运行需要java环境。我的kafka安装在了windows wsl环境下。

1、jdk安装

1.1、https://www.oracle.com/cn/java/technologies/downloads/下载需要的jdk.
1.2、解压下载的jdk

tar -zxvf jdk-19_linux-x64_bin.tar.gz

1.3、配置jdk环境变量,/etc/profile影响所有用户,.bashrc影响当前用户。

vi /etc/profile
export JAVA_HOME=/mnt/d/workspace/wsl/java/jdk-19.0.1
export PATH=${JAVA_HOME}/bin:$PATH

1.4、测试jdk是否安装成功

java -version
java version "19.0.1" 2022-10-18
Java(TM) SE Runtime Environment (build 19.0.1+10-21)
Java HotSpot(TM) 64-Bit Server VM (build 19.0.1+10-21, mixed mode, sharing)

2、zookeeper单机安装

2.1、https://zookeeper.apache.org/releases.html zookeeper下载地址
2.2、解压zookeeper包 apache-zookeeper-3.8.0-bin.tar.gz

tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz

2.3、配置zookeeper
解压后zookeeper配置文件名称默认为zoo_sample.cfg需要修改为zoo.cfg

mv zoo_sample.cfg zoo.cfg

2.4、zookeeper启动
进入到zookeeper安装目录,输入启动命令

bin/zkServer.sh start

输出如下信息说明启动成功。

Starting zookeeper ... STARTED

2.5、查看zookeeper运行信息
输入查看命令

bin/zkServer.sh status

输出zookeeper运行信息

ZooKeeper JMX enabled by default
Using config: /mnt/d/ProgramFiles/apache-zookeeper-3.8.0-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone

2.6、zookeeper停止运行

bin/zkServer.sh stop

3、kafka单机安装

3.1、kafka包下载https://kafka.apache.org/downloads.html
3.2、解压kafka包

tar -zxvf kafka_2.12-3.3.1.tgz

3.3、kafka配置
3.3.0、kafka配置文件 config/server.properties
3.3.1、kafka数据日志目录:log.dirs=***
3.3.2、zookeeper连接地址:zookeeper.connect=localhost:2181
3.3.3、节点id集群时使用:broker.id=0
3.3.4、kafka服务监听的ip和端口:listeners=PLAINTEXT://172.24.198.152:9092。我的kafka安装在了windows wsl内,172.24.198.152是我的wsl地址可以通过如下命令查看:

命令: ip addr |grep eth0
输出: inet 172.24.198.152/20 brd .......

3.4、kafka启动

bin/kafka-server-start.sh config/server.properties

3.5、创建topic

bin/kafka-topics.sh --create --topic my_topic

3.6、kafka停止文章来源地址https://www.toymoban.com/news/detail-407959.html

bin/kafka-server-stop.sh

三、go连接kafka

1、go kafka安装

go get github.com/segmentio/kafka-go

2、生产者:官方github examples producer-api

package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	kafka "github.com/segmentio/kafka-go"
)

func producerHandler(kafkaWriter *kafka.Writer) func(http.ResponseWriter, *http.Request) {
	return http.HandlerFunc(func(wrt http.ResponseWriter, req *http.Request) {
		body, err := ioutil.ReadAll(req.Body)
		if err != nil {
			log.Fatalln(err)
		}
		msg := kafka.Message{
			Key:   []byte(fmt.Sprintf("address-%s", req.RemoteAddr)),
			Value: body,
		}
		err = kafkaWriter.WriteMessages(req.Context(), msg)

		if err != nil {
			wrt.Write([]byte(err.Error()))
			log.Fatalln(err)
		}
	})
}

func getKafkaWriter(kafkaURL, topic string) *kafka.Writer {
	return &kafka.Writer{
		Addr:     kafka.TCP(kafkaURL),
		Topic:    topic,
		Balancer: &kafka.LeastBytes{},
	}
}

func main() {
	// get kafka writer using environment variables.
	//kafkaURL := os.Getenv("kafkaURL")
	kafkaURL := "172.24.198.152:9092"

	//topic := os.Getenv("topic")
	topic := "my_topic"

	kafkaWriter := getKafkaWriter(kafkaURL, topic)

	defer kafkaWriter.Close()

	// Add handle func for producer.
	http.HandleFunc("/", producerHandler(kafkaWriter))

	// Run the web server.
	fmt.Println("start producer-api ... !!")
	log.Fatal(http.ListenAndServe(":8081", nil))
}

3、消费者:官方github examples consumer-logger

package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	kafka "github.com/segmentio/kafka-go"
)

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
	brokers := strings.Split(kafkaURL, ",")
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  groupID,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
}

func main() {
	// get kafka reader using environment variables.
	//kafkaURL := os.Getenv("kafkaURL")
	kafkaURL := "172.24.198.152:9092"
	//topic := os.Getenv("topic")
	topic := "my_topic"
	//groupID := os.Getenv("groupID")
	groupID := ""

	reader := getKafkaReader(kafkaURL, topic, groupID)

	defer reader.Close()

	fmt.Println("start consuming ... !!")
	for {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			log.Fatalln(err)
		}
		fmt.Printf("message at topic:%v partition:%v offset:%v	%s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

到了这里,关于golang学习之go连接Kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Go操作Kafka之kafka-go

    Kafka是一种高吞吐量的分布式发布订阅消息系统,本文介绍了如何使用kafka-go这个库实现Go语言与kafka的交互。 Go社区中目前有三个比较常用的kafka客户端库 , 它们各有特点。 首先是IBM/sarama(这个库已经由Shopify转给了IBM),之前我写过一篇使用sarama操作Kafka的教程,相较于sar

    2024年04月26日
    浏览(34)
  • 51.Go操作kafka示例(kafka-go库)

    代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go 之前已经介绍过一个操作kafka的go库了,28.windows安装kafka,Go操作kafka示例(sarama库) ,但是这个库比较老了,当前比较流行的库是 github.com/segmentio/kafka-go ,所以本次我们就使用一下它。 我们在 GitHub 直接输入 kafk

    2024年03月17日
    浏览(41)
  • Golang中常用的kafka库

    本文将介绍目前主流的三种第三方kafka库,并结合实际使用中遇到的问题,给出实际的解决方案,本文只涉及kafka数据的消费,且其中对所有库的使用仅为测试过的简单代码,仅记录自己踩坑过程。 问题 :最开始使用的是sarama-cluster库,git地址为链接: github.com/bsm/sarama-cluster,

    2024年02月02日
    浏览(32)
  • Golang 操作 Kafka 设置消息的失效时间

    在使用 Golang 操作 Kafka 时,你可以使用 Sarama 库来设置消息的失效时间。以下是一个示例代码,演示如何在生产者端设置数据失效时间: 上述示例中,我们首先创建了一个 sarama.Config 实例,并通过 config.Message.MaxAge 属性设置了消息的失效时间,此处设定为一天 (time.Hour * 24)。然

    2024年02月11日
    浏览(47)
  • golang分布式中间件之kafka

    Kafka是一个分布式发布-订阅消息系统,由LinkedIn公司开发。它被设计为快速、可靠且具有高吞吐量的数据流平台,旨在处理大量的实时数据。Kafka的架构是基于发布-订阅模型构建的,可以支持多个生产者和消费者。 在本文中,我们将讨论如何使用Go语言来实现Kafka分布式中间件

    2024年02月07日
    浏览(55)
  • go-kafka

    本文使用的是kafka-go 6.5k 这个包 其他包参考: 我们在细分市场中非常依赖GO和Kafka。不幸的是,在撰写本文时,Kafka的GO客户库的状态并不理想。可用选项是: 萨拉玛(Sarama) 10k ,这是迄今为止最受欢迎的,但很难与之合作。它的记录不足,API暴露了Kafka协议的低级概念,并

    2024年02月11日
    浏览(29)
  • go语言kafka入门

    消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据 消息队列相关概念: 生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。 消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。 主题(Topi

    2024年02月10日
    浏览(33)
  • Go语言中使用kafka

    Windows环境下安装zookeeper和kafka

    2024年02月07日
    浏览(45)
  • kafka client for go

    关于 go 的 kafka client 有很多开源项目,例如 sarama: 具有完整协议支持的纯 Go 实现。包括消费者和生产者实施,支持 GZIP 和 Snappy 压缩。 confluent-kafka-go: Confluent 的 Golang Kafka 客户端包装了 librdkafka C 库,提供完整的 Kafka 协议支持,具有出色的性能和可靠性。提供了高级生产者和

    2024年02月03日
    浏览(43)
  • golang kafka客户端 sarama 在 rebalance时异常如何解决

    在使用sarama作为Kafka客户端的过程中,在进行消费者分区的rebalance操作时,可能会发生异常,在解决这些异常一般可以采取以下措施: 1. 异常处理:在consumer rebalance过程中如果发生异常,Sarama库将会发出错误事件(error event)。因此在编写代码时应该注册错误事件处理函数

    2024年02月10日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包