目录
1、背景
2、问题排查
3、问题解决
1、背景
质检任务是异步执行,正常情况下任务状态扭转是 等待中》运行中》成功(失败)。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的kafka消息,正常情况下状态是顺序下发。
升级部署某个项目,生产环境突然出现很多任务,一直是运行中状态。
2、问题排查
(1)怀疑大数据平台,任务没有正常执行完成,所以任务一直是运行中
1、在yarn平台以及数据库中,都没有发现正在运行中的质量sql任务
2、排查质量平台服务器日志(kafka消息打印接收消息日志很必要,出问题利于排查),发现这个某个sqlId,正常返回了kafka消息,包括运行中、成功、失败等消息。
通过上面的排查,大数据平台没问题,正常执行了任务,正常按顺序给质量平台发了kafka消息
(2)排查质量平台处理kafka消息逻辑
kafka按顺序返回了状态,质量平台没按顺序消费,看质量平台代码如下。
@Slf4j
@Component("dsjAdapterListen")
public class DsjAdapterListen {
@Autowired
ZyslCljgService zyslCljgService;
/**
* kafka消费消息,需要配置kafka
*/
@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dquality}", topics =
{"status_dquality_" + CommonConstant.ZYJH_CHANNEL})
public void topicConsumer(String message) {
StatusInfo statusInfo = JSON.parseObject(message, StatusInfo.class);
log.warn("==============>KafkaListener:start作业实例结果处理,sqlId:{},zyslId:{},状态:{}", statusInfo.getSqlId(),
statusInfo.getTaskId(), statusInfo.getStatus());
zyslCljgService.procZyslJobData(statusInfo);
}
}
@Slf4j
@Component("zyslCljgServiceImpl")
public class ZyslCljgServiceImpl implements ZyslCljgService {
@Override
@Async("zyslClThreadPool")
public void procZyslJobData(StatusInfo statusInfo) {
try {
String zyslId = statusInfo.getTaskId();
String sqlId = statusInfo.getSqlId();
-dosomething();
//运行中任务把检核状态更新成运行中
if (StatusEnum.RUNNING.getCode().equals(statusInfo.getStatus().getCode())) {
if (ZyslYxztEnum.WAITING_SUBMIT.getBm().equals(oldGxZysl.getYxzt())
|| ZyslYxztEnum.WAITING.getBm().equals(oldGxZysl.getYxzt())) {
//运行中
oldGxZysl.setYxzt(ZyslYxztEnum.RUNNING.getBm());
gxZyslMapper.updateById(oldGxZysl);
}
//运行中状态
oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.RUNNING.getBm());
gxZyrzMapper.updateById(oldGxZyrzUpdate);
return;
}
if (StatusEnum.FAILED.getCode().equals(statusInfo.getStatus().getCode())) {
log.error("大数据job错误,执行任务失败:{},zyslid:{},参数:{}", statusInfo.getMessage(), zyslId,
JSON.toJSONString(statusInfo));
//处理失败
procFail(statusInfo, oldGxZyrzUpdate);
}
if (StatusEnum.FINISH.getCode().equals(statusInfo.getStatus().getCode())) {
//正常sql和异常sql都执行完成
oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.SUCCESS.getBm());
}
// 先更新状态,后处理事件
dosomething2();
} catch (Exception e) {
log.error("大数据作业实例结果处理报错:{}", e.getMessage());
}
}
public void dosomething2(GxZyrz gxZyrz, SsZyxx zyxx) {
if (Constants.CODE_SUCCESS.equals(gxZyrz.getYxzt())) {
//成功状态查询es、发邮件等
dosomething3();
} else if (Constants.CODE_FAILED.equals(gxZyrz.getYxzt())) {
gxZyrz.setYcs(0L);
gxZyrz.setZyl(0L);
gxZyrz.setSfgj("N");
}
gxZyrzMapper.updateById(gxZyrz);
}
}
(1)kafka这个topic只有一个分区,数据质量服务器kafka消息是设置了消费者组,即使是高可用,现场部署多台服务器,也会只有一台服务器会消费这个topic的消息数据,所有排除是因为部署了多台服务器的原因。
(2)kafka消息被多线程异步处理了
1、如果任务sql执行成功,kafka返回运行中、执行成功消息
线程1 - 处理待运行任务
线程2- 处理成功状态
通过代码分析,线程1更新任务之前会更新另外一张表状态,假如gxZyslMapper.updateById是2秒时间, 线程2更新状态之前会dosomething3()查询es、查询数据表、发邮件,肯定超过5秒,然后更新sql任务状态。
结论: 如果sql是执行成功,这种情况,应该不会出现线程2先把任务状态更新成成功,然后线程1把状态更新是运行中。
2、如果任务sql执行失败,kafka返回运行中、执行失败消息
线程1 - 处理待运行任务
线程2- 处理失败状态
通过代码分析,线程1更新任务之前会更新另外一张表状态,假如update是2秒时间, 线程2直接更新sql任务状态。
结论: 如果sql是失败成功,这种情况,如果运行中、运行失败状态消息时间建个在2秒内,应该会出现线程2先把任务状态更新成失败,然后线程1把状态更新是运行中。
代码分析之后,带着结论去现场验证,发现确实是失败状态任务状态被逆写了。
按这个结论,按理来讲部署的所有现场都会出现问题,为什么只有这个现场有问题呢?
大数据那边升级了代码,以前执行失败的任务,运行中和运行失败,他们发消息间隔至少耗时在5秒,改了逻辑之后直接失败的任务,发信息间隔在2秒内。这就验证了这个问题文章来源:https://www.toymoban.com/news/detail-861882.html
3、问题解决
1、运行中状态,先更新sqlId对应的任务状态,然后更新别的数据表状态;2、更新运行中的状态不直接更新,带着状态更新 update zyrz set yxzt = '2' where id = 'xxx' and yxzt not in('0','1')文章来源地址https://www.toymoban.com/news/detail-861882.html
到了这里,关于高可用环境kafka消息未按顺序消费问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!