SpringBoot集成Apache RocketMQ详解

这篇具有很好参考价值的文章主要介绍了SpringBoot集成Apache RocketMQ详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


SpringBoot集成Apache RocketMQ详解,# Spring Boot 知识集锦,# RocketMQ从入门到精通,java-rocketmq,spring boot,apache

0. 前言

上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下
如果已经安装了RocketMQ 学习环境可以略过此章节《【实践篇(一)】RocketMQ入门之学习环境搭建》
本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证 在SpringBoot应用中展示如何使用Apache RocketMQ的生产者(Producer)进行消息发送。
这段代码实现了以下类型的消息发送:
使用Apache RocketMQ 官方的依赖库 RocketMQTemplate,实现同步、异步等消息。

  1. 同步消息:使用syncSend方法,生产者会等待消息服务器回复确认后才会继续发送下一条消息。

  2. 异步消息:使用asyncSend方法,生产者发送消息后不等待服务器回复,直接发送下一条消息。

  3. 单向消息:使用sendOneWay方法,生产者只负责发送消息,不等待服务器回复,也不关注发送结果。

  4. 顺序消息:使用sendOrderly方法,按照消息的发送顺序进行消费(First-In-First-Out)。

  5. 延迟消息:使用sendDelayed方法,消息被发送后,不会立即被消费,等待特定的延迟时间后,才能被消费。

  6. 批量消息:使用sendBatch方法,一次发送多条消息,可以有效提高发送的吞吐量。

关于RocketMQ消息的消息模型介绍和使用,我专门写了一篇博客,搭建可以了解
《RocketMQ 消息传递模型》https://blog.csdn.net/wangshuai6707/article/details/132863088

1. Spring Boot 集成Apache RocketMQ详细步骤

1.1.添加依赖

在SpringBoot项目的pom.xml文件中添加RocketMQ的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.15</version>
		<relativePath/>
	</parent>
	<groupId>com.icepip.project</groupId>
	<artifactId>springboot-icepip-rocketMQ-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>springboot-icepip-rocketMQ-example</name>
	<description>Spring boot 集成rocketMQ 示例</description>
	<properties>
		<java.version>8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.0.4</version>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

1.2.配置RocketMQ

application.properties文件中配置RocketMQ的相关信息:

rocketmq.name-server=你的RocketMQ服务IP:9876
rocketmq.producer.group=my-producer
# 刚开始未配置 导致超时报错
rocketmq.producer.sendMessageTimeout=10000

1.3.创建消息生产者(Producer)

package com.icepip.project.mqtt.controller;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
/**
 *  SpringBoot集成Apache RocketMQ详解
 * @author 冰点
 * @version 1.0.0
 * @date 2023/9/9 17:02
 */

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    /**
     * 同步发送消息到指定主题
     * @param message
     * @return
     */
    @GetMapping("/syncSend")
    public String syncSend(String message) {
        // 同步发送消息到指定主题
        rocketMQTemplate.syncSend("test-topic", message);
        return "Sync message: " + message + " sent";
    }
    /**
     * 异步发送消息到指定主题
     * @param message
     * @return
     */
    @GetMapping("/asyncSend")
    public String asyncSend(String message) {
        // 异步发送消息到指定主题
        rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("Async message sent successfully, result: " + sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.err.println("Failed to send async message: " + throwable.getMessage());
            }
        }, 3000, 3); // 3000 ms timeout, delay level 3

        return "Async message: " + message + " sent";
    }

    /**
     * 发送单向消息到指定主题,无需等待Broker的确认
     * @param message
     * @return
     */
    @GetMapping("/sendOneWay")
    public String sendOneWay(String message) {
        // 发送单向消息到指定主题,无需等待Broker的确认
        rocketMQTemplate.sendOneWay("test-topic", message);
        return "OneWay message: " + message + " sent";
    }

    // 发送顺序消息
    @GetMapping("/sendOrderly")
    public String sendOrderly(String message) {
        // 发送顺序消息到指定主题
        rocketMQTemplate.syncSendOrderly("test-topic", message, "order");
        return "Orderly message: " + message + " sent";
    }

    // 发送延迟消息
    @GetMapping("/sendDelayed")
    public String sendDelayed(String message) {
        // 发送延迟消息到指定主题,延迟级别为3
        rocketMQTemplate.syncSend("test-topic", MessageBuilder.withPayload(message).build(), 1000, 3);
        return "Delayed message: " + message + " sent";
    }

    // 发送批量消息
    @GetMapping("/sendBatch")
    public String sendBatch() {
        List<String> messages = new ArrayList<>();
        messages.add("message1");
        messages.add("message2");
        // 批量发送消息到指定主题
        rocketMQTemplate.syncSend("test-topic", messages);
        return "Batch messages sent";
    }
}

1.4.创建消息消费者(Consumer)

package com.icepip.project.mqtt.handler;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * 定义一个消费者,监听test-topic主题的消息
 * @author 冰点
 * @version 1.0.0
 * @date 2023/9/9 16:29
 */

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class MyConsumer implements RocketMQListener<String>{

    // 当收到消息时,该方法将被调用
    @Override
    public void onMessage(String message) {
        System.out.println("Received message: "+ message);
    }
}

2. 测试验证

SpringBoot集成Apache RocketMQ详解,# Spring Boot 知识集锦,# RocketMQ从入门到精通,java-rocketmq,spring boot,apache
SpringBoot集成Apache RocketMQ详解,# Spring Boot 知识集锦,# RocketMQ从入门到精通,java-rocketmq,spring boot,apache

3. 常见报错

  1. See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6386]ms, Topic: test-topic, BrokersSent: [698f11314447, 698f11314447, 698f11314447]
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.8:10911> failed
解决办法,修改Broker的IP为宿主机IP
进容器修改配置文件,修改完启动服务 。启动之前先kill 掉容器里原来的Broker。
nohup sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/broker.conf &
SpringBoot集成Apache RocketMQ详解,# Spring Boot 知识集锦,# RocketMQ从入门到精通,java-rocketmq,spring boot,apache

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples文章来源地址https://www.toymoban.com/news/detail-732993.html

到了这里,关于SpringBoot集成Apache RocketMQ详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot集成Kafka详解

    Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。 1. 添加依赖 首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。 2. 配置Kafka 接下来,

    2024年02月09日
    浏览(41)
  • spring boot集成Elasticsearch-SpringBoot(25)

      搜索引擎(search engine )通常意义上是指:根据特定策略,运用特定的爬虫程序从互联网上搜集信息,然后对信息进行处理后,为用户提供检索服务,将检索到的相关信息展示给用户的系统。   而我们讲解的是捜索的索引和检索,不涉及爬虫程序的内容爬取。大部分公司

    2023年04月09日
    浏览(113)
  • spring boot集成neo4j实现简单的知识图谱

    随着社交、电商、金融、零售、物联网等行业的快速发展,现实社会织起了了一张庞大而复杂的关系网,传统数据库很难处理关系运算。大数据行业需要处理的数据之间的关系随数据量呈几何级数增长,急需一种支持海量复杂数据关系运算的数据库,图数据库应运而生。 世界

    2024年03月12日
    浏览(55)
  • 【SpringBoot3】Spring Boot 3.0 集成 Redis 缓存

    Redis缓存是一个开源的使用ANSIC语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。它主要用于作为数据库、缓存和消息中间件,以快速读写和丰富的数据结构支持而著称。 在应用程序和数据库之间,Redis缓存作为一个中间层起着关键

    2024年02月21日
    浏览(56)
  • Spring Boot 3 集成 Druid 连接池详解

    在现代的Java应用中,使用一个高效可靠的数据源是至关重要的。Druid连接池作为一款强大的数据库连接池,提供了丰富的监控和管理功能,成为很多Java项目的首选。本文将详细介绍如何在Spring Boot 3项目中配置数据源,集成Druid连接池,以实现更高效的数据库连接管理。 Spri

    2024年02月21日
    浏览(48)
  • Apache RocketMQ之集成RocketMQ_MQTT 安装部署协议

    Apache RocketMQ 安装说明 安装步骤 参考快速开始 https://rocketmq.apache.org/zh/docs/quickStart/01quickstart 安装可视化rocketmq_dashboard下载地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/03Dashboard/ 安装rocketmq_mqtt https://rocketmq.apache.org/zh/docs/4.x/mqtt/01RocketMQMQTTOverview broker.conf配置文件中添加参数,开

    2024年02月13日
    浏览(33)
  • Spring Boot进阶(48):【实战教程】SpringBoot集成WebSocket轻松实现实时消息推送

            WebSocket是一种新型的通信协议,它可以在客户端与服务器端之间实现双向通信,具有低延迟、高效性等特点,适用于实时通信场景。在SpringBoot应用中,集成WebSocket可以方便地实现实时通信功能,如即时聊天、实时数据传输等。         本文将介绍如何在Sprin

    2024年02月09日
    浏览(61)
  • Spring Boot进阶(55):SpringBoot之集成MongoDB及实战使用 | 超级详细,建议收藏

            随着大数据时代的到来,数据存储和处理变得越来越重要。而MongoDB作为一种非关系型数据库,具有高效的数据存储和处理能力,被越来越多地应用于各种领域。尤其在Web应用开发中,SpringBoot框架已经成为了主流选择之一。在这篇文章中,我们将探讨如何将MongoD

    2024年02月17日
    浏览(48)
  • spring boot +springboot集成es7.9.1+canal同步到es

    未经许可,请勿转载。 其实大部分的代码是来源于 参考资料来源 的 主要代码实现 ,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看 主要代码实现 ,结合我的来使用。 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没

    2023年04月11日
    浏览(83)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包