基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

这篇具有很好参考价值的文章主要介绍了基于golang多消息队列中间件的封装nsq,rabbitmq,kafka。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

场景

在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中间件;

接口模型

这个模型的核心思想是消息队列的核心功能生产者生产消息方法和消费者消费消息,任何消息队列都必须有这两个功能;根据如下代码消息队列中间件是可扩展的,只需在实例化消息队列对象那里添加新消息队列的实现;

// MQer 消息队列接口
type MQer interface {
	Producer(topic string, data []byte)
	Consumer(topic, channel string, ch chan []byte, f func(b []byte))
}

// NewMQ 实例化消息队列对象
func NewMQ() MQer {
	switch conf.Conf.Default.Mq { // mq 设置的类型
	case "nsq":
		return new(MQNsqService)
	case "rabbit":
		return new(MQRabbitService)
	case "kafka":
		return new(MQKafkaService)
	default:
		return new(MQNsqService)
	}
}

/*
配置文件结构设计

mqType: "" # nsq, rabbit, kafka  这三个值然当然了是可扩展的

nsq:
  producer: ""
  consumer: ""

rabbit:
  addr: ""
  user: ""
  password: ""

kafka:
  addr: ""
*/
各个消息队列的实现
1. 依赖库
  • nsq : github.com/nsqio/go-nsq
  • rabbitmq : github.com/streadway/amqp
  • kafka : github.com/Shopify/sarama
2. nsq

nsq结构体

// MQNsqService NSQ消息队列
type MQNsqService struct {
}

生产者

// Producer 生产者
func (m *MQNsqService) Producer(topic string, data []byte) {
	nsqConf := &nsq.Config{}
	client, err := nsq.NewProducer(nsqServer, nsqConf)
	if err != nil {
		log.Error("[nsq]无法连接到队列")
		return
	}
	log.DebugF(fmt.Sprintf("[生产消息] topic : %s -->  %s", topic, string(data)))
	err = client.Publish(topic, data)
	if err != nil {
		log.Error("[生产消息] 失败 : " + err.Error())
	}
}

消费者

var (
	nsqServer   = conf.Conf.Default.Nsq.Producer // nsqServer
)

// Consumer 消费者
func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte)) {
	mh, err := NewMessageHandler(nsqServer, channel)
	if err != nil {
		log.Error(err)
		return
	}

	go func() {
		mh.SetMaxInFlight(1000)
		mh.Registry(topic, ch)
	}()

	go func() {
		for {
			select {
			case s := <-ch:
				f(s)
			}
		}
	}()
	log.DebugF("[NSQ] ServerID:%v => %v started", channel, topic)
}

// MessageHandler MessageHandler
type MessageHandler struct {
	msgChan     chan *goNsq.Message
	stop        bool
	nsqServer   string
	Channel     string
	maxInFlight int
}

// NewMessageHandler return new MessageHandler
func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error) {
	if nsqServer == "" {
		err = fmt.Errorf("[NSQ] need nsq server")
		return
	}
	mh = &MessageHandler{
		msgChan:   make(chan *goNsq.Message, 1024),
		stop:      false,
		nsqServer: nsqServer,
		Channel:   channel,
	}
	return
}

// Registry register nsq topic
func (m *MessageHandler) Registry(topic string, ch chan []byte) {
	config := goNsq.NewConfig()
	if m.maxInFlight > 0 {
		config.MaxInFlight = m.maxInFlight
	}
	consumer, err := goNsq.NewConsumer(topic, m.Channel, config)
	if err != nil {
		panic(err)
	}
	consumer.SetLogger(nil, 0)
	consumer.AddHandler(goNsq.HandlerFunc(m.handlerMessage))
	err = consumer.ConnectToNSQLookupd(m.nsqServer)
	if err != nil {
		panic(err)
	}
	m.process(ch)
}

  1. rabbitmq
    结构体
// MQRabbitService Rabbit消息队列
type MQRabbitService struct {
}

生产者

// Producer 生产者
func (m *MQRabbitService) Producer(topic string, data []byte) {
	mq, err := NewRabbitMQPubSub(topic)
	if err != nil {
		log.Error("[rabbit]无法连接到队列")
		return
	}
	//defer mq.Destroy()
	log.DebugF(fmt.Sprintf("[生产消息] topic : %s -->  %s", topic, string(data)))
	err = mq.PublishPub(data)
	if err != nil {
		log.Error("[生产消息] 失败 : " + err.Error())
	}
}

// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例  (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {
	mq, err := NewRabbitMQ("", exchangeName, "", "")
	if mq == nil || err != nil {
		return nil, err
	}
	//获取connection
	mq.conn, err = amqp.Dial(mq.MqUrl)
	mq.failOnErr(err, "failed to connect mq!")
	if mq.conn == nil || err != nil {
		return nil, err
	}
	//获取channel
	mq.channel, err = mq.conn.Channel()
	mq.failOnErr(err, "failed to open a channel!")
	return mq, err
}

...其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 

消费者

// Consumer 消费者
func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {
	mh, err := NewRabbitMQPubSub(topic)
	if err != nil {
		log.Error("[rabbit]无法连接到队列")
		return
	}
	msg := mh.RegistryReceiveSub()
	go func(m <-chan amqp.Delivery) {
		for {
			select {
			case s := <-m:
				f(s.Body)
			}
		}
	}(msg)
	log.DebugF("[Rabbit] ServerID:%v => %v started", serverId, topic)
}

// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例  (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {
	mq, err := NewRabbitMQ("", exchangeName, "", "")
	if mq == nil || err != nil {
		return nil, err
	}
	//获取connection
	mq.conn, err = amqp.Dial(mq.MqUrl)
	mq.failOnErr(err, "failed to connect mq!")
	if mq.conn == nil || err != nil {
		return nil, err
	}
	//获取channel
	mq.channel, err = mq.conn.Channel()
	mq.failOnErr(err, "failed to open a channel!")
	return mq, err
}

... 其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 
  1. kafka
    结构体
// MQKafkaService Kafka消息队列
type MQKafkaService struct {
}

生产者

func (m *MQKafkaService) Producer(topic string, data []byte) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follower都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,我们默认设置32个分区
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic
	msg.Value = sarama.ByteEncoder(data)
	// 连接kafka
	client, err := sarama.NewSyncProducer(kafkaServer, config)
	if err != nil {
		log.Error("Producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		log.Error("send msg failed, err:", err)
		return
	}
	log.InfoF("pid:%v offset:%v\n", pid, offset)
}

消费者

// Consumer 消费者
func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {
	var wg sync.WaitGroup
	consumer, err := sarama.NewConsumer(kafkaServer, nil)
	if err != nil {
		log.ErrorF("Failed to start consumer: %s", err)
		return
	}
	partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区
	if err != nil {
		log.Error("Failed to get the list of partition: ", err)
		return
	}
	log.Info(partitionList)
	for partition := range partitionList { // 遍历所有的分区
		pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者
		if err != nil {
			log.ErrorF("Failed to start consumer for partition %d: %s\n", partition, err)
		}
		wg.Add(1)
		go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值
			for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待
				log.DebugF("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
				f(msg.Value)
			}
			defer pc.AsyncClose()
			wg.Done()
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}
总结

golang的接口是一种抽象类型,是对其他类型行为的概括与抽象,从语法角度来看,接口是一组方法定义的集合,文本的封装使用了golang接口这一特性,把所有的消息队列中间件抽象为一个MQer拥有生产和消费两个方法,具体的各个消息队列中间件去实现这两个方法即可,最明显的优点在于扩展性,解耦性,选择性,维护性这几个表象上。

完整代码

https://github.com/mangenotwork/common/tree/main/mq

你的星星是我分享的最大动力 : )文章来源地址https://www.toymoban.com/news/detail-628962.html

到了这里,关于基于golang多消息队列中间件的封装nsq,rabbitmq,kafka的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 中间件RabbitMQ消息队列介绍

    1.1 什么是 MQ MQ ( message queue ),从字面意思上看,本质是个队列, FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中, MQ 是一种非常常 见的上下游 逻辑解耦+物理解耦 的消息通信服务。使用了 MQ 之

    2024年02月13日
    浏览(72)
  • 深入理解Java消息中间件-组件-消息队列

    引言: 消息中间件在现代分布式系统中扮演着至关重要的角色,它解决了系统之间异步通信和解耦的需求。而在消息中间件的架构中,核心组件之一就是消息队列。本文将深入探讨消息队列的架构组件,帮助读者加深对消息中间件的理解和应用。 一、什么是消息队列 消息队列

    2024年04月27日
    浏览(51)
  • 消息队列中间件 MetaQ/RocketMQ

    推荐电子书:云原生架构白皮书 2022版-藏经阁-阿里云开发者社区 (aliyun.com) 简介—— 消息队列中间件 MetaQ/RocketMQ 中间件 MetaQ 是一种基于队列模型的消息中间件,MetaQ 据说最早是受 Kafka 的影响开发的,第一版的名字 \\\"metamorphosis\\\",是奥地利作家卡夫卡的名作——《变形记》。

    2024年02月14日
    浏览(57)
  • 消息队列中间件(二)- RabbitMQ(一)

    接收,存储,转发消息 生产者 交换机 队列 消费者 简单模式 工作模式 发布 路由模式 主题模式 发布订阅模式 Broker 接收和分发消息的应用 Virtual host 虚拟分组 Connection: TCP连接 Channel: 节省连接,每次访问建立一次Connection消耗太大,所以使用信道代替连接 交换机 队列 www.r

    2024年02月11日
    浏览(68)
  • 「中间件」rabbitmq 消息队列基础知识

    RabbitMQ是一个消息队列软件,用于在应用程序之间转发消息。以下是RabbitMQ的基本概念: 消息:RabbitMQ中的消息是传递的基本单位,它由消息头和消息体组成。 队列(Queue):队列是消息的缓冲区,用于存储待处理的消息。 交换器(Exchange):交换器是接收生产者发送的消息并

    2024年02月07日
    浏览(62)
  • 深入详解高性能消息队列中间件 RabbitMQ

      目录 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 优势 4、RabbitMQ 整体架构剖析 4.1、发送消息流程 4.2、消费消息流程 5、RabbitMQ 应用 5.1、广播 5.2、RPC VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++软件异常排查从入

    2024年02月05日
    浏览(80)
  • 常用的消息队列和中间件都有哪些

    常用的消息队列和中间件有以下几种: RabbitMQ:RabbitMQ是一个开源的消息队列中间件,使用Erlang语言编写。它具有可靠性、灵活性和易用性的特点,支持多种消息协议。 Kafka:Kafka是一个高吞吐量的分布式发布订阅消息系统,由Apache开发。它主要用于处理大规模的实时数据流,

    2024年01月17日
    浏览(58)
  • 消息队列中间件 - Docker安装RabbitMQ、AMQP协议、和主要角色

    不管是微服务还是分布式的系统架构中,消息队列中间件都是不可缺少的一个重要环节,主流的消息队列中间件有RabbitMQ、RocketMQ等等,从这篇开始详细介绍以RabbitMQ为代表的消息队列中间件。 AMQP协议 AMQP协议是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与

    2024年02月03日
    浏览(61)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 基于Promise.resolve实现Koa请求队列中间件

    本文作者为360奇舞团前端工程师 最近在做一个 AIGC 项目,后端基于 Koa2 实现。其中有一个需求就是调用兄弟业务线服务端 AIGC 能力生成图片。但由于目前兄弟业务线的 AIGC 项目也是处于测试阶段,能够提供的服务器资源有限,当并发请求资源无法满足时,会响应【服务器繁忙

    2024年02月13日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包