使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

这篇具有很好参考价值的文章主要介绍了使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用StreamBridge实现RabbitMq && 延时消息

Maven依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

延时消息需要安装插件

下载地址:link
1.下载完成放到rabbitmq安装目录plugins下
2.执行命令启用插件
3.重启mq

rabbitmq-plugins enable rabbitmq_delayed_message_exchange  // 启用插件
//重启mq
rabbitmq-server stop
rabbitmq-server start

Exchanges -> add a new exchange -> type 出现x-delayed-message即安装成功

使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息文章来源地址https://www.toymoban.com/news/detail-512066.html

yml配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: xxxx
    password: xxxx

  function:
      # 与消费者对应(消费者方法名称)
      definition: ackMessage;normal;delay
    stream:
      rabbit:
        bindings:
          ackMessage-in-0:
            consumer:
              acknowledge-mode: manual  # manual手动确认 ,auto 自动确认
          delay-in-0:
            consumer:
              delayedExchange: true # 开启延时
          delay-out-0:
            producer:
              delayedExchange: true  # 开启延时

      bindings:
        delay-in-0:
          destination: delay.exchange.cloud  # mq对应交换机
          content-type: application/json
          consumer:
            acknowledge-mode: auto # manual手动确认 ,auto 自动确认
          group: delay-group	# 消息组
          binder: rabbit
        delay-out-0:
          destination: delay.exchange.cloud
          content-type: application/json
          group: delay-group
          binder: rabbit

        ackMessage-in-0:
          destination: ackMessage.exchange.cloud
          content-type: application/json
          consumer:
            acknowledge-mode: manual # manual手动确认 ,auto 自动确认
          group: ackMessage-group
          binder: rabbit
        ackMessage-out-0:
          destination: ackMessage.exchange.cloud
          content-type: application/json
          group: ackMessage-group
          binder: rabbit
        normal-in-0:
          destination: normal.exchange.cloud
          content-type: application/json
          consumer:
            acknowledge-mode: auto # manual手动确认 ,auto 自动确认
          group: normal-group
          binder: rabbit
        normal-out-0:
          destination: normal.exchange.cloud
          content-type: application/json
          group: normal-group
          binder: rabbit

接口controller


import com.alibaba.fastjson2.JSON;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;

/**
 * @Description: RabbitmqController
 */
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/mq")
public class MqController {

	//消息发送者
    private final RabbitMqProducer rabbitMqProducer;

    /**
     * 发送普通消息Rabbitmq
     * bindingName 绑定队列名称
     * @param msg 消息内容
     */
    @GetMapping("/sendMessage/{msg}/{bindingName}")
    public R<Void> sendMessage(@PathVariable("msg") String msg, @PathVariable("bindingName") String bindingName) {
        log.info(bindingName + "发送消息: " + msg);
        rabbitMqProducer.sendMsg(msg, bindingName);
        return R.ok();
    }

    /**
     * 发送延迟消息
     *
     * @param message  消息实体
     * @return
     */
    @PostMapping("/sendDelayedMessage")
    public R<Void> sendDelayedMessage(@RequestBody Message message) {
        log.info(MqTExchangesEnum.delay + "发送延时消息: " + LocalDateTime.now() + "  " + message);
        rabbitMqProducer.sendDelayMsg(JSON.toJSONString(message), message.getBindingName(), message.getSeconds());// 延迟时间(秒)
        return R.ok();
    }
}

发送者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * RabbitMq消息生产者
 */
@Component
public class RabbitMqProducer {

    @Autowired
    private StreamBridge streamBridge;

    /**
     * @Description RabbitMq消息生产者
     * @Param msg 消息内容
     * @Param bindingName  exchange绑定queue名称
     **/
    public void sendMsg(String msg, String bindingName) {
        // 构建消息对象
        Messaging messaging = new Messaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);
        Message<Messaging> message = MessageBuilder.withPayload(messaging).build();
        streamBridge.send(bindingName, message);
    }

    /**
     * 发送延迟消息
     *
     * @param msg
     * @param bindingName
     * @param seconds
     */
    public void sendDelayMsg(String msg, String bindingName, Integer seconds) {
        // 构建消息对象
        Messaging messaging = new Messaging().setMsgId(UUID.randomUUID().toString()).setMsgText(msg);
        Message<Messaging> message = MessageBuilder.withPayload(messaging).setHeader("x-delay", seconds * 1000).build();
        streamBridge.send(bindingName, message);
    }
}

消费者

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;

import java.time.LocalDateTime;
import java.util.function.Consumer;


/**
 * RabbitMq消息消费者
 */
@Component
@Slf4j
public class RabbitMqConsumer {

    /**
     * mq接收ackMessage消息/手动ack确认
     * @methodName 配置文件对应
     **/
    @Bean
    Consumer<Message<Messaging>> ackMessage() {
        log.info("ackMessage-初始化订阅");
        return obj -> {
            Channel channel = obj.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Long deliveryTag = obj.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
            try {
                log.info("ackMessage-消息接收成功:" + obj.getPayload());
                //业务逻辑处理
                //ack确认
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                //重新回队列-true则重新入队列,否则丢弃或者进入死信队列。
//                    channel.basicReject(deliveryTag, true);
                log.error(e.getMessage());
            }

        };
    }

    /**
     * mq接收normal消息
     **/
    @Bean
    Consumer<Messaging> normal() {
        log.info("normal-初始化订阅");
        return obj -> {
            log.info("normal-消息接收成功:" + obj);
            //业务逻辑处理
        };
    }


    /**
     * mq接收延时消息
     * Messaging 发送实体消息接收实体消息
     **/
    @Bean
    Consumer<Message<Messaging>> delay() {
        log.info("delay-初始化订阅");
        return obj -> {
            Messaging payload = obj.getPayload();
            log.info("delay-消息接收成功:" + LocalDateTime.now() + "  " + payload);
            //业务逻辑处理
        };
    }
}

到了这里,关于使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot集成RabbitMQ之ACK确认机制(第三节)

    目录 开始语 📝简述 🗒️模式NONE application配置 生产者 消费者 结果验证 🗒️模式AUTO application配置 生产者 消费者 结果验证 🗒️模式ACK(重点) application配置 生产者 消费者 结果验证 🗒️生产者确认机制 yml添加配置 修改生产者代码 结果验证 结束语 一位普通的程序员,

    2024年02月04日
    浏览(38)
  • 搭建RabbitMQ消息服务,整合SpringBoot实现收发消息

    作者主页 :Designer 小郑 作者简介 :3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN博客专家,蓝桥云课认证讲师。 消息队列是一种在应用程序之间传递数据的通信机制 ,它基于 发布-订阅 模式,将消息发送者(发布者)和消息接收者

    2024年02月09日
    浏览(57)
  • 基于springboot实现的rabbitmq消息确认

    RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费

    2024年02月06日
    浏览(42)
  • SpringCloudStream集成RabbitMQ实现消息收发

    ​ SpringCloudStream 是一个构建高扩展和事件驱动的微服务系统的框架,用于连接共有消息系统,官网地址: spring.io/projects/sp… 。整体上是把各种花里胡哨的MQ产品抽象成了一套非常简单的统一的编程框架,以实现事件驱动的编程模型。社区官方实现了RabbitMQ,Apache Kafka,Kaf

    2024年02月03日
    浏览(39)
  • SpringBoot-RabbitMQ06-持久化和ACK确认机制

    1.什么是消息确认ACK? 如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那么可能这条正在处理的消息刘没有完成消息消费,数据就会丢失,为了确保数据不会丢失RabbitMQ支持消息确认-ACK 2.ACK的消息确认机制 ACK机制是消费者从RabbitMQ收到消息并处理完成后,反

    2024年04月15日
    浏览(36)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(39)
  • 【MQ 系列】SpringBoot + RabbitMq 消息确认/事务机制的使用姿势

    我们知道 RabbitMq 提供了两种机制,来确保发送端的消息被 brocke 正确接收,本文将主要介绍,在消息确认和事物两种机制的场景下,发送消息的使用姿势 首先创建一个 SpringBoot 项目,用于后续的演示 springboot 版本为 2.2.1.RELEASE rabbitmq 版本为  3.7.5   依赖配置文件 pom.xml 在 a

    2024年01月18日
    浏览(46)
  • (七)「消息队列」之 RabbitMQ 发布者确认(使用 .NET 客户端)

    发布者确认 是一个 RabbitMQ 扩展,用于实现可靠的发布。当在通道上启用发布者确认时,客户端发布的消息将由代理 异步确认 ,这意味着它们已在服务器端得到处理。 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主

    2024年02月16日
    浏览(40)
  • 如何在Window系统中安装RabbitMQ以及在.NET平台上实现收发消息功能

    以下是接收客户端代码: #region RabbitMQ接收客户端 private ConnectionFactory factory; private IConnection connection; private IModel channel; private EventingBasicConsumer consumer; /// /// 开始创建连接对象 /// public void StartReceiving(string ListenIp,string queueName, string QueueUserName,string QueueUserPassword, int Port = 5672) {

    2024年02月11日
    浏览(42)
  • RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

    SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致; 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;

    2024年02月14日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包