SpringBoot3集成RocketMq

这篇具有很好参考价值的文章主要介绍了SpringBoot3集成RocketMq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

标签:RocketMq5.Dashboard;

一、简介

RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景;

二、环境部署

1、编译打包

1、下载5.0版本源码包
rocketmq-all-5.0.0-source-release.zip

2、解压后进入目录,编译打包
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

SpringBoot3集成RocketMq

2、修改配置

在distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runserver.sh

SpringBoot3集成RocketMq

distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/runbroker.sh

SpringBoot3集成RocketMq

3、服务启动

1、该目录下
distribution/target/rocketmq-5.0.0/rocketmq-5.0.0/bin/

2、启动NameServer
sh mqnamesrv

输出日志
The Name Server boot success. serializeType=JSON

3、启动Broker+Proxy
sh mqbroker -n localhost:9876 --enable-proxy

输出日志
rocketmq-proxy startup successfully

4、关闭服务
sh mqshutdown namesrv
Send shutdown request to mqnamesrv(18636) OK

sh mqshutdown broker
Send shutdown request to mqbroker with proxy enable OK(18647)

4、控制台安装

1、下载master源码包
rocketmq-dashboard-master

2、解压后进入目录,编译打包
mvn clean package -Dmaven.test.skip=true

3、启动服务
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

4、输出日志
INFO main - Tomcat started on port(s): 8080 (http) with context path ''

5、访问服务:localhost:8080

SpringBoot3集成RocketMq

三、工程搭建

1、工程结构

SpringBoot3集成RocketMq

2、依赖管理

rocketmq-starter组件中,实际上依赖的是rocketmq-client组件的5.0版本,由于两个新版框架间的兼容问题,需要添加相关配置解决该问题;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-starter.version}</version>
</dependency>

3、配置文件

配置RocketMq服务地址,消息生产者和消费者;

rocketmq:
  name-server: 127.0.0.1:9876
  # 生产者
  producer:
    group: boot_group_1
    # 消息发送超时时间
    send-message-timeout: 3000
    # 消息最大长度4M
    max-message-size: 4096
    # 消息发送失败重试次数
    retry-times-when-send-failed: 3
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
  # 消费者
  consumer:
    group: boot_group_1
    # 每次提取的最大消息数
    pull-batch-size: 5

4、配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

@Configuration
public class RocketMqConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer sendMsgTimeout;

    @Value("${rocketmq.producer.max-message-size}")
    private Integer maxMessageSize;

    @Value("${rocketmq.producer.retry-times-when-send-failed}")
    private Integer retryTimesWhenSendFailed ;

    @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
    private Integer retryTimesWhenSendAsyncFailed ;

    @Bean
    public RocketMQTemplate rocketMqTemplate(){
        RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
        rocketMqTemplate.setProducer(defaultMqProducer());
        return rocketMqTemplate;
    }

    @Bean
    public DefaultMQProducer defaultMqProducer() {
        DefaultMQProducer producer = new DefaultMQProducer();
        producer.setNamesrvAddr(this.nameServer);
        producer.setProducerGroup(this.producerGroup);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
        return producer;
    }
}

四、基础用法

1、消息生产

编写一个生产者接口类,分别使用RocketMQTemplateDefaultMQProducer实现消息发送的功能,然后可以通过Dashboard控制面板查看消息详情;

@RestController
public class ProducerWeb {
    private static final Logger log = LoggerFactory.getLogger(ProducerWeb.class);

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    @GetMapping("/send/msg1")
    public String sendMsg1 (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(1,"boot_mq_msg"));
            // 发送消息
            rocketMqTemplate.convertAndSend("boot-mq-topic",msgBody);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK" ;
    }

    @Autowired
    private DefaultMQProducer defaultMqProducer ;

    @GetMapping("/send/msg2")
    public String sendMsg2 (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(2,"boot_mq_msg"));
            // 构建消息对象
            Message message = new Message();
            message.setTopic("boot-mq-topic");
            message.setTags("boot-mq-tag");
            message.setKeys("boot-mq-key");
            message.setBody(msgBody.getBytes());
            // 发送消息,打印日志
            SendResult sendResult = defaultMqProducer.send(message);
            log.info("msgId:{},sendStatus:{}",sendResult.getMsgId(),sendResult.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消费

编写消息监听类,实现RocketMQListener接口,通过RocketMQMessageListener注解控制监听的具体信息;

@Service
@RocketMQMessageListener(consumerGroup = "boot_group_1",topic = "boot-mq-topic")
public class ConsumerListener implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @Override
    public void onMessage(String message) {
        log.info("\n=====\n message:{} \n=====\n",message);
    }
}

SpringBoot3集成RocketMq文章来源地址https://www.toymoban.com/news/detail-652504.html

五、参考源码

文档仓库:
https://gitee.com/cicadasmile/butte-java-note

源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent

到了这里,关于SpringBoot3集成RocketMq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot3集成ElasticSearch

    目录 一、简介 二、环境搭建 1、下载安装包 2、服务启动 三、工程搭建 1、工程结构 2、依赖管理 3、配置文件 四、基础用法 1、实体类 2、初始化索引 3、仓储接口 4、查询语法 五、参考源码 标签:ElasticSearch8.Kibana8; Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎

    2024年02月12日
    浏览(27)
  • SpringBoot3集成Quartz

    目录 一、简介 二、工程搭建 1、工程结构 2、依赖管理 3、数据库 4、配置文件 三、Quartz用法 1、初始化加载 2、新增任务 3、更新任务 4、暂停任务 5、恢复任务 6、执行一次 7、删除任务 8、任务执行 四、参考源码 标签:Quartz.Job.Scheduler; Quartz由Java编写的功能丰富的开源作业

    2024年02月13日
    浏览(25)
  • SpringBoot3数据库集成

    标签:Jdbc.Druid.Mybatis.Plus; 项目工程中,集成数据库实现对数据的增晒改查管理,是最基础的能力,而对于这个功能的实现,其组件选型也非常丰富; 通过如下几个组件来实现数据库的整合; Druid连接池 :阿里开源的数据库连接池,并且提供 SQL 执行的监控能力; MybatisPlu

    2024年02月13日
    浏览(33)
  • Java21 + SpringBoot3集成WebSocket

    近日心血来潮想做一个开源项目,目标是做一款可以适配多端、功能完备的模板工程,包含后台管理系统和前台系统,开发者基于此项目进行裁剪和扩展来完成自己的功能开发。 本项目为前后端分离开发,后端基于 Java21 和 SpringBoot3 开发,前端提供了vue、angular、react、uniap

    2024年01月23日
    浏览(45)
  • Elasticsearch 搜索测试与集成Springboot3

    Elasticsearch是专门做 搜索 的,它非常擅长以下方面的问题 Elasticsearch对模糊搜索非常擅长(搜索速度很快) 从Elasticsearch搜索到的数据可以根据 评分 过滤掉大部分的,只要返回评分高的给用户就好了(原生就支持排序) 没有那么准确的也能搜出相关的结果(能匹配有相

    2024年01月22日
    浏览(34)
  • 【springboot3.x 记录】解决 springboot3 集成 mybatis-plus 报 sqlSession 异常

    2022-12-30,作者最新发布了 3.5.3.1 版本,不需要使用快照版本了 ========================= springboot3 已经发布正式版,第一时间尝鲜看看如何,但是在集成 mybatis-plus 最新版 3.5.2 的时候发现提示异常。 看来 springboot3 在注入这块做了调整,但目前 mybatis-plus 并没有适配到。 于是翻查

    2024年02月13日
    浏览(37)
  • springboot3 集成mybatis 和通用mapper

    xml版本查看:https://www.cnblogs.com/binz/p/6564490.html springboot3.x以前的版本查看 https://www.cnblogs.com/binz/p/17421063.html springboot3.x查看  https://www.cnblogs.com/binz/p/17654403.html 1、pom引用 !-- openapi、 swagger3、knife4j配置,适用boot3 -- !-- https://doc.xiaominfo.com -- dependency groupId com.github.xiaoymin/ groupI

    2024年02月11日
    浏览(34)
  • SpringBoot3集成Kafka优雅实现信息消费发送

           首先,你的JDK是否已经是8+了呢?        其次,你是否已经用上SpringBoot3了呢?        最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。        这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据

    2024年02月02日
    浏览(34)
  • SpringBoot3.1.7集成Kafka和Kafka安装

    我们在很多系统开发都需要用到消息中间件,目前来说Kafka凭借其优秀的性能,使得它的使用率已经是名列前茅了,所以今天我们将它应用到我们的系统 在使用一个中间件一定要考虑版本的兼容性,否则后面会遇到很多问题,首先我们打开Spring的官网:Spring for Apache Kafka Spr

    2024年01月23日
    浏览(29)
  • 【SpringBoot3】Spring Boot 3.0 集成 Redis 缓存

    Redis缓存是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。它主要用于作为数据库、缓存和消息中间件,以快速读写和丰富的数据结构支持而著称。 在应用程序和数据库之间,Redis缓存作为一个中间层起着关键

    2024年02月21日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包