业务场景:该接口为H5、小程序提供,用于用户提交信息,后台计算用户数据,进行审核。
根据用户提交的手机号计算用户数据,计算用户实时数据比较长,数据量大的3-5分钟,数据小的1分钟上下,移动端不需要实时返回用户计算数据,所以接口可以保存完用户基本信息,再推送至消息队列计算用户数据。
消费者的计算方法(farmApi.calcSubsidyAndHomeworkData(requestParam, requestAdmin);)
已经封装通用方法,消费者可以直接调用(可根据具体业务修改)
消费异常重试机制可参考
RocketMQ详解(12)——RocketMQ的重试机制_张申傲的博客-CSDN博客_rocketmq消息重试原理
RocketMQ 的消息丢失机制可参考
总结rocketmq消息丢失四种情况_相思比梦长的博客-CSDN博客_rocketmq消息丢失
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
rocketmq:
name-server: ip:9876
# 默认的消息组
producer:
group: springBootGroup
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
erbadagang-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
1. 生产者
import com.dpxdata.backend.db.domain.SystemAdmin;
import com.dpxdata.backend.risk.label.vo.LabelRequestParam;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: LiSaiHang
* @Date: 2022/10/24 11:17 上午
*/
@Component
public class FarmProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送消息的实例
public void sendMessage(String topic, String msg) {
rocketMQTemplate.convertAndSend(topic, msg);
}
public void farmInfo(LabelRequestParam param, boolean checkInfoFlag, String ipAttr, SystemAdmin admin, String topic) {
Map<String, Object> map = new HashMap<>();
map.put("param", param);
map.put("admin", admin);
rocketMQTemplate.convertAndSend(topic, map);
}
// 发送事务消息的实例
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException {
String[] tags = {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload(msg).build();
String destination = topic + ":" + tags[i % tags.length];
TransactionSendResult sendResult =
rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}
}
2. 消费者(业务处理)
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.dpxdata.backend.db.domain.SystemAdmin;
import com.dpxdata.backend.farm.FarmWebUtil;
import com.dpxdata.backend.risk.label.vo.LabelRequestParam;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author: LiSaiHang
* @Date: 2022/10/24 11:17 上午
*/
@Component
@RocketMQMessageListener(consumerGroup = "springBootGroup", topic = "TestTopic")
@Slf4j
public class FarmConsumer implements RocketMQListener {
@Autowired
private FarmWebUtil farmApi;
@Override
public void onMessage(Object message) {
log.info("消费者-消费");
try {
// TODO 参数处理,可以通过map传入
Map<String, Object> map = JSONObject.parseObject(message.toString());
log.info(map.toString());
// LabelRequestParam param, boolean checkInfoFlag, String ipAttr, SystemAdmin admin, String topic
final JSONObject param = JSONObject.parseObject(map.get("param").toString());
final JSONObject admin = JSONObject.parseObject(map.get("admin").toString());
LabelRequestParam requestParam = BeanUtil.copyProperties(param, LabelRequestParam.class);
SystemAdmin requestAdmin = BeanUtil.copyProperties(admin, SystemAdmin.class);
// TODO 业务处理
farmApi.calcSubsidyAndHomeworkData(requestParam, requestAdmin);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. controller文章来源:https://www.toymoban.com/news/detail-679018.html
private final FarmProducer springBootProducer;
@ApiOperation(value = "提交查询", notes = "")
@GetMapping(value = "/submit")
public CommonResult<?> submit(LabelFarmVo param) {
log.info("{} 提交查询", param.toString());
// add by Lish 2022/08/26 添加批次id,一次查询一个批次
String batchId = UUID.randomUUID().toString().replace("-", "");
LabelRequestParam requestParam = null;
param.setBatchId(batchId);
// add by Lish 2022/08/26 添加批次id,一次查询一个批次
try {
// 校验信息
requestParam = BeanUtil.copyProperties(param, LabelRequestParam.class);
requestParam.setAttr(IpUtil.getIpAddr(request));
requestParam.setDeptId(param.getOperator());
// add by Lish 2022/05/31 如果plat 字段为空 默认 qr 线上、二维码
if (StringUtils.isBlank(param.getPlat())) {
requestParam.setPlat("qr");
} else {
// update by Lish 2022/06/24 处理 plat 携带#/
if (requestParam.getPlat().indexOf("#") > 0) {
requestParam.setPlat(requestParam.getPlat().substring(0, requestParam.getPlat().indexOf("#")));
}
// update by Lish 2022/06/24 处理 plat 携带#/
}
// add by Lish 2022/05/31 如果plat 字段为空 默认 qr 二维码
// TODO 保存临时数据
// update by Lish 2022/07/08 重复提交校验
this.repeatSubmitCheck(param, requestParam);
// 提交字段校验
farmRecordsService.farmInfoCheck(requestParam, true);
// update by Lish 2022/06/29 根据联系人二维码提交信息,使用该部门
// 设置默认的操作人,如果操作人为空,自动分配操作人
requestParam.setOperator(this.pushOperator(param));
// 根据联系人查询所在部门
final DpxdataIndexUnitDeptPerson person = personService.getById(requestParam.getOperator());
// update by Lish 202/08/29 更新分配人员的部门id
if (person != null) {
requestParam.setDeptId(person.getDeptId());
}
// 实例化用户token
SystemAdmin admin = null;//AdminUtils.getAdmin();
if (admin == null) {
log.info("H5提交-用户token认证失效,使用hlh5 token认证");
admin = new SystemAdmin();
admin.setUsername("hlh5");
admin.setUnit("");
admin.setMainId("");
}
final SystemAdmin finalAdmin = admin;
final StringBuffer status = new StringBuffer();
//TODO 推送消息队列生产者
springBootProducer.farmInfo(requestParam,admin,"TestTopic");
} catch (ApiException a) {
log.error(a.getMessage());
a.printStackTrace();
return CommonResult.failed(a.getMessage());
} catch (Exception e) {
e.printStackTrace();
return CommonResult.failed("提交失败,请联系运营人员!");
}
return CommonResult.success("提交成功");
}
4. 监听器事务配置文章来源地址https://www.toymoban.com/news/detail-679018.html
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author: LiSaiHang
* @Date: 2022/10/24 11:18 上午
*/
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId, msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = " + msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, msg);
String tags = message.getTags();
if (StringUtils.contains(tags, "TagA")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagB")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔要有点奇怪。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = " + originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS).toString();
if (StringUtils.contains(tags, "TagC")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (StringUtils.contains(tags, "TagD")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
/**
* @Author: LiSaiHang
* @Date: 2022/10/24 11:18 上午
*/
@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
到了这里,关于Springboot 整合RocketMQ的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!