Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

这篇具有很好参考价值的文章主要介绍了Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1,版本说明

erlang 和 rabbitmq 版本说明
https://www.rabbitmq.com/which-erlang.html
确认需要安装的mq版本以及对应的erlang版本。

2,下载安装文件

RabbitMQ下载地址:
https://packagecloud.io/rabbitmq/rabbitmq-server

Erlang下载地址:
https://packagecloud.io/rabbitmq/erlang

RabbitMQ延迟消息插件下载
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载文件如图

Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息,java,MQ,centos,rabbitmq,java,延迟消息

3,安装步骤

3.1, 查询是否有安装过erlang、rabbitmq, 查询到有的话需要删除。

	rpm -qa | grep rabbitmq-server
	rpm -qa | grep erlang
	# 删除
	yum -y remove rabbitmq-server.noarch

3.2, 本地安装erlang

	yum localinstall erlang-23.2.7-2.el7.x86_64.rpm
	# 查询安装的版本
	erl -version
	# Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version xxx

3.3, 本地安装rabbitmq

	yum localinstall rabbitmq-server-3.9.0-1.el7.noarch.rpm
	# 启动rabbitmq
	systemctl start rabbitmq-server

	# 查看rabbitmq状态
	systemctl status rabbitmq-server

	# 设置rabbitmq服务开机自启动
	systemctl enable rabbitmq-server

	# 关闭rabbitmq服务
	systemctl stop rabbitmq-server

	# 重启rabbitmq服务
	systemctl restart rabbitmq-server

3.4, mq 端口开放:

	firewall-cmd --zone=public --add-port=5672/tcp --permanent
	firewall-cmd --zone=public --add-port=15672/tcp --permanent
	firewall-cmd --reload
	firewall-cmd --zone=public --list-ports

3.5, 安装mq管理界面

	
	# 启用管理界面插件
	rabbitmq-plugins enable rabbitmq_management

	curl http://localhost:15672 就可以打开web管理页面

	# rabbitmq有一个默认的账号密码guest,但该情况仅限于本机localhost进行访问,所以需要添加一个远程登录的用户

	# 添加用户
	rabbitmqctl add_user 用户名 密码

	rabbitmqctl add_user admin 123456

	# 设置用户角色,分配操作权限
	rabbitmqctl set_user_tags 用户名 角色

	rabbitmqctl set_user_tags admin administrator

	# 为用户添加资源权限(授予访问虚拟机根节点的所有权限)
	rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

	rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

	# 角色有四种:
	# administrator:可以登录控制台、查看所有信息、并对rabbitmq进行管理
	# monToring:监控者;登录控制台,查看所有信息
	# policymaker:策略制定者;登录控制台指定策略
	# managment:普通管理员;登录控制

	# 修改密码
	rabbitmqctl change_ password 用户名 新密码

	# 删除用户
	rabbitmqctl delete_user 用户名

	# 查看用户清单
	rabbitmqctl list_users

3.6, 延迟消息插件安装:

    # 把插件包先复制到	 /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins
    cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins/
	rabbitmq-plugins enable rabbitmq_delayed_message_exchange
	#重启mq		
	systemctl restart rabbitmq-server
	rabbitmq-plugins list

3.7,登录测试

访问地址: ip:15672 账号密码: admin 123456
Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息,java,MQ,centos,rabbitmq,java,延迟消息

找到交换机 exchange,看看类型是否有延迟消息类型的
Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息,java,MQ,centos,rabbitmq,java,延迟消息

然后就可以写代码去连接发消息了。

4, Java代码

4.1, pom 引入:

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

4.2, 配置类:

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

4.3, 消息定义配置类:


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class OrderRabbitMQConfig {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    //================================订单延时=================================
    @Bean
    CustomExchange order_pay_delay_exchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("order_pay_delay_exchange", "x-delayed-message", true, false, args);
    }
    @Bean
    public Queue order_pay_delay_queue() {
        Queue queue = new Queue("order_pay_delay_queue", true, false, false);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding order_pay_delay_binding() {
        return BindingBuilder.bind(order_pay_delay_queue())
                .to(order_pay_delay_exchange()).with("order_pay_delay_routing").noargs();
    }

    //================================订单支付通知======================================
    @Bean
    public DirectExchange order_pay_notify_exchange() {
        return new DirectExchange("order_pay_notify_exchange", true, false);
    }
    @Bean
    public Queue order_pay_notify_direct_queue() {
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("x-max-priority", 5);
        Queue queue = new Queue("order_pay_notify_queue", true, false, false, argsMap);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }
    @Bean
    public Binding ctc_bidding_auction_pay_notify_binding() {
        return BindingBuilder.bind(order_pay_notify_direct_queue())
                .to(order_pay_notify_exchange()).with("order_pay_notify_routing");
    }
}

4.4, 消息发送类:


import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitMQSendUtils {

    private static RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQSendUtils(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * 订单支付延时通知、发送MQ消息
     */
    public static void sendPayDelayMessage(PayOrderNotifyDto dto, final Integer delayTimes) {
        //给延迟队列发送消息
        String msg = JSONUtil.toJsonStr(dto);
        log.info("订单支付延时通知、发送MQ消息: {}, delayTimes={}", msg, delayTimes);
        rabbitTemplate.convertAndSend("order_pay_delay_exchange", "order_pay_delay_routing", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setDelay(delayTimes);
                return message;
            }
        });
    }

    /**
     * 订单支付通知,发送MQ消息
     */
    public static void sendPayNotifyMsg(PayOrderNotifyDto dto) {
        log.info("订单支付通知,发送MQ消息: {}", dto);
        rabbitTemplate.convertAndSend("order_pay_notify_exchange", "order_pay_notify_routing", JSONUtil.toJsonStr(dto));
    }
}

4.5, 消息监听消费类:文章来源地址https://www.toymoban.com/news/detail-715957.html


import cn.hutool.json.JSONUtil;
import com.xxx.rabbitmq.dto.PayOrderNotifyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * MQ消费监听
 */
@Slf4j
@Component
public class OrderMQListener {
    /**
     * 订单延时通知 消息
     */
    @RabbitListener(queues = {"order_pay_delay_queue"})
    public void payDelayNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("【消费】订单延时通知 MQ 消息内容: {}, Message={}", msg, message);
            //支付订单改成超时未支付》取消
            PayOrderNotifyDto dto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);

        } catch (Exception e) {
            log.error("订单延时通知 消息消费失败:", e);
        }
    }
    /**
     * 订单支付通知 消息
     */
    @RabbitListener(queues = {"order_pay_notify_queue"})
    public void payNotify(Message message) {
        try {
            String msg = new String(message.getBody());
            log.info("订单支付通知 MQ 消息内容:{}, {}", msg, message);
            PayOrderNotifyDto payOrderNotifyDto = JSONUtil.toBean(msg, PayOrderNotifyDto.class);
        } catch (Exception e) {
            log.error("订单支付通知 消息消费失败:", e);
        }
    }

}

到了这里,关于Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMq应用延时消息

    一.建立绑定关系 二.建立生产者 1.消息实体 三.建立消费者 四.测试类测试 五.效果如图所示

    2024年02月12日
    浏览(29)
  • RabbitMQ 延时消息实现

    导入Spring 集成RabbitMQ MAEVN 推送消息至延迟队列 - 消息过期自动推送到死信队列 - 消费死信队列 2.1. MQ配置信息 2.1.1. 自定义队列配置 …/bootstrap.yml 2.1.2. 读取自定义MQ配置信息 2.2. 配置文件自动生成队列 2.2.1. 延迟队列 2.2.2. 死信队列 2.3. 生产者推送消息 2.4. 消费者处理消息 设

    2024年04月26日
    浏览(32)
  • 【RabbitMQ】 RabbitMQ 消息的延迟 —— 深入探索 RabbitMQ 的死信交换机,消息的 TTL 以及延迟队列

    消息队列是现代分布式应用中的关键组件,用于实现异步通信、解耦系统组件以及处理高并发请求。消息队列可以用于各种应用场景,包括任务调度、事件通知、日志处理等。在消息队列的应用中,有时需要实现消息的延迟处理、处理未能成功消费的消息等功能。 本文将介绍

    2024年02月05日
    浏览(54)
  • 使用StreamBridge实现RabbitMq 消息收发 && ack确认 && 延时消息

    下载地址:link 1.下载完成放到rabbitmq安装目录plugins下 2.执行命令启用插件 3.重启mq Exchanges - add a new exchange - type 出现x-delayed-message即安装成功

    2024年02月11日
    浏览(30)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(43)
  • Rabbitmq延迟消息

    延迟消息有两种实现方案: 1,基于死信队列 2,集成延迟插件 使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念: 消息的TTL(存活时间)和死信交换机Exchange,通过这两者的组合来实现延迟队列 1.1 消息的TTL(Time To Live) 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和

    2024年02月13日
    浏览(35)
  • RabbitMQ-消息延迟

            一个队列接收到的消息有过期时间,消息过期之后,如果配置有死信队列,消息就会进去死信队列。         当生产者将消息发送到exchange1,然后交换机将消息路由到队列queue1,但是队列queue1没有消费者,所以当该队列里面的值过期时,就会将消息发送到死信

    2024年01月22日
    浏览(25)
  • RabbitMQ实现延迟消息

    1,基于死信队列 2,集成延迟插件 使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可

    2024年02月12日
    浏览(28)
  • RabbitMQ实现延时消息的两种方法

    1、死信队列 1.1消息什么时候变为死信(dead-letter) 消息被否定接收,消费者使用basic.reject 或者 basic.nack并且requeue 重回队列属性设为false。 消息在队列里得时间超过了该消息设置的过期时间(TTL)。 消息队列到达了它的最大长度,之后再收到的消息。 1.2死信队列的原理 当一个

    2024年02月10日
    浏览(29)
  • RabbitMQ-同步和异步通讯、安装和入门案例、SpringAMQP(5个消息发送接收Demo,jackson消息转换器)

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月11日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包