Springboot 整合RocketMQ

这篇具有很好参考价值的文章主要介绍了Springboot 整合RocketMQ。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 业务场景:该接口为H5、小程序提供,用于用户提交信息,后台计算用户数据,进行审核。

根据用户提交的手机号计算用户数据,计算用户实时数据比较长,数据量大的3-5分钟,数据小的1分钟上下,移动端不需要实时返回用户计算数据,所以接口可以保存完用户基本信息,再推送至消息队列计算用户数据。

消费者的计算方法(farmApi.calcSubsidyAndHomeworkData(requestParam, requestAdmin);)

已经封装通用方法,消费者可以直接调用(可根据具体业务修改)

消费异常重试机制可参考

RocketMQ详解(12)——RocketMQ的重试机制_张申傲的博客-CSDN博客_rocketmq消息重试原理

RocketMQ 的消息丢失机制可参考

总结rocketmq消息丢失四种情况_相思比梦长的博客-CSDN博客_rocketmq消息丢失

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
rocketmq:
  name-server: ip:9876
  # 默认的消息组
  producer:
    group: springBootGroup
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      erbadagang-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

1. 生产者

import com.dpxdata.backend.db.domain.SystemAdmin;
import com.dpxdata.backend.risk.label.vo.LabelRequestParam;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: LiSaiHang
 * @Date: 2022/10/24 11:17 上午
 */
@Component
public class FarmProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 发送消息的实例
    public void sendMessage(String topic, String msg) {
        rocketMQTemplate.convertAndSend(topic, msg);
    }


    public void farmInfo(LabelRequestParam param, boolean checkInfoFlag, String ipAttr, SystemAdmin admin, String topic) {
        Map<String, Object> map = new HashMap<>();
        map.put("param", param);
        map.put("admin", admin);
        rocketMQTemplate.convertAndSend(topic, map);
    }

    // 发送事务消息的实例
    public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
        String[] tags = {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination = topic + ":" + tags[i % tags.length];

            TransactionSendResult sendResult =
                    rocketMQTemplate.sendMessageInTransaction(destination, message, destination);

            System.out.printf("%s%n", sendResult);
            Thread.sleep(10);
        }
    }
}

2. 消费者(业务处理)

import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.dpxdata.backend.db.domain.SystemAdmin;
import com.dpxdata.backend.farm.FarmWebUtil;
import com.dpxdata.backend.risk.label.vo.LabelRequestParam;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @Author: LiSaiHang
 * @Date: 2022/10/24 11:17 上午
 */
@Component
@RocketMQMessageListener(consumerGroup = "springBootGroup", topic = "TestTopic")
@Slf4j
public class FarmConsumer implements RocketMQListener {

    @Autowired
    private FarmWebUtil farmApi;

    @Override
    public void onMessage(Object message) {
        log.info("消费者-消费");
        try {
            // TODO 参数处理,可以通过map传入
            Map<String, Object> map  = JSONObject.parseObject(message.toString());
            log.info(map.toString());
            // LabelRequestParam param, boolean checkInfoFlag, String ipAttr, SystemAdmin admin, String topic
            final JSONObject param = JSONObject.parseObject(map.get("param").toString());
            final JSONObject admin = JSONObject.parseObject(map.get("admin").toString());
            LabelRequestParam requestParam = BeanUtil.copyProperties(param, LabelRequestParam.class);
            SystemAdmin requestAdmin = BeanUtil.copyProperties(admin, SystemAdmin.class);
            // TODO 业务处理
            farmApi.calcSubsidyAndHomeworkData(requestParam, requestAdmin);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

3. controller

private final FarmProducer springBootProducer;

    @ApiOperation(value = "提交查询", notes = "")
    @GetMapping(value = "/submit")
    public CommonResult<?> submit(LabelFarmVo param) {
        log.info("{} 提交查询", param.toString());
        // add by Lish 2022/08/26 添加批次id,一次查询一个批次
        String batchId = UUID.randomUUID().toString().replace("-", "");
        LabelRequestParam requestParam = null;
        param.setBatchId(batchId);
        // add by Lish 2022/08/26 添加批次id,一次查询一个批次
        try {
            // 校验信息
            requestParam = BeanUtil.copyProperties(param, LabelRequestParam.class);
            requestParam.setAttr(IpUtil.getIpAddr(request));
            requestParam.setDeptId(param.getOperator());

            // add by Lish 2022/05/31 如果plat 字段为空 默认 qr 线上、二维码
            if (StringUtils.isBlank(param.getPlat())) {
                requestParam.setPlat("qr");
            } else {
                // update by Lish 2022/06/24 处理 plat 携带#/
                if (requestParam.getPlat().indexOf("#") > 0) {
                    requestParam.setPlat(requestParam.getPlat().substring(0, requestParam.getPlat().indexOf("#")));
                }
                // update by Lish 2022/06/24 处理 plat 携带#/
            }
            // add by Lish 2022/05/31 如果plat 字段为空 默认 qr 二维码

            // TODO 保存临时数据
            
            // update by Lish 2022/07/08 重复提交校验
            this.repeatSubmitCheck(param, requestParam);

            // 提交字段校验
            farmRecordsService.farmInfoCheck(requestParam, true);
       

            // update by Lish 2022/06/29 根据联系人二维码提交信息,使用该部门
            // 设置默认的操作人,如果操作人为空,自动分配操作人
            requestParam.setOperator(this.pushOperator(param));

            // 根据联系人查询所在部门
            final DpxdataIndexUnitDeptPerson person = personService.getById(requestParam.getOperator());
            // update by Lish 202/08/29 更新分配人员的部门id
            if (person != null) {
                requestParam.setDeptId(person.getDeptId());
            }
            
            // 实例化用户token
            SystemAdmin admin = null;//AdminUtils.getAdmin();
            if (admin == null) {
                log.info("H5提交-用户token认证失效,使用hlh5 token认证");
                admin = new SystemAdmin();
                admin.setUsername("hlh5");
                admin.setUnit("");
                admin.setMainId("");
            }
            final SystemAdmin finalAdmin = admin;
            final StringBuffer status = new StringBuffer();
            
            //TODO 推送消息队列生产者
            springBootProducer.farmInfo(requestParam,admin,"TestTopic");
        } catch (ApiException a) {
            log.error(a.getMessage());
            a.printStackTrace();
            return CommonResult.failed(a.getMessage());
        } catch (Exception e) {
            e.printStackTrace();
            return CommonResult.failed("提交失败,请联系运营人员!");
        }
        return CommonResult.success("提交成功");
    }

4. 监听器事务配置文章来源地址https://www.toymoban.com/news/detail-679018.html

import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: LiSaiHang
 * @Date: 2022/10/24 11:18 上午
 */
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
        String destination = arg.toString();
        localTrans.put(transId, msg);
        //这个msg的实现类是GenericMessage,里面实现了toString方法
        //在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
        //而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
        System.out.println("executeLocalTransaction msg = " + msg);
        //转成RocketMQ的Message对象
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, msg);
        String tags = message.getTags();
        if (StringUtils.contains(tags, "TagA")) {
            return RocketMQLocalTransactionState.COMMIT;
        } else if (StringUtils.contains(tags, "TagB")) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    //延迟检查的时间间隔要有点奇怪。
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID).toString();
        Message originalMessage = localTrans.get(transId);
        //这里能够获取到自定义的transaction_id属性
        System.out.println("checkLocalTransaction msg = " + originalMessage);
        //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
//        String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
        String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS).toString();
        if (StringUtils.contains(tags, "TagC")) {
            return RocketMQLocalTransactionState.COMMIT;
        } else if (StringUtils.contains(tags, "TagD")) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;

/**
 * @Author: LiSaiHang
 * @Date: 2022/10/24 11:18 上午
 */
@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {
}

到了这里,关于Springboot 整合RocketMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 1024程序员节带你玩转图片Exif信息获取之JavaScript

    目录 一、前言 二、背景 三、Exif.js          1、Exif.js 简介 2、Exif.js 引入 四、多场景展示数据获取 1、原始图片直接获取  2、base64 编码文件加载  3、文件上传的方式加载  五、总结        1024是2的十次方,二进制计数的基本计量单位之一。1G=1024M,而1G与1级谐音,也有一

    2024年02月20日
    浏览(38)
  • 1024程序员节特辑 | Spring Boot实战 之 MongoDB分片或复制集操作

    Spring实战系列文章: Spring实战 | Spring AOP核心秘笈之葵花宝典 Spring实战 | Spring IOC不能说的秘密? 国庆中秋特辑系列文章: 国庆中秋特辑(八)Spring Boot项目如何使用JPA 国庆中秋特辑(七)Java软件工程师常见20道编程面试题 国庆中秋特辑(六)大学生常见30道宝藏编程面试题

    2024年02月08日
    浏览(60)
  • 1024程序员狂欢节 | IT前沿技术、人工智能、数据挖掘、网络空间安全技术

    一年一度的1024程序员狂欢节又到啦!成为更卓越的自己,坚持阅读和学习,别给自己留遗憾,行动起来吧! 那么,都有哪些好书值得入手呢?小编为大家整理了前沿技术、人工智能、集成电路科学与芯片技术、新一代信息与通信技术、网络空间安全技术,四大热点领域近期

    2024年02月06日
    浏览(55)
  • 1024程序员节特辑 | ELK+ 用户画像构建个性化推荐引擎,智能实现“千人千面”

    专栏集锦,大佬们可以收藏以备不时之需 Spring Cloud实战专栏:https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏:https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏:https://blog.csdn.net/superdangbo/category_9271502.html tensorflow专栏:https://blog.csdn.net/superdangbo/category_869

    2024年02月07日
    浏览(61)
  • 1024程序员节?我们整点AI绘图玩玩吧,一文教你配置stable-diffusion

    需提前准备:一台高性能的电脑(尤其是显存)、python、Git、梯子。 其实Github上有很多关于Stable diffusion的库,综合对比之后,我选取的是比较全面的AUTOMATIC1111这个,源码链接:Stable-diffusion(Github) 找到安装那块的教程,此教程以windows为例。 ps:如果你电脑上已经有了pyt

    2024年01月16日
    浏览(58)
  • 1024程序员节特辑 | 解密Spring Cloud Hystrix熔断提高系统的可用性和容错能力

    专栏集锦,大佬们可以收藏以备不时之需 Spring Cloud实战专栏:https://blog.csdn.net/superdangbo/category_9270827.html Python 实战专栏:https://blog.csdn.net/superdangbo/category_9271194.html Logback 详解专栏:https://blog.csdn.net/superdangbo/category_9271502.html tensorflow专栏:https://blog.csdn.net/superdangbo/category_869

    2024年02月08日
    浏览(40)
  • PHP框架开发实践 | 1024 程序员节:通过index.php找到对应的controller是如何实现的

    🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌,CSDN博客专家,阿里云社区专家博主,2023年6月CSDN上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 🏆本文已收录于PHP专栏:PHP进阶实战教程。 🎉欢迎 👍点赞✍评论⭐收藏

    2024年02月08日
    浏览(57)
  • SpringBoot整合RocketMQ

    目录 0.前提条件 1.简易消息操作 1.1生产消息服务整合MQ 1.2创建消费者服务 2.各种不同类型的消息发送集合 2.1消息生产者(mqproductservice服务) (1)配置文件中配置rocket配置(application-dev.yml) (2)提供不同类型的接口下发不同类型的消息 2.2消息消费者(mqconsumerservice服务)

    2024年02月07日
    浏览(25)
  • Springboot 整合RocketMQ

     业务场景:该接口为H5、小程序提供,用于用户提交信息,后台计算用户数据,进行审核。 根据用户提交的手机号计算用户数据,计算用户实时数据比较长,数据量大的3-5分钟,数据小的1分钟上下,移动端不需要实时返回用户计算数据,所以接口可以保存完用户基本信息,

    2024年02月11日
    浏览(20)
  • 黑马程序员rocketmq第二章

    maven工程springboot-rocketmq-producer application.properties 测试类 springboot-rocketmq-consumer application.properties zookeeper集群搭建 1.在/usr/soft/zookeeper-cluster下存放zookeeper-3.4.6.tar.gz 2.解压:tar -zvxf zookeeper-3.4.6.tar.gz 3./usr/soft/zookeeper-cluster/zookeeper-3.4.6/conf 下重命名 zoo_sample.cfg为zoo.cfg mv zoo_sample.cf

    2023年04月26日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包