RabbitMQ-基础学习

这篇具有很好参考价值的文章主要介绍了RabbitMQ-基础学习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在虚拟机上安装Erlang的GCC环境,装erlong,然后安装rabbitmq

参考:安装说明链接

安装web端面板

RabbitMQ-基础学习,rabbitmq,学习,ruby

创建交换机

RabbitMQ-基础学习,rabbitmq,学习,ruby

先学习一下工作模式(详细介绍可见官网)

RabbitMQ-基础学习,rabbitmq,学习,ruby

上代码

1.Hello Word模式

RabbitMQ-基础学习,rabbitmq,学习,ruby

写在测试类中:
Providucer

@Test
	void contextLoads()throws Exception {
		//1.创建链接
		ConnectionFactory factory = new ConnectionFactory();
		//2。设置参数
		factory.setHost("192.168.63.130");
		factory.setPort(5672);
		factory.setVirtualHost("/peng");
		factory.setUsername("peng");
		factory.setPassword("peng");
		//3.创建链接Connection
		Connection connection = factory.newConnection();

		//4.创建Channel
		Channel channel = connection.createChannel();
		//5.创建队列Queue
		//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
		/**
		 * 1.queue 队列名
		 * 2.durable 是否持久化
		 * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
		 * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
		 * 5.arguments:
		 * */

		channel.queueDeclare("peng",true,false,false,null);

		//6.发送消息
		//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		/**
		 * 1.exchange:交换机名称
		 * 2.routingKey:路由名称
		 * 3.props:配置信息
		 * 4.body:发送的消息数据
		 */
		String body="第一个消息";
		channel.basicPublish("","peng",null,body.getBytes());
		//7.释放资源
		channel.close();
		connection.close();
	}

Consumer

@Test
	void contextLoads()throws Exception {
		//1.创建链接
		ConnectionFactory factory = new ConnectionFactory();
		//2。设置参数
		factory.setHost("192.168.63.130");
		factory.setPort(5672);
		factory.setVirtualHost("/peng");
		factory.setUsername("peng");
		factory.setPassword("peng");
		//3.创建链接Connection
		Connection connection = factory.newConnection();

		//4.创建Channel
		Channel channel = connection.createChannel();
		//5.创建队列Queue
		//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
		/**
		 * 1.queue 队列名
		 * 2.durable 是否持久化
		 * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
		 * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
		 * 5.arguments:
		 * */

		channel.queueDeclare("peng",true,false,false,null);
		//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
		/**
		 * 1.queue:队列名
		 * 2.deliverCallback:是否自动确认收到
		 * 3.cancelCallback:回调对象
		 */
		Consumer consumer= new DefaultConsumer(channel){
			/**
			 * 1.consumerTag:
			 * 2.envelope:
			 * 3.properties:
			 * 4.body:
			 * @param consumerTag
			 * @param envelope
			 * @param properties
			 * @param body
			 * @throws IOException
			 */
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				System.out.println("consumerTag"+consumerTag);
				System.out.println("envelope"+envelope.getExchange());
				System.out.println("properties"+envelope.getRoutingKey());
				System.out.println("properties"+properties);
				System.out.println("body"+new String(body));
			}
		};
		channel.basicConsume("peng",true,consumer);
	}

2.Work Queues模式

RabbitMQ-基础学习,rabbitmq,学习,ruby
生产者生产,两个消费者循环消费
P:

package com.providucer.factory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @ClassName: Providucerfactory
 * @author: 鹏
 * @date: 2023/7/4 14:42
 */

public class ProvideFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */
        channel.queueDeclare("pengwork",true,false,false,null);
        //6.发送消息
        //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        /**
         * 1.exchange:交换机名称
         * 2.routingKey:路由名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */
        for (int i = 1; i <= 10; i++) {
            String body="第"+i+"个消息";
            channel.basicPublish("","pengwork",null,body.getBytes());
        }
        //7.释放资源
        channel.close();
        connection.close();
    }
}

C1:

package com.consumer.factory;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: ConsumerFactory
 * @author: 鹏
 * @date: 2023/7/4 14:44
 */

public class ConsumerFactory {
    public static void main(String[] args)throws Exception {
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("pengwork",true,false,false,null);
        //basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
        /**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */
        Consumer consumer= new DefaultConsumer(channel){
            /**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope.getExchange());
                System.out.println("properties"+envelope.getRoutingKey());
                System.out.println("properties"+properties);*/
                System.out.println("body"+new String(body));
            }
        };
        channel.basicConsume("pengwork",true,consumer);
    }
}

C2:

package com.consumer.factory;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: ConsumerFactory
 * @author: 鹏
 * @date: 2023/7/4 14:44
 */

public class ConsumerFactory1 {
    public static void main(String[] args)throws Exception {
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();

        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建队列Queue
        //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        /**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("pengwork",true,false,false,null);
        //basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
        /**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */
        Consumer consumer= new DefaultConsumer(channel){
            /**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope.getExchange());
                System.out.println("properties"+envelope.getRoutingKey());
                System.out.println("properties"+properties);*/
                System.out.println("body"+new String(body));
            }
        };
        channel.basicConsume("pengwork",true,consumer);
    }
}

消费结果:
RabbitMQ-基础学习,rabbitmq,学习,ruby
RabbitMQ-基础学习,rabbitmq,学习,ruby

3.Publish/Subscribe订阅模式

RabbitMQ-基础学习,rabbitmq,学习,ruby
RabbitMQ-基础学习,rabbitmq,学习,ruby
消费着只需要绑定相应的队列,生产者需要创建交换机

public class PubFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        String exchange="test_fanout";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT,
                true,false,false,null);
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        //1.exchange:交换机名称
        //2.type:交换机类型
        /**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         */
        //3.durable:是否持久化
        //4.autoDelete:是福哦自动删除
        //5.internal: 内部使用一般用false
        //6.arguments: 参数

        //channel.exchangeDeclare();
        //6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定交换机与队列
        /**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"");
        channel.queueBind(queue2Name,exchange,"");
        String body="日志信息:接收成功";
        channel.basicPublish(exchange,"",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

4.Routing路由模式

路由模式相当于增加一层限制,只有通过相应的限制交换机才能将消息发布到对应的队列,也就是在发布的时候路由参数数设置值,且交换机类型必须为direct
channel.basicPublish(exchange,"error",null,body.getBytes());此处限制队列路由为error的可以发送


public class RoutingFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        String exchange="test_direct";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT,
                true,false,false,null);
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        //1.exchange:交换机名称
        //2.type:交换机类型
        /**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         */
        //3.durable:是否持久化
        //4.autoDelete:是福哦自动删除
        //5.internal: 内部使用一般用false
        //6.arguments: 参数

        //channel.exchangeDeclare();
        //6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定交换机与队列
        /**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"error");
        channel.queueBind(queue2Name,exchange,"info");
        channel.queueBind(queue2Name,exchange,"error");
        channel.queueBind(queue2Name,exchange,"warming");
        String body="日志信息:接收成功";
        channel.basicPublish(exchange,"error",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

5. Topics模式

相对于routing在队列增加了匹配规则,让交换机发送与队列接受更加灵活*匹配一个单词,#匹配多个单词
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true,false,false,null);
设置为BuiltinExchangeType.TOPIC
RabbitMQ-基础学习,rabbitmq,学习,ruby文章来源地址https://www.toymoban.com/news/detail-543242.html

public class TopicsFactory {
    public static void main(String[] args) throws Exception{
        //1.创建链接
        ConnectionFactory factory = new ConnectionFactory();
        //2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");
        //3.创建链接Connection
        Connection connection = factory.newConnection();
        //4.创建Channel
        Channel channel = connection.createChannel();
        //5.创建交换机
        //exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
        String exchange="test_topic";
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,
                true,false,false,null);
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        //1.exchange:交换机名称
        //2.type:交换机类型
        /**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         */
        //3.durable:是否持久化
        //4.autoDelete:是福哦自动删除
        //5.internal: 内部使用一般用false
        //6.arguments: 参数

        //channel.exchangeDeclare();
        //6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7.绑定交换机与队列
        /**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"*.*");
        channel.queueBind(queue2Name,exchange,"*.one");
        channel.queueBind(queue2Name,exchange,"*.two");
        channel.queueBind(queue2Name,exchange,"ok.*");
        String body="日志信息:接收成功";
        channel.basicPublish(exchange,"error",null,body.getBytes());
        channel.basicPublish(exchange,"123.one",null,body.getBytes());
        channel.basicPublish(exchange,"123.two",null,body.getBytes());
        //8.释放资源
        channel.close();
        connection.close();
    }
}

到了这里,关于RabbitMQ-基础学习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ学习(五):RabbitMQ持久化

    在上一章内容中我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉后消 息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它将忽视队列 和消息,除非告知它不要这样做。 确保消息不会丢失需要做两件事:我们需

    2024年02月16日
    浏览(42)
  • RabbitMQ学习(二)——Linux下安装RabbitMQ

    1、 先去官网下载RabbitMQ 下载地址 :Downloading and Installing RabbitMQ — RabbitMQ 选择对应的系统版本点击下载,下载后会得到 .rpm 文件   2、下载Erlang RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,需要是安装 Erlang Erlang 和 RabbitMQ 版本对照:RabbitMQ Erlang Version

    2024年02月08日
    浏览(39)
  • 【SpringCloud】RabbitMQ基础

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年02月14日
    浏览(34)
  • RabbitMq-1基础概念

    mq:消息队列,存储消息的中间件 分布式系统通信的两种方式:直接远程调用,借助第三方完成间接通信 消息的发送方是生产者,接收方为消费者 优势:应用解耦,异步提速,削峰填谷 劣势:系统可用性降低,系统复杂度提高,一致性问题 应用解耦 异步提速 削峰填谷 使用

    2024年02月12日
    浏览(34)
  • RabbitMQ基础概念-02

    RabbitMQ是基于AMQP协议开发的一个MQ产品, 首先我们以Web管理页面为 入口,来了解下RabbitMQ的一些基础概念,这样我们后续才好针对这些基础概念 进行编程实战。 可以参照下图来理解RabbitMQ当中的基础概念: 虚拟主机 virtual host 这个在之前搭建时已经体验过了。RabbitMQ出于服务

    2024年02月07日
    浏览(32)
  • SpringBoot整合RabbitMQ(基础)

    一.环境准备 1、在pom文件中引入对应的依赖: 2、在application.yml配置文件中配置RabbitMQ: 二、整合 点对点,简单模式 ①配置文件中声明队列 ②创建生产者 消息发送成功后,在web管理页面查看: 可以看到对应队列中产生了消息 ③创建消费者 启动项目,可以看到消息成功消费:

    2024年02月11日
    浏览(37)
  • RabbitMQ基础核心概念

    了解RabbitMQ的核心概念 1、模型概念 2、Producer (生产者) 、 Consumer(消费者) 3、Exchange (交换机) 4、Queue (消息队列) 5、Broker(服务节点) 6、RabbitMQ的五种工作模式 Producer(生产者) :顾名思义是生产消息的一方 Consumer(消费者) :顾名思义是消费消息的一方 而消息一般由 俩部分组成

    2023年04月09日
    浏览(50)
  • RabbitMQ基础

    目录 MQ  MQ概述 MQ 的优势  1.应用解耦 2.异步提速 3.削峰填谷 MQ 的劣势 1.系统可用性降低 2.系统复杂度提高 3.一致性问题 使用 MQ 需要满足什么条件呢?  RabbitMQ 简介 ​编辑RabbitMQ 中的相关概念 RabbitMQ 提供了 6 种工作模式 JMS java实现Rabbitmq 依赖导入和基础配置 简单模式 Wor

    2024年02月09日
    浏览(10)
  • RabbitMQ基础篇 笔记

    同步调用 一步一步的来,支付业务写完后,如果之后加需求,还需要增加代码,不符合开闭原则。 性能上也有问题,openfeign是同步调用,性能太差。 同步调用耦合太多。 同步的优势是可以立即得到结果,例如查询,查到了就能知道结果。但是拓展性差,性能下降,级联失败

    2024年02月06日
    浏览(29)
  • RabbitMQ 基础介绍

    RabbitMQ 基础介绍 TL;DR RabbitMQ 是一个默认基于 AMQP 协议的消息队列系统,本文解释介绍 RabbitMQ 的基础概念、数据传输机制和流程,帮助快速了解 RabbitMQ 和进行相关开发。本文重点在解释什么是 RabbitMQ、RabbitMQ 是如何运行的。至于为什么使用消息队列?为什么使用 RabbitMQ?请参

    2024年02月03日
    浏览(22)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包