Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战

这篇具有很好参考价值的文章主要介绍了Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在项目中经常有延迟业务处理的背景,此时可以借助于Rabbitmq的延迟队列进行实现,但Rabbitmq本身并不支持延迟队列,但可以通过安装插件的方式实现延迟队列

环境准备

  1. 首先确认目前项目使用的Rabbitmq的版本,这里博主的版本是3.9.15的。
    docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
  2. 访问 Rabbitmq的github网址,检索 delay 找到插件rabbitmq-delayed-message-exchange ,如下图所示:
    docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
  3. 找到延迟队列插件相应的版本并进行下载。
    docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
    博主的Rabbitmq是3.9版本的,所以这里选择3.9版本即可如下图所示:
    docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
    下载 rabbitmq_delayed_message_exchange-3.9.0.ez
    docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式

安装延迟队列

上面我们已经成功下载了延迟队列插件,接下来我们需要将此插件上传到Linux系统服务器上,然后再将插件拷贝到Rabbitmq的容器中。

  1. 查看Rabbitmq容器进程
    docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
  2. 将Rabbitmq延迟队列插件拷贝到容器中
# c94f1e7dbfc0 代表容器Id,通过上面1的步骤即可查看
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez c94f1e7dbfc0:/opt/rabbitmq/plugins
  1. 进入容器并安装延迟队列插件
docker exec -it c94f1e7dbfc0 /bin/bash

docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
查看 /opt/rabbitmq/plugins,是否已经有了延迟队列插件。如下图所示:
docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
4. 安装消息管理插件

 rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
4. 查看已经安装的Rabbitmq插件
docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
5. 控制台查看验证延迟队列类型如下图:
docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式

延迟队列实战

SpringBoot项目中引入Rabbitmq的依赖项

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

新增一个交换机、一个队列然后绑定队列与交换机如下图所示:

@Configuration
public class RabbitmqConfig {
    @Bean(name = "updateAmount")
    public TopicExchange lazyExchange() {
        Map<String, Object> pros = new HashMap<>();
        //设置交换机支持延迟消息推送
        pros.put("x-delayed-type", "direct");
        TopicExchange exchange = new TopicExchange("updateAmount", true, false,pros);
        // 这一行是重点,指定交换机类型
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue lazyQueue(){
        return new Queue("LAZY_QUEUE", true);
    }

    @Bean
    public Binding lazyBinding(){
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with("");
    }
}

新增发送延迟队列的工具类代码逻辑:

public class RabbitmqUtils {


    /**
     * 发送延迟消息
     * @param rabbitTemplate rabbitTemplate
     * @param millisecond 延迟毫秒
     * @param messageContent 发送字符串
     * @param busiId 业务主键id
     */
    public static void sendDelayMessage(RabbitTemplate rabbitTemplate, Integer millisecond, Object messageContent, Long busiId) {
        CorrelationData correlationData = new CorrelationData(busiId.toString() + System.currentTimeMillis());
        rabbitTemplate.convertAndSend("updateAmount", "", messageContent,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    message.getMessageProperties().setDelay(millisecond);
                    return message;
                }, correlationData);
    }
}

新增一个监听器代码消费延迟队列的消息:

@Component
@Slf4j
@RequiredArgsConstructor
@RabbitListener(queues = "LAZY_QUEUE")
public class RabbitmqListener {

    private final ObjectMapper objectMapper;


    @RabbitHandler
    public void onCustomBookingMessage(@Payload String message, Channel channel, @Headers Map<String, Object> headers) {
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            Map map = objectMapper.readValue(message, Map.class);
            log.info("接收消息成功,消息内容:"+map);
            ack(channel,deliveryTag,"");
        } catch (JsonProcessingException e) {
            nack(channel,deliveryTag,"");
        }
    }


    /**
     * 确认消息成功
     */
    public void ack(Channel channel, long deliveryTag, String info) {
        try {
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            log.error(info + ":消息应答出错", e);
        }
    }

    /**
     * 确认消息失败
     */
    public void nack(Channel channel, long deliveryTag, String info) {
        try {
            channel.basicNack(deliveryTag, false, true);
        } catch (IOException e) {
            log.error(info + ":消息拒绝出错", e);
        }
    }
}

新增一个生产者发送消息的代码:

@RequestMapping("/rabbit")
@RequiredArgsConstructor
@RestController
@Slf4j
public class RabbitmqController {
    private final RabbitTemplate rabbitTemplate;

    private final ObjectMapper objectMapper;


    @GetMapping("/sendMessage")
    public void sendDeLayMessage() throws JsonProcessingException {
        Map<String,Object> map = new HashMap<>();
        map.put("username","zhangsan");
        map.put("tranferMoney",new BigDecimal("1200.23"));
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        // 发送消息
        RabbitmqUtils.sendDelayMessage(rabbitTemplate,7000,objectMapper.writeValueAsString(map),System.currentTimeMillis());
        log.info("发送延迟队列消息成功:消息内容:"+map);
    };
}

在浏览器访问:http://127.0.0.1:8080/rabbit/sendMessage 后可以看到控制台输出如下日志:
docker rabbitmq wget下载延时队列插件,rabbitmq,rabbitmq,docker,分布式
因为我们设定延迟时间为7s,可以清晰看到日志打印的时间刚好相差了7s,证明我们延迟队列发送消息成功。文章来源地址https://www.toymoban.com/news/detail-785990.html

到了这里,关于Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ实现延迟消息,RabbitMQ使用死信队列实现延迟消息,RabbitMQ延时队列插件

    假设有一个业务场景:超过30分钟未付款的订单自动关闭,这个功能应该怎么实现? RabbitMQ使用死信队列,可以实现消息的延迟接收。 队列有一个消息过期属性。就像丰巢超过24小时就收费一样,通过设置这个属性,超过了指定事件的消息将会被丢弃。 这个属性交:x-message

    2024年02月13日
    浏览(59)
  • Rabbitmq 延迟队列---插件

            解决没法优先发送延时时间短的消息。 插件安装 配置类 生产者 消费者

    2024年02月12日
    浏览(51)
  • RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

    1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件 RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html 2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明)

    2024年02月16日
    浏览(35)
  • 项目实战之RabbitMQ死信队列应用

    🧑‍💻作者名称:DaenCode 🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。 😎人生感悟:尝尽人生百味,方知世间冷暖。 死信队列架构图 本地消息表 接受到订单服务大哥锁定库存请求时,进行锁定库存消息的发送。 发送消息的同时,在本地消息表插入

    2024年02月04日
    浏览(36)
  • Rabbitmq入门与应用(五)-延迟队列的设计与实现

    在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。 常规使用rabbitmq设计延迟队列有两种方式 使用创建一个延迟队列阻塞消息 使用延迟队列插件 Dead Letter Exchanges — RabbitMQ 配置 To set the DLX for a queue, s

    2024年02月21日
    浏览(36)
  • [超详细]RabbitMQ安装延迟消息插件

    Community Plugins — RabbitMQ https://www.rabbitmq.com/community-plugins.html 进入以上地址以后,找到Routing里边的rabbitmq_delayed_message_exchange然后点击Releases   下载完成以后  然后解压到plugins文件中  然后再sbin目录下运行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange  查看交换机类型中是否有

    2024年02月07日
    浏览(42)
  • Linux安装rabbitMq RPM安装 以及带延迟插件

    文档中rabbitmq下载链接 以及延迟插件 网盘下载 目前下载文件中版本已经过多个服务器安装测试 完全成功 rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force --nodeps rpm -ivh libnsl-2.34-28.el9_0.x86_64.rpm --force --nodeps rpm -ivh erlang-23.3-2.el7.x86_64.rpm --force --nodeps rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm --fo

    2024年02月03日
    浏览(39)
  • RabbitMQ:概念和安装,简单模式,工作,发布确认,交换机,死信队列,延迟队列,发布确认高级,其它知识,集群

    1.1.1.什么是MQ MQ(message queue:消息队列) ,从字面意思上看,本质是个 队列 , FIFO 先入先出 ,只不过队列中存放的 内容是message 而已 ,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “逻辑解耦+物理解耦” 的消息通信服

    2024年01月20日
    浏览(54)
  • RabbitMQ实现延迟消息的方式-死信队列、延迟队列和惰性队列

    当一条消息因为一些原因无法被成功消费,那么这这条消息就叫做死信,如果包含死信的队列配置了dead-letter-exchange属性指定了一个交换机,队列中的死信都会投递到这个交换机内,这个交换机就叫死信交换机,死信交换机再绑定一个队列,死信最终会进入到这个存放死信的

    2024年02月19日
    浏览(41)
  • 【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

    在电商平台下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。 该如何实现? 定期轮询(数据库等) 用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定

    2024年01月17日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包