2.13日学习打卡
一.RocketMQ之Java Class
DefaultMQProducer类
概述DefaultMQProducer
类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。
其可以通过无参构造方法快速创建一个生产者,通过getter/setter
方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway
的发送方式,这些发送方式均支持批量发送。
方法
属性 | 内容 |
---|---|
DefaultMQProducerImpl defaultMQProducerImpl; | 生产者内部默认实现类 |
String producerGroup; | Producer组名, 默认值为DEFAULT_PRODUCER。多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
String createTopicKey; | 自动创建测试的topic名称, 默认值为TBW102;在发送消息时,自动创建服务器不存在的topic,需要指定Key。broker必须开启isAutoCreateTopicEnable |
int defaultTopicQueueNums; | 创建默认topic的queue数量。默认4 |
int sendMsgTimeout; | 发送消息超时时间,默认值10000,单位毫秒 |
int compressMsgBodyOverHowmuch; | 消息体压缩阈值,默认为4k(Consumer收到消息会自动解压缩) |
int retryTimesWhenSendFailed; | 同步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2 |
int retryTimesWhenSendAsyncFailed; | 异步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2 |
boolean retryAnotherBrokerWhenNotStoreOK; | 声明发送失败时,下次是否投递给其他Broker,默认false |
int maxMessageSize; | 最大消息大小。默认4M; 客户端限制的消息大小,超过报错,同时服务端也会限制 |
TraceDispatcher traceDispatcher | 消息追踪器,定义了异步传输数据接口。使用rcpHook来追踪消息 |
DefaultMQPushConsumer类
概述DefaultMQPushConsumer
类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。
方法
字段 | 内容 |
---|---|
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; | 消费者实现类,所有的功能都委托给DefaultMQPushConsumerImpl来实现 |
String consumerGroup; | 消费者组名,必须设置,参数默认值是:DEFAULT_CONSUMER (需要注意的是,多个消费者如果具有同样的组名,那么这些消费者必须只消费同一个topic) |
MessageModel messageModel; | 消费的方式,支持以下两种 1、集群消费 2、广播消费。BROADCASTING 广播模式,即所有的消费者可以消费同样的消息CLUSTERING 集群模式,即所有的消费者平均来消费一组消息 |
ConsumeFromWhere consumeFromWhere; | 消费者从那个位置消费,分别为:CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 ;CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费(以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始) |
AllocateMessageQueueStrategy allocateMessageQueueStrategy; | 消息分配策略,用于集群模式下,消息平均分配给所有客户端;默认实现为AllocateMessageQueueAveragely |
Map<String, String> subscription; | topic对应的订阅tag |
MessageListener messageListener; | 消息监听器 ,处理消息的业务就在监听里面。目前支持的监听模式包括:MessageListenerConcurrently,对应的处理逻辑类是MessageListener messageListener ;ConsumeMessageConcurrentlyService MessageListenerOrderly 对应的处理逻辑类是ConsumeMessageOrderlyService;两者使用不同的ACK机制。RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。上面两个不同的监听模式使用的ACK机制是不一样的。 |
OffsetStore offsetStore; | offset存储实现,分为本地存储或远程存储 。集群消费:从远程Broker获取。广播消费:从本地文件获取。 |
DefaultMQPushConsumer类
重要字段
int consumeThreadMin = 20 线程池自动调整
int consumeThreadMax = 64 线程池自动调整
long adjustThreadPoolNumsThreshold = 100000
int consumeConcurrentlyMaxSpan = 2000
单队列并行消费最大跨度,用于流控
int pullThresholdForQueue = 1000
一个queue最大消费的消息个数,用于流控
long pullInterval = 0 检查拉取消息的间隔时间,由于是长轮询,所以为 0,但是如果应用为了流控,
也可以设置大于 0 的值,单位毫秒,取值范围: [0, 65535]
consumeMessageBatchMaxSize = 1 并发消费时,一次消费消息的数量,默认为1,
假如修改为50,此时若有100条消息,那么会创建两个线程,每个线程分配50条消息。
换句话说,批量消费最大消息条数,取值范围: [1, 1024]。默认是1
pullBatchSize = 32 消费者去broker拉取消息时,一次拉取多少条。取值范围: [1, 1024]。默认是32 。可选配置
boolean postSubscriptionWhenPull = false
boolean unitMode = false
重要方法
subscribe(String topic, String subExpression)
订阅某个topic,subExpression传*为订阅该topic所有消息
registerMessageListener(MessageListenerConcurrently messageListener)
注册消息回调,如果需要顺序消费,需要注册MessageListenerOrderly的实现
start 启动消息消费
Message类
含义
Producer发送的消息定义为Message类
位置
org.apache.rocketmq.common.message
字段定义
如图
字段详解
topic
Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的
消息。
通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。
flag
网络通信层标记。
body
Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。
transactionId
RocketMQ 4.3.0引入的事务消息相关的事务编号。
properties
该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。
RocketMQ预定义了一组内置属性,除了内置属性之外,
还可以设置任意自定义属性。当然属性的数量也是有限的,
消息序列化之后的大小不能超过预设的最大消息大小。
系统内置属性定义于org.apache.rocketmq.common.message.MessageConst (如图)
对于一些关键属性,Message类提供了一组set接口来进行设置,
class Message {
public void setTags(String tags) {...}
public void setKeys(Collection<String> keys) {...}
public void setDelayTimeLevel(int level) {...}
public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
public void setBuyerId(String buyerId) {...}
}
这几个set接口对应的作用分别为为,
属性 接口 用途
MessageConst.PROPERTY_TAGS setTags 在消费消息时可以通过tag进行消
息过滤判定
MessageConst.PROPERTY_KEYS setKeys 可以设置业务相关标识,用于消费
处理判定,或消息追踪查询
MessageConst.PROPERTY_DELAY_TIME_LEVEL setDelayTimeLevel
消息延迟处理级别,不同级别对应不同延迟时间
MessageConst.PROPERTY_WAIT_STORE_MSG_OK setWaitStoreMsgOK
在同步刷盘情况下是否需要等待数
据落地才认为消息发送成功
`MessageConst.PROPERTY_BUYER_ID setBuyerId 没有在代码中找到使用的地方,所
以暂不明白其用处
这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存
盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个
动态字段更为方便,自然兼容。
MessageExt类
含义
对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,
还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等
等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,
字段
字段 用途
queueId 记录MessageQueue编号,消息会被发送到Topic下的MessageQueue
storeSize 记录消息在Broker存盘大小
queueOffset 记录在ConsumeQueue中的偏移
sysFlag 记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp 消息创建时间,在Producer发送消息时设置
storeHost 记录存储该消息的Broker地址
msgId 消息Id
commitLogOffset 记录在Broker中存储便宜
bodyCRC 消息内容CRC校验值
reconsumeTimes 消息重试消费次数
preparedTransactionOffset 事务详细相关字段
注意
Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,
在消息发送时由Producer生成创建。
上面的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成
的,其构成为(如图)
这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能
获取到该值。RocketMQ也提供了相关命令,
命令 实现类 描述
queryMsgById QueryMsgByIdSubCommand 根据MsgId查询消息
二.RocketMQ 消费幂
消费过程幂等
RocketMQ
无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段
,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符
,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
消费速度慢的处理方式
提高消费并行度
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
批量方式消费
批量方式消费可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 参数值,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
跳过非重要消息
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//队列偏移量
long offset = msgs.get(0).getQueueOffset();
//最大偏移量
String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 消息堆积情况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
优化每条消息消费过程
举例如下,原来某条消息的消费过程如图中左侧的流程,优化后变成右侧
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。
消费打印日志
如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。
三.RocketMQ 集群服务
集群特点
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。 每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有 NameServer。
Producer与NameServer集群
中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与NameServer集群
中的其中一个节点(随机选择)建立长连接,定期从 NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
单master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
启动 NameServer
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动 Broker
### 启动Broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### 验证Broker是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...
多master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
启动NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...
多master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样
缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
启动NameServer
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker集群
### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
多Master多Slave模式-同步双写
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
启动NameServer
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker集群
### 在机器A,启动第一个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP
为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
以上Broker与Slave配对是通过指定相同的BrokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。另外一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量
四.RocketMQ消息消费
集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
/**
* 集群消费消息(默认)
* ClusterConsumer 类,用于从 RocketMQ 集群消费消息
*/
public class ClusterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 使用指定的消费者组名来实例化 DefaultMQPushConsumer。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 指定 NameServer 地址。
consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
consumer.setMessageModel(MessageModel.CLUSTERING); // 设置消息模式为集群消费模式(默认)。
// 订阅一个或多个要消费的主题。
consumer.subscribe("TopicTest", "*");
// 注册回调函数,以便在从代理获取的消息到达时执行。
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。
});
// 启动消费者实例。
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
- MessageModel.CLUSTERING 设置消费模式为集群消费模式,默认情况下 RocketMQ 使用集群消费模式。
- consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
- consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
- consumer.start() 启动了消费者实例,开始消费消息。
广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
/**
* 广播消费消息(默认)
* BroadcastingConsumer 类,用于从 RocketMQ 广播消费消息
*/
public class BroadcastingConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 使用指定的消费者组名来实例化 DefaultMQPushConsumer。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 指定 NameServer 地址。
consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
consumer.setMessageModel(MessageModel.BROADCASTING); // 设置消息模式为广播消费模式(默认)。
// 订阅一个或多个要消费的主题。
consumer.subscribe("TopicTest", "*");
// 注册回调函数,以便在从代理获取的消息到达时执行。
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 标记消息已成功消费。
});
// 启动消费者实例。
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- CONSUMER_GROUP 和 NAME_SERVER_ADDRESS 是从 Contant 类中获取的常量,用于指定消费者组和 NameServer 地址。
- MessageModel.BROADCASTING 设置消费模式为广播消费模式,默认情况下 RocketMQ 使用广播消费模式。
- consumer.subscribe(“TopicTest”, ““) 订阅了名为 “TopicTest” 的主题,”” 表示消费该主题下的所有消息。
- consumer.registerMessageListener(…) 注册了一个消息监听器,用于接收从代理获取的消息,并处理这些消息。
- consumer.start() 启动了消费者实例,开始消费消息。
这种模式下消费者只能收到启动后发送MQ中的消息。
消息消费时的权衡
集群模式:
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。文章来源:https://www.toymoban.com/news/detail-831826.html
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!文章来源地址https://www.toymoban.com/news/detail-831826.html
到了这里,关于2.13日学习打卡----初学RocketMQ(四)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!