JavaWeb_SpringCloud微服务_Day4-MQ, RabbitMQ, SpringAMQP

这篇具有很好参考价值的文章主要介绍了JavaWeb_SpringCloud微服务_Day4-MQ, RabbitMQ, SpringAMQP。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MQ

同步通讯

  • 优点:
    • 时效性强, 可以立即得到结果
  • 缺点:
    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

异步通讯

  • 优点:
    • 耦合度低
    • 吞吐量提升
    • 故障隔离
    • 流量削峰
  • 缺点:
    • 依赖于Broker的可靠性, 安全性, 吞吐能力
    • 架构复杂, 业务没有明显的流程线, 不好追踪管理

mq常见技术

RabbitMq ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang java java Scala&java
协议支持 AMQP, XMPP, SMTP, STOMP OpenWire, STOMP, REST, XMPP, AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

RabbitMQ

下载安装

  • docker下载
docker pull rabbitmq:3-management
  • docker运行
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

介绍

  • channel: 操作MQ的工具
  • exchange: 路由消息到队列中
  • queue: 缓存消息
  • virtual host: 虚拟主机, 是对queue, exchange等资源的逻辑分组

SimpleQueue模型

  • publisher
public void testSendMessage() throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.174.133");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123321");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.发送消息
    String message = "hello, rabbitmq!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("发送消息成功:【" + message + "】");

    // 5.关闭通道和连接
    channel.close();
    connection.close();
}
  • consumer
public static void main(String[] args) throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.174.133");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123321");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

    // 2.创建通道Channel
    Channel channel = connection.createChannel();

    // 3.创建队列
    String queueName = "simple.queue";
    channel.queueDeclare(queueName, false, false, false, null);

    // 4.订阅消息
    channel.basicConsume(queueName, true, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                    AMQP.BasicProperties properties, byte[] body) throws IOException {
            // 5.处理消息
            String message = new String(body);
            System.out.println("接收到消息:【" + message + "】");
        }
    });
    System.out.println("等待接收消息。。。。");
}

SpringAMQP

介绍

  • AMQP: 应用间消息通信的一种协议, 与语言和平台无关.
  • 依赖:
    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

SimpleQueue模型

  • 配置
    spring:
    rabbitmq:
        host: 192.168.174.133 # 主机名S
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
    
  • publisher
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue()
        {
            String queueName = "simple.queue";
            String message = "hello, spring amqp!";
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    
  • comsumer
    @Component
    public class SpringRabbitListener {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg)
        {
            System.out.println("spring 消费者接收到消息: ["+msg+"]");
        }
    }
    

WorkQueue模型

  • publisher
    @Test
    public void testWorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message);
            Thread.sleep(20);
        }
    }
    
  • consumer
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(200);
    }
    
  • 配置
    spring:
    rabbitmq:
        listener:
        simple:
            prefetch: 1 # 每次只能获取下一消息, 处理完成才能获取下一个消息
    
  • work模型总结:
    • 多个消费者绑定到一个队列, 同一条消息只会被一个消费者处理
    • consumer会预取消息, 会导致性能差的consumer堆积消息, 可以通过设置prefetch来控制消费者预取的消息数量.

发布订阅模型

介绍

  • 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者, 实现方式就是加入了exchange(交换机).
  • 常见exchange类型:
    • Fanout: 广播
    • Direct: 路由
    • Topic: 话题

FanoutExchange

  • config
    @Configuration
    public class FanoutConfig {
    
        @Bean
        public FanoutExchange fanoutExchange()
        {
            return new FanoutExchange("itcast.fanout");
        }
    
        @Bean
        public Queue fanoutQueue1()
        {
            return new Queue("fanout.queue1");
        }
    
        @Bean
        public Queue fanoutQueue2()
        {
            return new Queue("fanout.queue2");
        }
    
        @Bean
        public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange)
        {
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
        @Bean
        public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange)
        {
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
    
  • consumer
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息: ["+msg+"]"+ LocalTime.now());
        Thread.sleep(200);
    }
    
  • publisher
    @Test
    public void testSendFanoutExchange()
    {
        // 交换机名称
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, every one!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
    
  • 总结:
    • 交换机的作用
      • 接收publisher发送的消息
      • 将消息按照规则路由到与之绑定的队列
      • 不能缓存消息, 路由失败, 消息丢失
      • FanoutExchange的会将消息路由到每个绑定的队列
    • 声明队列的Bean: Queue
    • 声明交换机的Bean: FanoutExchange
    • 声明绑定关系的Bean: Binding

Direct Exchange

  • 介绍
    • 每一个Queue都与Exchange设置一个BindingKey
    • 发布者发送消息时, 指定消息的RoutingKey
    • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
  • consumer
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息: ["+msg+"]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息: ["+msg+"]");
    }
    
  • publisher
    @Test
    public void testSendDirectExchange()
    {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, every blue!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
    

Topic Exchange

  • consumer
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息: ["+msg+"]");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到消息: ["+msg+"]");
    }
    
  • publisher
    @Test
    public void testSendTopicExchange()
    {
        // 交换机名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "hello, china.news";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
    
  • 总结
    • TopicExchange与DirectExchange类似, 区别在于routingKey必须是多个单词的列表, 并且以.分割.

消息转换器

  • 依赖
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
  • MessageConverter
    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }
    
  • publisher
    @Test
    public void testSendObjectQueue()
    {
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "柳岩");
        msg.put("age", 21);
        rabbitTemplate.convertAndSend("object.queue", msg);
    }
    
  • consumer
    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String, Object> msg)
    {
        System.out.println("接收到object.queue的消息: "+msg);
    }
    

来源

黑马程序员. SpringCloud微服务文章来源地址https://www.toymoban.com/news/detail-602873.html

到了这里,关于JavaWeb_SpringCloud微服务_Day4-MQ, RabbitMQ, SpringAMQP的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C++linux高并发服务器项目实践 day4

    int access(const char * pathname ,int mode); int chmod(const char * filename,int mode); int chown(const char* path,uid_t owner,gid_t group); int truncate(const char* path,off_t length); #include unistd.h int access(const char *pathname, int mode); 作用:判断某个文件是否有某个权限,或者判断文件是否存在 参数: pathname:判断文件路

    2023年04月16日
    浏览(29)
  • DAY4,Qt(事件处理机制的使用,Qt中实现服务器的原理)

    ---chatser.h---头文件 ---chatser.cpp---函数实现文件 ---main.cpp---测试文件 结果展示---     

    2024年02月15日
    浏览(44)
  • QT--day4(定时器事件、鼠标事件、键盘事件、绘制事件、实现画板、QT实现TCP服务器)

     QT实现tcpf服务器代码:(源文件) 头文件:

    2024年02月15日
    浏览(39)
  • day08-SpringCloud Gateway-服务网关

    没有使用网关服务时: 使用网关服务后: 官网:Spring Cloud Gateway Gateway是Spring生态系统之上构建的API网关服务,基于Spring、SpringBoot和Project Reactor等技术 Gateway旨在提供一种简单有效的方式来对API进行路由,以及提供一切强大的过滤器功能,例如:熔断、限流、重试等 鉴权 流

    2024年02月07日
    浏览(32)
  • SpringCloud_微服务基础day1(走进微服务,认识springcloud,微服务(图书管理)项目搭建(一))

    官方网站:柏码 - 让每一行代码都闪耀智慧的光芒! (itbaima.net) 注意: 此阶段学习推荐的电脑配置,至少配备4核心CPU(主频3.0Ghz以上)+16GB内存,否则卡到你怀疑人生。 前面我们讲解了SpringBoot框架,通过使用SpringBoot框架,我们的项目开发速度可以说是得到了质的提升。同时

    2024年02月07日
    浏览(36)
  • SpringCloud入门Day01-服务注册与发现、服务通信、负载均衡与算法

    伴随互联网的发展,使用互联网的人群越来越多,软件应用的体量越来越大和复杂。 而传统单体应用 可能不足以支撑大数据量以及发哦并发场景 应用的框架也随之进行演变 从最开始的单体应用架构到分布式(SOA)架构到 今天比较火的微服务框架,以及微服务网格架构。 ​

    2024年02月13日
    浏览(31)
  • 计算机网络 day4 IP地址的两部分-A、B、C、D、E五类IP地址-私有地址-子网掩码-DNA服务器-域名解析服务

    目录 三创网络拓扑结构图:  普通家庭网络拓扑结构图:(也可以直接使用 子母路由器 (母:无线路由器)(子:信号放大器、中继器)) 网络层:(network layer) 搜索IP地址所在地:iP地址查询--手机号码查询归属地 | 邮政编码查询 | iP地址归属地查询 | 身份证号码验证在

    2024年02月01日
    浏览(39)
  • Qt,day4

    闹钟

    2024年02月13日
    浏览(26)
  • c++ day4

       

    2024年02月11日
    浏览(23)
  • QT DAY4

    做一个闹钟,并播报填写内容  widget.h 文件  widget.cpp文件 实现效果         在没有点击启动按钮时,时间和信息都可以修改,点击关闭按钮无效。          信息和时间填写完后,点击启动按钮后,启动按钮被锁定,时间框和信息框无法再修改内容,只有关闭按钮可以点

    2024年02月13日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包