Golang使用消息队列(RabbitMQ)

这篇具有很好参考价值的文章主要介绍了Golang使用消息队列(RabbitMQ)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。

使用RabbitMQ的架构

Golang使用消息队列(RabbitMQ),golang,rabbitmq,开发语言

代码

因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节

  • 注意交换机和队列的绑定,一定要细心
  • 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
  • 下列的内容不包含RabbitMQ持久化的内容
package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"strings"
)

func InitRabbitMQ() *amqp.Connection {
	mq := "amqp"
	host := "127.0.0.1"
	port := "5672"
	user := "root"
	pwd := "root"
	dns := strings.Join([]string{mq, "://", user, ":", pwd,
		"@", host, ":", port, "/"}, "")
	conn, err := amqp.Dial(dns)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	return conn
}

func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {
	// 队列信息
	exchangeName := "main_exchange"
	queueName := fmt.Sprintf("user_queue_%s", userID)
	messageTTL := int32(300000)

	// 声明主交换机
	err := ch.ExchangeDeclare(
		exchangeName, // 交换机名
		"direct",     // Exchange type
		false,        // Durable
		false,        // Auto-deleted
		false,        // Internal
		false,        // No-wait
		nil,          // Arguments
	)
	if err != nil {
		log.Fatalf("Failed to declare an main exchange: %v", err)
	}

	// 声明用户队列
	_, err = ch.QueueDeclare(
		queueName, // 队列名
		false,     // Durable
		false,     // Delete when unused
		false,     // Exclusive
		false,     // No-wait
		amqp.Table{
			"x-dead-letter-routing-key": "dead",          // routing-key
			"x-dead-letter-exchange":    "dead_exchange", // 死信交换机
			"x-message-ttl":             messageTTL,      // TTL
		},
	)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定
	err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)
	if err != nil {
		log.Fatalf("Failed to bind queue to exchange: %v", err)
	}

	return ch
}

func InitDeadExchangeAndQueue(ch *amqp.Channel) {
	// 声明死信交换机
	err := ch.ExchangeDeclare(
		"dead_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("Failed to declare an dead exchange: %v", err)
	}

	// 声明一个死信队列
	_, err = ch.QueueDeclare(
		"dead_queue",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
		log.Fatalf("Failed to declare a queue: %v", err)
	}

	// 绑定
	err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)
	if err != nil {
		log.Fatalf("Failed to bind queue to exchange: %v", err)
	}
}

func PublishMessage(ch *amqp.Channel, userID, fileID string) {
	// 用户信息
	message := fmt.Sprintf("%s|%s", userID, fileID)
	exchangeName := "main_exchange"
	// 发布用户消息
	err := ch.Publish(
		exchangeName, // Exchange
		userID,       // Routing key
		false,        // Mandatory
		false,        // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	if err != nil {
		log.Fatalf("Failed to publish a message: %v", err)
	}
	log.Printf("Message sent to user %s: %s", userID, message)
}

func ConsumeTTL(ch *amqp.Channel) {
	// 声明死信交换机
	err := ch.ExchangeDeclare(
		"dead_exchange", // 交换机名
		"direct",        // Exchange type
		true,            // Durable
		false,           // Auto-deleted
		false,           // Internal
		false,           // No-wait
		nil,             // Arguments
	)
	if err != nil {
		log.Fatalf("Failed to declare a dead letter exchange: %v", err)
	}

	// 创建消费者并阻塞等待消费死信队列中的消息
	megs, err := ch.Consume(
		"dead_queue", // Queue
		"",           // Consumer
		false,        // Auto-acknowledge
		false,        // Exclusive
		false,        // No-local
		false,        // No-wait
		nil,          // Args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)
	}

	// 使用无限循环一直监听
	fmt.Println("Waiting for message from dead_queue......")
	for d := range megs {
		// 实际中,处理消息的逻辑,例如删除文件或其他操作
		fmt.Println(string(d.Body))

		// 消费完成后手动确认消息
		err = d.Ack(false)
		if err != nil {
			log.Fatalf("Failed to ack message: %v", err)
		}
	}
}

func Consume(ch *amqp.Channel, userID string) {
	// 下面的信息可以通过前后端进行传递
	queueName := fmt.Sprintf("user_queue_%s", userID)

	// 消费消息
	megs, err := ch.Consume(
		queueName, // Queue
		"",        // Consumer
		true,      // Auto-acknowledge
		false,     // Exclusive
		false,     // No-local
		false,     // No-wait
		nil,       // Args
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}

	// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听
	d, ok := <-megs
	if !ok {
		log.Fatalf("Failed to get message: %v", err)
	}
	fmt.Println(string(d.Body))
	// 消息完成后确认消息
	err = d.Ack(true)
	if err != nil {
		log.Fatalf("Failed to ack message: %v", err)
	}
}

func main() {
	// 获取客户端
	client := InitRabbitMQ()
	defer client.Close()

	ch, err := client.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	defer ch.Close()

	//ConsumeTTL(ch)

	// 构造dead_exchange及dead_queue
	// InitDeadExchangeAndQueue(ch)

	// 假设这是web请求信息
	//var userID1 = "test-id1"
	//var fileID1 = "file1"

	// 构造main_exchange及user_queue
	//ch = InitMainExchangeAndQueue(ch, userID1)
	// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中
	//PublishMessage(ch, userID1, fileID1)

	//time.Sleep(20 * time.Second)

	// 模拟后端消费消息
	//Consume(ch, userID1)

	// 针对用户2:模拟其不被后端消费,过期到死信队列中
	var userID2 = "test-id2"
	var fileID2 = "file2"
	ch = InitMainExchangeAndQueue(ch, userID2)
	PublishMessage(ch, userID2, fileID2)
	// 注意这个消息没有被消费,理论上应当被死信队列消费
}

从dead_exchange中消费:
Golang使用消息队列(RabbitMQ),golang,rabbitmq,开发语言文章来源地址https://www.toymoban.com/news/detail-693072.html

到了这里,关于Golang使用消息队列(RabbitMQ)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Golang学习笔记_RabbitMQ的原理架构和使用

    实现了高级消息队列协议(Advanced Message Queuing Protcol)AMQP 消息队列中间件的作用(Redis实现MQ里面有写过,这里简单带过) 解耦 削峰 异步处理 缓存 消息通信 提高扩展性 RabbitMQ 架构理解 binding(绑定):交换机将消息路由给Queue所遵循的规则,可以定义一个路由键,用于交换机

    2024年04月24日
    浏览(32)
  • 【RabbitMQ】RabbitMQ 消息的堆积问题 —— 使用惰性队列解决消息的堆积问题

    消息的堆积问题是指在消息队列系统中,当生产者以较快的速度发送消息,而消费者处理消息的速度较慢,导致消息在队列中积累并达到队列的存储上限。在这种情况下,最早被发送的消息可能会在队列中滞留较长时间,直到超过队列的容量上限。当队列已满且没有更多的可

    2024年02月05日
    浏览(37)
  • RabbitMQ 消息队列使用

    同步调用优点: 时效性强,立即得到结果 缺点: 耦合度高 新业务新需求到来时,需要修改代码 性能和吞吐能力下降 调用服务的响应时间为所有服务的时间之和 资源浪费 调用链中的服务在等待时不会释放请求占用的资源 级联失败 一个服务执行失败会导致调用链后续所有服务失

    2024年01月21日
    浏览(41)
  • RabbitMQ-网页使用消息队列

    几种模式 从最简单的开始 添加完新的虚拟机可以看到,当前admin用户的主机访问权限中新增的刚添加的环境 1.1查看交换机 交换机列表中自动新增了刚创建好的虚拟主机相关的预设交换机。一共7个。前面两个 direct类型的交换机,一个是(AMQP default)还有一个是amq.direct,它们

    2024年02月07日
    浏览(32)
  • Hyperf使用RabbitMQ消息队列

    Hyperf连接使用RabbitMQ消息中间件 使用Docker部署RabbitMQ,-传送门 使用Docker部署Hyperf,-传送门- 安装amqp扩展 安装command命令行扩展 配置参数 假设已经在rabbitmq设置了交换机exchange_test和队列queue_test 新建 /config/autoload/amp.php配置文件,修改地址和用户名密码 创建生产者中间件 exch

    2024年02月13日
    浏览(33)
  • 消息队列RabbitMQ.01.基本使用

    目录  RabbitMQ的作用 Message queue 释义  问题思考  存在的问题 优化方案  案例分析  带来的好处  消息队列特点  Email邮件案例分析 Docker安装部署RabbitMQ 1.下拉镜像 2.运行RabbitMQ  3.打开防火墙端口号并重新运行防火墙 4.容器启动后,可以通过 docker logs 容器 查看日志 6.通过刚才

    2024年01月21日
    浏览(32)
  • Java RabbitMQ消息队列简单使用

    消息队列,即MQ,Message Queue。 消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    2024年02月12日
    浏览(44)
  • 【RabbitMQ】golang客户端教程5——使用topic交换器

    发送到 topic交换器 的消息不能具有随意的 routing_key ——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的 routing_key 示例: “stock.usd.nyse” , “nyse.vmw” , “quick.orange.rabbit” 。 routing_key 中可以包含任意多个单词,

    2024年02月14日
    浏览(31)
  • Thinkphp6使用RabbitMQ消息队列

    Thinkphp6连接使用RabbitMQ(不止tp6,其他框架对应改下也一样),使用Docker部署RabbitMQ,在上一篇已经讲了-传送门-。 开始前先进入RabbitMQ的web管理界面,选择Queues菜单,点击底部的Add a new queue,新建一个test的队列。 安装thinkphp6框架 安装workerman扩展 安装rabbitmq扩展 生产者 在a

    2023年04月23日
    浏览(65)
  • SSM 如何使用 RabbitMQ 实现消息队列

    在分布式系统中,消息队列是一种常见的通信方式,可以实现不同服务之间的异步通信和解耦。RabbitMQ 是一个开源的消息队列软件,本文将介绍如何在 SSM 框架中使用 RabbitMQ 实现消息队列。 本文将使用 Spring Boot 作为 SSM 框架,使用 Maven 进行项目管理。 在开始之前,需要安装

    2024年02月06日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包