阿里云RockMQ与SpringBoot的整合

这篇具有很好参考价值的文章主要介绍了阿里云RockMQ与SpringBoot的整合。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

开源版本Rocket和商业版本的RocketMQ有些不同,研究的是商业版本的RocketMQ,阿里云的官方文档,感觉有点乱。看不咋明白,网上虽然有教程,大都还是有点缺少,有时候会突然跳了步骤,抹去了一些细节。

前置步骤

阿里云MQ开通及子Access账号的权限的生成

阿里云MQ开通

开通阿里云MQ(现在叫阿里云RocketMQ)百度的教程够用,不多记录,需要的参考该地址http://mtw.so/5Q5nHp,进行开通。PS:页面由于开发人员一直在更新,教程的页面不一定和现有页面完全一样,所以不要死脑筋。

子Access账号

阿里云可以为账号,创建两个字段,用于你身份的验证,下图中可以进入申请子账户

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

跳出提示,选择开始使用子用户AccessKey

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击 创建用户

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击确定,会要你验证手机,输入验证码即可

创建完以后会给你两个字段的值,一个是AccessKey IDAccessKey Secret一定要及时妥善保存,虽然可以重新创建

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

ps:这里别忘了给账户赋予MQ的权限,不然无法进行消息的订阅和发送

如何设置权限?

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击添加权限,添加以下权限

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

Topic和Group的创建(在阿里云控制台页面进行)

首先创建实例,点击创建实例

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击确定

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

按提示创建Group和Topic 即可,然后将Group和Topic的名称,填入到application.properties对应字段中

nameSrvAddr的获取,在创建好Group和Topic后,从这进入到接入点的获取页面

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

接入点有两个,分别对应了不同的接入方式。TCP和HTTP,我这里用的TCP协议的接入方式

这里只能获取到公网的接入地址,没有内网

 

开始开发

SpringBoot整合阿里云RocketMQ(普通消息为例)

Maven工程

POM文件依赖文章来源地址https://www.toymoban.com/news/detail-553336.html

<dependencies>
        <!--主要用来写WEB接口,这里用来测试MQ的生产者-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--阿里云ons,方便的接入到云服务-->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.4.Final</version>
        </dependency>
        <!--神器,这里主要用来输出日志@Slf4j-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
        <!--测试用,主要是目的是让功能带着spring容器中进行测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
 </dependencies>

application.properties


#启动测试之前请替换如下 XXX 为您的配置,从阿里云MQ里获取,具体获取方式,看下前置步骤
rocketmq.accessKey=xxx
rocketmq.secretKey=xxx
rocketmq.nameSrvAddr=xxx
 
rocketmq.topic=TpMQTest
rocketmq.groupId=GID_MQTEST
rocketmq.tag=*
 
rocketmq.orderTopic=XXX
rocketmq.orderGroupId=XXX
rocketmq.orderTag=*

配置类,用于读取application.properties中相应字段的值

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
 
import java.util.Properties;
 
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {
 
    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;
 
    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
 
    public String getAccessKey() {
        return accessKey;
    }
 
    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }
 
    public String getSecretKey() {
        return secretKey;
    }
 
    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }
 
    public String getNameSrvAddr() {
        return nameSrvAddr;
    }
 
    public void setNameSrvAddr(String nameSrvAddr) {
        this.nameSrvAddr = nameSrvAddr;
    }
 
    public String getTopic() {
        return topic;
    }
 
    public void setTopic(String topic) {
        this.topic = topic;
    }
 
    public String getGroupId() {
        return groupId;
    }
 
    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }
 
    public String getTag() {
        return tag;
    }
 
    public void setTag(String tag) {
        this.tag = tag;
    }
 
    public String getOrderTopic() {
        return orderTopic;
    }
 
    public void setOrderTopic(String orderTopic) {
        this.orderTopic = orderTopic;
    }
 
    public String getOrderGroupId() {
        return orderGroupId;
    }
 
    public void setOrderGroupId(String orderGroupId) {
        this.orderGroupId = orderGroupId;
    }
 
    public String getOrderTag() {
        return orderTag;
    }
 
    public void setOrderTag(String orderTag) {
        this.orderTag = orderTag;
    }
}

消费者的注册类

消费者的build,主要目的是将配置文件里的配置设置到ConsumerBean中,使其在Spring启动时,一同启动。

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.aliyun.openservices.springboot.example.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
public class ConsumerClient {
 
    @Autowired
    private MqConfig mqConfig;
 
    @Autowired
    private DemoMessageListener messageListener;
 
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.setExpression(mqConfig.getTag());
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置
 
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
 
}

注册完成以后,开启监听,在消息队列有消息时就会进行消费 @Component这个注解,阿里云官方的Demo,并没有出现,导致一直消费者消费不到消息。后来加上以后就能正常消费消息了

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
@Component
@Slf4j
public class DemoMessageListener implements MessageListener {
 
    @Override
    public Action consume(Message message, ConsumeContext context) {
 
        log.info("Receive: " + message);
        try {
            //do something..
            //Action.CommitMessag 进行消息的确认
            return Action.CommitMessage;
        } catch (Exception e) {
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

生产者注册类

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.springboot.example.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class ProducerClient {
 
    @Autowired
    private MqConfig mqConfig;
 
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}

生产者生产消息工具类

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.springboot.example.config.MqConfig;
import org.springframework.stereotype.Component;
 
/**
 * @description: <h1>RocketMessageProducer rocketMQ消息生产者</h1>
 * @author: LiRen
 **/
@Component
public class RocketMessageProducer {
 
    private static ProducerBean producer;
    private static MqConfig mqConfig;
 
    public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) {
        this.producer = producer;
        this.mqConfig = mqConfig;
    }
 
    /**
     * @Description: <h2>生产 普通 消息</h2>
     * @author: LiRen
     */
    public  static void producerMsg(String tag, String key, String body) {
        Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            System.out.println(time
                    + " Send mq message success.Topic is:" + msg.getTopic()
                    + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()
                    + " msgId is:" + sendResult.getMessageId());
        } catch (ONSClientException e) {
            e.printStackTrace();
            System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic());
            // TODO 发送失败
        }
    }
 
}

WEB接口,测试Controller类

/**
 * ClassName: ProducerController <br/>
 * Description: <br/>
 * date: 2021/11/3 11:05<br/>
 *
 * @author Hesion<br />
 * @version
 * @since JDK 1.8
 */
 
import com.aliyun.openservices.springboot.example.normal.RocketMessageProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * 测试生产者
 * @author: hesion
 * @create: 2021-11-03 11:05
 **/
@RestController
public class ProducerController {
    /**
     * rocketmq demo
     */
    @RequestMapping(value = {"/useRocketMQ"}, method = RequestMethod.GET)
    public String useRocketMQ() {
 
        RocketMessageProducer.producerMsg("RocketProdTagTest","RocketProdKeyTest","RocketProdBodyTest");
        return "请求成功!";
    }
}

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

好了 代码就放在下面了

Gitee代码地址

前言:

开源版本Rocket和商业版本的RocketMQ有些不同,研究的是商业版本的RocketMQ,阿里云的官方文档,感觉有点乱。看不咋明白,网上虽然有教程,大都还是有点缺少,有时候会突然跳了步骤,抹去了一些细节。

前置步骤

阿里云MQ开通及子Access账号的权限的生成

阿里云MQ开通

开通阿里云MQ(现在叫阿里云RocketMQ)百度的教程够用,不多记录,需要的参考该地址http://mtw.so/5Q5nHp,进行开通。PS:页面由于开发人员一直在更新,教程的页面不一定和现有页面完全一样,所以不要死脑筋。

子Access账号

阿里云可以为账号,创建两个字段,用于你身份的验证,下图中可以进入申请子账户

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

跳出提示,选择开始使用子用户AccessKey

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击 创建用户

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击确定,会要你验证手机,输入验证码即可

创建完以后会给你两个字段的值,一个是AccessKey IDAccessKey Secret一定要及时妥善保存,虽然可以重新创建

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

ps:这里别忘了给账户赋予MQ的权限,不然无法进行消息的订阅和发送

如何设置权限?

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击添加权限,添加以下权限

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

Topic和Group的创建(在阿里云控制台页面进行)

首先创建实例,点击创建实例

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

点击确定

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

按提示创建Group和Topic 即可,然后将Group和Topic的名称,填入到application.properties对应字段中

nameSrvAddr的获取,在创建好Group和Topic后,从这进入到接入点的获取页面

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

阿里云RockMQ与SpringBoot的整合,MQ,阿里云,rockMQ

接入点有两个,分别对应了不同的接入方式。TCP和HTTP,我这里用的TCP协议的接入方式

这里只能获取到公网的接入地址,没有内网

 

开始开发

SpringBoot整合阿里云RocketMQ(普通消息为例)

Maven工程

POM文件依赖

到了这里,关于阿里云RockMQ与SpringBoot的整合的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot项目整合阿里云短信业务(非常详细)

    详细介绍SpringBoot整合阿里云短信服务的每一步过程,同时会将验证码存放到Redis中并设置过期时间,尽量保证实战的同时也让没做过的好兄弟也能实现发短信的功能~ 首先,你需要注册一个阿里云账号(如果还没有),然后在控制台中创建Access Key。这个Access Key将用于通过AP

    2024年04月27日
    浏览(34)
  • springboot整合阿里云oss实现文件上传

    通过阿里云oss进行文件上传,首先需要开通相关的服务,这边就不在具体说明,不懂的可以百度看一下。 阿里云oss有几个关键的参数,这也是后续通过java进行上传所需要的参数,分别是endpoint(域结点)、AccessKey ID(秘钥id)、AccessKey secret(秘钥)、bucket name(bucket域名)。  通过这几

    2024年01月25日
    浏览(48)
  • SpringBoot 整合 MongoDB 连接 阿里云MongoDB

      注:spring-boot-starter-data-mongodb 2.7.5;jdk 1.8  阿里云MongoDB是 副本集实例的 在网上查找了一番,大多数都是教连接本地mongodb或者linux上的mongodb 阿里云上有java版连接教程,但它不是SpringBoot方法配置的,是手动写死的很不方便。 通过程序代码连接MongoDB副本集实例 下面进行配置

    2024年02月14日
    浏览(48)
  • springboot + vue 整合 阿里云 视频点播 功能

    1.1、找到视频点播 1.2、进入管理控制台 1.2、开通服务 1.3、选择“按使用流量计费”,开通服务 1.4、开通后,进入管理控制台 1.5、上传音 / 视频 1.6、启用存储地址 1.7、已启用 1.8、选择上传的音频,开始上传 1.9、上传成功 1.10、分类管理 1.11、视频转码 1.12、再上传一个视频

    2024年02月02日
    浏览(37)
  • SpringBoot整合阿里云Oss实现文件图片上传

    目录 1. 阿里云Oss注册使用 2. 项目中使用 2.1 引入依赖以及插件 2.2 编写配置文件application.properties 2.3 创建常量类,获取配置信息  2.4 serviceImpl中实现逻辑            

    2024年02月08日
    浏览(68)
  • 【案例实战】SpringBoot整合阿里云文件上传OSS

    1.需求背景 C端业务用户头像上传 海量图片音频、视频存储 用户行为日志存储 (1)阿里云OSS介绍 对象存储OSS(Object Storage Service)是阿里云提供的海量、安全、低成本、高持久的云存储服务。其数据设计持久性不低于99.9999999999%(12个9),服务设计可用性不低于99.995%。 OSS具

    2024年02月06日
    浏览(56)
  • SpringBoot整合阿里云OSS文件存储解决方案

    🧑‍💻作者名称:DaenCode 🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。 😎人生感悟:尝尽人生百味,方知世间冷暖。 📖所属专栏:SpringBoot实战 以下是专栏部分内容,更多内容请前往专栏查看! 标题 一文带你学会使用SpringBoot+Avue实现短信通知功能

    2024年02月08日
    浏览(39)
  • 全网最详细SpringBoot、SpringCloud整合阿里云短信服务

    https://www.aliyun.com/

    2024年02月02日
    浏览(52)
  • SpringBoot整合阿里云短信服务详细过程(保证初学者也能实现)

    网上关于实操性的文章普遍大部分都记录不全,要么只记录重点部分,对于刚学习的小伙伴来说看起来是比较困难的 所以,基于这一点。 该文章会详细介绍使用SpringBoot整合阿里云短信服务的每一步过程,同时会将验证码存放到Redis中并设置过期时间 , 尽量保证实战的同时也

    2023年04月15日
    浏览(40)
  • 最近项目上需要发送短信整理了一篇文章 SpringBoot整合阿里云发送短信

    阿里云短信服务网址:阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台 第一步:申请签名(一般申请时长在1-2小时之间)特别注意:场景说明不要乱填以免申请不通过  第二步:申请短信模板(一般申请时长在1-2小时之间)特别注意:场景说明不要乱填以免申请不

    2024年02月06日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包