如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

这篇具有很好参考价值的文章主要介绍了如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议

rabbitmq实现思路:

选型:发布订阅模式(Publish/Subscribe)

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。

这种情况下,我们有四种交换机可供选择,分别是:

  • Direct
  • Fanout
  • Topic
  • Header

由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用

代码实现:
1.pom文件引入rabbitmq依赖

 <!-- rabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置文件

server:
  port: 9091
spring:
  application:
    name: rabbitmq
  # rabbitmq配置
  rabbitmq:
    host: 192.168.8.142
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost

3.constant类

package com.anychat.rabbitmqtest.constant;

/**
 * @author Liby
 * @date 2022-05-05 10:02
 * @description:
 * @version:
 */

public class RabbitmqConstant {
    public  static final String  MEETING_FANOUT_EXCHANGE = "meeting_exchange";
}

4.用户实体类

package com.anychat.rabbitmqtest.entity;

/**
 * @author Liby
 * @date 2022-05-06 09:39
 * @description:
 * @version:
 */

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public User(Integer userId, String username) {
        this.userId = userId;
        this.username = username;
    }
}

5.工具类

package com.anychat.rabbitmqtest.util;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @author Liby
 * @date 2022-04-28 10:27
 * @description:
 * @version:
 */

public class RabbitmqUtil {
    @Autowired
    private static RabbitTemplate rabbitTemplate;

    public static Channel getChannel() {
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
        return channel;

    }


}

6.消费者类

package com.anychat.rabbitmqtest.consumer;

import cn.hutool.core.util.StrUtil;
import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.entity.User;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Liby
 * @date 2022-04-25 11:18
 * @description:消费者,动态创建临时队列
 * @version:
 */
@Slf4j
@Component
public class FanoutConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createQueue(User user) {

        //创建信道
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);

        try {
            //声明一个交换机与生产者相同

            channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
            //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除
            String queueName = channel.queueDeclare().getQueue();
            //用户Id与队列名绑定
            ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>();
            userQueueMap.putIfAbsent(queueName, user.getUserId());
            //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串
            // channel.queueBind(queue, exchange, routingKey)
            channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, "");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //对信息进行操作
                    String message = new String(body, "UTF-8");
                    if (StrUtil.isNotBlank(message)) {
                        String[] receiveIds = message.split(",");
                        Integer userId = userQueueMap.get(queueName);
                        for (String id : receiveIds) {
                            if (userId.equals(Integer.valueOf(id))) {
                                log.info("用户{}收到入会邀请", id);
                            }

                        }

                    }

                }
            };
            //true 自动回复ack
            channel.basicConsume(queueName, true, consumer);
        } catch (Exception ex) {
        }
    }
}


7.controller类

package com.anychat.rabbitmqtest.controller;

import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.consumer.FanoutConsumer;
import com.anychat.rabbitmqtest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Liby
 * @date 2022-04-24 16:34
 * @description:生产者
 * @version:
 */
@RestController
@Slf4j
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private FanoutConsumer fanoutConsumer;
    /**
    * 模拟用户登录后,创建一个临时队列,与该用户绑定
    */
    @PostMapping("/login")
    public String login(){
        //模拟三个用户登录
        int userNum=3;

        for (int i = 0; i < userNum; i++) {
            //用户绑定临时队列,并监听队列
            fanoutConsumer.createQueue(new User(i, "用户" + i));
            log.info("用户{}登录成功",i);
        }
        return "用户登录成功";

    }

    @PostMapping("/meeting")
    public String meeting(){
        String message="1,2";
        log.info("邀请用户{}进入会议",message);
        //发送消息,要求userId为2和3的用户进入会议
        rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message);
        return "发送成功";

    }
}

postman分别调用login和meeting两个接口
可以看到日志打印如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息文章来源地址https://www.toymoban.com/news/detail-424650.html

到了这里,关于如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(40)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(54)
  • RabbitMQ之生产者可靠性

    有的时候由于网络波动,可能会出现客户端连接RabbitMQ失败的情况。通过配置我们可以开启 连接失败后 的重连机制 注:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前

    2024年01月22日
    浏览(47)
  • RabbitMQ生产者的可靠性

    目录 MQ使用时会出现的问题 生产者的可靠性 1、生产者重连 2、生产者确认 3、数据持久化 交换机持久化 队列持久化 消息持久化 LazyQueue懒加载 MQ使用时会出现的问题 发送消息时丢失: 生产者发送消息时连接MQ失败 生产者发送消息到达MQ后未找到 Exchange 生产者发送消息到达

    2024年02月08日
    浏览(41)
  • rabbitmq消费者与生产者

    在第一次学习rabbitmq的时候,遇到了许多不懂得 第一步导包 第二步新增生产者 在这里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填写自己的队列名称,如果你的为”/“则填写\\\'\\\'/\\\'\\\' 第三步新增消费者 消息获取成功 注意如果你用的云服务器需要打开这两个端口 5672 15672 如果你使

    2024年02月11日
    浏览(46)
  • RabbitMQ 生产者-消息丢失 之 场景分析

      生产者发送消息的流程如下:首先生产者和RabbitMQ服务器建立连接,然后创建信道,通过信道发送消息给RabbitMQ服务器,RabbitMQ服务器接收到消息后交由交换机进行消息存储,交换机根据不同策略将消息路由到指定队列中。在此过程中,可能会存在以下消息丢失的场景:

    2024年02月14日
    浏览(43)
  • Spring整合RabbitMQ——生产者(利用配置类)

    配置RabbitMQ的基本信息,用来创建连接工厂的 编写启动类 编写配置类

    2024年02月07日
    浏览(42)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(89)
  • RabbitMQ消息可靠性(一)-- 生产者消息确认

    目录 前言 一、消息确认流程图 二、生产者消息确认 1、publisher-confirm(发送者确认) 2、publisher-return(发送者回执) 三、代码实现 1、修改application.yml 配置 2、ConfirmCallback函数和ReturnCallback函数 在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一

    2024年02月04日
    浏览(72)
  • 【RabbitMQ实战】 03 SpringBoot RabbitMQ生产者和消费者示例

    上一节我们写了一段原生API来进行生产和消费的例子。实际上SpringBoot对原生RabbitMQ客户端做了二次封装,让我们使用API的代价更低。 依赖引入 RabbitMQ的配置如下 每个配置的具体含义,详见配置 代码说明 使用RabbitTemplate可以发送消息 这个Controller定义了一个发送的接口,调用

    2024年02月07日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包