【RocketMQ入门-安装部署与Java API测试】

这篇具有很好参考价值的文章主要介绍了【RocketMQ入门-安装部署与Java API测试】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、环境说明

  1. 虚拟机VWMare:安装centos7.6操作系统
  2. 源码包:rocketmq-all-5.1.3-source-release.zip
  3. 单master部署,在一台虚拟机上安装部署name server和proxy以及broker
  4. 流程图:
    【RocketMQ入门-安装部署与Java API测试】,RocketMQ,java-rocketmq,rocketmq,java

二、安装部署

  1. 源码包安装需要事先安装部署maven,下载apache-maven-3.6.3-bin.tar.gz安装包,然后解压并配置环境变量,如下命令:

    tar -zvxf apache-maven-3.6.3-bin.tar.gz -C /training/
    

    配置环境变量(此处是用root安装),编辑:vi ~/.bash_profile,在文件末尾添加如下内容:

    #maven
    export MVN_HOME=/training/apache-maven-3.6.3
    export PATH=$MVN_HOME/bin:$PATH
    

    执行:source ~/.bash_profile 使环境生效。

  2. 进入/training/apache-maven-3.6.3/conf目录下,配置maven的仓库为阿里云和华为云仓库,执行如下命令:

    cd /training/apache-maven-3.6.3/conf/
    mv settings.xml settings.xml.backup
    vi settings.xml
    

    在打开的settings.xml中,粘贴如下内容即可:

    <?xml version="1.0" encoding="utf-8"?>
    <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="         http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
        <mirrors>
    		<mirror>
    				<id>aliyunmaven</id>
    				<mirrorOf>*</mirrorOf>
    				<name>阿里云公共仓库</name>
    				<url>https://maven.aliyun.com/repository/public</url>
    		</mirror>
    		 <mirror>
    				<id>huaweicloud</id>
    				<mirrorOf>central</mirrorOf>
    				<name>huaweicloud maven</name>
    				<url>https://mirrors.huaweicloud.com/repository/maven/</url>
    		</mirror>
    
        </mirrors>
        <profiles>
            <profile>
    
    			<repositories>
    					<repository>
    					  <id>central</id>
    					  <url>https://maven.aliyun.com/repository/central</url>
    					  <releases>
    							<enabled>true</enabled>
    					  </releases>
    					  <snapshots>
    							<enabled>true</enabled>
    					  </snapshots>
    					</repository>
    			</repositories>
            </profile>
        </profiles>
    </settings>
    
  3. 由于CentOS7.6最小模式安装没有unzip命令,需要事先安装,执行如下命令安装:

    yum install unzip -y
    
  4. 解压源码包rocketmq-all-5.1.3-source-release.zip,进入到解压后的目录下,然后编译安装,执行如下命令:

    unzip rocketmq-all-5.1.3-source-release.zip
    cd rocketmq-all-5.1.3-source-release/
    mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
    
  5. 第5步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动NameServer,执行如下命令:

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqnamesrv &
    
  6. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/namesrv.log
    

    会看到如下内容,说明已经正常启动了

    The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
    或者执行jps命令查看是否已经有了NameServer进程:NamesrvStartup,如有说明ok

  7. 第5、6步骤正确后,进入到 /rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3目录下,然后启动Broker和Proxy,执行如下命令:注意:NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考其他教程。

    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
    
  8. 验证NameServer是否启动成功,执行如下命令:

    tail -f ~/logs/rocketmqlogs/proxy.log
    

    会看到如下内容,说明已经正常启动了

    The broker[broker-a, 192.168.36.132:10911] boot success. serializeType=JSON and name server is localhost:9876
    或者执行jps命令查看是否已经有了:ProxyStartup 进程,如有说明ok

三、Java API 编写Producer和Consumer进行测试

  1. 上述正常启动NameServer和Broker及Proxy后,首先需要创建名为TestTopic的Topic,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
    
    查看新创建的Topic,验证是否已经创建好,执行:
    sh bin/mqadmin topicList -n localhost:9876
    
    结果如下:
    【RocketMQ入门-安装部署与Java API测试】,RocketMQ,java-rocketmq,rocketmq,java
  2. 创建消费者组,执行如下命令:
    cd  /root/rocketmq-all-5.1.3-source-release/distribution/target/rocketmq-5.1.3/rocketmq-5.1.3
    sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
    
    执行命令无任何错误即说明已经创建成功。
  3. 在Idea中创建Maven工程,添加rocketmq依赖,添加如下依赖到pom.xml中:
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <rocketmq-client-java-version>5.0.5</rocketmq-client-java-version>
        <slf4j.version>1.7.25</slf4j.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>${rocketmq-client-java-version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    </dependencies>
    
  4. 编写ProducerTest生产者,代码如下:
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class ProducerTest {
        private static final Logger logger = LoggerFactory.getLogger(ProducerTest.class);
    
        public static void main(String[] args) throws Exception {
            testMain();
        }
    
        public static void testMain() throws ClientException, IOException {
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoint = "192.168.36.132:8081";
            // 消息发送的目标Topic名称,需要提前创建。
            // 执行:sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
            String topic = "TestTopic";
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            // 初始化Producer时需要设置通信配置以及预绑定的Topic。
            Producer producer = provider.newProducerBuilder()
                    .setTopics(topic)
                    .setClientConfiguration(configuration)
                    .build();
            int temp = 0;
            while (true) {
                String msg = "第 " + temp + " 条消息,我喜欢rocketmq!!";
                temp++;
                // 普通消息发送。
                Message message = provider.newMessageBuilder()
                        .setTopic(topic)
                        // 设置消息索引键,可根据关键字精确查找某条消息。
                        .setKeys("messageKey")
                        // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                        .setTag("messageTag")
                        // 消息体。
                        .setBody(msg.getBytes())
                        .build();
                try {
                    // 发送消息,需要关注发送结果,并捕获失败等异常。
                    SendReceipt sendReceipt = producer.send(message);
                    Thread.sleep(1000);
                    logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
                } catch (Exception e) {
                    logger.error("Failed to send message", e);
                }
            }
            // producer.close();
        }
    }
    
  5. 编写CommonUtils工具类,用于将ByteBuffer转成String,代码如下:
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    
    public class CommonUtils {
    
        public static void main(String[] args) {
            System.out.println("Hello world!");
        }
    
        public static String decodeKey(ByteBuffer bytes) {
            Charset charset = StandardCharsets.UTF_8;
            return charset.decode(bytes).toString();
        }
    
        
        public static byte[] decodeValue(ByteBuffer bytes) {
            int len = bytes.limit() - bytes.position();
            byte[] bytes1 = new byte[len];
            bytes.get(bytes1);
            return bytes1;
        }
    
        
        public static ByteBuffer encodeKey(String key) {
            return ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8));
        }
    
        public static ByteBuffer encodeValue(byte[] value) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(value.length);
            byteBuffer.clear();
            byteBuffer.get(value, 0, value.length);
            return byteBuffer;
        }
    }
    
  6. 编写ConsumerTest生产者,代码如下:
    import java.util.Collections;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    import org.rocketmq.producer.CommonUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PushConsumerTest {
        private static final Logger logger = LoggerFactory.getLogger(PushConsumerTest.class);
    
        private PushConsumerTest() {
        }
    
        public static void main(String[] args) throws Exception {
            final ClientServiceProvider provider = ClientServiceProvider.loadService();
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoints = "192.168.36.132:8081";
            ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                    .setEndpoints(endpoints)
                    .build();
            // 订阅消息的过滤规则,表示订阅所有Tag的消息。
            String tag = "*";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            // 为消费者指定所属的消费者分组,Group需要提前创建。
            // 执行:sh bin/sh mqadmin updateSubGroup -g testgroup -c DefaultCluster -n localhost:9876
            String consumerGroup = "testgroup";
            // 指定需要订阅哪个目标Topic,Topic需要提前创建。
            String topic = "TestTopic";
            // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
            PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                    .setClientConfiguration(clientConfiguration)
                    // 设置消费者分组。
                    .setConsumerGroup(consumerGroup)
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        // 处理消息并返回消费结果。
                        logger.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(), CommonUtils.decodeKey(messageView.getBody()));
                        return ConsumeResult.SUCCESS;
                    })
                    .build();
            Thread.sleep(Long.MAX_VALUE);
            // 如果不需要再使用 PushConsumer,可关闭该实例。
            // pushConsumer.close();
        }
    }
    
  7. 为了能查看到控制台日志输入,需要在resources目录下新建log4j.properties、log4j2.properties,具体内容如下:
    log4j.properties内容:
    log4j.rootLogger=INFO,console
    
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    
    log4j2.properties内容:
    name = PropertiesConfig
    property.filename = target/logs
    
    #appenders = console, file
    #配置值是appender的类型,并不是具体appender实例的name
    appenders = rolling
    
    appender.rolling.type = RollingFile
    appender.rolling.name = RollingLogFile
    appender.rolling.fileName=${filename}/automationlogs.log
    appender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.log
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 5
     
    rootLogger.level = INFO,console
    rootLogger.appenderRef.rolling.ref = RollingLogFile
    
  8. 到此,完成了所有准备工作了,整个工程如下所示:
    【RocketMQ入门-安装部署与Java API测试】,RocketMQ,java-rocketmq,rocketmq,java
  9. 运行ProducerTest程序进行消息的发送,控制台中会看到如下内容:
    【RocketMQ入门-安装部署与Java API测试】,RocketMQ,java-rocketmq,rocketmq,java
  10. 运行ConsumerTest程序接收消息,控制台中会看到如下内容:
    【RocketMQ入门-安装部署与Java API测试】,RocketMQ,java-rocketmq,rocketmq,java

四、小结

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们也通过编写Java程序进行简单的消息收发。如本文对您有帮助,麻烦您动动发财的手指点个赞~~~~~,谢谢您的阅读!!!文章来源地址https://www.toymoban.com/news/detail-638671.html

到了这里,关于【RocketMQ入门-安装部署与Java API测试】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RocketMQ 入门实战(2)--安装

    本文主要介绍 RocketMQ 的安装部署,文中所使用到的软件版本:RocketMQ 5.1.3、CentOS 7.9.2009。 Apache RocketMQ 部署架构上主要分为四部分: A、生产者 Producer 发布消息的角色。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。

    2024年02月10日
    浏览(33)
  • Apache RocketMQ之集成RocketMQ_MQTT 安装部署协议

    Apache RocketMQ 安装说明 安装步骤 参考快速开始 https://rocketmq.apache.org/zh/docs/quickStart/01quickstart 安装可视化rocketmq_dashboard下载地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/03Dashboard/ 安装rocketmq_mqtt https://rocketmq.apache.org/zh/docs/4.x/mqtt/01RocketMQMQTTOverview broker.conf配置文件中添加参数,开

    2024年02月13日
    浏览(33)
  • 常用环境部署(七)——Docker安装RocketMQ

    (1)拉取镜像 (2)创建一个数据目录 即创建一个namesrv数据存储路径 (3)构建namesrv容器  (4)参数说明 参数 说明 -d 以守护进程的方式启动 - -restart=always docker重启时候容器自动重启 - -name rmqnamesrv 把容器的名字设置为rmqnamesrv -p 9876:9876 把容器内的端口9876挂载到宿主机98

    2023年04月14日
    浏览(46)
  • RocketMQ:一个纯java的开源消息中间件--开发测试环境搭建

    一、简介     RocketMQ的前身是Metaq,当 Metaq 3.0发布时,产品名称改为  RocketMQ     MetaQ2.x版本由于依赖了alibaba公司内部其他系统,对于公司外部用户使用不够友好,推荐使用3.0版本。      项目地址:  https://github.com/alibaba/RocketMQ

    2024年02月11日
    浏览(55)
  • RocketMQ的windos/linux/docker超详细安装及简单入门!

    本文若有不当之处欢迎提出pr/issue 主要内容: 初识MQ RocketMQ简介 RocketMQ安装 RocketMQ快速入门 SpringBoot集成RocketMQ 最后 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有

    2024年02月03日
    浏览(38)
  • API 渗透测试从入门到精通系列文章(下)

    导语:在本系列文章的前面一部分我们从使用 Postman 开始,创建了集合和请求,并通过 Burp Suite 设置为了 Postman 的代理,这样我们就可以使用 Burp 的模糊测试和请求篡改的功能。 在本系列文章的前面一部分我们从使用 Postman 开始,创建了集合和请求,并通过 Burp Suite 设置为了

    2024年02月03日
    浏览(55)
  • Postman Newman API 自动化测试快速入门

    Newman 是一款专为 Postman 打造的命令行工具,旨在通过自动运行 Postman 集合和环境,实现 API 测试的自动化。它使得开发者无需打开 Postman 图形界面,即可直接在命令行中执行测试用例。 使用 Newman 进行 API 测试,可以带来诸多好处: 快速反馈 :每当代码发生变更,开发者都可

    2024年04月14日
    浏览(80)
  • [入门一]C# webApi创建、与发布、部署、api调用

    一. 创建 web api项目 1.1、项目创建 MVC架构的话,它会有view-model-control三层,在web api中它的前端和后端是分离的,所以只在项目中存在model-control两层 1.2、修改路由 打开App_Start文件夹下,WebApiConfig.cs , 修改路由,加上{action}/ ,这样就可以在api接口中通过接口函数名,来导向我

    2024年02月05日
    浏览(43)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(48)
  • 软件测试 -- Selenium常用API全面解答(java)

    写在前面 // 如果文章有问题的地方, 欢迎评论区或者私信指正 目录 什么是Selenium 一个简单的用例  元素定位 id定位 xpath定位   name定位 tag name 定位和class name 定位 操作元素 click  send_keys submit text getAttribute 添加等待 显示等待 隐式等待  显示等待和隐式等待的特点 显示等待

    2024年03月28日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包