Rabbitmq学习

这篇具有很好参考价值的文章主要介绍了Rabbitmq学习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

RabbitMQ

1 同步调用和异步调用

1 同步调用适用于大多数场景 比如差一个订单状态 我们要求时效性
2 异步调用适用于高并发场景,依赖于异步管理器,能提高吞吐量,反应速度。
Rabbitmq学习,rabbitmq,rabbitmq,分布式
Rabbitmq学习,rabbitmq,rabbitmq,分布式

2 常见的MQ对比

Rabbitmq学习,rabbitmq,rabbitmq,分布式
Rabbitmq学习,rabbitmq,rabbitmq,分布式
Rabbitmq学习,rabbitmq,rabbitmq,分布式

3 安装RabbitMQ

我使用的是centos8

直接安装rabbitmq会有很多问题 因为centos8

021年12月31日CentOS 8操作系统版本结束了生命周期(EOL),Linux社区已不再维护该操作系统版本。后续新的服务器建议使用CentOS Stream,或者其他linux版本,按照社区规则,CentOS 8的源地址http://mirror.centos.org/centos/8/内容已移除

这里使用Docker的方式进行安装

1 安装rabbitmq

docker pull rabbitmq

Rabbitmq学习,rabbitmq,rabbitmq,分布式

2 启动rabbitmq
默认方式
登录的时候用户名密码都是guest

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

配置用户名密码

docker run \
 -e RABBITMQ_DEFAULT_USER=dongjiming \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

3 安装web插件

先执行docker ps 拿到当前的镜像ID

进入容器

安装插件

ctrl+p+q退出当前容器

docker ps 
docker exec -it 镜像ID /bin/bash
rabbitmq-plugins enable rabbitmq_management

访问地址

http://linuxip地址:15672,这里的用户名和密码输你配置的

可以配置用户信息

# 创建一个rabbitmq用户
rabbitmqctl add_user [账号] [密码]
# 给具体的一个用户设置身份权限
rabbitmqctl set_user_tags [账号] administrator
# 给具体的一个用户修改密码
rabbitmqctl change_password [username] [new password]
# 删除一个用户
rabbitmqctl delete_user [username]
# 列出所有用户清单
rabbitmqctl list_users
# 为用户设置 administrator 角色
rabbitmqctl.bat set_permission -p / [username] ".*" ".*" ".*"
rabbitmqctl.bat set_permission -p / root ".*" ".*" ".*"

Rabbitmq学习,rabbitmq,rabbitmq,分布式

4 RabbitMQ学习

Rabbitmq学习,rabbitmq,rabbitmq,分布式

4.1 helloworld学习

依赖

  <dependencies>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-nop</artifactId>
      <version>1.7.29</version>
    </dependency>
  </dependencies>

生产者

package helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:     Hello World 的发送类,连接到RabbitMQ服务端,然后发送一条消息,然后退出。
 */
public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("139.224.237.247");
        factory.setUsername("admin");
        factory.setPassword("11111");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发布消息
        String message = "Hello World!11";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + message);
        //关闭连接
        channel.close();
        connection.close();
    }
}

消费者

package helloworld;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 描述:     接收消息,并打印,持续运行
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("139.224.237.247");
        factory.setUsername("admin");
        factory.setPassword("1111");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //接收消息并消费
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        });
    }
}

5 Spring AMQP

Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.1 AMQP的入门案例(使用rabbittemplate进行消息发送和接受)

发送端send
1 导入依赖

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

2 配置文件

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 139.224.237.247 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: 11111
    virtual-host: /

3 引入使用

@SpringBootTest
public class SpringAmqpTest {

//注入模板
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() {
    String queueName = "simple.queue";
    String message = "hello, spring amqp!";
    //指定队列发送消息
    rabbitTemplate.convertAndSend(queueName, message);
}

}

接收端
1 导入依赖
2 配置文件

这两步和上面相同

3 编写类注册Bean开启监听

@Component
public class SpringRabbitListener {

     @RabbitListener(queues = "simple.queue")
     public void listenSimpleQueue(String msg) {
         System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
     }
     }

Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.2 RabbitMQ的workquene

workquene就是一个队列 多个消费者消费
每条信息只能被一个消费者消费
其目的是提高性能 避免消息队列中信息堆积
Rabbitmq学习,rabbitmq,rabbitmq,分布式
Rabbitmq学习,rabbitmq,rabbitmq,分布式

Rabbitmq学习,rabbitmq,rabbitmq,分布式

Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.3 发布订阅模型(exchange(广播fanout 路由direct 话题topic))

Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.3.1 fanout 广播

把消息发给交换机
交换机会把消息发送给每一个和它绑定的队列Rabbitmq学习,rabbitmq,rabbitmq,分布式

1 首先使用配置类 配置Bean的方式 声明交换机 队列1 队列2 并进行交换机和队列的绑定

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    // fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    // 绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    // fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    // 绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }

2 配置Bean 加 @rabbitlistener(指定队列名称) 的方式进行监听消费

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    }
}

3 向交换机发送消息(发送前先启动消费者端 进行监听)

    @Test
    public void testSendFanoutExchange() {
        // 交换机名称
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, every one!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

发现消息同时被两个消息队列消费
Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.3.2 direct 路由

生产者 发送消息时会指定 交换机 和 routingkey
交换机再根据 routingkey 和 队列绑定的bindingkey比较
相同则会把消息发给这个队列
Rabbitmq学习,rabbitmq,rabbitmq,分布式

1 使用@rabbitlistener的方式 声明交换机 队列 绑定 binding-key 消费方法

@Component
public class SpringRabbitListener {
	//生命交换机 队列 进行绑定 并且指定bindingkey 和消费方法
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
}

2 发送端编写代码 指定routingkey和消息

    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, red!";
        // 发送消息 (指定routingkey为red 这样两个队列都会收到消息)
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

指定routingkey red
Rabbitmq学习,rabbitmq,rabbitmq,分布式
指定routingkey yellow
Rabbitmq学习,rabbitmq,rabbitmq,分布式

Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.3.3 topic 话题

和direct 几乎一样 不过topic的key 以 . 分割并且可以使用通配符
Rabbitmq学习,rabbitmq,rabbitmq,分布式

@Component
public class SpringRabbitListener {
   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
}
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "今天天气不错,我的心情好极了!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
    }

Rabbitmq学习,rabbitmq,rabbitmq,分布式

5.3.4 消息转换器(默认我们传一个对象给rabbitmq spring会使用默认的jdk objectoutputstream进行序列化)

Rabbitmq学习,rabbitmq,rabbitmq,分布式
Rabbitmq学习,rabbitmq,rabbitmq,分布式
Rabbitmq学习,rabbitmq,rabbitmq,分布式

Rabbitmq学习,rabbitmq,rabbitmq,分布式

总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。文章来源地址https://www.toymoban.com/news/detail-646476.html

到了这里,关于Rabbitmq学习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ——解决分布式事务问题,RabbitMQ的重要作用之一!!!通过可靠生产和可靠消费来完美解决!

    分布式事务是指涉及多个独立的计算机系统(也称为节点或参与者)之间的事务处理。在分布式系统中,每个节点可能各自拥有自己的数据存储和事务管理机制。分布式事务的目标是保证在跨多个节点执行的一系列操作可以以一致和可靠的方式执行和提交,即使在面对故障或

    2024年04月23日
    浏览(38)
  • Python爬虫分布式架构 - Redis/RabbitMQ工作流程介绍

    在大规模数据采集和处理任务中,使用分布式架构可以提高效率和可扩展性。本文将介绍Python爬虫分布式架构中常用的消息队列工具Redis和RabbitMQ的工作流程,帮助你理解分布式爬虫的原理和应用。 为什么需要分布式架构? 在数据采集任务中,单机爬虫可能面临性能瓶颈和资

    2024年02月11日
    浏览(34)
  • 分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ

    在现代分布式系统中,消息队列是一种常见的异步通信模式,它可以帮助系统处理高并发、高可用性以及容错等问题。在这篇文章中,我们将深入探讨三种流行的分布式消息队列:Apache Kafka、RabbitMQ和ActiveMQ。我们将讨论它们的核心概念、算法原理、特点以及使用场景。 随着

    2024年02月02日
    浏览(51)
  • (黑马出品_07)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 聚合 可以让我们极其方便

    2024年03月12日
    浏览(40)
  • 分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于实现全文搜索功能,例如商城中对商品搜索、搜索、分类搜索、订单搜索、客户搜索等。它支持复杂的查询语句、中文分词、近似搜索等功能,可以快速地搜索并返回匹配的结果。 2、日志分析 Elastic search可以用于实现实时日志分析,例

    2024年02月04日
    浏览(37)
  • 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

    本文参考黑马 分布式Elastic search Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 同步调用 方案一:同步调用 基本步骤如下: hotel-demo对外提供接口,用来修改elasticsearch中的数据 酒店管理服务在完成数据库操

    2024年04月11日
    浏览(36)
  • 【103期】RabbitMQ 实现多系统间的分布式事务,保证数据一致性

    org.springframework.boot spring-boot-starter-amqp mysql mysql-connector-java runtime org.projectlombok lombok true org.springframework.boot spring-boot-starter-jdbc com.alibaba fastjson 1.2.17 3.2.1.2配置文件内容: server: port: 8080 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/test?useUnicode=tru

    2024年04月14日
    浏览(48)
  • (黑马出品_高级篇_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

    [此文档是在心向阳光的天域的博客加了一些有助于自己的知识体系,也欢迎大家关注这个大佬的博客](https://blog.csdn.net/sinat_38316216/category_12263516.html) [是这个视频](https://www.bilibili.com/video/BV1LQ4y127n4/?p=5spm_id_from=pageDrivervd_source=9beb0a2f0cec6f01c2433a881b54152c) 消息队列在使用过程中,面

    2024年03月19日
    浏览(38)
  • SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈

    我们发现在微服务中有一个令人头疼的问题——部署,用Docker去解决这个部署难题 1、项目部署的问题 2、Docker 扔到一台机器上,它们的依赖难道没有干扰吗?不会,docker将打包好的程序放到一个隔离容器去运行,使用沙箱机制,避免互相干扰,之间不可见,这样就解决了混

    2023年04月24日
    浏览(35)
  • RabbitMQ:高效传递消息的魔法棒,一篇带你助力构建可靠的分布式系统(上篇)

    MQ是消息队列( Message Queue )的缩写,是一种在应用程序之间传递消息的技术。通常用于 分布式系统 或 异步通信 中,其中 发送者 将消息放入队列,而 接收者 从队列中获取消息。 这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用

    2024年02月15日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包