RabbitMq应用延时消息

这篇具有很好参考价值的文章主要介绍了RabbitMq应用延时消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一.建立绑定关系

package com.lx.mq.bind;

import com.lx.constant.MonitorEventConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author liuweiping.com
 * @version 1.0
 * @date 2023-06-26 10:04:03
 */
@Slf4j
@Configuration
public class MonitorRabbitMqBinding {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    /**
     * Description: 延迟消息 <br/>
     * Created By: liu wei ping <br/>
     * Creation Time: 2023年6月26日 下午6:59:43 <br/>
     * <br/>
     * @return <br/>
     */
    @Bean("delayExchange")
    public CustomExchange buildDelayedMessageNoticeExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile, "x-delayed-message", Boolean.FALSE, Boolean.FALSE, args);
    }

    @Bean
    public Queue buildDelayedMessageNoticeQueue(){
        return QueueBuilder.durable(MonitorEventConst.MONITOR_DELAYED_MESSAGE_QUEUE + profile).build();
    }

    @Bean
    public Binding buildDelayedMessageNoticeBinding(){
        return BindingBuilder.bind(buildDelayedMessageNoticeQueue()).to(buildDelayedMessageNoticeExchange()).with(MonitorEventConst.MONITOR_DELAYED_MESSAGE_ROUTING_KEY).noargs();
    }

    /**
     * 交车完成事件消息定时处理队列
     */
    @Bean
    public Queue deliveryCompleteEventHandQueue() {
        return QueueBuilder.durable(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + profile).build();
    }

    /**
     * 交车完成事件消息定时处理队列绑定
     */
    @Bean
    public Binding deliveryCompleteBinding() {
        return BindingBuilder.bind(deliveryCompleteEventHandQueue())
                .to(buildDelayedMessageNoticeExchange())
                .with(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY)
                .noargs();
    }

}

二.建立生产者

1.消息实体

package com.lx.dto.monitor;

import lombok.Data;

import java.util.Date;

/**
 * @author liuweiping.com
 * @version 1.0
 * @date 2023-06-26 10:11:06
 */
@Data
public class MonitorEventMessage {

    /**
     * 事件id
     */
    private String eventId;

    /**
     * 事件编码
     */
    private String eventCode;

    /**
     * 业务数据
     */
    private String businessUniqueKey;

    /**
     * 业务类型
     */
    private String businessType;

    /**
     * 到期时间
     */
    private Long expireMillis;

    /**
     *  时间处理唯一版本号
     */
    private Integer eventHandVersion;

    /**
     * 定时处理时间
     */
    private Date timedOperationTime;

    public void setTimedOperationTime(Date timedOperationTime) {
        this.timedOperationTime = timedOperationTime;
        expireMillis = timedOperationTime.getTime() - new Date().getTime();
        if (expireMillis < 0) {
            expireMillis = 0L;
        }
    }
}
package com.lx.mq.producer;

import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 监控事件消息发送类
 */
@Slf4j
@Component
public class MonitorEventMessageProducer {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *  交车完成监控事件定时发送
     */
    public void sendDeliveryCompleteEventHandMessage(MonitorEventMessage monitorEventMessage) {
        String message = JsonUtil.toJson(monitorEventMessage);;
        rabbitTemplate.convertAndSend(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile,
                MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY,
                message,
                msg -> {
                    msg.getMessageProperties().setDelay(monitorEventMessage.getExpireMillis().intValue());
                    return msg;
                });
        log.info("sending event processing messages: {}", message);//发送事件处理消息
    }

  
}

三.建立消费者

package com.lx.mq.consumer;

import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * 监控事件消息发送类
 */
@Slf4j
@Component
public class MonitorEventMessageConsumer {

    @Value(value = "-${spring.profiles.active}")
    private String profile;

    /**
     *  交车完成事件处理mq监听
     */
    @RabbitListener(queues = MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + "-${spring.profiles.active}")
    public void dealWithDeliveryCompleteEventHandMessage(String eventMessage, Channel channel, Message message) {
        log.info("dealWithDeliveryCompleteEventHandMessage:【{}】", JsonUtil.toJson(eventMessage));
        String str = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("Received the message of regular loading and unloading of goods: {}", str); //收到商品定时上下架消息
        MonitorEventMessage monitorEventMessage = JsonUtil.toBean(eventMessage, MonitorEventMessage.class);
        try {
            analyzeHand(monitorEventMessage);
        }catch (Exception e){
            log.error("交车完成事件分析失败,参数:{},e:{}",JsonUtil.toJson(monitorEventMessage),JsonUtil.toJson(e));
        }
    }

    /**
     *  事件分析
     * @param monitorEventMessage
     */
    private void  analyzeHand(MonitorEventMessage monitorEventMessage) throws Exception {

    }
}

四.测试类测试

package com.lx.controller;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.lx.conf.MQConfig;
import com.lx.conmon.ResultData;
import com.lx.dto.monitor.MonitorEventMessage;
import com.lx.mq.producer.MonitorEventMessageProducer;
import com.lx.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.lx.constant.RedisMange;
import com.lx.utils.RedisUtil;
import org.thymeleaf.util.DateUtils;


/**
 * @Author : liu wei ping
 * @CreateTime : 2019/9/3
 * @Description :
 **/

@RestController
public class SendMessageController {


    @Autowired
    private MonitorEventMessageProducer messageProducer;

 
    @GetMapping("/sendTopicMessage3")
    public ResultData<String> sendTopicMessage3() {
        MonitorEventMessage monitorEventMessage = new MonitorEventMessage();
        monitorEventMessage.setEventCode("delivery");
        //设置定时处理时间= 当前时间+ 定时处理时长
        monitorEventMessage.setTimedOperationTime(DateUtil.date(DateUtil.getCurrentMillis() + 30 * 1000));
        monitorEventMessage.setBusinessType("deliveryType");
        messageProducer.sendDeliveryCompleteEventHandMessage(monitorEventMessage);
        return new ResultData<>("ok");
    }
}

五.效果如图所示

RabbitMq应用延时消息,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-531982.html

到了这里,关于RabbitMq应用延时消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(48)
  • 分布式消息队列RabbitMQ-Linux下服务搭建,面试完腾讯我才发现这些知识点竟然没掌握全

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 5.修改配置文件 这里面修改{loopback_users, [“guest”]}改为{loopback_users, []} {application, rabbit, %% - - erlang - - [{description, “RabbitMQ”}, {id, “RabbitMQ”}, {vsn, “3.6.5”}, {modules, [‘background_gc’,‘delegate’,‘delegate_sup’,‘dtree’,‘file_han

    2024年04月14日
    浏览(55)
  • RabbitMq应用延时消息

    一.建立绑定关系 二.建立生产者 1.消息实体 三.建立消费者 四.测试类测试 五.效果如图所示

    2024年02月12日
    浏览(37)
  • 微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(五):分布式搜索 ES-下

    聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类: 桶(Bucket)聚合:用来对文档做分组 TermAggregation:按照文档字段值分组 Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组 度量(Metric)聚合:用以计算一些值,比如:最大值

    2024年03月26日
    浏览(65)
  • RabbitMQ——解决分布式事务问题,RabbitMQ的重要作用之一!!!通过可靠生产和可靠消费来完美解决!

    分布式事务是指涉及多个独立的计算机系统(也称为节点或参与者)之间的事务处理。在分布式系统中,每个节点可能各自拥有自己的数据存储和事务管理机制。分布式事务的目标是保证在跨多个节点执行的一系列操作可以以一致和可靠的方式执行和提交,即使在面对故障或

    2024年04月23日
    浏览(48)
  • 微服务学习:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    目录 一、高级篇 二、面试篇 ==============实用篇============== day05-Elasticsearch01 1.初识elasticsearch 1.4.安装es、kibana 1.4.1.部署单点es 1.4.2.部署kibana 1.4.3.安装IK分词器 1.4.4.总结 2.索引库操作 2.1.mapping映射属性 2.2.索引库的CRUD 2.2.1.创建索引库和映射 2.2.2.查询索引库 2.2.3.修改索引库 2.

    2024年02月02日
    浏览(59)
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(45)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(56)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(46)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包