java利用线程池带有返回值的方式,大体逻辑批量处理大量数据,启用线程池,处理完成后将所有的返回内容进行组装拼接
废话不多说开始看代码,重点敲黑板:
1.ThreadPoolExecutor 线程池创建
2.CountDownLatch 同步工具类,让主线程一直等待,直到子线程执行完后再执行
3.listret 用于接收多线程返回值
方式一
使用线程池
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(coresNumber * 2, coresNumber * 2 + 1, 1000, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
/*创建List用来接收多线程返回的内容,泛型里面可以自定义,String或者对象亦或者其他类型*/
List<Map<String, Object>> listret = new ArrayList<>();
// 同步工具类,让主线程一直等待,直到子线程执行完后再执行
CountDownLatch downLatch = new CountDownLatch(partition.size());
// 循环任务的List
for (List<String> stringList : partition) {
// 启用开启多个线程
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 开始调用具体业务代码
Map<String, Object> mapRet = pmpTargetPriceService.targetPriceThreadTask(stringList, initiateTaskType, userName);
listret.add(mapRet);
} catch (Exception e) {
logger.error("循环开启线多线程报错,调用下游系统出现错误,异常:" + e);
} finally {
// 业务逻辑处理完毕,计数器减一【当前线程处理任务完毕,线程释放进入线程池,等待处理下一个任务】
downLatch.countDown();
}
}
});
}
// 主线程需要等待子任务线程执行完,结果汇总之后,主线程继续往下执行
try {
downLatch.await();
} catch (Exception e) {
logger.error("等待超时", e);
throw new RuntimeException("系统处理超时,请稍后再试");
}
// 对返回组装的list进循环处理业务逻辑
for (Map<String, Object> esbResultPlm1 : listret) {
// 从Future对象上获取任务的返回值,并输出到控制台
// Map esbResultPlm1 = (Map) f.get();
// todo 对我返回的多个map进行拼接
if (esbResultPlm1.get("status").equals("fail")) {
failureNum = (int) esbResultPlm1.get("failureNum");
failureMsg.append(esbResultPlm1.get("msg"));
map.put("msg", failureMsg.toString());
failureNumCount += failureNum;
} else {
successNum = (int) esbResultPlm1.get("successNum");
successMsg.append(esbResultPlm1.get("msg"));
map.put("msg", successMsg.toString());
successNumCount += successNum;
}
}
方法一得到的结果如下,使用线程池我这里是核数乘以2是核心线程16,最大17,所以这里最多是16个线程,而且他是无序的随机分配的
方式二
重点不用线程池使用@Async注解,但是策略得有所调整,大体逻辑比如你待处理的数据有100条,你可以将这个List按10条为一个新的List,循环这个集合,在调用的实际方法上加@Async注解,从而实现多线程加快循环也是可以的
@Async注意点,加了该注解的方法不能再同一个类,否则无效,其次有可能存在启动过程@Async UnsatisfiedDependencyException导致 SpringBoot 无法启动问题解决,我这里是在报错的类里有注入service或者mapper的注解上加了@Lazy注解就可以
// 将要发送的集合按10个一组进行重新组装
List<List<String>> partition = Lists.partition(list, 10);
/*创建List用来接收多线程返回的内容,泛型里面可以自定义,String或者对象亦或者其他类型*/
List<Map<String, Object>> listret = new ArrayList<>();
// 循环任务的List
for (List<String> stringList : partition) {
// 开始调用具体业务代码
Map<String, Object> mapRet = pmpTargetPriceService.targetPriceThreadTask(stringList, initiateTaskType, userName);
listret.add(mapRet);
}
// 对返回组装的list进循环处理业务逻辑
for (Map<String, Object> esbResultPlm1 : listret) {
//对返回的内容进行业务处理
}
// 调用的方法,返回map
@Async
public Map<String, Object> targetPriceThreadTask(List<String> idList, String initiateTaskType, String userName) throws Exception {
//具体的逻辑代码
Map<String, Object> map = new HashMap();
return map;
}
方法二的执行结果,循环多少次就启动了多少个子线程,所以这里的想法是先将原生数组按自定义个进行分配,如有200个任务,分给20个人,每人10个大概就是这样的思路
文章来源:https://www.toymoban.com/news/detail-815540.html
文章来源地址https://www.toymoban.com/news/detail-815540.html
我的完整代码仅供参考,里面很多类都是我自己业务用到的,大家可以借鉴
public Map<String, Object> initiateTargetPriceTask(PmpTargetPriceDTO pmpTargetPriceDTO) throws Exception {
String userName = SecurityUtils.getUsername();
Map map = new HashMap();
List<String> list = Arrays.asList(pmpTargetPriceDTO.getIds());
// 将要发送的集合按10个一组进行重新组装
List<List<String>> partition = Lists.partition(list, 10);
// 创建一个线程池
// 获取CPU核数
int coresNumber = Runtime.getRuntime().availableProcessors();
System.out.println("获取CPU核数:" + coresNumber);
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(coresNumber * 2, coresNumber * 2 + 1, 1000, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
// 获取任务发起类型字段
String initiateTaskType = pmpTargetPriceDTO.getInitiateTaskType();
/*创建List用来接收多线程返回的内容,泛型里面可以自定义,String或者对象亦或者其他类型*/
List<Map<String, Object>> listret = new ArrayList<>();
// 同步工具类,让主线程一直等待,直到子线程执行完后再执行
CountDownLatch downLatch = new CountDownLatch(partition.size());
// 循环任务的List
for (List<String> stringList : partition) {
// 启用开启多个线程
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 开始调用具体业务代码
Map<String, Object> mapRet = pmpTargetPriceService.targetPriceThreadTask(stringList, initiateTaskType, userName);
listret.add(mapRet);
} catch (Exception e) {
logger.error("循环开启线多线程报错,调用下游系统出现错误,异常:" + e);
} finally {
// 业务逻辑处理完毕,计数器减一【当前线程处理任务完毕,线程释放进入线程池,等待处理下一个任务】
downLatch.countDown();
}
}
});
}
// 主线程需要等待子任务线程执行完,结果汇总之后,主线程继续往下执行
try {
downLatch.await();
} catch (Exception e) {
logger.error("等待超时", e);
throw new RuntimeException("系统处理超时,请稍后再试");
}
// 关闭线程池
executor.shutdown();
// 获取所有并发任务的运行结果
StringBuilder successMsg = new StringBuilder();
StringBuilder failureMsg = new StringBuilder();
int failureNum;
int successNum;
int failureNumCount = 0;
int successNumCount = 0;
for (Map<String, Object> esbResultPlm1 : listret) {
// 从Future对象上获取任务的返回值,并输出到控制台
// Map esbResultPlm1 = (Map) f.get();
// todo 对我返回的多个map进行拼接
if (esbResultPlm1.get("status").equals("fail")) {
failureNum = (int) esbResultPlm1.get("failureNum");
failureMsg.append(esbResultPlm1.get("msg"));
map.put("msg", failureMsg.toString());
failureNumCount += failureNum;
} else {
successNum = (int) esbResultPlm1.get("successNum");
successMsg.append(esbResultPlm1.get("msg"));
map.put("msg", successMsg.toString());
successNumCount += successNum;
}
}
// todo 对最终的结果进行组装
if (failureNumCount > 0) {
failureMsg.insert(0, "很抱歉,发起任务存在失败!共发起 " + list.size() + "条数据,其中有" + failureNumCount + " 条数据格式不正确,错误如下:");
map.put("status", "fail");
map.put("msg", failureMsg.toString());
} else {
successMsg.insert(0, "恭喜您,数据已全部发起成功!共 " + successNumCount + " 条");
map.put("status", "success");
map.put("msg", successMsg.toString());
}
return map;
}
// 调用的逻辑处理方法
public Map<String, Object> targetPriceThreadTask(List<String> idList, String initiateTaskType, String userName) throws Exception {
// 发起目标价任务
int successNum = 0;
int failureNum = 0;
StringBuilder successMsg = new StringBuilder();
StringBuilder failureMsg = new StringBuilder();
StringBuffer NoSubunitmaterialCode = new StringBuffer(); // 子组不存在的物料号合集
StringBuffer NoSubunit = new StringBuffer(); // 没有子组的子组号合集
Map<String, Object> map = new HashMap();
for (String id : idList) {
PmpTargetPrice pmpTargetPrice = pmpTargetPriceMapper.selectPmpTargetPriceById(id);
SysApiRequestLog sysApiRequestLog = new SysApiRequestLog();
sysApiRequestLog.setRequestMethod("手动发起目标价任务");
sysApiRequestLog.setRequestData("物料号:" + pmpTargetPrice.getMaterialCode());
//查询是否发起流程,
if (pmpTargetPrice.getIsFqlc().equals("1")) {
failureNum++;
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、此物料已经发起过流程,请核实!";
failureMsg.append(msg);
continue;
}
PmpTargetPriceProcess targetPriceProcess = new PmpTargetPriceProcess();
// 请求PLM接口
Map invokeGetPlm = invokeWebService.getInvokeGetPlm(pmpTargetPrice.getMaterialCode());
targetPriceProcess.setSouce("手动发起"); // 来源(手动发起)
targetPriceProcess.setTaskSponsor(userName); // 设置发起人
targetPriceProcess.setMaterialStatus("0"); // 状态
targetPriceProcess.setInitiateTaskType(initiateTaskType); // 设置手工发起任务类型
if (null != invokeGetPlm.get("number").toString()) {
targetPriceProcess.setMaterialCode(invokeGetPlm.get("number").toString()); // 物料编号
}
if (null != invokeGetPlm.get("name").toString()) {
targetPriceProcess.setMaterialName(invokeGetPlm.get("name").toString()); // 物料名称
}
if (null != invokeGetPlm.get("phase").toString()) {
targetPriceProcess.setStage(invokeGetPlm.get("phase").toString());//阶段
}
if (null != invokeGetPlm.get("version").toString()) {
targetPriceProcess.setVersionNo(invokeGetPlm.get("version").toString()); // 大版本
}
if (null != invokeGetPlm.get("state").toString()) {
targetPriceProcess.setMaterialStatus(invokeGetPlm.get("state").toString()); // 状态
}
// 请求BOM接口获取数据
Map materialCode = invokeWebService.getEsbBomMaterialInfo(pmpTargetPrice.getMaterialCode());
// 截取物料编码为子组号
String substring = pmpTargetPriceProcessService.getBlockCode(pmpTargetPrice.getMaterialCode());
PmpTargetRule targetRule = new PmpTargetRule();
PmpTargetRule pmpTargetRule;
String userCode = "";
// 判断bom是否有返回,有返回表示有路线,无返回表示无路线
if (null != materialCode) {
//根据物料编号截取的子组取目标规则表PMP_TARGET_RULE中查
targetRule.setSonGroup(substring);
targetRule.setIsRoute("1");
pmpTargetRule = pmpTargetRuleService.selectPmpTargetRuleBySonGroup(targetRule);
// 判断<pmpTargetRule>子组配置是否存在,如根据子组查询不存在则设置为特殊子组
if (!Optional.ofNullable(pmpTargetRule).isPresent()) {
targetRule.setSonGroup("特殊件无法获取");
targetRule.setIsRoute("1");
pmpTargetRule = pmpTargetRuleService.selectPmpTargetRuleBySonGroup(targetRule);
}
/**
* 1.一级制造、一级装配和采购制造均为空时生成任务;
* 2.一级制造、一级装配和采购制造均有值时生成任务;
* 3.一级制造、一级装配有值,采购制造为空时,不生成任务。但要判定一级制造和一级装配均不含CG时为自制件,
* 不生成任务,但返回PLM为S,提示“一级制造和一级装配均不含CG,为自制件,不生成任务”,
* 其他情况返回PLM为E并提示“非自制件,BOM采购制造路线待维护,请稍后发起定价任务”。
*/
String mfmrtg = ""; // 一级制造
String mfartg = ""; // 一级装配
if (StringUtils.isNotBlank(materialCode.get("MFMRTG").toString())
&& StringUtils.isNotBlank(materialCode.get("MFARTG").toString())
) {
mfmrtg = materialCode.get("MFMRTG").toString(); // 一级制造
mfartg = materialCode.get("MFARTG").toString(); // 一级装配
} else {
failureNum++;
logger.error("物料号:" + pmpTargetPrice.getMaterialCode() + "获取BOM信息,一级制造或一级装配为Null,发起任务失败!");
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、获取BOM信息,一级制造或一级装配为Null,发起任务失败! ";
failureMsg.append(msg);
sysApiRequestLog.setResponseStatus(EsbResult.RET_STATUS_ERROR);
sysApiRequestLog.setErrorLog("手动发起目标价任务,失败原因:根据物料号获取BOM信息,一级制造或一级装配为Null,发起任务失败");
apiRequestLogService.insertSysApiRequestLog(sysApiRequestLog);
continue;
}
// if (StringUtils.isNotBlank(materialCode.get("MFMRTG").toString())) {
// targetPriceProcess.setOneLevelMake(materialCode.get("MFMRTG").toString()); // 一级制造
// }
// if (StringUtils.isNotBlank(materialCode.get("MFARTG").toString())) {
// targetPriceProcess.setOneLevelAssembling(materialCode.get("MFARTG").toString()); // 一级装配
// }
targetPriceProcess.setOneLevelMake(mfmrtg); // 一级制造
targetPriceProcess.setOneLevelAssembling(mfartg); // 一级装配
if (StringUtils.isNotBlank(materialCode.get("CFMRTG").toString())) {
targetPriceProcess.setPurchaseMake(materialCode.get("CFMRTG").toString()); // 采购制造
//根据BOM接口返回采购制造,如果是PT为研究院,否则为财务部
if (targetPriceProcess.getPurchaseMake().equals("PT")) {
if (!StringUtils.isEmpty(pmpTargetRule.getYjyDirectorCode())) {
targetPriceProcess.setTaskPurchase(pmpTargetRule.getYjyDirectorCode()); // 指定填写目标价人:研究院
// 设置审核人
String[] splitYjyCode = pmpTargetRule.getYjyDirectorCode().split("/");
userCode = splitYjyCode[0];
} else {
failureNum++;
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、未获取到研究院相关人员,请核对! ";
failureMsg.append(msg);
logger.error("手动发起目标价任务-物料号-PT-YJY:" + pmpTargetPrice.getMaterialCode() + "设置目标价录入人时配置有误,路线为PT,根据子组查询但是研究院人员code是未维护发起任务失败!");
sysApiRequestLog.setResponseStatus(EsbResult.RET_STATUS_ERROR);
sysApiRequestLog.setErrorLog("手动发起目标价任务失败,失败原因:路线为PT,根据子组查询但是研究院人员code是未维护发起任务失败");
apiRequestLogService.insertSysApiRequestLog(sysApiRequestLog);
NoSubunitmaterialCode.append("物料号-PT-YJY:" + pmpTargetPrice.getMaterialCode() + "/");
NoSubunit.append("子组号-PT-YJY:" + substring + "/");
continue;
}
} else {
if (!StringUtils.isEmpty(pmpTargetRule.getCwDirectorCode())) {
targetPriceProcess.setTaskPurchase(pmpTargetRule.getCwDirectorCode()); // 指定填写目标价人:财务部
// 设置审核人
String[] splitCwCode = pmpTargetRule.getCwDirectorCode().split("/");
userCode = splitCwCode[0];
} else {
failureNum++;
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、未获取到财务部相关人员,请核对! ";
failureMsg.append(msg);
logger.error("手动发起目标价任务-物料号-非PT-CW:" + pmpTargetPrice.getMaterialCode() + "设置目标价录入人时配置有误,路线非PT,根据子组查询财务人员code是Null发起任务失败!");
sysApiRequestLog.setResponseStatus(EsbResult.RET_STATUS_ERROR);
sysApiRequestLog.setErrorLog("手动发起目标价任务,失败原因:根据子组查询财务人员code是Null发起任务失败");
apiRequestLogService.insertSysApiRequestLog(sysApiRequestLog);
NoSubunitmaterialCode.append("物料号-非PT-CW:" + pmpTargetPrice.getMaterialCode() + "/");
NoSubunit.append("子组号-非PT-CW:" + substring + "/");
continue;
}
}
} else {
if (!mfmrtg.equals("CG") || !mfartg.equals("CG")) {
logger.error("物料号:" + pmpTargetPrice.getMaterialCode() + "一级制造和一级装配均不含CG,为自制件,不生成任务!");
failureNum++;
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、一级制造和一级装配均不含CG,为自制件,不生成任务! ";
failureMsg.append(msg);
sysApiRequestLog.setResponseStatus(EsbResult.RET_STATUS_ERROR);
sysApiRequestLog.setErrorLog("手动发起目标价任务,失败原因:一级制造和一级装配均不含CG,为自制件,不生成任务!");
continue;
} else {
failureNum++;
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、BOM采购制造路线待维护,请稍后发起定价任务!";
failureMsg.append(msg);
sysApiRequestLog.setResponseStatus(EsbResult.RET_STATUS_ERROR);
sysApiRequestLog.setErrorLog("手动发起目标价任务,失败原因:BOM采购制造路线待维护,请稍后发起定价任务!");
continue;
}
}
} else {
// 无路线
targetRule.setSonGroup(substring);
targetRule.setIsRoute("0");
pmpTargetRule = pmpTargetRuleService.selectPmpTargetRuleBySonGroup(targetRule);
/**
* 其他特殊件,无法获取子组,
* PT或无路线由研究院郑宇处理,
* 非PT的由财务部苏战波和赵伟处理
*/
if (!Optional.ofNullable(pmpTargetRule).isPresent()) {
targetRule.setSonGroup("特殊件无法获取");
targetRule.setIsRoute("1");
pmpTargetRule = pmpTargetRuleService.selectPmpTargetRuleBySonGroup(targetRule);
}
if (null != pmpTargetRule && StringUtils.isNotBlank(pmpTargetRule.getYjyDirectorCode())) {
targetPriceProcess.setTaskPurchase(pmpTargetRule.getYjyDirectorCode()); // 指定填写目标价人:研究院
String[] splitYjyCode = pmpTargetRule.getYjyDirectorCode().split("/");
userCode = splitYjyCode[0];
} else {
failureNum++;
String msg = "<br/>物料号:" + pmpTargetPrice.getMaterialCode() + "、未获取到研究院相关人员(无路线),请核对! ";
failureMsg.append(msg);
continue;
}
}
// todo 判断任务是研究院或者财务,设置审核人
// 查询研究院,财务或者无路线任务办理人,并获取他们的部门编码,向上寻找审核人
if (!userCode.equals("")) {
SysUser sysUser = userService.selectUserByUserName(userCode);
if (StringUtils.isNotNull(sysUser)) {
// 截取部门code
String deptCode = "";
if (sysUser.getDeptId().length() > 9) {
deptCode = sysUser.getDeptId().substring(0, 9);
} else {
deptCode = sysUser.getDeptId();
}
//查询审核配置表,查到审核人,插入目标价
PmpTargetAuditConfig pmpTargetAuditConfig = pmpTargetAuditConfigService.selectPmpTargetAuditConfigByDeptCode(deptCode);
targetPriceProcess.setCheckName(pmpTargetAuditConfig.getAuditor());
}
}
//已发起流程
pmpTargetPrice.setIsFqlc("1");
targetPriceProcess.setStatus("0"); // 任务状态 0待定级发起
targetPriceProcess.setQuejiaType("目标价"); // 缺价类型默认目标价
//添加任务发起人
targetPriceProcess.setTaskSponsor(userName);
targetPriceProcess.setCreateBy(userName);
pmpTargetPriceMapper.updatePmpTargetPrice(pmpTargetPrice);
if (StringUtils.isEmpty(targetPriceProcess.getMaterialName())) {
targetPriceProcess.setMaterialName(pmpTargetPrice.getMaterialName());
}
/**
* 查看当前物料号在目标价任务表中是否存在,最后的检查
*/
PmpTargetPriceProcess priceProcess = new PmpTargetPriceProcess();
priceProcess.setMaterialCode(targetPriceProcess.getMaterialCode());
priceProcess.setStatus("0,1,2,4");
PmpTargetPriceProcess pmpTargetPriceProcess = targetPriceProcessService.selectPmpTargetPriceProcessByEntity(priceProcess);
if (null != pmpTargetPriceProcess) {
continue;
}
// 设置任务号
String newsNo = DateUtils.parseDateToStr("yyyyMMdd", new Date());
int count = targetPriceProcessService.getFindTaskCount();
String format = String.format("%05d", count);
// 任务编号 MBJRW+年月日+流水号
targetPriceProcess.setTaskNumber("MBJRW" + newsNo + format);
targetPriceProcessService.insertPmpTargetPriceProcess(targetPriceProcess);
successNum++;
}
if (failureNum > 0) {
// failureMsg.insert(0, "很抱歉,发起任务存在失败!共 " + failureNum + " 条数据格式不正确,错误如下:");
map.put("status", "fail");
map.put("msg", failureMsg.toString());
map.put("failureNum", failureNum);
} else {
// successMsg.insert(0, "恭喜您,数据已全部发起成功!共 " + successNum + " 条");
map.put("status", "success");
map.put("msg", successMsg.toString());
map.put("successNum", successNum);
}
logger.info("手动发起目标价任务结束!");
logger.info("手动发起目标价-物料号查找子组有误的物料统计:" + NoSubunitmaterialCode.toString());
logger.info("手动发起目标价-物料号查找子组有误的子组号统计:" + NoSubunit.toString());
return map;
}
总结方式一和方式二都能解决加快任务处理,处理时间都差不读多,大家可以挑选自己适合的方式,如有更好的方式或不对的点请指正,欢迎大家沟通交流,共同成长进步
到了这里,关于java多线程带返回值的方式方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!