【RabbitMQ 实战】09 客户端连接集群生产和消费消息

这篇具有很好参考价值的文章主要介绍了【RabbitMQ 实战】09 客户端连接集群生产和消费消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、部署一个三节点集群

下面的链接是最快最简单的一种集群部署方法
3分钟部署一个RabbitMQ集群
上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。
每个容器应用都映射了宿主机的端口,分别是5602,5612,5622
docker compse文件如下

version: '3'

services:
  stats:
    image: bitnami/rabbitmq
    environment:
      - RABBITMQ_NODE_TYPE=stats
      - RABBITMQ_NODE_NAME=rabbit@stats
      - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
    ports:
      - '15672:15672'
      - '5602:5672'
    volumes:
      - 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'
  queue-disc1:
    image: bitnami/rabbitmq
    environment:
      - RABBITMQ_NODE_TYPE=queue-disc
      - RABBITMQ_NODE_NAME=rabbit@queue-disc1
      - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats
      - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
    ports:
      - '5612:5672'
    volumes:
      - 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'
  queue-ram1:
    image: bitnami/rabbitmq
    environment:
      - RABBITMQ_NODE_TYPE=queue-ram
      - RABBITMQ_NODE_NAME=rabbit@queue-ram1
      - RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats
      - RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
    ports:
      - '5622:5672'
    volumes:
      - 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'

volumes:
  rabbitmqstats_data:
    driver: local
  rabbitmqdisc1_data:
    driver: local
  rabbitmqram1_data:
    driver: local

通过docker-compose up命令,就可以启动三个集群的容器了

[root@localhost mycompose]# docker-compose up

二、配置文件

原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

server:
  port: 8080
spring:
  application:
    name: rabbitmq-demo
  #配置rabbitMq 服务器
  rabbitmq:
#单节点直接可以写host和port
#    host: 192.168.56.201
#    port: 5672
    #集群连接写ip和端口
    addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622
    username: user
    password: bitnami
    #虚拟host
    virtual-host: virtual01
    template:
      mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
    publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功
    publisher-returns: true #是否开启生产者returns
    listener:
      simple:
        acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法
        prefetch: 10 #每个消费者可拉取的,还未ack的消息数量
        concurrency: 3 #消费端(每个Listener)的最小线程数
        max-concurrency: 10 #消费端(每个Listener)的最大线程数

三、代码

生产者

和单节点的发送和消费代码一致,没有变化

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String ROUTING_KEY = "my_routing";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 正常发送并被broker接收
     * @return
     */
    @RequestMapping("send")
    public String send() {
        for (int i = 0; i < 10; i++) {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setAddress("成都市高新区");
            orderInfo.setOrderId(String.valueOf(i));
            orderInfo.setProductName("华为P60:" + i);

            //设置回调关联的一个id
            String messageId = UUID.randomUUID().toString();
            log.info("开始发送消息,当前消息关联id为:{}", messageId);
            CorrelationData correlationData = new CorrelationData(messageId);

            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                    .andProperties(messageProperties).build();
            //设置ack回调
            rabbitTemplate.setConfirmCallback(this);
            //退回消息的回调
            rabbitTemplate.setReturnCallback(this);
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
        }
        return "ok";
    }

    /**
     * 设置一个非法的路由键,模拟消息被broker退回的情况,前提是
     * spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
     * <p>
     * spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功
     *
     * @return
     */
    @RequestMapping("send-return")
    public String sendAndReturn() {
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setAddress("成都市高新区");
        orderInfo.setOrderId("111");
        orderInfo.setProductName("小米13");

        //设置回调关联的一个id
        String messageId = UUID.randomUUID().toString();
        log.info("开始发送消息,当前消息关联id为:{}", messageId);
        CorrelationData correlationData = new CorrelationData(messageId);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                .andProperties(messageProperties).build();
        //设置ack回调
        rabbitTemplate.setConfirmCallback(this);
        //退回消息的回调
        rabbitTemplate.setReturnCallback(this);
        //下面这个RoutingKey是没有绑定的,所以发不出去
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);
        return "ok";
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (correlationData == null) {
            return;
        }
        String messageId = correlationData.getId();
        if (ack) {
            log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);
        } else {
            log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",
                new String(message.getBody()), replyCode, replyText, exchange, routingKey);

    }
}

消费者

@Slf4j
@Component
public class RabbitOrderConsumer {
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String QUEUE_NAME = "my_queue";
    private static final String ROUTING_KEY = "my_routing";

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})
    public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建
        log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);
        channel.basicAck(tag, false);
    }
}

访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773
开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561
开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0
开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270
开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c
开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de
开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4
接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1
开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab
开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969
【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773
【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561
接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4
接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3
【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0
【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270
【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c
【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de
【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4
【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab
【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969

上述代码仓库:https://gitee.com/syk1234/mqdmo

四、后台管理

登录管理后台页面:http://192.168.56.202:15672/
【RabbitMQ 实战】09 客户端连接集群生产和消费消息,RabbitMQ实战,rabbitmq,ruby,分布式

共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

查看连接情况,发现是连接的是节点rabbit@stats节点【RabbitMQ 实战】09 客户端连接集群生产和消费消息,RabbitMQ实战,rabbitmq,ruby,分布式
查看队列的情况,队列是在rabbit@stats节点上
【RabbitMQ 实战】09 客户端连接集群生产和消费消息,RabbitMQ实战,rabbitmq,ruby,分布式文章来源地址https://www.toymoban.com/news/detail-728020.html

到了这里,关于【RabbitMQ 实战】09 客户端连接集群生产和消费消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka系列 - 生产者客户端架构以及3个重要参数

    整个生产者客户端由两个县城协调运行,这两个线程分别为主线程和Sender线程(发送线程)。 主线程中由KafkaProducer创建消息,然后通过可能的拦截器,序列化器和分区器之后缓存到 消息累加器(RecordAccumulator) 。Sender线程负责从RecordAccumulator中获取消息并将其发送到kafka中。

    2024年02月04日
    浏览(47)
  • RabbitMQ 教程 | 第3章 客户端开发向导

    👨🏻‍💻 热爱摄影的程序员 👨🏻‍🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻‍🏫 一位高冷无情的编码爱好者 大家好,我是 DevOps 工程师 欢迎分享 / 收藏 / 赞 / 在看! 这篇 RabbitMQ 教程为学习者提供了全面的内容,从 RabbitMQ 的简介开始,涵盖了消息中间件的

    2024年02月15日
    浏览(59)
  • 【RabbitMQ】golang客户端教程1——HelloWorld

    本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。如果你使用不同的主机、端口或凭据,则需要调整连接设置。如果你未安装RabbitMQ,可以浏览我上一篇文章Linux系统服务器安装RabbitMQ RabbitMQ是一个消息代理:它接受并转发消息。你可以把它想象成一个邮局:当你把

    2024年02月14日
    浏览(45)
  • java socket Server TCP服务端向指定客户端发送消息;可查看、断开指定连接的客户端;以及设置客户端最大可连接数量。

    首先需要知道java里如何创建一个Socket服务器端。 提示:注意server.accept()方法调用会阻塞,只有新的客户端连接后才返回一个新的socket对象。如果一直未连接那么会一直处于阻塞状态 了解了如何创建一个socket服务器端后。那么如何实现给指定的连接客户端发送消息呢?首先我

    2024年02月11日
    浏览(72)
  • (五)「消息队列」之 RabbitMQ 主题(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月16日
    浏览(55)
  • (四)「消息队列」之 RabbitMQ 路由(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月17日
    浏览(42)
  • hive客户端连接

    hive客户端 第一代客户端直接使用hive指令连接,连接的就是metastore服务,并且只要连接成功就可以操作hive数据库 第一代客户端的启动和使用 第一代客户端,直接启动metastore服务即可 前台启动(当终端任务结束后,立刻结束服务) 前台启动,可以方便查看日志信息 后台启动

    2024年04月23日
    浏览(39)
  • rabbitmq中客户端30分钟未ack报错解决

    错误日志 : ERROR MESSAGE 这个错误发生在 RabbitMQ 客户端,提示连接已经被关闭,并给出了关闭的原因: close-reason,由对等方(Peer)发起; code=406,表示 PRECONDITION_FAILED; text=‘PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be

    2024年02月10日
    浏览(42)
  • 【RabbitMQ】golang客户端教程5——使用topic交换器

    发送到 topic交换器 的消息不能具有随意的 routing_key ——它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的 routing_key 示例: “stock.usd.nyse” , “nyse.vmw” , “quick.orange.rabbit” 。 routing_key 中可以包含任意多个单词,

    2024年02月14日
    浏览(42)
  • QT实现客户端断开连接

    Widget.cpp Widget.h main.cpp

    2024年04月14日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包