【RabbitMQ】golang客户端教程5——使用topic交换器

这篇具有很好参考价值的文章主要介绍了【RabbitMQ】golang客户端教程5——使用topic交换器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

topic交换器(主题交换器)

发送到topic交换器的消息不能具有随意的routing_key——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的routing_key示例:“stock.usd.nyse”“nyse.vmw”“quick.orange.rabbit”routing_key中可以包含任意多个单词,最多255个字节。

绑定键也必须采用相同的形式。topic交换器背后的逻辑类似于direct交换器——用特定路由键发送的消息将传递到所有匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况: - *(星号)可以代替一个单词。 - #(井号)可以替代零个或多个单词。

通过下面这个示例可以很容易看明白这一点:

【RabbitMQ】golang客户端教程5——使用topic交换器,RabbitMQ,rabbitmq,golang,中间件,后端
在这个例子中,我们将发送一些都是描述动物的信息。将使用包含三个词(两个点)的路由密钥发送消息。路由键中的第一个单词将描述速度,第二个是颜色,第三个是种类:“<speed>.<colour>.<species>”

我们创建了三个绑定关系:Q1与绑定键“ * .orange. * ”绑定,Q2与“* .* .rabbit”和“lazy.#”绑定。

这些绑定可以总结为:

  • Q1对所有橙色动物都感兴趣
  • Q2想接收有关兔子(rabbit)的一切消息,以及有关懒惰(lazy)动物的一切消息。

路由键设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将发送给他们两个。另一方面,“quick.orange.fox”将仅进入第一个队列,而“lazy.brown.fox”将仅进入第二个队列。即使“lazy.pink.rabbit”与两个绑定匹配(匹配Q2的两个绑定),也只会传递到第二个队列一次。 “quick.brown.fox”与任何绑定都不匹配,因此将被丢弃。

如果我们打破约定并发送一个或四个单词的消息,例如“orange”“quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不匹配任何绑定,并且将会丢失。

另外,“lazy.orange.male.rabbit”即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。

topic交换器

topic交换器功能强大,可以像其他交换器一样运行。

当队列用“#”(井号)绑定键绑定时,它将接收所有消息,而与路由键无关,就像在fanout交换器中一样。

当在绑定中不使用特殊字符“*”(星号)和“#”(井号)时,topic交换器的行为就像direct交换器一样。

完整示例

我们将在日志记录系统中使用topic交换器。我们将从一个可行的假设开始,即日志的路由键将包含两个词:“<facility>.<severity>”

emit_log_topic.go的代码:

package main

import (
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs_topic",          // exchange
                severityFrom(os.Args), // routing key
                false, // mandatory
                false, // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 3) || os.Args[2] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[2:], " ")
        }
        return s
}

func severityFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "anonymous.info"
        } else {
                s = os.Args[1]
        }
        return s
}

receive_logs_topic.go 的代码:

package main

import (
        "log"
        "os"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        if len(os.Args) < 2 {
                log.Printf("Usage: %s [binding_key]...", os.Args[0])
                os.Exit(0)
        }
  			// 绑定topic
        for _, s := range os.Args[1:] {
                log.Printf("Binding queue %s to exchange %s with routing key %s",
                        q.Name, "logs_topic", s)
                err = ch.QueueBind(
                        q.Name,       // queue name
                        s,            // routing key
                        "logs_topic", // exchange
                        false,
                        nil)
                failOnError(err, "Failed to bind a queue")
        }

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto ack
                false,  // exclusive
                false,  // no local
                false,  // no wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf(" [x] %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
        <-forever
}

想要接收所有的日志:

go run receive_logs_topic.go "#"

要从“kern”接收所有日志:

go run receive_logs_topic.go "kern.*"

或者,如果你只想接收“critical”日志:

go run receive_logs_topic.go "*.critical"

你可以创建多个绑定:

go run receive_logs_topic.go "kern.*" "*.critical"

并发出带有路由键“kern.critical”的日志:

go run emit_log_topic.go "kern.critical" "A critical kernel error"

你可以自己尝试玩一下这个程序。请注意,代码没有对路由键或绑定键进行任何假设,你可能希望使用两个以上的路由键参数。

源自:https://www.rabbitmq.com/getstarted.html文章来源地址https://www.toymoban.com/news/detail-634730.html

到了这里,关于【RabbitMQ】golang客户端教程5——使用topic交换器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(32)
  • rabbitmq笔记-rabbitmq客户端开发使用

    1.创建ConnectionFactory,给定参数ip地址,端口号,用户名和密码等 2.创建ConnectionFactory,使用uri方式实现,创建channel。 注意: Connection可以用来创建多个channel实例,但channel实例不能在线程间共享,应用程序为每个线程开辟一个channel。多线程间共享channel实例是非线程安全的。

    2024年02月11日
    浏览(34)
  • 自定义kafka客户端消费topic

    使用自定义的KafkaConsumer给spring进行管理,之后在注入topic的set方法中,开单线程主动订阅和读取该topic的消息。 后端服务不需要启动时就开始监听消费,而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 使用的spring集成2.1.8.RELEASE的版本,在@KafkaListener注解中没

    2024年02月02日
    浏览(41)
  • (五)「消息队列」之 RabbitMQ 主题(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月16日
    浏览(46)
  • (四)「消息队列」之 RabbitMQ 路由(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月17日
    浏览(29)
  • Golang笔记:使用ssh包作为客户端与SSH服务器交互

    Golang中可以使用 golang.org/x/crypto/ssh 包作为SSH客户端或者SSH服务使用。这篇文章将简单记录下作为客户端使用的一些内容。 Package ssh implements an SSH client and server. 作为客户端与SSH服务器操作上来说主要分为三步: 使用一定的参数与SSH服务器建立连接得到 Client 对象; 在 Client 之

    2024年02月09日
    浏览(37)
  • (七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

    发布者确认 是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理 异步确认 ,这意味着它们已在服务器端得到处理。 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主

    2024年02月16日
    浏览(28)
  • Golang编写客户端SDK,并开源发布包到GitHub,供其他项目import使用

    如果希望其他项目能够使用该SDK,可以将该SDK打包为一个Go模块,并将其发布到Go模块仓库中。这将使其他项目能够通过Go的模块依赖机制来使用该SDK。可以轻松地引用和使用你的代码。 登录到你的 GitHub 帐户。 在 GitHub 主页点击右上角的加号(+),然后选择 “New repository”(

    2024年02月09日
    浏览(34)
  • SVN客户端使用教程

    正式进入公司项目后,我们需要和同事进行协同开发,此时代码管理工具是必不可少的,目前常用的两款工具是:SVN 和 Git。本人因为要搭建一个自动化测试的框架,对SVN和Git都不熟悉,在调研后选择了操作更为简单的代码管理工具:SVN。 SVN是什么? 全称Subversion,属于集中

    2024年02月08日
    浏览(32)
  • Trojan客户端使用教程

    此教程使用的是 Centos7 x86_64系统 使用此命令下载Trojan客户端 官方版本(GitHub): cd /usr/src wget https://github.com/trojan-gfw/trojan/releases/download/v1.15.1/trojan-1.15.1-linux-amd64.tar.xz 解压Trojan文件 tar xvf trojan-1.15.1-linux-amd64.tar.xz 打开配置文件 cd /usr/src/trojan vi config.json 按i进入编辑模式 run_typ

    2024年02月02日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包