Golang学习笔记_RabbitMQ的原理架构和使用

这篇具有很好参考价值的文章主要介绍了Golang学习笔记_RabbitMQ的原理架构和使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ 简介

  1. 实现了高级消息队列协议(Advanced Message Queuing Protcol)AMQP
  2. 消息队列中间件的作用(Redis实现MQ里面有写过,这里简单带过)
    1. 解耦
    2. 削峰
    3. 异步处理
    4. 缓存
    5. 消息通信
    6. 提高扩展性

RabbitMQ 架构理解

  1. binding(绑定):交换机将消息路由给Queue所遵循的规则,可以定义一个路由键,用于交换机筛选特定的Queue
    1. Routing_Key(路由键):Producer 和 Consumer 协商一致的 key 策略。主要在交换机的 direct(直连)和 topic(主题) 模式下使用,fanout(广播)模式下不使用Routing_Key
  2. Exchange(交换机):主要功能是分发消息给特定的Queue,只负责转发,不具备存储消息的功能。Exchange有以下四种模式:
    1. direct(直连模式),根据携带的Routing_Key来筛选特定的Queue进行消息投递。是RabbitMQ的默认类型,可以不指定Routing_Key,在创建时会默认生成与Queue重名。
    2. hander(头模式),使用场景不多,消息路由涉及多个属性的时候,交换机使用多属性来代替Routing_key建立路由规则,还可以定义匹配单词的个数,例如any为有一个单词满足条件就匹配成功。all为所有单词都满足条件才匹配成功。
    3. fanout(广播模式),不看Routing_Key。只根据Exchange和Queue的binding情况来分发信息。所有与之binding的queue都将接收到同一条消息。
    4. topic(主题模式),相当于模糊查询。topic的routing_key是使用 . 来进行隔断的。有两种匹配方法:
      1. " * " 匹配一个单词,例子如下
      2. " # " 匹配0个~多个单词,例子如下
rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic
rabbitMQ.# == rabbit.topic == rabbit.topic.topic
  1. Queue(消息队列的存储数据结构):
    1. 存储方式:
      • 持久化,在Server本地硬盘存储一份
      • 临时队列,重启后丢失数据
      • 自动删除,不存在用户连接则删除queue
    2. 队列对ACK请求的不同情况
      • consumer 接收并 ack,queue 删除数据并向 consumer 发送新消息
      • consumer 接收但是未 ack 就断开了连接,queue 会认为消息并未传送成功,consumer 再次连接时会重新发送消息
      • 如果consumer 接收消息成功 ,但是忘记 ack 则 queue 不会重复发送消息
      • 如果 consumer 拒收消息,则 queue 会向另外满足条件的 consumer 继续发送这条消息

RabbitMQ 工作流程

Producer方向
  1. Producer 与 RabbitMQ Broker 建立连接,开启一个信道 channel
  2. 声明交换机并设置属性(交换机类型、持久化等)
  3. 声明Queue并设置属性(持久化,自动删除等)
  4. 通过Routing_key来binding交换机和Queue
  5. 发送信息给交换,交换机根据Routing_key来确认投递的queue
  6. 查找成功后将消息存到queue
  7. 查找失败将消息丢弃或抛回给生产者
  8. 关闭channel
Consumer方向
  1. 与 queue 建立连接,开启channel
  2. 向queue请求队列中的msg
  3. 等待queue回应,开始接收消息
  4. 消息处理完成后 返回回调确认ack
  5. queue 将确认的消息从队列中删除
  6. 关闭channel

RabbitMQ的两种部署方式

Meta Data : 元数据(描述数据的数据)

  • vhost meta data : 为Queue、Exchange、Binding提供命名空间级别的隔离
  • exchange meta data:记录路由的名称类型和属性
  • binding mate data:映射 routing_key和queue之间的绑定关系
  • queue mate data:表队列名称和属性
普通模式

对于该模式的两个节点,消息只会存在其中一个节点,另一个节点只保存mate data,当consumer 连接节点2访问节点1的数据信息时,消息会在两个节点中传递。
该模式下p和c应尽量连接每个节点,这样起到线性拓展的作用。
但存在一个问题,如果节点上还有未消费的消息,但是节点挂了。如果节点设置了持久化,则需要在节点重启的时候消息才会恢复。如果未设置持久化,则消息会丢失。

镜像模式

消息存在多个节点中,消息会在节点与节点之间同步,可实现高可用(当一个节点挂了,另一个节点可以接替其位置,继续工作)但会降低性能,因为大量消息进入和同步,会占用大量带宽,但是为了保证高可靠性需要取舍。

面试题

  • Q:如何保证消息不被重复消费?
    • A:MQ通过确认机制ACK,进行确认。确认后消息从queue中删除,保证消息不被重复消费的。如果因为网络原因ack没有成功发出,导致消息重新投递。可以使用全局唯一消息id来避免。
    1. 消息发送者发送消息时携带一个全局唯一的消息id
    2. 消费者监听到消息后,根据id在redis或者db中查询是否存在消费记录
    3. 如果没有消费就正常消费,消费完毕后,写入redis或者db
    4. 如果消息消费过则直接丢弃
  • Q:如何保证消息的消费顺序?
    • A:RabbitMQ中存在一个设置,叫独占队列。即在同一时间只有一个消费者会消费消息。从而制止了异步操作,保证消费顺序。或者一个Producer对一个Consumer
  • Q:如何保证数据一致性?
    • A:因为MQ的使用场景多为分布式系统,所以一般不追求强一致性。而保证最终一致性就可以。
    • 而保证数据最终一致性,可以采用消息补偿机制。即消息在消费者处理完之后调用生产者的API修改数据状态。如未调用API则判断为消息处理失败或出错。此时间隔一段时间后重新投递消息进行再次操作。
    • 消费者收到消息,处理完毕后,发送一条响应消息给生产者也是消息补偿机制,本意是确认消费者成功消费消息。ACK也是处理方法

RabbitMQ的使用(Golang使用amqp包)

代码部分参考 upup小亮的博客

代码只是简单的操作,主要是熟悉流程。对于如何创建Queue和绑定Exchange之类的操作有个了解。

Simple(简单收发模式,只有一个Queue)

Simple运行机制与WorkQueue相似,只是一个Consumer与多个Consumer的区别。多个Consumer之间存在竞争关系,所以工作队列是创建多个Consumer,多个竞争只有一个可以获取消息消费。消费成功后ack消息删除。
演示代码放到一起了:

WorkQueue 工作队列

生产者

// simple and work queue
func main2() {
	// 连接到 rabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("无法创建连接:%s", err)
		return
	}
	// 默认关闭
	defer conn.Close()

	// 创建通道Channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("无法创建channel:%s", err)
		return
	}
	// 通道关闭
	defer ch.Close()

	// 创建存储队列
	queue, err := ch.QueueDeclare(
		"hello", // 队列名称
		false, // 持久化设置,可以为true根据需求选择
		false, // 自动删除,没有用户连接删除queue一般不选用
		false, //独占
		false, //等待服务器确认
		nil)   //参数
	if err != nil {
		fmt.Println(err)
		log.Fatalf("无法声明队列:%s", err)
		return
	}

	var body string
	// 发送信息
	for i := 0; i < 10; i++ {
		fmt.Println(i)
		body = "Hello RabbitMQ" + string(i)
		err = ch.Publish(
			"",
			queue.Name,
			false, // 必须发送到消息队列
			false, // 不等待服务器确认
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(body),
			})
		if err != nil {
			log.Fatalf("消息生产失败:%s", err)
			continue
		}
	}
}

消费者

	// create conn
	// 如果同时运行两个这样的consumer代码,就是工作队列。只有一个consumer就是simple
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("无法创建连接:%s", err)
		return
	}
	defer conn.Close()

	// create channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("无法创建channel:%s", err)
		return
	}
	defer ch.Close()
	// create queue
	queue, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil)
	if err != nil {
		log.Fatalf("无法创建queue:%s", err)
		return
	}
	
	// 消费信息

	msgs, err := ch.Consume(
		queue.Name,
		"",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
		log.Fatalf("无法消费信息:%s", err)
		return
	}
	for msg := range msgs {
		log.Println(string(msg.Body))
	}
	return

pub/sub 发布订阅模式

发布订阅模式可以创建两个Queue,绑定到同一个Exchange中
生产者这边只需要跟交换机对接,而交换机类型为fanout

func main() {
	// 连接到 rabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("无法创建连接:%s", err)
	}
	// 默认关闭
	defer conn.Close()

	// 创建通道Channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("无法创建channel:%s", err)
	}
	defer ch.Close()

	// create exchange
	ex := ch.ExchangeDeclare(
		"exchange1", // 交换机名称
		"fanout",    // 交换机类型
		true,        // 是否持久化
		false,       // 是否自动删除
		false,       // 是否内部使用
		false,       // 是否等待服务器响应
		nil,         // 其他属性
	)
	fmt.Println(ex)

	body := "Hello RabbitMQ for Pub/Sub"
	err = ch.Publish(
		"exchange1",
		"", // routing key 可以为空,因为fanout不看routing key
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	if err != nil {
		log.Fatalf("err %s:", err)
	}
	log.Println(body)
}

消费者:创建交换机,类型为fanout,创建队列,绑定交换机(创建多个consumer绑定同一个queue和同一个交换机。这样发送一个消息,所有的consumer都能收到。== 发布订阅模型)

	// Pub/Sub
	// Create conn
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil{
		log.Fatalf(err)
	}
	defer conn.Close()

	// channel create
	ch, err := conn.Channel()
	if err != nil{
		log.Fatalf(err)
	}
	defer ch.Close()

	// exchange create
	ex := ch.ExchangeDeclare(
		"exchange1",
		"fanout",
		true,
		false,
		false,
		false,
		nil)

	fmt.Println(ex)

	// queue create
	queue, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil)
	if err != nil{
		log.Fatalf(err)
	}
	err = ch.QueueBind(
		queue.Name,
		"",
		"exchange1",
		false,
		nil)
	if err != nil{
		log.Fatalf(err)
	}

	msgs, err := ch.Consume(
		queue.Name,
		"",
		true,
		false,
		false,
		false,
		nil)
	if err != nil{
		log.Fatalf(err)
	}
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf("Waiting for messages. To exit press CTRL+C")
	<-make(chan struct{}) // 阻塞主goroutine
}

Routing 模式(对特定的队列投递消息)

生产者

func main() {
	// 连接到 rabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("无法创建连接:%s", err)
	}
	// 默认关闭
	defer conn.Close()

	// 创建通道Channel
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("无法创建channel:%s", err)
	}
	defer ch.Close()

	// create exchange
	ex := ch.ExchangeDeclare(
		"exchange1", // 交换机名称
		"direct",    // 交换机类型
		true,        // 是否持久化
		false,       // 是否自动删除
		false,       // 是否内部使用
		false,       // 是否等待服务器响应
		nil,         // 其他属性
	)
	fmt.Println(ex)
	body := "Hello RabbitMQ for direct routing"
		// 发布消息到交换机,并指定路由键
	err = ch.Publish(
		"logs_direct", // 交换机名称
		"routing_key", // 路由键
		false,         // 是否等待服务器响应
		false,         // 是否立即将消息写入磁盘
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	if err != nil{
		log.Fatalf("无法创建send msg:%s", err)
	}
	log.Printf("Sent message: %s", message)

消费者

func main() {
	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil{
		log.Fatalf("无法创建send msg:%s", err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil{
		log.Fatalf("无法创建send msg:%s", err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		"logs_direct", // 交换机名称
		"direct",      // 交换机类型
		true,          // 是否持久化
		false,         // 是否自动删除
		false,         // 是否内部使用
		false,         // 是否等待服务器响应
		nil,           // 其他属性
	)
	if err != nil{
		log.Fatalf("无法创建send msg:%s", err)
	}

	// 声明一个临时队列
	q, err := ch.QueueDeclare(
		"",    // 队列名称,留空表示由RabbitMQ自动生成,因为定义了key所以队列名可以是随意的,毕竟是依靠key来进行匹配的
		false, // 是否持久化
		false, // 是否自动删除(当没有任何消费者连接时)
		true,  // 是否排他队列(仅限于当前连接)
		false, // 是否等待服务器响应
		nil,   // 其他属性
	)
	// 将队列绑定到交换机上,并指定要接收的路由键
	err = ch.QueueBind(
		q.Name,        // 队列名称
		"routing_key",      // 路由键
		"logs_direct", // 交换机名称
		false,         // 是否等待服务器响应
		nil,           // 其他属性
	)
	if err != nil{
		log.Fatalf("无法创建send msg:%s", err)
	}

	// 订阅消息
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标识符,留空表示由RabbitMQ自动生成
		true,   // 是否自动应答
		false,  // 是否独占模式(仅限于当前连接)
		false,  // 是否等待服务器响应
		false,  // 其他属性
		nil,    // 其他属性
	)
	failOnError(err, "Failed to register a consumer")

	// 接收消息的goroutine
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf("Waiting for messages. To exit press CTRL+C")
	<-make(chan struct{}) // 阻塞主goroutine

topic

func main() {
	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil{
		log.Fatalf(err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil{
		log.Fatalf(err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		"logs_topic", // 交换机名称
		"topic",      // 交换机类型
		true,         // 是否持久化
		false,        // 是否自动删除
		false,        // 是否内部使用
		false,        // 是否等待服务器响应
		nil,          // 其他属性
	)
	if err != nil{
		log.Fatalf(err)
	}

	// 定义要发送的消息的路由键和内容
	routingKey := "example.key.das"
	message := "Hello, RabbitMQ!"

	// 发布消息到交换机,并指定路由键
	err = ch.Publish(
		"logs_topic", // 交换机名称
		routingKey,   // 路由键
		false,        // 是否等待服务器响应
		false,        // 是否立即发送
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
	if err != nil{
		log.Fatalf(err)
	}

	log.Printf("Sent message: %s", message)
}

消费者文章来源地址https://www.toymoban.com/news/detail-856787.html

func main() {
	// 连接到RabbitMQ服务器
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil{
		log.Fatalf(err)
	}
	defer conn.Close()

	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil{
		log.Fatalf(err)
	}
	defer ch.Close()

	// 声明一个交换机
	err = ch.ExchangeDeclare(
		"logs_topic", // 交换机名称
		"topic",      // 交换机类型
		true,         // 是否持久化
		false,        // 是否自动删除
		false,        // 是否内部使用
		false,        // 是否等待服务器响应
		nil,          // 其他属性
	)
	if err != nil{
		log.Fatalf(err)
	}

	// 声明一个临时队列
	q, err := ch.QueueDeclare(
		"",    // 队列名称,留空表示由RabbitMQ自动生成
		false, // 是否持久化
		false, // 是否自动删除(当没有任何消费者连接时)
		true,  // 是否排他队列(仅限于当前连接)
		false, // 是否等待服务器响应
		nil,   // 其他属性
	)
	if err != nil{
		log.Fatalf(err)
	}

	// 将队列绑定到交换机上,并指定要接收的路由键
	err = ch.QueueBind(
		q.Name,       // 队列名称
		"example.#",  // 路由键,可以使用通配符*匹配一个单词
		"logs_topic", // 交换机名称
		false,        // 是否等待服务器响应
		nil,          // 其他属性
	)
	if err != nil{
		log.Fatalf(err)
	}
	// 创建一个消费者通道
	msgs, err := ch.Consume(
		q.Name, // 队列名称
		"",     // 消费者标识符,留空表示由RabbitMQ自动生成
		true,   // 是否自动应答
		false,  // 是否排他消费者
		false,  // 是否阻塞
		false,  // 是否等待服务器响应
		nil,    // 其他属性
	)
	if err != nil{
		log.Fatalf(err)
	}
	// 接收和处理消息
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf("Waiting for messages...")
	// 阻塞
	<-forever
}

到了这里,关于Golang学习笔记_RabbitMQ的原理架构和使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【RabbitMQ】golang客户端教程5——使用topic交换器

    发送到 topic交换器 的消息不能具有随意的 routing_key ——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的 routing_key 示例: “stock.usd.nyse” , “nyse.vmw” , “quick.orange.rabbit” 。 routing_key 中可以包含任意多个单词,

    2024年02月14日
    浏览(31)
  • Golang快速入门到实践学习笔记

    Go程序设计的一些规则 Go之所以会那么简洁,是因为它有一些默认的行为: 大写字母开头的变量是可导出的,也就是其它包可以读取 的,是公用变量;小写字母开头的就是不可导出的,是私有变量。 大写字母开头的函数也是一样,相当于class 中的带public的公有函数;

    2024年02月20日
    浏览(42)
  • 【RabbitMQ】golang客户端教程3——发布订阅(使用fanout交换器)

    在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务只传递给一个工人。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者传递一个消息。这就是所谓的 “订阅/发布模式” 。 为了说明这种模式,我们将构建一个简单的日志系统。

    2024年02月14日
    浏览(32)
  • golang学习笔记(defer基础知识)

    defer语句用于golang程序中延迟函数的调用, 每次defer都会把一个函数压入栈中, 函数返回前再把延迟的函数取出并执行。 为了方便描述, 我们把创建defer的函数称为主函数, defer语句后面的函数称为延迟函数。延迟函数可能有输入参数, 这些参数可能来源于定义defer的函数,

    2024年04月25日
    浏览(36)
  • 【后端学习笔记·Golang】邮箱邮件验证

    流程: 接收用户请求后生成随机验证码,并将验证码存入Redis中,并设置TTL 通过gomail发送验证码给用户邮箱 接收用户输入的验证码,与Redis中存放的验证码进行比对 ​ 随机种子通过 time.Now().UnixNano() 进行设置,以确保对于同一个用户每次请求都使用不同的种子。然后,定义

    2024年04月26日
    浏览(38)
  • Golang 中高级工程师学习笔记

    闭包(Closure)是一种函数值,它可以引用在其外部定义的变量。闭包允许这些变量保持在函数内部,而不是被每次调用时重新创建。闭包的作用主要体现在以下几个方面 封装: 闭包允许函数访问其外部作用域中的变量,形成了一种封装。这意味着可以在闭包内部定义私有的

    2024年01月21日
    浏览(34)
  • Golang笔记:UDP基础使用与广播

    UDP是比较基础常用的网络通讯方式,这篇文章将介绍Go语言中UDP基础使用的一些内容。 本文中使用 Packet Sender 工具进行测试,其官网地址如下: https://packetsender.com/ UDP是一种面向无连接的通讯,抛开业务逻辑来说UDP使用上不需要像TCP那样先建立连接才能使用,收就是收、发就

    2024年02月09日
    浏览(31)
  • golang对接rabbitMQ

    此文参考如下链接: Golang 使用 RabbitMQ - 知乎 以go rabbitmq为例子--用最少的时间最好的掌握消息队列_胡桃姓胡,蝴蝶也姓胡的博客-CSDN博客 rabbitmq的个组件、消息传递原理就不做解释了,百度一大堆。 首先要明确的是,tabbitmq是c/s架构的软件,对接rabbitMQ也就是通过client(客户

    2023年04月15日
    浏览(29)
  • Golang RabbitMQ实现的延时队列

    之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。 延迟队列是一种特殊类型的消息队列 ,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景

    2024年02月10日
    浏览(43)
  • 编程笔记 Golang基础 007 第一个程序:hello world 使用Goland

    开始在Goland环境中编程go语言代码啦。 打开GoLand软件。 选择 “File”(文件)菜单,然后点击 “New Project”(新建项目)或使用快捷键 Ctrl+Shift+A 并搜索 “New Project”。 在新建项目向导中,选择 “Go” 并点击 “Next” 按钮。 配置项目设置: 为项目选择一个合适的保存位置。

    2024年02月20日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包