springboot+RabbitMQ实现一条消息被所有consumer消费

这篇具有很好参考价值的文章主要介绍了springboot+RabbitMQ实现一条消息被所有consumer消费。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求背景:

        用户认证中心(Authorization center简称ac)使用jwt实现用户请求身份认证,需要支持多副本部署。系统架构如下:

rabbitmq 多个消费者消费同一条消息,spring boot,rabbitmq,系统架构

        用户登录后生成jwt,纵向需要通过socket长连接把jwt下发到应用集成层ws,ws再把jwt下发到应用。前端请求各应用时可以在应用的filter中校验jwt是否有效,无效则向上询问ws jwt是否有效,无效再请求ac jwt是否有效。

        所以,用户登录请求通过负载均衡落到ac副本1(简称ac1)后,ac1生成jwt,除了纵向下发之外,还需要横向同步到ac2 ac3,ac2和ac3再纵向同步jwt,实现全平台的单点登录。

具体需求:

        ac1发送消息到rabbit mq,其他的所有副本ac2和ac3消费消息。

实现方案:

        1.添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        2.添加rabbitMQ配置

spring:
  application:
    name: xxx
  rabbitmq:
    addresses: x.x.x.x:5672
    username: admin
    password: admin
    virtual-host: /
    #    启用消息发布ack
    publisher-confirm-type: correlated
    #    启用发布返回
    publisher-returns: true
    template:
      #启用强制信息;默认false
      mandatory: true
      retry:
        # 发送重试是否可用
        enabled: true
        #最大重试次数
        max-attempts: 3
    #消费端配置
    listener:
      simple:
        missing-queues-fatal: false
        #最大/最小的消费者数量
        concurrency: 1
        max-concurrency: 8
        #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
        acknowledge-mode: manual
        #监听器抛出异常而拒绝的消息是否被重新放回队列,默认值为true。
        default-requeue-rejected: true
        #每次从mq取消息的条数
        prefetch: 1
        retry:
          enabled: true
          #最大重试次数
          max-attempts: 5
          #最大重试时间间隔
          max-interval: 32000
          #第一次和第二次尝试发布或传递消息之间的间隔
          initial-interval: 2000
          #应用于上一重试间隔的乘数
          multiplier: 2

         3.登录service中发送mq(jwt),即mq provider

        
        @Autowired
        private RabbitTemplate rabbitTemplate;

        public void login(Token token){
            //如果是自己发的mq则不需要消费,直接return
            if(cache.contains(token.getJwt())return;
               
            //下发token到ws层
                        server.getNamespace("/ws1").getBroadcastOperations().sendEvent("LOGIN_EVENT",token,
broadcastAckCallback);

            //获取redis中注册的ac节点信息,如果节点数大于1则需要发送mq同步token
            int acNodeCount= redisTemplate.boundValueOps(nodeKey).get();
            if(acNodeCount>1){
            rabbitTemplate.convertAndSend(""ac-topic-exchange"",
                  AcConstant.LOGIN_ROUTING_KEY,
                  token);
        }

4.MQ consumer

通过@Queue注解中的value属性,用spring spel表达式,在每次启动ac的时候生成一个带有随机字符串的名字,绑定到ac的topic exchange。这样,启动3个ac副本,就有3个queue绑定到了ac的exchange,mq message发送到topic exchange, 通过routing key分发到所有符合规则的queue,就能实现所有副本消费同一条消息。

@RabbitListener(
          bindings = {
                  @QueueBinding(
                          value = @Queue(value = "ac-login-queue- #{T(System).currentTimeMillis()}"),
                          exchange = @Exchange(value = "ac-topic-exchange",
                                  type = ExchangeTypes.TOPIC),
                          key = "ac.login.#")
          },
          ackMode = "MANUAL")
  @RabbitHandler
  public void loginConsumer(Token token, Channel channel, Message message) throws IOException {
    log.info("【loginConsumer】 监听到其他ac节点的登录事件,jwt:{}",token.getJwt());
    long messageId = message.getMessageProperties().getDeliveryTag();
    try{
      listenLogin();
      //手动ack
      channel.basicAck(messageId,false);
    }catch (Exception e){
      //失败ack,消息重新入队
      log.error("loginConsumer消费失败:{}",e.getCause().getMessage());
      channel.basicNack(messageId,false,true);
    }
  }

rabbitmq 多个消费者消费同一条消息,spring boot,rabbitmq,系统架构

登出时的实现逻辑相同。 文章来源地址https://www.toymoban.com/news/detail-767380.html

到了这里,关于springboot+RabbitMQ实现一条消息被所有consumer消费的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot: RabbitMQ消息队列之同时消费多条消息

    1. basicQos预取方法参数解析 basicQos(int prefetchCount) basicQos(int prefetchCount, boolean global) basicQos(int prefetchSize, int prefetchCount, boolean global) 参数: prefetchSize:可接收消息的大小 prefetchCount:处理消息最大的数量。 global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如

    2024年02月01日
    浏览(74)
  • rabbitmq之Consumer Prefetch(消费者预取)

    官方文档: https://www.rabbitmq.com/consumer-prefetch.html https://www.rabbitmq.com/confirms.html#channel-qos-prefetch 测试”消息积压“场景:在消费者没有启动的情况下,生产者先生产很多消息。然后先开启一个a消费者,再开启b消费者,发现只有a消费者不断的消费旧的消息,而b消费者”无动于

    2024年02月11日
    浏览(35)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(72)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(43)
  • 使用kafka-consumer-group.sh查看消息消费情况,CONSUMER-ID,HOST,CLIENT-ID不显示问题分析

    在使用使用kafka-consumer-group.sh查看消息消费情况,消息都已经消费完了,但是CONSUMER-ID,HOST,CLIENT-ID字段的信息不显示,而且,消费实例也在运行中,却出现了 Consumer group \\\'manage.group1\\\' has no active members., 如下图所示: 消费者的代码如下: 之所以出现上面的的问题,是因为使

    2024年02月16日
    浏览(49)
  • (四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

    Lison dreamlison@163.com , v1.0.0 , 2023.06.23 之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。 1、 生产者批量发送消息 2、消费端配置限流机制 3、消费者监听队列 在RabbitMQ中,多个消费者监听同一条队列,则队列

    2024年02月15日
    浏览(43)
  • RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

    我们先看一串代码,并思考一下为什么要先入库然后发MQ: 如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。 ① 消息从生产者发送到Broker 生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接

    2024年02月11日
    浏览(55)
  • 搭建RabbitMQ消息服务,整合SpringBoot实现收发消息

    作者主页 :Designer 小郑 作者简介 :3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN博客专家,蓝桥云课认证讲师。 消息队列是一种在应用程序之间传递数据的通信机制 ,它基于 发布-订阅 模式,将消息发送者(发布者)和消息接收者

    2024年02月09日
    浏览(57)
  • SpringBoot RabbitMQ 实现消息队列功能

    作者:禅与计算机程序设计艺术 在企业级应用中,为了提升系统性能、降低响应延迟、改善用户体验、增加系统的稳定性、提高资源利用率等方面所需的功能之一就是使用消息队列。RabbitMQ是一个开源的AMQP(Advanced Message Queuing Protocol)的实现消息队列,它是用Erlang语言开发的。

    2024年02月09日
    浏览(48)
  • 基于springboot实现的rabbitmq消息确认

    RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费

    2024年02月06日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包