rocketMQ消息队列简介及其实例

这篇具有很好参考价值的文章主要介绍了rocketMQ消息队列简介及其实例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 RocketMQ优点:

单机吞吐量:十万级

可用性:非常高,分布式架构

消息可靠性:经过参数优化配置,消息可以做到0丢失

功能支持:MQ功能较为完善,还是分布式的,扩展性好

支持10亿级别的消息堆积,不会因为堆积导致性能下降

缺点:兼容性差点

一、RocketMQ 核心的四大组件:

Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。

Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。

Broker:主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。

做集群的时候只需要保证配置文件中nameserver的地址指向相同;且brokerid=0为master,>0为slave即可;

NameServer:类似Zookeeper,是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。

二、rocketmq基本工作流程:

1、先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker在启动的时候会注册自己配置的Topic信息到NameServer集群的每一台机器中。即每一个NameServer均有该broker的Topic路由配置信息,并向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo;NameServer 也会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。

2、这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线会根据配置文件中的NameServer 地址自动连接一个NameServer ;每 30s 会从连接的 NameServer 获取 Topic 和 Broker 的映射关系存在本地内存中,从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。

3、Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。

可以理解为如下:

name server:注册中心

broker:消息处理

procucer:生成消息

consumer:消费消息

每个组件都可以部署成集群模式进行水平扩展。
消息由topic区分消息类型(一级分类):如订单消息,物流消息等
tag为二级分类
message queue为消息类型下的消息队列。
用于并行发送和接受消息。

四、基础
分布式事务:
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。

事务消息:
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。

半事务消息:
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。

本地事务状态:
Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认令。

// 描述本地事务执行状态 public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}

RocketMQ中的消息回查设置:
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

transactionTimeout=20,指定TM在 20 秒内应将最终确认状态发送给TC,否则引发消息回查。默认为 60 秒
transactionCheckMax=5,指定最多回查 5 次,超过后将丢弃消息并记录错误日志。默认 15 次。
transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为 10 秒。默认为 60 秒。
五、Topic与Broker的关系:

  • Borker中有一个或多个Topic
  • Topic中有一个或多个MessageQueue

Topic可以自动创建和手动创建;

1、手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面(可视化控制台)创建Topic。

方法:DefaultMQProducer producer = rocketMQTemplate.getProducer();

producer.createTopic(String key, String newTopic, int queueNum, int topicSysFlag)

key:这个参数是系统已经存在的一个topic的名称,新建的topic会跟它在相同的broker上创建;key或者是broker的名称也行
newTopic:新建的topic的名称
queueNum:指定topic中queue的数量
topicSysFlag:topic的标记位设置,没有特殊要求就填0就可以了。可选值在TopicSysFlag中定义

根据源码可以分析出大致创建分为如下几步:

第1步,根据提供的key代表的topic去获取该topic所在的broker的路由,如果想在所有broker创建,一般使用DefaultTopic,因为这个topic是在所有broker上都存在的。
第2步,轮询所有的broker,在master上创建topic,中间有一个broker失败,则中止创建,返回失败。因为master和slave的配置数据也会自动同步,所以只需要在master上创建。
第3,4步,设置参数
第5步,调用MQClientAPIImpl接口创建,失败会重试4次。

2、自动创建就是在broker.conf中设置了autoCreateTopicEnable =true,都在设置false;

TBW102 是啥用的?

TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认(TBW102)topic。

就是一个接受自动创建topic的 Broker上的topic, 启动会把这个默认Topic(主题)的Broker登记到 NameServer,这样当 Producer 发送新 Topic 的消息时候会根据"TBW102 "这个topic得知哪个 Broker 可以自动创建主题,然后发往那个 Broker。

而 Broker 接受到这个消息的时候发现没找到对应的主题,但是它接受创建新主题,这样就会创建对应的 Topic 路由信息。

假设此时发送方还在连续快速的发送消息,那 NameServer 上其实还没有关于这个 Topic 的路由信息,所以有机会让别的允许自动创建的 Broker 也创建对应的 Topic 路由信息,这样集群里的 Broker 就能接受这个 Topic 的信息,达到负载均衡的目的,但也有个别 Broker 可能,没收到。

如果发送方这一次发了之后 30s 内一个都不发,之前的那个 Broker 随着心跳把这个路由信息更新到 NameServer 了,那么之后发送该 Topic 消息的 Producer 从 NameServer 只能得知该 Topic 消息只能发往之前的那台 Broker ,这就不均衡了,如果这个新主题消息很多,那台 Broker 负载就很高了。

所以不建议线上开启允许自动创建主题,即 autoCreateTopicEnable 参数。

Tags的使用

tag(标签): 标签可以被认为是对topic的进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。区分相同topic下不同种类的消息。生产到哪个topic的哪个tag下,消费者也是从topic的哪个tag进行消费,即实现消息的过滤。

建议一个应用一个 Topic,利用 tages 来标记不同业务,因为 tages 设置比较灵活,且一个应用一个 Topic 很清晰,能直观的辨别。

Keys的使用

如果有消息业务上的唯一标识,请填写到 keys 字段中,方便日后的定位查找。

queue(队列): queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue,默认自动创建是4个,手动创建是8个

rocketMQ消息队列简介及其实例

 

六、下面以windows服务器为例演示使用rocketmq如下:

1、下载rocketmq的安装包:https://rocketmq.apache.org/zh/download

2、下载rocketmq仪表盘(也就是可视化操作界面,是一个完整的java项目可以用idea运行)

3、修改conf/broker.conf配置在末尾添加如下配置(IP使用自己的),并保存。

brokerIP1=192.168.31.199

namesrvAddr=192.168.31.199:9876

4、配置ROCKET_HOME环境变量,路径使用下载路径;path中配置%ROCKET_HOME%\bin即可

5、启动Namesrv

在rocketmq文件的bin目录下,进入cmd使用如下命令:start mqnamesrv.cmd

关闭mqshutdown namesrv

6、启动Broker:start mqbroker.cmd -n 127.0.0.1:9876  autoCreateTopicEnable=true  (也就是说,producer使用RocketMQTemplate发送的消息,就算Booker上的topic之前不存在,rocket也会帮我们创建好)

关闭mqshutdown broker

7、将仪表盘项目导入idea,然后打开application.properties文件修改rocket.config.namesrvAddr=localhost:9876;

8、启动仪表盘项目:浏览器输入http://localhost:8080/#/即可看到可视化界面;

9、java代码创建生产者和消费者:

创建普通springboot项目,添加依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>

10、修改配置文件

# 应用名称
spring:
application:
name: rocket-producer
# 应用服务 WEB 访问端口
server:
port: 8002
rocketmq:
name-server: localhost:9876
producer:
group: my-group

11、创建测试代码

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class SendMessage {
@Resource
private RocketMQTemplate rocketMQTemplate;


@Scheduled(fixedRate = 5000)
public void run(){
//发送消息

DefaultMQProducer producer = rocketMQTemplate.getProducer();
// key值为brokerName手动创建topic
//producer.createTopic("USER-20210820NG","99999",4);
// key值为已经存在的topic,根据此topic寻找到对应的broker在其上创建新的topic
producer.createTopic("1414","6666",4);
rocketMQTemplate.convertAndSend("6666", "Hello, World!");

}
}

12、创建消费者项目(同上)

消费端测试代码:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
class MyConsumer1 implements RocketMQListener<String> {

/**
*需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功
*/

@Override
public void onMessage(String s) {
System.out.println(s);
}
}文章来源地址https://www.toymoban.com/news/detail-482453.html

到了这里,关于rocketMQ消息队列简介及其实例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列中间件 MetaQ/RocketMQ

    推荐电子书:云原生架构白皮书 2022版-藏经阁-阿里云开发者社区 (aliyun.com) 简介—— 消息队列中间件 MetaQ/RocketMQ 中间件 MetaQ 是一种基于队列模型的消息中间件,MetaQ 据说最早是受 Kafka 的影响开发的,第一版的名字 \\\"metamorphosis\\\",是奥地利作家卡夫卡的名作——《变形记》。

    2024年02月14日
    浏览(50)
  • 分布式消息队列RocketMQ概念详解

    目录 1.MQ概述 1.1 RocketMQ简介 1.2 MQ用途 1.3 常见MQ产品 2.RocketMQ 基本概念 2.1 消息 2.2 主题 2.3 标签 2.4 队列  2.5 Producer 2.6 Consumer 2.7 NameServer 2.8 Broker 2.9 RocketMQ 工作流程   RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息

    2024年02月03日
    浏览(61)
  • rocketMq消息队列详细使用与实践整合spring

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月13日
    浏览(39)
  • 【面试需了解之消息队列】RocketMQ、kafka、RabbitMQ概述

    消息队列说明:RocketMQ、kafka、RabbitMQ概述及关键概念 概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量控制等问题。实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件 作用 异构系统消息传递:上游系统

    2024年02月10日
    浏览(81)
  • RabbitMQ与RocketMQ:消息队列的两大强者对比

    在现代分布式系统中,消息队列已成为不可或缺的一部分,它们帮助我们在不同的服务之间实现异步通信、解耦和流量削峰。在众多消息队列中间件中,RabbitMQ和RocketMQ是两个备受瞩目的选项。本文将对它们进行深入对比,帮助大家根据实际需求选择合适的消息队列中间件。

    2024年04月28日
    浏览(33)
  • mq 消息队列 mqtt emqx ActiveMQ RabbitMQ RocketMQ

    十几年前,淘宝的notify,借鉴ActiveMQ。京东的ActiveMQ集群几百台,后面改成JMQ。 Linkedin的kafka,因为是scala,国内很多人不熟。淘宝的人把kafka用java写了一遍,取名metaq,后来再改名RocketMQ。 总的来说,三大原因,语言、潮流、生态。 MQ这种东西,当你的消息量不大的时候,用啥

    2024年02月12日
    浏览(49)
  • 解析RocketMQ:高性能分布式消息队列的原理与应用

    什么是消息队列 消息队列是一种消息传递机制,用于在应用程序和系统之间传递消息,实现解耦和异步通信。它通过将消息发送到一个中间代理(消息队列),然后由消费者从该队列中获取消息并处理。 RocketMQ简介 RocketMQ是阿里巴巴开源的一款高性能分布式消息队列系统。它

    2024年02月14日
    浏览(50)
  • 【深入浅出RocketMQ原理及实战】「消息队列架构分析」帮你梳理RocketMQ或Kafka的选择理由以及二者PK

    前提背景 大家都知道,市面上有许多开源的MQ,例如,RocketMQ、Kafka、RabbitMQ等等,现在Pulsar也开始发光,今天我们谈谈笔者最常用的RocketMQ和Kafka,想必大家早就知道二者之间的特点以及区别,但是在实际场景中,二者的选取有可能会范迷惑,那么今天笔者就带领大家分析一下

    2024年02月19日
    浏览(53)
  • 消息队列黄金三剑客:RabbitMQ、RocketMQ和Kafka全面对决,谁是最佳选择?

    1.RabbitMQ: 适用于易用性和灵活性要求较高的场景 异步任务处理:RabbitMQ提供可靠的消息传递机制,适用于处理异步任务,例如将耗时的任务放入消息队列中,然后由消费者异步处理,提高系统的响应速度和可伸缩性。 解耦系统组件:通过使用RabbitMQ作为消息中间件,不同的

    2024年02月14日
    浏览(36)
  • RocketMQ on openEuler 提供高性能消息队列的稳定性解决方案

    RocketMQ on openEuler,是一种将 RocketMQ 消息中间件通过容器化的方式部署在 openEuler 操作系统上运行,借助 openEuler 系统对于 OS 缓存回收效率增强的内核特性,提升消息中间件在面向超大规模高并发、高吞吐量、低延迟场景下稳定性和可靠性的软件解决方案。 移动云 RocketMQ 消息

    2024年02月11日
    浏览(50)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包