go对rabbitmq基本操作

这篇具有很好参考价值的文章主要介绍了go对rabbitmq基本操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、安装rabbitmq

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit01 \
     --hostname rabbit01 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     -v mq-plugins:/plugins \
     --name rabbit02 \
     --hostname rabbit02 --restart=always \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3.8-management
    

二、go语言对rabbitmq基本操作

  • 1、安装依赖包

    go get github.com/streadway/amqp
    
  • 2、基本的连接操作

    package main
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func main() {
    	// 连接rabbitmq
        // conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称")   // 端口号:5672
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	defer conn.Close()
    
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	defer ch.Close()
    }
    
  • 3、由于部分每个地方都要使用,封装成一个方法

    package utils
    
    import (
    	"fmt"
    	"github.com/streadway/amqp"
    )
    
    func RabbitmqUtils() *amqp.Channel {
    	// 连接rabbitmq
    	conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672
    	//defer conn.Close()
    	// 打开通道
    	ch, err := conn.Channel()
    	fmt.Println(err)
    	//defer ch.Close()
    	return ch
    }
    
  • 4、创建一个队列,然后到可视化界面查看是否自动创建

    func main() {
    	// 创建一个队列
        // durable, autoDelete, exclusive, noWait bool
    	queue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil)
    	fmt.Println(queue.Name, err)
    }
    

    gostack mq启动命令,go,golang,rabbitmq,开发语言

  • 5、关于创建队列几个参数的介绍

    • 第一个参数是队列名称
    • 第二个参数是队列是否持久化
    • 第三个参数是是否自动删除
    • 第四个参数是队列是否可以被其他队列访问
    • 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度

三、简单模式

  • 1、根据官网图来看,简单模式是不需要交换机的

    gostack mq启动命令,go,golang,rabbitmq,开发语言

  • 2、定义生产者,向队列中发送消息(注意要先创建队列)

    func main() {
        /**
    	第一个参数是交换机名称
    	第二个参数是队列名称
    	第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃
    	第四个参数是 路由的时候
    	第五个参数是消息体
    	*/
    	err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    	fmt.Println(err)
    }
    
  • 3、查看可是界面是否存在一条消息

  • 4、创建消费者,来获取消息内容

    /**
    第一个参数是队列名称
    第二个参数自己给当前消费者命名
    第三个参数是否自动应答
    第三个参数队列是否可以被其他队列访问
    第四个参数
    第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
    */
    msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil)
    fmt.Println(err)
    for msg := range msgChan {
        fmt.Println(string(msg.Body))
    }
    

四、工作模式

  • 1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息

  • 2、定义2个消费者来消费消息

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 3、生产多条消息

    func main() {
    	for i := 0; i < 10; i++ {
    		_ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{
    			Body: []byte(fmt.Sprintf("hello word %d", i)),
    		})
    	}
    }
    
  • 4、消费结果

    gostack mq启动命令,go,golang,rabbitmq,开发语言

五、发布订阅模式

  • 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息

  • 2、使用goapi来创建交换机和队列

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil)
    	// 2.创建一个交换机
    	_ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil)
    }
    
  • 3、消费者只需要绑定队列来消费消息就可以

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息

    func main() {
    	_ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    
  • 5、可以查看控制台两个消费者都接收到消息

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {
    	// 1.创建2个队列
    	queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)
    	queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)
    	// 2.创建一个交换机
    	err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)
    	if err != nil {
    		fmt.Println(err)
    	}
    	// 3.队列和交换机绑定在一起
    	_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)
    	_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {
    	msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)
    	fmt.Println(err)
    	for msg := range msgChan {
    		fmt.Println("消费者1:", string(msg.Body))
    	}
    }
    
  • 4、定义生产者

    func main() {
        // 消费者会根据绑定的路由key来获取消息
    	_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{
    		Body: []byte("hello word"),
    	})
    }
    

七、主题模式

  • 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
    • *表示只匹配一个单词
    • #表示匹配多个单词

八、简单对其封装

  • 1、封装代码

    package utils
    
    import (
    	"errors"
    	"fmt"
    	"github.com/streadway/amqp"
    	"log"
    )
    
    // MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"
    
    type RabbitMQ struct {
    	conn    *amqp.Connection
    	channel *amqp.Channel
    	MQUrl   string
    }
    
    // NewRabbitMQ 创建RabbitMQ的结构体实例
    func NewRabbitMQ() *RabbitMQ {
    	rabbitMQ := &RabbitMQ{
    		MQUrl: MQURL,
    	}
    	var err error
    	// 创建rabbitMQ连接
    	rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)
    	if err != nil {
    		rabbitMQ.failOnErr(err, "创建连接错误")
    	}
    	rabbitMQ.channel, err = rabbitMQ.conn.Channel()
    	if err != nil {
    		rabbitMQ.failOnErr(err, "获取channel失败")
    	}
    	return rabbitMQ
    }
    
    // Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {
    	// 1.创建1个队列
    	queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)
    	if err != nil {
    		r.failOnErr(err, "创建队列失败")
    	}
    	if exchange != "" && key == "" {
    		r.failOnErr(errors.New("错误"), "请传递交换机链接类型")
    	}
    	if exchange != "" {
    		// 2.创建一个交换机
    		err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)
    		if err1 != nil {
    			r.failOnErr(err, "创建交换机失败")
    		}
    		// 3.队列和交换机绑定在一起
    		if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {
    			fmt.Println("1111")
    			r.failOnErr(err, "交换机和队列绑定失败")
    		}
    	}
    	fmt.Println("创建成功")
    }
    
    // failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {
    	if err != nil {
    		log.Fatalf("%s:%s", message, err)
    		panic(fmt.Sprintf("%s:%s", message, err))
    	}
    }
    
    func (r *RabbitMQ) Close() {
    	defer func(Conn *amqp.Connection) {
    		err := Conn.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭链接失败")
    		}
    	}(r.conn)
    	defer func(Channel *amqp.Channel) {
    		err := Channel.Close()
    		if err != nil {
    			r.failOnErr(err, "关闭通道失败")
    		}
    	}(r.channel)
    }
    
    func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {
    	err := r.channel.Qos(prefetchCount, prefetchSize, global)
    	if err != nil {
    		r.failOnErr(err, "限流失败")
    	}
    }
    
    // Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {
    	// 2.发送数据到队列中
    	if err := r.channel.Publish(
    		exchange,
    		routerKey,
    		false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者
    		false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者
    		amqp.Publishing{
    			Body: []byte(message),
    		},
    	); err != nil {
    		r.failOnErr(err, "发送消息失败")
    	}
    	fmt.Println("恭喜你,消息发送成功")
    }
    
    // Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {
    	// 2.接收消息
    	message, err := r.channel.Consume(
    		queueName,
    		"",    // 区分多个消费者
    		true,  // 是否自动应答
    		false, // 是否具有排他性
    		false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者
    		false, // 队列消费是否阻塞
    		nil,
    	)
    	if err != nil {
    		r.failOnErr(err, "接收消息失败")
    	}
    	fmt.Println("消费者等待消费...")
    	forever := make(chan bool)
    	// 使用协程处理消息
    	go func() {
    		for d := range message {
    			log.Printf("接收到的消息:%s", d.Body)
    			callback(d.Body)
    		}
    	}()
    	<-forever
    }
    
  • 2、简单模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("simple_queue1", func(message []byte) {
    		fmt.Println(string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("simple_queue1", "", "", "")
    	defer mq.Close()
    	mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("work_queue1", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	for i := 0; i < 10; i++ {
    		mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))
    	}
    }
    
  • 4、交换机带路由的时候文章来源地址https://www.toymoban.com/news/detail-816142.html

    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")
    	mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")
    	mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	mq.Consumer("first_queue2", func(message []byte) {
    		fmt.Println("消费者2", string(message))
    	})
    	defer mq.Close()
    }
    
    func main() {
    	mq := utils.NewRabbitMQ()
    	defer mq.Close()
    	mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

到了这里,关于go对rabbitmq基本操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HDFS基本操作命令

    hdfs shell cli支持操作多种文件系统,包括本地文件系统(file:///),分布式文件系统(hdfs:nn:8020)等 操作的是什么文件系统取决于URL种的前缀协议 如果没有指定前缀,则将会读取环境变量汇总的fs.defaultFS属性,以该属性作为默认文件系统 hdfs dfs -ls file:/// #操作本地文件系统 hdfs dfs

    2024年02月16日
    浏览(30)
  • Git基本操作命令

    目录 一、简介 二、基本命令使用 (1) git add ---将该文件添加到暂存区 (2) git status --- 命令用于查看在上次提交之后是否有对文件进行再次修改 (3) git diff -- 比较文件在暂存区和工作区的差异 (4) git commit  --- 将暂存区内容添加到本地仓库中 (5) git reset --指定退回某一次提交的版

    2024年02月05日
    浏览(30)
  • Docker基本操作命令(一)

    docker search命令搜索存放在 Docker Hub中的镜像,此命令默认Docker会在Docker Hub中搜索镜像,可以配置了其他镜像仓库 [root@zch01 ~]# docker search centos NAME:镜像仓库名称 DESCRIPTION:镜像仓库描述 STARS:镜像仓库收藏数,表示该镜像仓库的受欢迎程度,类似于 GitHub的 stars OFFICAL: 表示是否为

    2024年02月12日
    浏览(31)
  • HBase的基本命令操作

    所有命令来源帮助文档; processlist – 查看当前正在执行的任务 status – 查看服务器的状态信息 table_help – 查看表操作帮助信息。 version – 显示版本信息。 whoami – 用户信息。 create_namespace – 创建命名空间 describe_namespace – 显示指定命名空间的详细信息 alter_namespace – 修改命

    2024年04月14日
    浏览(29)
  • MYSQL基本命令和操作

    🖊作者 : D. Star. 📘专栏 : JAVA 😆今日提问 : 你好,我的朋友,在你的人生途中,会面临很多选择,不管选什么样的结果,我们都多少会有些后悔。如果是你,你会选择爱你的,还是懂你的? Oracle(最好),Mysql( 最广泛–免费 ),SQL Server(好用不火),SQLite(轻量级) MongoDB,Redis,

    2024年02月12日
    浏览(33)
  • redis操作基本命令

    Redis —— Remote Dictionary Server ,它是一个开源的使用ANSI C语言编写、 支持网络 、可基于 内存亦可持久化 的日志型、 Key-Value 数据库,并提供多种语言的API,我们可使用它构建高性能,可扩展的Web应用程序。 具有以下特点: 异常快速:  Redis数据库完全在 内存 中,因此处理速

    2024年02月09日
    浏览(27)
  • 命令行hive的基本操作

    使用SQL语句在命令行创建Hive库: 其中, database_name 是要创建的数据库的名称。例如: 这将在Hive中创建一个名为\\\"mydatabase\\\"的数据库。 使用SQL语句在命令行创建Hive表: 其中, table_name 是要创建的表的名称, column1_name 和 column2_name 是表的列名, column1_type 和 column2_type 是对应列

    2024年03月11日
    浏览(28)
  • FFMPEG基本操作命令指南(二)

    FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。FFmpeg有非常强大的功能包括视频采集功能、视频格式转换、视频抓图、给视频加水印等。 接上第一篇基础的几个命令,下面介绍几个看起来比较复杂的 一:视频剪切 命令示例: ffmpeg -

    2023年04月24日
    浏览(25)
  • Linux学习笔记——基本操作命令

      Linux 操作系统的 Shell 作为操作系统的外壳,为用户提供使用操作系统的接口。它是命令语言、命令解释程序及程序设计语言系统。   Shell 是用户和 Linux 内核之间的接口程序,如果把 Linux 内核想象成一个球体的中心,Shell 就是围绕内核的外层。当从 Shell 或其他程序向

    2024年03月12日
    浏览(58)
  • Git 的基本操作 ——命令行

    详解如下: 本地仓库: 是在开发人员自己电脑上的Git仓库,存放我们的代码(.git 隐藏文件夹就是我们的本地仓库) 远程仓库: 是在远程服务器上的Git仓库,存放代码(可以是github.com或者gitee.com 上的仓库,或者自己该公司的服务器) 工作区: 我们自己写代码(文档)的地方 暂存区:

    2024年02月05日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包