Java - RabbitMq的安装&使用

这篇具有很好参考价值的文章主要介绍了Java - RabbitMq的安装&使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、Linux(ubuntu)安装RabbitMQ

(1)首先确认Linux 内核版本,确定是Ubuntu还是CentOS版本。

(2)rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang。

(3)确定relang是否安装成功。

(4)更新一下软件包,确保软件最新版本。

(5)安装最新版本的RabbitMQ

(5.1)安装指定版本 (先准备环境)

(5.2)查询目前可以安装的版本

(5.3)选择版本

(6)查看状态,验证是否安装成功。

(7)启动插件,允许web访问。(很关键)

(8)重启一下RabbitMQ服务

(9)添加用户方便web访问

(10)登录

(11)注意:如果遇见没有死信队列可用,可以在官网下载 (以下为帮助文档)

二、RabbitMQ整合SpringBoot使用

(1)导入依赖

(2)配置文件

(3)声明exchange、queue

(4)发布消息到RabbitMQ

(5)创建消费者监听消息

(6)ACK

(7)添加配置文件

(8)手动ack

三、消息的可靠性

(1)RabbitMQ的事务

(2)事务模型

(3)普通Confirm方式

(4)批量Confirm方式。

(5)异步Confirm方式。

(6)Return机制

(7)SpringBoot整合实现

(8)避免消息重复消费

 (9)SpringBoot实现

 四、使用死信队列实现订单超时取消


一、Linux(ubuntu)安装RabbitMQ

(1)首先确认Linux 内核版本,确定是Ubuntu还是CentOS版本。
uname -a

Java - RabbitMq的安装&使用,java,开发语言

(2)rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang。
apt-get install erlang-nox
(3)确定relang是否安装成功。
erl
(4)更新一下软件包,确保软件最新版本。
sudo apt-get update
(5)安装最新版本的RabbitMQ
sudo apt-get install rabbitmq-server
(5.1)安装指定版本 (先准备环境)
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
(5.2)查询目前可以安装的版本
apt-cache madison rabbitmq-server 
(5.3)选择版本
sudo apt-get install rabbitmq-server=3.7.7-1
(6)查看状态,验证是否安装成功。
systemctl status rabbitmq-server
(7)启动插件,允许web访问。(很关键)
rabbitmq-plugins enable rabbitmq_management
(8)重启一下RabbitMQ服务
service rabbitmq-server restart
(9)添加用户方便web访问
rabbitmqctl add_user admin adimin // 增加普通用户
abbitmqctl set_user_tags admin administrator // 给普通用户分配管理员角色
(10)登录

Java - RabbitMq的安装&使用,java,开发语言

Java - RabbitMq的安装&使用,java,开发语言

(11)注意:如果遇见没有死信队列可用,可以在官网下载 (以下为帮助文档)

https://blog.csdn.net/u010404909/article/details/125329160

         下载好以后上传插件到Linux

// 插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

// 重启mq
systemctl restart rabbitmq-server

二、RabbitMQ整合SpringBoot使用

(1)导入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置文件
spring:
  rabbitmq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
(3)声明exchange、queue
@Configuration
public class RabbitMQConfig {
    //1. 创建exchange - topic
    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("boot-topic-exchange",true,false);
    }

    //2. 创建queue
    @Bean
    public Queue getQueue(){
        return new Queue("boot-queue",true,false,false,null);
    }

    //3. 绑定在一起
    @Bean
    public Binding getBinding(TopicExchange topicExchange,Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
}
(4)发布消息到RabbitMQ
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
(5)创建消费者监听消息
@Component
public class Consumer {

    @RabbitListener(queues = "boot-queue")
    public void getMessage(Object message){
        System.out.println("接收到消息:" + message);
    }

}
(6)ACK

        消费者在消费消息时,为了保证消息的可靠性,可以设置ack模式,当没有设置时,为默认自动ACK。当消费成功后,会对mq进行一个响应,mq收到成功消费的响应后,才会移除消息。自动ACK是默认设置,当消费过程中没有异常并顺利执行完毕,会自动响应mq服务器,当消费过程中出现异常,则不会响应,会被mq视为ACK失败。

        手动ACK:当消息成功后,需要手动ACK响应mq服务器,否则在连接未断开前,一直是unacked状态,不会重新分配消费者,直到连接断开才会回到ready状态,并重新分配消费者。

        消息拒绝:当消息被拒绝时,如果设置重新入列,会继续将该消息返回到mq重新分配消费者,如果设置false,则直接丢弃消息。

        ACK也支持事务,与生产者投递消息时事务代码几乎一样。但是必须设置为手动ACK才能支持事务,自动ack情形下事务是无效的。

(7)添加配置文件
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  #手动确定

ACK有三种确认方式:

• 自动确认:acknowledge = none

• 手动确认:acknowledge = manual

• 根据异常情况确认:acknowledge = auto

(8)手动ack
    @RabbitListener(queues = "myqueue1") // 监听mq
    public void consumer1(String msg, Channel channel, Message message) throws IOException{
        System.out.println("consumer1===" + msg);
        // 当设置了手动ack时,没有进行ack,mq会将该消息设置为unacked状态,此时如果连接断开,会将消息重新设置为ready状态,继续分配消费者
        // 消息的拒绝,会将消息直接设置为ready状态,继续分配消费者
        // 参数:1,消息对象的标识, 2、requeue,是否重新回到队列
        // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        // 手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

三、消息的可靠性

(1)RabbitMQ的事务

        事务可以保证消息正确传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100。RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

(2)事务模型

        在投递消息时开启事务,投递完消息无误的情况下提交事务,有误的情况下回滚事务。性能非常低下。一般情况下mq就是用来解耦进行削峰填谷,使用事务不推荐。

        注意:在连接关闭前,事务都没有commit时,等同于rollback。

        try (
                // 获得连接
                Connection connection = MQConnections.getConnection();
                // 创建通道
                Channel channel = connection.createChannel();
                ){
            // 开启事务
            channel.txSelect();
            // 直接发送简单消息到队列
            // 参数:1、交换机 2、队列名称 3、消息携带的properties 4、消息主体
            channel.basicPublish("myex1", "", null, msg.getBytes());
            System.out.println("消息发送成功");
            // 提交事务
            channel.txCommit();
            // 回滚事务
            // channel.txRollback();
        }catch (Exception e){
            e.printStackTrace();

        }
(3)普通Confirm方式

        开启了确认机制后,也会降低性能,大概是原来的10倍。不论生产者是否接收确认消息,消息都已经成功发送了。

//3.1 开启confirm
channel.confirmSelect();
//3.2 发送消息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判断消息发送是否成功
if(channel.waitForConfirms()){
    System.out.println("消息发送成功");
}else{
    System.out.println("发送消息失败");
}
(4)批量Confirm方式。
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 确定批量操作是否成功
channel.waitForConfirmsOrDie();     // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
(5)异步Confirm方式。
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {

    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
    }

    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
    }
});

消息传递可靠性  

Java - RabbitMq的安装&使用,java,开发语言

(6)Return机制

        Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。而且exchange是不能持久化消息的,queue是可以持久化消息。采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

Java - RabbitMq的安装&使用,java,开发语言

开启Return机制,并在发送消息时,指定mandatory为true

// 开启return机制
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 当消息没有送达到queue时,才会执行。
        System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
    }
});

// 在发送消息时,指定mandatory参数为true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());
(7)SpringBoot整合实现
  •         配置文件
spring:
  rabbitmq:
    publisher-confirm-type: simple
    publisher-returns: true
  •        开启confirm和return
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct  // init-method
    public void initMethod(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("消息已经送达到Exchange");
        }else{
            System.out.println("消息没有送达到Exchange");
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息没有送达到Queue");
    }
}
(8)避免消息重复消费

        重复消费消息,会对非幂等行操作造成问题。原因是消费者没有给RabbitMQ一个ack。

重复消费

Java - RabbitMq的安装&使用,java,开发语言

        为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中。

                id-0(正在执行业务)

                id-1(执行业务成功)

        如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

        极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

        生产者,发送消息时,指定messageId

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(1)     //指定消息书否需要持久化 1 - 需要持久化  2 - 不需要持久化
    .messageId(UUID.randomUUID().toString())
    .build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

       消费者,在消费消息时,根据具体业务逻辑去操作redis

DefaultConsumer consume = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        Jedis jedis = new Jedis("192.168.199.109",6379);
        String messageId = properties.getMessageId();
        //1. setnx到Redis中,默认指定value-0
        String result = jedis.set(messageId, "0", "NX", "EX", 10);
        if(result != null && result.equalsIgnoreCase("OK")) {
            System.out.println("接收到消息:" + new String(body, "UTF-8"));
            //2. 消费成功,set messageId 1
            jedis.set(messageId,"1");
            channel.basicAck(envelope.getDeliveryTag(),false);
        }else {
            //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
            String s = jedis.get(messageId);
            if("1".equalsIgnoreCase(s)){
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        }
    }
};
 (9)SpringBoot实现
  •         依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  •        配置文件
spring:
  redis:
    host: 192.168.199.109
    port: 6379
  •       修改生产者
@Test
void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
    System.in.read();
}
  •      修改消费者
@Autowired
private StringRedisTemplate redisTemplate;


@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
    //0. 获取MessageId
    String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    //1. 设置key到Redis
    if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
        //2. 消费消息
        System.out.println("接收到消息:" + msg);

        //3. 设置key的value为1
        redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
        //4.  手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else {
        //5. 获取Redis中的value即可 如果是1,手动ack
        if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

 四、使用死信队列实现订单超时取消

死信队列:DLX,dead-letter-exchange

利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

 消息变成死信有以下几种情况:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false

  • 消息TTL过期

  • 队列达到最大长度 

 死信处理过程:

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

  • 可以监听这个队列中的消息做相应的处理。

具体实现:

上面的代码中,修改RabbitConfig类,配置延迟队列和死信队列

@Configuration
public class RabbitConfig {
    // 创建队列
    @Bean
    public Queue orderDelayQueue(){
        // 将订单延迟队列绑定死信队列
        Map map = new HashMap();
        map.put("x-dead-letter-exchange", "dead_exchange");
        map.put("x-dead-letter-routing-key", "dead_route_key");
        return new Queue("orderDelayQueue", true, false, false, map);
    }

    // 创建Fanout类型交换机
    @Bean
    public DirectExchange orderDelayExchange(){
        return new DirectExchange("orderDelayExchange");
    }

    // 将队列绑定到交换机
    @Bean
    public Binding simpleBinding(){
        return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with("orderDelayKey");
    }

    // 创建死信队列
    @Bean
    public Queue deadQueue(){
        // 将订单延迟队列绑定死信队列
        return new Queue("deadQueue");
    }

    // 创建Fanout类型交换机
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead_exchange");
    }

    // 将队列绑定到交换机
    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead_route_key");
    }
}
@RestController
public class OrderController {
    @Resource
    private OrderService orderService;
    @Resource
    private MySender mySender;

    @RequestMapping("/add")
    public String save(Orders order){
        order.setCreateTime(new Date());
        order.setUpdateTime(new Date());
        order.setNo(UUID.randomUUID().toString().replace("-", ""));
        order.setStatus(0);
        orderService.save(order);
        // 投递到订单延迟队列中,该队列没有消费者,作用是为了超时后投递到死信队列中
        mySender.sendOrder(order);
        return "success";
    }
}

@Component
public class MySender {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendOrder(Orders orders){
        rabbitTemplate.convertAndSend("orderDelayExchange", "orderDelayKey", orders, message -> {
                // 设置超时时间,单位毫秒
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        );
        System.out.println(orders.getNo() + "订单已经放入队列中...");
    }
}

@Component
public class MyConsumer {
    @Resource
    private OrderDAO orderDAO;

    @RabbitListener(queues = "deadQueue")
    public void consume(Orders orders){
        orders.setStatus(2);
        QueryWrapper queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("no", orders.getNo());
        orderDAO.update(orders, queryWrapper);
        System.out.println(orders.getNo() + "取消成功");
    }
}

@Service
public class OrderService {
    @Resource
    private OrderDAO orderDAO;

    public void save(Orders orders){
        orderDAO.insert(orders);
    }
}

通过controller中添加方法,测试添加一条订单,等待10秒后,看是否会取消订单。文章来源地址https://www.toymoban.com/news/detail-768339.html

到了这里,关于Java - RabbitMq的安装&使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • doker安装RabbitMQ以及用java连接

    目录 doker安装:  RabitMq安装:  java链接 参考链接(非常详细): docker安装以及部署_docker bu shuminio_春风与麋鹿的博客-CSDN博客 安装好后开启doker 1. 拉取do\\\'ke中的rabbitmq镜像 2.启动 3.浏览器访问 输入虚拟机ip和端口号进行访问 链接之前在防火墙中开发5672端口,否则会连接超时

    2024年02月11日
    浏览(21)
  • Java之IDE开发工具eclipse下载安装使用详细教程

    熟练eclipse的使用 Eclipse是一个IDE(集成开发环境),集成了代码编写功能,分析功能,编译功能,调试功能等一体化的开发软件。 免费 Java语言编写 免安装 扩展性强 下载和安装 下载地址:http://eclipse.org 绿色版 解压就可以使用(Eclipse) 版本代号 平台版本 需要的JDK最低版本 Gal

    2024年04月14日
    浏览(65)
  • kafka和rabbitmq区别面试题,十年Java编程开发生涯

    前言 作为同时具备高性能、高可靠和高可扩展性的典型键值数据库,Redis不仅功能强大,而且稳定,理所当然地成为了大型互联网公司的首选。 众多大厂在招聘的时候,不仅会要求面试者能简单地使用Redis,还要能深入地理解底层实现原理,并且具备解决常见问题的能力。可

    2024年04月25日
    浏览(30)
  • 大型医院云HIS系统:采用前后端分离架构,前端由Angular语言、JavaScript开发;后端使用Java语言开发 融合B/S版电子病历系统

    一套医院云his系统源码 采用前后端分离架构,前端由Angular语言、JavaScript开发;后端使用Java语言开发。融合B/S版电子病历系统,支持电子病历四级,HIS与电子病历系统均拥有自主知识产权。 文末卡片获取联系! 基于云计算技术的B/S架构的医院管理系统(简称云HIS),采用前后

    2024年02月03日
    浏览(35)
  • RabbitMQ的使用(JAVA)

    目录 一、MQ的简介 二、MQ的作用 1、异步处理(同时处理多件事情) ​编辑 2、应用解耦(添加了一个中间件) ​编辑 3、流量控制(当秒杀的时候,可以限制流量) 三、Docker安装MQ  四、MQ的执行顺序 五、其他 1、Exchange 类型 ①Exchange的direct(默认) ②Exchange的fanout ③Exchange的

    2023年04月08日
    浏览(18)
  • java中使用rabbitmq

    mq常用于业务解耦、流量削峰和异步通信,rabbitmq是使用范围较广,比较稳定的一款开源产品,接下来我们使用springboot的starter来引入rabbitmq,了解mq的几种使用模式,通过几个简单的案例,让你可以快速地了解到该使用哪种模式来对应业务场景,使用rabbitmq看这一篇就够了,下方附安

    2024年03月28日
    浏览(27)
  • 使用Java进行操作RabbitMQ

    使用Java操作消息队列 现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式: 依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责

    2024年02月10日
    浏览(26)
  • RabbitMQ:使用Java进行操作

    使用Java操作消息队列 现在我们来看看如何通过Java连接到RabbitMQ服务器并使用消息队列进行消息发送(这里一起讲解,包括Java基础版本和SpringBoot版本),首先我们使用最基本的Java客户端连接方式: 依赖导入之后,我们来实现一下生产者和消费者,首先是生产者,生产者负责

    2024年02月16日
    浏览(27)
  • Rabbitmq在java中的使用

    1.1、maven导入相关依赖 1.2、通用类及常用方法讲解 常用方法可以参考: RabbitMQ 常用方法介绍(二) 1.3、编写不同的交换机类型 Direct exchange(直连交换机) 直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下: 将一个队列绑定

    2024年02月06日
    浏览(31)
  • Java RabbitMQ消息队列简单使用

    消息队列,即MQ,Message Queue。 消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    2024年02月12日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包