RocketMQ mqadmin java springboot python 调用笔记

这篇具有很好参考价值的文章主要介绍了RocketMQ mqadmin java springboot python 调用笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

命令

mqadmin命令列表

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:
   updateTopic               Update or create topic
   deleteTopic               Delete topic from broker and NameServer.
   updateSubGroup            Update or create subscription group
   setConsumeMode            Set consume message mode. pull/pop etc.
   deleteSubGroup            Delete subscription group from broker.
   updateBrokerConfig        Update broker's config
   updateTopicPerm           Update topic perm
   topicRoute                Examine topic route info
   topicStatus               Examine topic Status info
   topicClusterList          Get cluster info for topic
   addBroker                 Add a broker to specified container
   removeBroker              Remove a broker from specified container
   resetMasterFlushOffset    Reset master flush offset in slave
   brokerStatus              Fetch broker runtime status data
   queryMsgById              Query Message by Id
   queryMsgByKey             Query Message by Key
   queryMsgByUniqueKey       Query Message by Unique key
   queryMsgByOffset          Query Message by offset
   queryMsgTraceById         Query a message trace
   printMsg                  Print Message Detail
   printMsgByQueue           Print Message Detail
   sendMsgStatus             Send msg to broker.
   brokerConsumeStats        Fetch broker consume stats data
   producerConnection        Query producer's socket connection and client version
   consumerConnection        Query consumer's socket connection, client version and subscription
   consumerProgress          Query consumers's progress, speed
   consumerStatus            Query consumer's internal data structure
   cloneGroupOffset          Clone offset from other group.
   producer                  Query producer's instances, connection, status, etc.
   clusterList               List cluster infos
   topicList                 Fetch all topic list from name server
   updateKvConfig            Create or update KV config.
   deleteKvConfig            Delete KV config.
   wipeWritePerm             Wipe write perm of broker in all name server you defined in the -n param
   addWritePerm              Add write perm of broker in all name server you defined in the -n param
   resetOffsetByTime         Reset consumer offset by timestamp(without client restart).
   skipAccumulatedMessage    Skip all messages that are accumulated (not consumed) currently
   updateOrderConf           Create or update or delete order conf
   cleanExpiredCQ            Clean expired ConsumeQueue on broker.
   deleteExpiredCommitLog    Delete expired CommitLog files
   cleanUnusedTopic          Clean unused topic on broker.
   startMonitoring           Start Monitoring
   statsAll                  Topic and Consumer tps stats
   allocateMQ                Allocate MQ
   checkMsgSendRT            Check message send response time
   clusterRT                 List All clusters Message Send RT
   getNamesrvConfig          Get configs of name server.
   updateNamesrvConfig       Update configs of name server.
   getBrokerConfig           Get broker config by cluster or special broker
   getConsumerConfig         Get consumer config by subscription group name
   queryCq                   Query cq command.
   sendMessage               Send a message
   consumeMessage            Consume message
   updateAclConfig           Update acl config yaml file in broker
   deleteAclConfig           Delete Acl Config Account in broker
   clusterAclConfigVersion   List all of acl config version information in cluster
   updateGlobalWhiteAddr     Update global white address for acl Config File in broker
   getAclConfig              List all of acl config information in cluster
   updateStaticTopic         Update or create static topic, which has fixed number of queues
   remappingStaticTopic      Update or create static topic, which has fixed number of queues
   exportMetadata            Export metadata
   exportConfigs             Export configs
   exportMetrics             Export metrics
   haStatus                  Fetch ha runtime status data
   getSyncStateSet           Fetch syncStateSet for target brokers
   getBrokerEpoch            Fetch broker epoch entries
   getControllerMetaData     Get controller cluster's metadata
   getControllerConfig       Get controller config.
   updateControllerConfig    Update controller config.
   electMaster               Re-elect the specified broker as master
   cleanBrokerMetadata       Clean metadata of broker on controller
   dumpCompactionLog         parse compaction log to message
   getColdDataFlowCtrInfo    get cold data flow ctr info
   updateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group config
   removeColdDataFlowCtrGroupConfig remove consumer from cold ctr config
   setCommitLogReadAheadMode set read ahead mode for all commitlog files

topicList

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList  -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer

statsAll

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll  -n localhost:9876 
#Topic                                                            #Consumer Group                                                  #Accumulation      #InTPS     #OutTPS   #InMsg24Hour  #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC                                          CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
stringRequestTopic                                                stringRequestConsumer                                                       1        0.00        0.00              0              0
TRANS_CHECK_MAX_TIME_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
BenchmarkTest                                                                                                                                 0        0.00                          0    NO_CONSUMER
string-topic                                                      string_consumer                                                           106        0.00        0.00              0              0
string-topic                                                      string_consumer_newns                                                      63        0.00        0.00              0              0
TBW102                                                                                                                                        0        0.00                          0    NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster                                                                                                             0        0.00                          0    NO_CONSUMER
SELF_TEST_TOPIC                                                                                                                               0        0.00                          0    NO_CONSUMER
SCHEDULE_TOPIC_XXXX                                                                                                                           0        0.00                          0    NO_CONSUMER
DefaultCluster_REPLY_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23                                                                                                    0        0.00                          0    NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC                                       CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name                                           252        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name_4                                           0        0.00        0.00              0              0
localhost.localdomain                                                                                                                         0        0.00                          0    NO_CONSUMER
order-paid-topic                                                  order-paid-consumer                                                         1        0.00        0.00              0              0
user-topic                                                        user_consumer                                                               2        0.00        0.00              0              0
message-ext-topic                                                 rocketmq-consume-demo-message-ext-consumer                                  2        0.00        0.00              0              0
OFFSET_MOVED_EVENT                                                                                                                            0        0.00                          0    NO_CONSUMER
yeqiang-MS-7B23                                                                                                                               0        0.00                          0    NO_CONSUMER
DefaultCluster                                                                                                                                0        0.00                          0    NO_CONSUMER
spring-transaction-topic                                          string_trans_consumer                                                      15        0.00        0.00              0              0
bytesRequestTopic                                                 bytesRequestConsumer                                                        0        0.00        0.00              0              0

topicStatus

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     35                      2023-08-25 16:21:35,786
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

Python 生产者:producer.py

from rocketmq.client import Producer, Message

groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
#     accessKey,  # 角色密钥
#     secretKey,  # 角色名称
#     ''
# )
# 启动生产者
producer.start()

# 组装消息   topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')

# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()

运行

yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ 

mqadmin查询topic状态

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     36                      2023-08-28 09:03:35,722
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186
yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute  -n localhost:9876 -t string-topic
{
	"brokerDatas":[
		{
			"brokerAddrs":{0:"10.47.76.67:10911"
			},
			"brokerName":"yeqiang-MS-7B23",
			"cluster":"DefaultCluster",
			"enableActingMaster":false
		}
	],
	"filterServerTable":{},
	"queueDatas":[
		{
			"brokerName":"yeqiang-MS-7B23",
			"perm":6,
			"readQueueNums":8,
			"topicSysFlag":0,
			"writeQueueNums":8
		}
	]
}

图形工具rocketmq-dashborad

https://github.com/apache/rocketmq-dashboard

自行编译

mvn clean package -Dmaven.test.skip=true

启动

java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar

RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

 RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

 

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36

RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

 RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

 consoumer.py

import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调

groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"


def callback(msg):
    # 模拟业务
    print('Received message. messageId: ', msg.id, ' body: ', msg.body)
    # 消费成功回复CONSUME_SUCCESS
    return ConsumeStatus.CONSUME_SUCCESS
    # 消费成功回复消息状态
    # return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
#     accessKey,	 # 角色密钥
#     secretKey,   # 角色名称
#     ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
    time.sleep(3600)
# 资源释放
consumer.shutdown()

启动python消费者

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py
 [Consumer] Waiting for messages.
Received message. messageId:  7F0001012DF4226307248D16C3250000  body:  b'This is a new message.'

RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

可以看到my-group1已被消费 

再启动一个consumer.py,产生一次消息

RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。

Java生产一个消息:

training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com

 

RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

 RocketMQ mqadmin java springboot python 调用笔记,java,python,java,rocketmq,python

 

python rocketmq依赖

Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub

python完整程序

python-rocketmq-demo: python3 rocketmq5 的一个例子文章来源地址https://www.toymoban.com/news/detail-680217.html

到了这里,关于RocketMQ mqadmin java springboot python 调用笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

    在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖: 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖: YAML 配置 在 SpringBoot 项目的 yml 配置文件中添加以下配置: 创建监听器 创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解: @Component 、 @RocketMQMe

    2024年02月03日
    浏览(43)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(45)
  • SpringBoot整合RocketMQ

    目录 0.前提条件 1.简易消息操作 1.1生产消息服务整合MQ 1.2创建消费者服务 2.各种不同类型的消息发送集合 2.1消息生产者(mqproductservice服务) (1)配置文件中配置rocket配置(application-dev.yml) (2)提供不同类型的接口下发不同类型的消息 2.2消息消费者(mqconsumerservice服务)

    2024年02月07日
    浏览(32)
  • Springboot 整合RocketMQ

     业务场景:该接口为H5、小程序提供,用于用户提交信息,后台计算用户数据,进行审核。 根据用户提交的手机号计算用户数据,计算用户实时数据比较长,数据量大的3-5分钟,数据小的1分钟上下,移动端不需要实时返回用户计算数据,所以接口可以保存完用户基本信息,

    2024年02月11日
    浏览(33)
  • SpringBoot 集成 RocketMQ

    🎈 作者: Linux猿 🎈 简介: CSDN博客专家🏆,华为云享专家🏆,Linux、C/C++、云计算、物联网、面试、刷题、算法尽管咨询我,关注我,有问题私聊! 🎈 欢迎小伙伴们点赞👍、收藏⭐、留言💬 目录 一、安装 RocketMQ 和 RocketMQ Dashboard 二、编写代码运行     本篇文章主要记

    2024年02月09日
    浏览(35)
  • Springboot 集成 RocketMq(入门)

    Linux 安装 RocketMq-CSDN博客 Springboot 集成 RocketMQ(进阶-消息)-CSDN博客

    2024年02月05日
    浏览(36)
  • 动力节点最新RocketMq笔记第一章RocketMQ基本操作

    MQ====Message Queue 官网: http://rocketmq.apache.org/ RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的 分布式消息系统 ,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方

    2024年02月07日
    浏览(49)
  • SpringBoot3集成RocketMq

    标签:RocketMq5.Dashboard; RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景; 在 rocketmq-starter 组件中,实际上依赖的是 rocketmq-client 组件的 5.0 版本,由于两个新版框架间的兼容问题,需要添

    2024年02月12日
    浏览(47)
  • SpringBoot集成Apache RocketMQ详解

    上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下 如果已经安装了RocketMQ 学习环境可以略过此章节 《【实践篇(一)】RocketMQ入门之学习环境搭建》 本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证

    2024年02月07日
    浏览(36)
  • RocketMQ集成Springboot --Chapter5

    生产者,由于springboot没有专门对mq进行tag标记的方法,只是在topic:后面加上,所以只需 rocketMQTemplate.sendOneWay(“tagFilterBoot:TagA”,msg1);标记即可 生产者代码如下 消费者在注解处添加selectorExpression = \\\"TagA || TagC\\\"表达式选项即可。 消费者代码如下 生产者 setHeader就是设置属性 生产

    2024年02月16日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包