RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ

这篇具有很好参考价值的文章主要介绍了RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

添加 RocketMQ 依赖

  1. 在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:

    @rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

  2. 在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
    

消费者 Consumer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:
  name-server: 192.168.68.121:9876     # rocketMq的nameServer地址

创建监听器

创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component@RocketMQMessageListener,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:

@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"消息内容:"+msg);
    }

}

@RocketMQMessageListener 注解参数如下:

参数 描述
topic 消费者订阅的主题
consumerGroup 消费者组
consumeMode 消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY
messageModel 消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING
selectorType 过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92
selectorExpression 过滤消息的表达式:Tag | SQL92【`tag1
maxReconsumeTimes 消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。
delayLevelWhenNextConsume 并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组)

消息过滤

Tag 过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

编写并启动消费者项目订阅 tagTopic 主题:

@Component
@RocketMQMessageListener(topic = "tagTopic",
        consumerGroup = "boot-mq-group-consumer",
        selectorType = SelectorType.TAG,
        selectorExpression = "java")
public class MQMsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送一个带 Tag 的同步消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/tag")
    public String sendSyncMessage() {
        SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");
        return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId();
    }
    
}

运行项目,访问接口:http://localhost:8080/send/tag

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看 RocketMQ 控制台,可以看到消息带有 java tag:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看消费者项目的 IDEA 控制台:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

生产者 Producer

YAML 配置

在 SpringBoot 项目的 yml 配置文件中添加以下配置:

rocketmq:
  name-server: 192.168.68.121:9876     # rocketMq的nameServer地址
  producer:
    group: boot-mq-group-producer # 生产者组名

注:生产者需要标注生产者组名,否则会报异常:'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.

发送同步消息

编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/sync/{msg}")
    public String sendSyncMessage(@PathVariable String msg){
        SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);
        return "发送状态:"+result.getSendStatus()+"<br>消息id:"+result.getMsgId();
    }

}

运行项目,访问接口:http://localhost:8080/send/sync/同步消息

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

发送异步消息

不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。

编写 Controller,使用 RocketMQTemplate 的 asyncSend() 方法发送异步消息,并使用回调接口打印发送的结果:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/async/{msg}")
    public String sendAsyncMessage(@PathVariable String msg) {
        rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步消息发送成功");
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("异步消息发送失败");
            }
        });
        System.out.println("异步消息已发送完成");
        return "发送异步消息";
    }
  
}

运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

发送单向消息

编写 Controller,使用 RocketMQTemplate 的 sendOneWay() 方法发送单向消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/oneWay/{msg}")
    public String sendOneWayMessage(@PathVariable String msg) {
        rocketMQTemplate.sendOneWay("oneWayTopic",msg);
        return "单向消息发送成功";
    }

}

运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

发送延迟消息

编写并启动消费者项目订阅 delayTopic 主题:

@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/delay/{msg}")
    public String sendDelayMessage(@PathVariable String msg) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        // 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);
        return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId()+"<br>消息发送时间:"+new Date();
    }

}

运行项目,访问接口:http://localhost:8080/send/delay/延迟消息

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

发送顺序消息

编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:

public class Order {
    //订单号
    private String orderId;
    //订单名称
    private String orderName;
    //订单的流程顺序
    private String seq;
}

编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:

@Component
@RocketMQMessageListener(topic = "orderlyTopic",
        consumerGroup="boot-mq-group-consumer",
        consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        System.out.println("消费者:"+message);
    }

}

编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly() 方法发送同步顺序消息:

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/orderly")
    public String sendOrderlyMessage() {
        List<Order> orders = Arrays.asList(
                new Order(UUID.randomUUID().toString(), "下订单", "1"),
                new Order(UUID.randomUUID().toString(), "发短信", "1"),
                new Order(UUID.randomUUID().toString(), "物流", "1"),
                new Order(UUID.randomUUID().toString(), "签收", "1"),

                new Order(UUID.randomUUID().toString(), "下订单", "2"),
                new Order(UUID.randomUUID().toString(), "发短信", "2"),
                new Order(UUID.randomUUID().toString(), "物流", "2"),
                new Order(UUID.randomUUID().toString(), "签收", "2")
        );
        //控制流程:下订单->发短信->物流->签收
        //将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
        orders.forEach(order -> {
            rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());
        });
        return "发送成功";
    }

}

运行项目,访问接口:http:localhost:8080/send/orderly

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

发送批量消息

编写并启动消费者项目订阅 batchOrderly 主题:

@Component
@RocketMQMessageListener(topic = "batchOrderly",
        consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<Order> {

    @Override
    public void onMessage(Order message) {
        System.out.println(Thread.currentThread().getName()+":"+message);
    }
  
}

编写生产者 Controller,将消息打包成 Collection<Message> msgs 传入 syncSend() 方法中发送:

@RestController
public class ProducerController {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  @GetMapping("/send/batch")
  public String sendOrderlyMessage() {

    List<Message> messages = Arrays.asList(
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
      MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build()
    );
    return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();
    
  }

}

运行项目,访问接口:http:localhost:8080/send/batch

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看消费者项目的 IDEA 控制台,多个线程并发进行消费:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

发送集合消息

编写并启动消费者项目订阅 listTopic 主题:

@Component
@RocketMQMessageListener(topic = "listTopic",
        consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<List<Order>> {

    @Override
    public void onMessage(List<Order> orders) {
        orders.forEach(o -> {
            System.out.println(Thread.currentThread().getName()+":"+o);
        });
    }

}

编写生产者 Controller,将集合传入 syncSend() 方法中发送:

@RestController
public class ProducerController {

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  @GetMapping("/send/list")
  public String sendOrderlyMessage() {

    List<Order> orders = Arrays.asList(
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1"),
      new Order(UUID.randomUUID().toString(), "下订单", "1")
    );
    rocketMQTemplate.syncSend("listTopic",orders);
    return "发送成功";
  }

}

运行项目,访问接口:http:localhost:8080/send/list

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看 RocketMQ 控制台,可以看到队列中一条消息:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring

查看消费者项目的 IDEA 控制台,进行消费:

@rocketmqmessagelistener,RocketMQ,spring boot,rocketmq,java,spring文章来源地址https://www.toymoban.com/news/detail-769428.html

到了这里,关于RocketMQ 学习教程——(二)SpringBoot 集成 RocketMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Azure机器学习 - 使用与Azure集成的Visual Studio Code实战教程

    本文介绍如何启动远程连接到 Azure 机器学习计算实例的 Visual Studio Code。 借助 Azure 机器学习资源的强大功能,使用 VS Code 作为集成开发环境 (IDE)。 在VS Code中将计算实例设置为远程 Jupyter Notebook 服务器。 关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研

    2024年02月07日
    浏览(50)
  • Elasticsearch 学习+SpringBoot实战教程(三)

    需要学习基础的可参照这两文章 Elasticsearch 学习+SpringBoot实战教程(一) Elasticsearch 学习+SpringBoot实战教程(一)_桂亭亭的博客-CSDN博客 Elasticsearch 学习+SpringBoot实战教程(二)   Elasticsearch 学习+SpringBoot实战教程(二)_桂亭亭的博客-CSDN博客 前言: 经过了前面2课的学习我们

    2023年04月09日
    浏览(25)
  • Elasticsearch 学习+SpringBoot实战教程(二)

    目录 URI Search 指定字段查询 泛查询 Term Query 和 Phrase Query 布尔,范围查询 URI Search 好处 Request Body Search 查询 分页  排序 返回指定字段 脚本字段 Response 使用RestHighLevelClient的方式对文档搜索 精确查询 分页查询 字符匹配AND精准查询 ​编辑字符匹配OR精准查询 模糊查询 Elasti

    2023年04月09日
    浏览(19)
  • 【RocketMQ】SpringBoot集成RocketMQ

    SpringBoot集成RocketMQ 首先依旧是引入依赖 然后就可以编写发送不同类型消息的代码了

    2024年02月12日
    浏览(23)
  • SpringBoot 集成 RocketMQ

    🎈 作者: Linux猿 🎈 简介: CSDN博客专家🏆,华为云享专家🏆,Linux、C/C++、云计算、物联网、面试、刷题、算法尽管咨询我,关注我,有问题私聊! 🎈 欢迎小伙伴们点赞👍、收藏⭐、留言💬 目录 一、安装 RocketMQ 和 RocketMQ Dashboard 二、编写代码运行     本篇文章主要记

    2024年02月09日
    浏览(25)
  • Springboot 集成 RocketMq(入门)

    Linux 安装 RocketMq-CSDN博客 Springboot 集成 RocketMQ(进阶-消息)-CSDN博客

    2024年02月05日
    浏览(23)
  • SpringBoot3集成RocketMq

    标签:RocketMq5.Dashboard; RocketMQ因其架构简单、业务功能丰富、具备极强可扩展性等特点被广泛应用,比如金融业务、互联网、大数据、物联网等领域的业务场景; 在 rocketmq-starter 组件中,实际上依赖的是 rocketmq-client 组件的 5.0 版本,由于两个新版框架间的兼容问题,需要添

    2024年02月12日
    浏览(37)
  • 一个简单的增删改查Spring boot项目教程(完整过程,附代码)(从搭建数据库到实现增删改查功能),Springboot学习,Springboot项目,

    这里将会介绍怎么去搭建一个简单增删改查的Springboot项目,认真看完我相信你一定能够学会,并且附有完整代码; 首先要进行增删改查肯定是要有供操作的数据库; 这里我是用的SQLyog来搭建的,随便用什么都可以,只要能确保给项目一个配套的数据库就行; 打开IDEA,创建

    2024年02月15日
    浏览(47)
  • 动力节点RocketMQ笔记第三章RocketMQ集成SpringBoot

    22.1.1 创建项目,完整的pom.xml 22.1.2 修改配置文件application.yml 22.1.3 我们在测试类里面测试发送消息 往powernode主题里面发送一个简单的字符串消息 运行后查看控制台 22.1.4 查看rocketMq的控制台 查看消息细节

    2024年02月04日
    浏览(29)
  • SpringBoot集成Apache RocketMQ详解

    上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下 如果已经安装了RocketMQ 学习环境可以略过此章节 《【实践篇(一)】RocketMQ入门之学习环境搭建》 本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证

    2024年02月07日
    浏览(21)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包