SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

这篇具有很好参考价值的文章主要介绍了SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

消息

消息的发送方:生产者
消息的接收方:消费者
同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送
异步消息:不需要接收方回应就可以进行下一步的发送

消息队列

什么是消息队列?
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
当此时有很多个用户同时访问服务器,需要服务器进行操作,但此时由于操作太多服务器运转不过来,这时将非常多的操作转换成消息的格式储存器来,所有的子服务器从中获取到消息进行操作分担主服务器的压力,而这个中间存储消息的容器我们一般称为消息队列

  • 企业级应用中广泛使用的三种异步消息传递技术(实现高并发的有效处理):
  1. JMS
  2. AMQP
  3. MQTT

JMS

(java Message Service):一个规范,等同于JDBC规范,提供了与消息服务相关的API接口

  • JMS消息模型
  1. peer-2-peer: 点对点模型,消息发送到一个队列中,队列保存信息,队列的消息只能被一个消费者消费,或超时
  2. publish-subscribe:发布订阅模式,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方存在
  • JMS消息种类

TextMessage,MapMessage, BytesMessage,StreamMessage,ObjectMessage,Message(Message只有消息头和属性)

  • 实现JMS规范的MQ

ActiveMQ,Redis,HornetMQ,RabbitMQ,RocketMQ(RocketMQ并未完全遵守JMS规范)

AMQP

AMQP(advanced message queuing protocol):一种协议(高级队列协议,消息代理规范),规范了网络交换的数据格式,兼容JMS

JMS存在一定的问题,JMS规范对对应的语言进行了规范,但若是我使用不是规范语言进行操作的时候就会出现问题,这时我们推出AMQP,这更像是一种协议,规范消息的格式,就是无论用什么语言什么环境都无所谓,它只人消息的格式

优点:跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现

  • AMQP的消息模型

direct exchange,fanout exchange,topic exchange,headers exchange,system exchange

AMQP的消息种类:byte[]

  • 实现AMQP的MQ:

RabbitMQ,StormMQ,RocketMQ

MQTT

(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一

Kafka

kafka,一种高吞吐量的分布式订阅消息系统,提供实时消息功能

Spring整合消息队列

模拟消息队列的工作流程

模拟消息队列的处理过程

import java.util.ArrayList;

@Service
public class Messageservice implements MessageService {
    private ArrayList<String> megList=new ArrayList<String>();

    @Override
    public void sendMessage(String id) {
        System.out.println("将待发送的消息订单纳入到处理队列.id:"+id);
        megList.add(id);
    }

    @Override
    public String doMessage() {
        String remove = megList.remove(0);
        System.out.println("已完成短信业务的发送,id:"+remove);
        return remove;
    }
}

模拟将消息导入到消息队列

@Service
public class orderserviceimpl implements orderService {
    @Autowired
    private MessageService messageService;
    @Override
    public void order(String id) {
        //发送消息队列
        messageService.sendMessage(id);
    }
}

Spring整合ActiveMQ

首先下载activeMQ
下载地址:https://activemq.apache.org/components/classic/download/
下载之后进行解压缩

  • 启动服务

打开x64的bin目录下执行activemq.bat命令启动服务
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
启动成功 ,其中给出其web控制台的访问地址:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
进入其管理界面:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
默认用户名&密码:admin

  • SpringBoot进行整合activemq

添加依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

添加配置:
配置Spirng连接的地址,以及后边消息存入的位置

server:
  port: 80
spring:
  activemq:
  # 说明spring连接的active的端口地址
    broker-url: tcp://localhost:61616

进行消息队列的操作:


@Service
public class Messageservice implements MessageService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Override
    public void sendMessage(String id) {
        System.out.println("将待发送的消息订单纳入到处理队列.id:"+id);
        jmsMessagingTemplate.convertAndSend(id);
    }
    @Override
    public String doMessage() {
        //将消息队列中的类型转移出来,并在参数中规定转移出来的消息类型
        String s = jmsMessagingTemplate.receiveAndConvert(String.class);
        System.out.println("已完成短信业务的发送,id:"+s);
        return s;
    }
}

在发送和获取期间也可以规定名称

 jmsMessagingTemplate.convertAndSend("order.shishi.id",id);
 String s = jmsMessagingTemplate.receiveAndConvert("order.shishi.id",String.class);

上述之中也有一个小问题,就是在并不是每次消费都需要进行访问,而是当消息队列中有消息就开始消费我们可以创建一个Listener


@Component
public class MessageListener {
    @JmsListener(destination = "order.shishi.id")
    public void receive(String id){
        System.out.println("已完成的短信业务:id:"+id);
    }
}

这样就自动监听指定位置下的消息,一有消息就自动开始消费,从服务开始就一直存在
还有一个消息转发的操作:

@Component
public class MessageListener {
    @JmsListener(destination = "order.shishi.id")
    @SendTo("order.bushi.id")
    public void receive(String id){
        System.out.println("已完成的短信业务:id:"+id);
    }
}

注解 @SendTo的作用是将监听到的消息消费之后将返回值返回到对应的消息中去
上述使用的都是点对点的模型,如果要使用发布订阅的模型,可以在配置文件中进行配置:

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    template:
      default-destination: shishi
    pub-sub-domain: true

Spring整合RabbitMQ

rabbitMQ基于Erlang语言编写,需要安装Erlang
首先需要下载Erlang:
下载地址:https://www.erlang.org/downloads
下载完成之需要重启一下操作系统(重启电脑)
配置环境变量
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
添加path:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
安装完成后下载RabbitMQ
下载地址:https://rabbitmq.com/install-windows.html

  • 启动rabbitMQ

SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
注意:要启动rabbitMQ服务需要命令行进入到管理员身份运行
rabbitMQ的控制台界面(需要手动配置插件):
在sbin目录下找到:rabbitmq:plugins.bat命令
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
执行命令展示其插件列表,通过命令开启插件
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
这样就可以访问它的控制台界面,端口号是15672,地址:http://localhost:15672
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
输入默认密码:guest

Spring进行整合rabbitMQ首先添加依赖:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在配置文件中进行rabbits的配置:

spring:
 activemq:
   broker-url: tcp://localhost:61616
 jms:
   template:
     default-destination: shishi
   pub-sub-domain: true
 rabbitmq:
   host: localhost
   port: 5672

直连交换机模式

使用直连模式的交换机进行消息队列的开发:
首先需要在配置类中进行直连交换机与消息队列的绑定


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfigQM {
   @Bean
   public Queue directQueue(){
       //第一个是消息队列的名称,第一个true表示消息持久化,第二个表示当前的消息队列是否是连接专用(连接一关消息队列就关闭),第三个参数是是否删除(当消费者生产者都不使用就删除)
       return new Queue("direct_queue",true,true,true);
   }
   //我们需要一个交换机去绑定消息队列,此处设置一个交换机
   @Bean
   public DirectExchange directExchange(){
       return new DirectExchange("directexchange");
   }
   @Bean
   public Binding binding(){
       //将消息队列与交换机进行绑定
       return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
   }
}

绑定之后通过直连交换机进行消息的存储

@Service
public class amqpservice implements MessageService{

   @Autowired
   private AmqpTemplate amqpTemplate;
   @Override
   public void sendMessage(String id) {
       //使用直连交换机
       amqpTemplate.convertAndSend("directExchange","direct",id);
   }
   @Override
   public String doMessage() {
       return null;
   }
}

然后从消息队列中读取消息写在rabbitMQ监听器下面:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
   @RabbitListener(queues = "direct_queue")
   public void reveive(String id){
       System.out.println("已完成短信发送业务 id:"+id);
   }
}

主题交换机模式

主题交换机可以模糊设置交换机绑定的名称来达到分发的目的
例如:

    @Bean
   public Binding binding(){
       //将消息队列与交换机进行绑定
       return BindingBuilder.bind(directQueue()).to(directExchange()).with("topic_*_id");
   }

在消息进入消息队列的时候:

        amqpTemplate.convertAndSend("directExchange","topic_ni_id",id);
        amqpTemplate.convertAndSend("directExchange","topic_bu_id",id);

这个两种消息都可以进入到消息队列中去,而且通过这种方式也可以使消息进入到不同的消息队列中去

  • 绑定案件的规则:

*(星号):用来表示一个单词,且该单词必须出现
#(井号):用来表示任意数量
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq

Spring整合RocketMQ

下载地址:https://rocketmq.apache.org/
默认服务端口:9876
配置环境变量:ROCKETMQ_HOME,PATH,NAMESER_ADDR(建议):127.0.0.1:9876

  • 命名服务器与broker

SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
当后期的业务服务器增多时,就需要不停的进行服务器之间的连接,会变得非常繁琐,但是如果我们有一台服务器将所有的业务服务器注册进行,消费者与生产者只需要连接命名服务器即可

  • 首先启动命名服务器
    SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
    双击文件启动命名服务器
    SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
    然后双击mqbroker文件启动服务器:
    SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
    如何测试服务器是否正常启动:
    在bin目录下启动cmd:
    SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq

首先使用第一个命名生成对应的消息:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
再使用第二个命令对生成的消息进行消费:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
进行整合:
首先导入依赖坐标:

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>

在配置文件中配置其命名服务器:
rocketmq是与spring在同一层次下

rocketmq:
   name-server: localhost:9876
   producer:
      group: group_rocketmq

进行消息队列的相关操作:

@Service
public class MessageRocketmqimpl implements MessageService {
   @Autowired
   private RocketMQTemplate rocketMQTemplate;
   @Override
   public void sendMessage(String id) {
       rocketMQTemplate.convertAndSend("sdasda",id);
   }
   @Override
   public String doMessage() {
       return null;
   }
}

消费者监听器:

 
@Component
@RocketMQMessageListener(topic = "sdasda",consumerGroup = "group_rocketmq")
public class MessageRocketmqListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.println("id:"+s);
        
    }
}

使用异步方式进行发送:


@Service
public class MessageRocketmqimpl implements MessageService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public void sendMessage(String id) {
        SendCallback callback=new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送失败");
            }
        }
//        rocketMQTemplate.convertAndSend("sdasda",id);
        rocketMQTemplate.asyncSend("sdasda",id,callback);
    }
  • 同步发送与异步发送的区别:

同步发送和异步发送是两种不同的消息发送方式。在同步发送中,发送线程会等待消息发送完成并收到发送结果后继续执行,而在异步发送中,发送线程不会阻塞,可以立即执行后续逻辑。选择哪种方式取决于业务需求和对消息发送结果的要求。

Spring整合kafka

下载地址:https://kafka.apache.org/downloads
下载之后进行解压缩文件
解压之后首先需要运行:zookeeper-server-start.bat文件
这个文件相当于一个注册中心,需要先进行注册才能够启动kafka服务器,作用相当于RocketMQ中的命名服务器,需要在对应目录下cmd命令携带参数进行启动:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
启动注册服务器后,然后开启kafka服务器:
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka),javaWeb进阶,java-rabbitmq,java-activemq,java-rocketmq
spring进行整合kafka:
导入依赖坐标:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

在配置文件中进行配置,配置注册服务器的地址:

  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order
@Service
public class kafka implements MessageService {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    @Override
    public void sendMessage(String id) {
        kafkaTemplate.send("adad",id);
    }

    @Override
    public String doMessage() {
        return null;
    }
}

创建消费者监听器:文章来源地址https://www.toymoban.com/news/detail-859848.html


@Component
public class kafkaListener {
    @KafkaListener(topics = "adad")
    public void onMessage(ConsumerRecord<String,String> consumerRecord){
        System.out.println("id:"+consumerRecord.value());
    }
}

到了这里,关于SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 想学高并发技能,这些常用的消息中间件( RabbitMQ、Kafka、ActiveMQ、Redis、NATS )你要必知

    对于全栈或者后端工程师来说,解决高并发是一个必备的技能,一说到高并发时,我们第一反应是分布式系统,那么,消息中间件( RabbitMQ 、 Kafka 、 ActiveMQ 、 Redis 、 NATS 等)的出现是为了解决分布式系统中的消息传递和异步通信的问题,以及提供可靠的消息传递机制。它们

    2024年04月15日
    浏览(47)
  • 消息中间件ActiveMQ介绍

    一、消息中间件的介绍   介绍 ​ 消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于 数据通信 来进行分布式系统的集成。   特点(作用) 应用解耦 异步通信 流量削峰 (海量)日志处理 消息通讯 …... 应用场景 根据消息队列的特点,可以衍生出

    2024年02月15日
    浏览(36)
  • ActiveMQ消息中间件简介

    一、ActiveMQ简介   ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provide实现。尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然扮演这特殊的地位。   二、ActiveMQ应用场景 消息队列在大型电子商务类网

    2024年02月07日
    浏览(80)
  • ActiveMQ消息中间件应用场景

    一、ActiveMQ简介   ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provide实现。尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中仍然扮演这特殊的地位。   二、ActiveMQ应用场景 消息队列在大型电子商务类网

    2024年02月15日
    浏览(36)
  • 消息中间件 —— ActiveMQ 使用及原理详解

    目录 一. 前言 二. JMS 规范 2.1. 基本概念 2.2. JMS 体系结构 三. ActiveMQ 使用 3.1. ActiveMQ Classic 和 ActiveMQ Artemis 3.2. Queue 模式(P2P) 3.3. Topic 模式(Pub/Sub) 3.4. 持久订阅 3.5. 消息传递的可靠性 3.5.1. 事务型会话与非事务型会话 3.5.2. 持久化与非持久化消息的存储策略 3.6. 消息发

    2024年02月03日
    浏览(34)
  • RabbitMQ 消息中间件 消息队列

    RabbitMQ 1、RabbitMQ简介 RabbiMQ是⽤Erang开发的,集群⾮常⽅便,因为Erlang天⽣就是⼀⻔分布式语⾔,但其本身并 不⽀持负载均衡。支持高并发,支持可扩展。支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 2、RabbitMQ 特点 可

    2024年02月03日
    浏览(49)
  • RabbitMQ消息中间件

    RabbitMQ消息中间件 RabbitMQ简介 windows下安装RabbitMQ RabbitMQ基本概念 RabbitMQ简单模式 RabbitMQ工作队列模式 RabbitMQ发布订阅模式 RabbitMQ路由模式 RabbitMQ主题模式 RabbitMQ RPC模式 RabbitMQ发布确认模式

    2024年02月10日
    浏览(44)
  • 消息中间件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年01月17日
    浏览(63)
  • 消息中间件之RabbitMQ

    1.基于AMQP协议Erlang语言开发的一款消息中间件,客户端语言支持比较多, 比如Python,Java,Ruby,PHP,JS,Swift.运维简单,灵活路由,但是性能不高, 可以满足一般场景下的业务需要,三高场景下吞吐量不高,消息持久化没有采取 零拷贝技术,消息堆积时,性能会下降 2.消息吞吐量在

    2024年01月19日
    浏览(78)
  • 消息中间件RabbitMQ详解

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件机制的系统中

    2024年02月16日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包