一文详解RocketMQ-Spring的源码解析与实战

这篇具有很好参考价值的文章主要介绍了一文详解RocketMQ-Spring的源码解析与实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

摘要:这篇文章主要介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。

本文分享自华为云社区《RocketMQ-Spring : 实战与源码解析一网打尽》,作者:勇哥java实战分享。

RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。

这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。

一 SDK 简介

项目地址:https://github.com/apache/rocketmq-spring

rocketmq-spring 的本质是一个 Spring Boot starter 。

Spring Boot 基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。

Spring Boot starter 构造的启动器使用起来非常方便,开发者只需要在 pom.xml 引入 starter 的依赖定义,在配置文件中编写约定的配置即可。

下面我们看下 rocketmq-spring-boot-starter 的配置:

1、引入依赖

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
 <version>2.2.3</version>
</dependency>

2、约定配置

接下来,我们分别按照生产者和消费者的顺序,详细的讲解消息收发的操作过程。

二 生产者

首先我们添加依赖后,进行如下三个步骤:

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic

生产者配置非常简单,主要配置名字服务地址和生产者组。

2、需要发送消息的类中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;
​
@Value("${rocketmq.topic}")
private String smsTopic;

3、发送消息,消息体可以是自定义对象,也可以是 Message 对象

rocketMQTemplate 类包含多钟发送消息的方法:

  1. 同步发送 syncSend
  2. 异步发送 asyncSend
  3. 顺序发送 syncSendOrderly
  4. oneway发送 sendOneWay

下面的代码展示如何同步发送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
 rocketMQTemplate.syncSend(
            destination, 
 MessageBuilder.withPayload(messageContent).
 setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
 build()
 );
if (sendResult != null) {
 if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
 // send message success ,do something 
 }
}

syncSend 方法的第一个参数是发送的目标,格式是:topic + ":" + tags ,

第二个参数是:spring-message 规范的 message 对象 ,而 MessageBuilder 是一个工具类,方法链式调用创建消息对象。

三 消费者

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic

2、实现消息监听器

@Component
@RocketMQMessageListener(
 consumerGroup = "${rocketmq.consumer1.group}", //消费组
    topic = "${rocketmq.consumer1.topic}" //主题
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
 public void onMessage(String message) {
 System.out.println("普通短信:" + message);
 }
}

消费者实现类也可以实现 RocketMQListener<MessageExt>, 在 onMessage 方法里通过 RocketMQ 原生消息对象 MessageExt 获取更详细的消息数据 。

public void onMessage(MessageExt message) {
 try {
        String body = new String(message.getBody(), "UTF-8");
        logger.info("普通短信:" + message);
 } catch (Exception e) {
 logger.error("common onMessage error:", e);
 }
}

四 源码概览

最新源码中,我们可以看到源码中包含四个模块:

1、rocketmq-spring-boot-parent

该模块是父模块,定义项目所有依赖的 jar 包。

2、rocketmq-spring-boot

核心模块,实现了 starter 的核心逻辑。

3、rocketmq-spring-boot-starter

SDK 模块,简单封装,外部项目引用。

4、rocketmq-spring-boot-samples

示例代码模块。这个模块非常重要,当用户使用 SDK 时,可以参考示例快速开发。

五 starter 实现

我们重点分析下 rocketmq-spring-boot 模块的核心源码:

spring-boot-starter 实现需要包含如下三个部分:

1、定义 Spring 自身的依赖包和 RocketMQ 的依赖包 ;

2、定义spring.factories 文件

在 resources 包下创建 META-INF 目录后,新建 spring.factories 文件,并在文件中定义自动加载类,文件内容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 会根据文件中配置的自动化配置类来自动初始化相关的 Bean、Component 或 Service。

3、实现自动加载类

在 RocketMQAutoConfiguration 类的具体实现中,我们重点分析下生产者和消费者是如何分别启动的。

▍生产者发送模板类:RocketMQTemplate

RocketMQAutoConfiguration 类定义了两个默认的 Bean :

首先SpringBoot项目中配置文件中的配置值会根据属性条件绑定到 RocketMQProperties 对象 中,然后使用 RocketMQ 的原生 API 分别创建生产者 Bean 和拉取消费者 Bean , 分别将两个 bean 设置到 RocketMQTemplate 对象中。

两个重点需要强调:

  • 发送消息时,将 spring-message 规范下的消息对象封装成 RocketMQ 消息对象
  • 默认拉取消费者 litePullConsumer 。拉取消费者一般用于大数据批量处理场景 。

RocketMQTemplate 类封装了拉取消费者的receive方法,以方便开发者使用。

▍自定义消费者类

下图是并发消费者的例子:

那么 rocketmq-spring 是如何自动启动消费者呢 ?

spring 容器首先注册了消息监听器后置处理器,然后调用 ListenerContainerConfiguration 类的 registerContainer 方法 。

对比并发消费者的例子,我们可以看到: DefaultRocketMQListenerContainer 是对 DefaultMQPushConsumer 消费逻辑的封装。

封装消费消息的逻辑,同时满足 RocketMQListener 泛化接口支持不同参数,比如 String 、MessageExt 、自定义对象 。

首先DefaultRocketMQListenerContainer初始化之后, 获取 onMessage 方法的参数类型 。

然后消费者调用 consumeMessage 处理消息时,封装了一个 handleMessage 方法 ,将原生 RocketMQ 消息对象 MessageExt 转换成 onMessage 方法定义的参数对象,然后调用 rocketMQListener 的 onMessage 方法。

上图右侧标红的代码也就是该方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

六 写到最后

开源项目 rocketmq-spring 有很多值得学习的地方 ,我们可以从如下四个层面逐层进阶:

1、学会如何使用 :参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接收消息,快速编码;

2、模块设计:学习项目的模块分层 (父模块、SDK 模块、核心实现模块、示例代码模块);

3、starter 设计思路 :定义自动配置文件 spring.factories 、设计配置属性类 、在 RocketMQ client 的基础上实现优雅的封装、深入理解 RocketMQ 源码等;

4、举一反三:当我们理解了 rocketmq-spring 的源码,我们可以尝试模仿该项目写一个简单的 spring boot starter。

 

点击关注,第一时间了解华为云新鲜技术~文章来源地址https://www.toymoban.com/news/detail-423688.html

到了这里,关于一文详解RocketMQ-Spring的源码解析与实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMq消费原理及源码解析

      先简单说下常见的rocketMq的部署方式,上图中broker为真正计算和存储消息的地方,而nameServer负责维护broker地   图中右侧consume message部分即是本文重点描述的部分,主要分为ConsumerGroup和Consumer,consumerGroup可以参考 https://rocketmq.apache.org/docs/domainModel/07consumergroup/ 。简单的说,

    2024年02月15日
    浏览(34)
  • RocketMQ 消费者Rebalance 解析——图解、源码级解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年4月15日 🍊 新专栏筹备中,还是熟悉的源码,还是熟悉的感觉! 🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力

    2023年04月18日
    浏览(52)
  • [RocketMQ] Broker启动流程源码解析 (二)

    1.Brocker介绍 Broker主要负责消息的存储、投递和查询以及服务高可用保证, Broker包含了以下几个重要子模块。 Remoting Module: 整个Broker的实体, 负责处理来自Client端的请求 Client Manager: 负责管理客户端 Producer和Consumer, 维护Consumer的Topic订阅信息 Store Service: 提供方便简单的API接口处理

    2024年02月09日
    浏览(52)
  • RocketMQ 消费者Rebalance算法 解析——图解、源码级解析

    🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2022年10月15日 🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是

    2024年01月19日
    浏览(42)
  • RocketMQ快速实战以及集群架构详解

    MQ:MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。 这个词可以分两个部分来看,一是Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上。二是Queue:队列。队列原意是指一种具有FIFO(先进

    2024年02月03日
    浏览(47)
  • [RocketMQ] Broker CommitLogDispatcher 异步构建ConsumeQueue和IndexFile源码解析 (十四)

    CommitLogDispatcherBuildConsumeQueue: 异步构建ConsumerQueue。 CommitLogDispatcherBuildIndex: 异步构建IndexFile。 1.CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue CommitLogDispatcherBuildConsumeQueue用于接收分发请求并构建ConsumeQueue。 对于非事务消息或者是事务commit消息, 调用DefaultMessageStore#putMessagePositionI

    2024年02月17日
    浏览(42)
  • RocketMQ 单机源码部署 自定义配置文件和端口以及acl权限配置解析

    1、我们首先配置完 namesrv和broker和acl认证的配置文件,然后直接使用-c指定配置文件来启动程序,就会非常明了,用户名密码要大于6,第一个用户我测试着不知道为什么始终有最高权限,大家尽量不要吧第一个用户给别人用。 ​ 1、创建 conf/namesrv.conf 文件 ​ 2、修改 conf/br

    2024年02月13日
    浏览(43)
  • 【RocketMQ专题】快速实战及集群架构原理详解

    基本介绍 MQ:即MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。 这个词可以分两个部分来看: Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上 Queue:队列。队列原意是指一种具有FIFO(先

    2024年02月07日
    浏览(29)
  • RocketMQ 5.1.0 源码详解 | Producer 启动流程

    初始化一个 DefaultMQProducer 对象的代码如下 在初始化 DefaultMQProducer 时会初始化一个 DefaultMQProducerImpl 实例并赋值给 producer 的成员变量 同时,在初始化 DefaultMQProducerImpl 实例时也会将 producer 对象作为成员变量保存在 DefaultMQProducerImpl 实例中 构造 defaultMQProducerImpl 的代码如下 因此

    2024年02月15日
    浏览(34)
  • 一文读懂强化学习:RL全面解析与Pytorch实战

    在本篇文章中,我们全面而深入地探讨了强化学习(Reinforcement Learning)的基础概念、主流算法和实战步骤。从马尔可夫决策过程(MDP)到高级算法如PPO,文章旨在为读者提供一套全面的理论框架和实用工具。同时,我们还专门探讨了强化学习在多个领域,如游戏、金融、医疗

    2024年02月06日
    浏览(129)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包