spring boot 集成rocketmq

这篇具有很好参考价值的文章主要介绍了spring boot 集成rocketmq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

集成Spring Boot和RocketMQ

在现代的微服务架构中,消息队列已经成为一种常见的异步处理模式,它能解决服务间的同步调用、耦合度高、流量高峰等问题。RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛应用在各种业务场景。

本文将详细介绍如何在Spring Boot项目中集成RocketMQ,实现消息的生产和消费。

开发环境

  • JDK 1.8 或更高
  • RocketMQ 4.8.0 或更高
  • Spring Boot 2.3.1.RELEASE 或更高
  • Maven 3.0 或更高

RocketMQ服务器部署

首先,我们需要在本地或服务器上部署RocketMQ。具体的部署步骤可以参考RocketMQ官方文档。为了简化部署,我们可以使用Docker进行部署。

Spring Boot项目创建

我们使用Spring Initializr创建一个新的Spring Boot项目,选择Web、Lombok和RocketMQ Spring Boot Starter为项目依赖。

pom.xml示例:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

配置RocketMQ

application.properties文件中配置RocketMQ的服务器地址和其他相关参数。

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

在这里,rocketmq.name-server是RocketMQ服务器的地址,rocketmq.producer.group是生产者的组名。

消息生产者

接下来,我们创建一个消息生产者。在Spring Boot项目中,我们可以使用RocketMQTemplate来发送消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send")
    public String send(String message) {
        rocketMQTemplate.convertAndSend("test-topic", message);
        return "Message: '" + message + "' sent.";
    }
}

上述代码中,我们创建了一个RESTful接口/send,当接口被调用时,它将发送一个消息到test-topic主题。

消息消费者

接下来,我们创建一个消息消费者。在Spring Boot项目中,我们可以使用@RocketMQMessageListener注解来定义一个消息消费者。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class ConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumer received: %s \n", message);
    }
}

上述代码中,我们定义了一个消息消费者,它将监听test-topic主题的消息,当有新的消息时,它将打印消息内容。

测试

至此,我们已经完成了Spring Boot集成RocketMQ的所有代码。接下来,我们就可以运行Spring Boot项目,并通过访问/send接口来发送消息,查看控制台的输出来验证消息消费者是否可以正常接收消息。

这就是Spring Boot集成RocketMQ的全过程。RocketMQ作为一款功能强大的消息中间件,不仅支持基本的消息生产和消费,还支持许多高级特性,如事务消息、顺序消息、延迟消息等。在实际的项目开发中,我们可以根据业务需求选择合适的消息模型,提高系统的可用性和可靠性。

事务消息

RocketMQ支持发送事务消息,也就是说,在发送消息的同时,我们可以执行本地的数据库操作,只有当本地的数据库操作成功时,消息才会真正被发送出去。

下面是一个发送事务消息的例子:

import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.*;

@RestController
public class TransactionProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/sendTransaction")
    public String sendTransaction(String message) {
        ExecutorService executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5000), r -> {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        });

        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = rocketMQTemplate.createAndStartTransactionMQProducer("transaction-group",transactionListener,executor);

        producer.sendMessageInTransaction("test-topic", "TagA", message, null);

        return "Transaction Message: '" + message + "' sent.";
    }
}

在上述代码中,我们创建了一个TransactionMQProducer,并设置了一个TransactionListener来处理事务的提交和回滚。当发送事务消息时,我们需要调用sendMessageInTransaction方法。

顺序消息

RocketMQ支持发送顺序消息,也就是说,消息会按照发送的顺序被消费。

下面是一个发送顺序消息的例子:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;

@RestController
public class OrderlyProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/sendOrderly")
    public String sendOrderly(String message) {
        for (int i = 0; i < 100; i++) {
            rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload(message + i).build(), "hashkey");
        }
        return "Orderly Message: '" + message + "' sent.";
    }
}

在上述代码中,我们调用syncSendOrderly方法发送顺序消息。该方法的第三个参数是hashkey,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。

延迟消息

RocketMQ支持发送延迟消息,也就是说,消息不会立即被消费,而是会在指定的时间后被消费。

下面是一个发送延迟消息的例子:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging.support.MessageBuilder;

@RestController
public class DelayProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/sendDelay")
    public String sendDelay(String message) {
        rocketMQTemplate.syncSend("delay_topic", MessageBuilder.withPayload(message).build(), 1000, 4);
        return "Delay Message: '" + message + "' sent.";
    }
}

在上述代码中,我们调用syncSend方法发送延迟消息。该方法的第三个参数是延迟时间,第四个参数是延迟级别。

以上就是Spring Boot集成RocketMQ的详细步骤和示例代码,希望对大家有所帮助。文章来源地址https://www.toymoban.com/news/detail-623157.html

到了这里,关于spring boot 集成rocketmq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spring Boot】Spring Boot 配置 Hikari 数据库连接池

    数据库连接池是一个提高程序与数据库的连接的优化,连接池它主要作用是提高性能、节省资源、控制连接数、连接管理等操作; 程序中的线程池与之同理,都是为了优化、提高性能。

    2024年02月11日
    浏览(51)
  • Spring Boot:数据库的整合

    在 Spring Boot :Web开发之视图模板技术的整合 文章中,介绍了 Spring Boot 整合视图模板技术。然而,仅仅整合视图模板技术可能还不能满足所有业务需求,因此还需要进一步扩展 Spring MVC 的默认配置。 简单补充说明 : 扩展 Spring Boot 中的 Spring MVC 默认配置通常涉及实现 WebMvcC

    2024年04月13日
    浏览(55)
  • Spring Boot对接Oracle数据库

    最近学习了Oracle数据库,那么如何使用Spring Boot和MyBatis Plus对接Oracle数据库呢? 这就有了这篇随记,具体流程如下 创建一个空的Maven工程,导入如下依赖: tips:这里碰到一个坑,我本机适用的Oracle数据库版本是11g XE,所以要使用的驱动为ojdbc5/ojdbc6,不然连接老会失败。 在配

    2024年02月09日
    浏览(45)
  • 【Spring Boot】通过AOP拦截Spring Boot日志并将其存入数据库

    在软件开发中,常常需要记录系统运行时的日志。日志记录有助于排查系统问题、优化系统性能、监控操作行为等。本文将介绍如何使用Spring Boot和AOP技术实现拦截系统日志并保存到数据库中的功能。 本文将通过以下步骤实现拦截系统日志并保存到数据库中的功能: 配置数据

    2024年02月10日
    浏览(43)
  • Spring Boot项目实现无数据库启动

    今天需要创建一个不连接数据库的Spring Boot工程, 结果一切配置好后项目却启动失败, 提示如下: 查询了一下资料, 发现原来Spring Boot启动时是默认是要连接数据库的, 这样一来只需要把数据库的一些配置排除掉就可以了: 重新启动, 项目成功运行了。另外, pom.xml文件中也不要引入

    2024年02月15日
    浏览(49)
  • Spring Boot如何访问不同的数据库

            在Spring Boot应用中连接多个数据库或数据源可以使用多种方式,下面介绍两种常用的方法: 1、使用Spring Boot官方支持的多数据源配置         spring boot提供了官方支持的多数据源配置,可以简单地配置和管理多个数据源。         需要在application.properties文件中

    2024年02月13日
    浏览(47)
  • Spring Boot MySQL数据库的使用

    目录 简介Spring Boot Spring Boot的优点 Spring Boot连接数据库 1.添加依赖 1.2开启连接数据库 1.2.1 如果没有开启数据库运行程序的时候会出现这样的报错这就是没有连接数据库,所以我们开启数据库即可使用。 1.2.2 我的名字是MySQL110所以一会用命令字符开启数据库的时候用的就是这

    2024年04月10日
    浏览(42)
  • 【Spring Boot】数据库持久层框架MyBatis — Spring Boot构建MyBatis应用程序

    Spring Boot是用于快速构建Spring应用程序的框架。MyBatis是一种Java持久化框架,可以帮助开发人员轻松地管理数据库。将Spring Boot与MyBatis结合使用可以使开发人员更容易地创建和管理数据库应用程序。 以下是使用Spring Boot构建MyBatis应用程序的步骤: 添加MyBatis依赖项:在项目的

    2024年02月10日
    浏览(53)
  • Spring Boot应用中如何动态指定数据库,实现不同用户不同数据库的场景

    当在 Spring Boot 应用程序中使用Spring Data JPA 进行数据库操作时,配置Schema名称是一种常见的做法。然而,在某些情况下,模式名称需要是动态的,可能会在应用程序运行时发生变化。比如:需要做数据隔离的SaaS应用。 所以,这篇博文将帮助您解决了在 Spring Boot 应用程序中如

    2024年04月26日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包