[自研开源] MyData 数据集成的任务流程 v0.7.1

这篇具有很好参考价值的文章主要介绍了[自研开源] MyData 数据集成的任务流程 v0.7.1。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

开源地址:https://gitee.com/LIEN321/mydata-blade
详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0
部署文档:用 Docker 部署 MyData v0.7.1
使用手册:MyData 使用手册v0.7.1
交流Q群:430089673

MyData后端结构

MyData的后端由3个子服务组成,分别是管理服务任务服务业务数据服务

  • 管理服务:通过项目、数据标准、应用API、环境的管理 配置出同步业务数据的任务;
  • 任务服务:根据配置的任务 定时调用应用API和数据服务 实现业务数据的传输和存储;
  • 数据服务:封装业务数据的隔离机制和读写操作;

依赖的组件:

  • MySQL:存储管理数据;
  • Redis:缓存管理数据和任务;
  • MongoDB;存储业务数据;

下图从数据流角度 展示3个子服务的关联:
[自研开源] MyData 数据集成的任务流程 v0.7.1,MyData,开源,数据仓库
注:开源版本采用单体SpringBoot;

任务服务

配置任务

任务主要包括:项目环境、数据标准、应用API、任务类型、字段映射、任务周期;

  • 项目环境:确定应用API的统一前缀地址;
  • 数据标准:明确集成的业务数据的数据结构;
  • 应用API: 业务数据的传输通道;
  • 任务类型:明确数据的传输方向,提供数据表示从应用API读取业务员数据、消费数据表示向应用API发送业务数据;
  • 字段映射:配置接口响应结构中 与标准数据字段的映射关系;
  • 任务周期:定期执行任务的时间间隔,格式为cron表达式;
    [自研开源] MyData 数据集成的任务流程 v0.7.1,MyData,开源,数据仓库

任务流程

数据集成的任务执行流程如下图:
[自研开源] MyData 数据集成的任务流程 v0.7.1,MyData,开源,数据仓库文章来源地址https://www.toymoban.com/news/detail-839641.html

  1. 任务服务启动时(即MyData系统启动),查询所有运行状态的任务;
public class JobExecutor implements ApplicationRunner {
    ...
    
    @Override
    public void run(ApplicationArguments args) {
        // 移除已有缓存
        jobCache.removeAll();
    
        // 查询已启动的任务
        List<Task> tasks = taskService.listRunningTasks();
        log.info("tasks.size() = " + tasks.size());
        if (CollUtil.isNotEmpty(tasks)) {
            tasks.forEach(this::startTask);
        }
    }

    ...
}
  1. 根据任务的cron表达式,计算任务的下次执行时间;
/**
 * 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间
 *
 * @param taskInfo 定时任务
 */
private void calculateNextRunTime(TaskInfo taskInfo) {
    Assert.notNull(taskInfo);
    Assert.notEmpty(taskInfo.getTaskPeriod());

    Date date = taskInfo.getStartTime();
    if (taskInfo.getFailCount() > 0) {
        date = taskInfo.getNextRunTime();
    }

    CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod());
    Date nextRunTime = cronExpression.getNextValidTimeAfter(date);
    taskInfo.setNextRunTime(nextRunTime);
}
  1. 计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入redis缓存;
/**
 * 缓存任务
 *
 * @param taskInfo 任务对象
 * @throws IllegalArgumentException 缓存时长无效
 */
public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException {
    // 计算任务缓存有效时长
    long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND);
    if (expire <= 0) {
        throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}"
                , DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN)
                , DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN)));
    }

    redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo);
    redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire);
    taskInfo.appendLog("任务存入redis,缓存时长 {} 秒", expire);
}
  1. 通过监听redis的key失效事件,获得待执行的任务;
public class RedisKeyExpiredListener implements MessageListener {

    private final JobExecutor jobExecutor;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) {
            String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length());
            jobExecutor.notify(taskId);
        }
    }
}
  1. 将任务加入待执行的线程池,随后即可执行
/**
 * 任务存入执行队列
 *
 * @param taskInfo 任务
 */
private void executeJob(TaskInfo taskInfo) {
    taskInfo.appendLog("任务存入执行队列");
    Runnable runnable = new JobThread(taskInfo);
    getThreadPoolExecutor().execute(runnable);
}
  1. 根据任务类型分别执行提供数据消费数据流程;
    1. 提供数据
      1. 调用应用API,获取json格式数据;
      2. 根据任务中字段映射 解析json为业务数据Map集合;
      3. 调用数据服务 将业务数据存入MongoDB;
case MdConstant.DATA_PRODUCER:
    // 调用api 获取json
    String json = ApiUtil.read(taskInfo);
    // 将json按字段映射 解析为业务数据
    jobDataService.parseData(taskInfo, json);
    // 根据条件过滤数据
    jobDataFilterService.doFilter(taskInfo);
    // 保存业务数据
    jobDataService.saveTaskData(taskInfo);
    // 更新环境变量
    jobVarService.saveVarValue(taskInfo, json);

    break;
  1. 消费数据
    1. 根据任务所选数据标准,查询业务数据;
    2. 再根据字段映射,将业务数据 转为指定的json对象集合;
    3. 调用应用API,传输json数据;
case MdConstant.DATA_CONSUMER:
    List<BizDataFilter> filters = taskInfo.getDataFilters();
    if (CollUtil.isNotEmpty(filters)) {
        // 解析过滤条件值中的 自定义字符串
        parseFilterValue(filters);
        // 排除值为null的条件
        filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
    }
    // 根据过滤条件 查询数据
    String dataCode = taskInfo.getDataCode();
    if (StrUtil.isNotEmpty(dataCode)) {
        List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
        taskInfo.setConsumeDataList(dataList);
        // 根据字段映射转换为api参数
        jobDataService.convertData(taskInfo);
    }
    // 调用api传输数据
    ApiUtil.write(taskInfo);
    break;
  1. 保存任务执行日志;

到了这里,关于[自研开源] MyData 数据集成的任务流程 v0.7.1的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • OpenPCDet系列 | 7.1 KITTI数据集测试流程predicted_boxes预测

    测试流程的结构图如下所示: generate_predicted_boxes函数 一开始的数据传入为: 首先对于各类预测的特征图进行重新reshape处理,将anchor那一维度进行拼接操作,比如:(16, 248, 216, 42) - (16, 321408, 7)。但是这里需要注意。 特征预测的box信息是基于anchor的一个偏移,也就是编码后的

    2024年02月06日
    浏览(38)
  • (7)(7.1) 使用航点和事件规划任务

    文章目录 前言 6.1.1 设置Home位置 6.1.2 视频:制作并保存多路点任务 6.1.3 视频:加载已保存的多航点任务 6.1.4 使用说明 6.1.5 提示 6.1.6 自动网格 6.1.7 任务指令 6.1.8 任务结束 6.1.9 任务重置 6.1.10 MIS_OPTIONS 6.1.11 任务再出发 6.1.12 任务大小 6.1.13 如何预取已存储的任务地图 6.1.14 偶

    2024年02月12日
    浏览(26)
  • 开源、易集成的Web可视化工具(流程图、组态、SCADA、大屏)

    乐吾乐 Meta2d.js 是一个完全从零自主研发的国产开源 Web 绘图工具。底层使用 canvas,可以自定义扩展丰富等动效。通过引擎 + 图形库+中间件的架构模式能够方便快速的集成到各种前端项目,实现专属自己的可视化平台。 Meta2d.js 集实时数据展示、动态交互、数据管理等一体,

    2024年02月05日
    浏览(46)
  • 云原生开源项目CNSI(Narrows)v0.4发布

    云原生安全开源项目 CNSI(项目代号:Narrows)发布了 0.4 版本,新增了 “cnsi-scanner-trivy” 的组件,帮助用户更容易的进行工作负载的漏洞扫描。使用该组件也可以非常方便的进行镜像内软件包漏洞、错误配置和 license 信息的扫描。除此之外,该版本引入了 Redis DB 用来存储扫

    2024年02月12日
    浏览(38)
  • harbor v1.7.1镜像仓库无法访问,并提示502 Bad Gateway

    在巡检rancher平台时发现有一个服务运行报错了,查看该服务容器事件时提示连接不到harbor镜像仓库。 发现无法访问时,第一时间是通过浏览器去访问harbor仓库是否能正常访问: http://harbor.jx.shu.com 发现无法访问,然后登入到对应的harbor服务器上去查看harbor服务是否正常。 通

    2024年02月19日
    浏览(37)
  • Mistral 7B v0.2 基础模型开源,大模型微调实践来了

    Mistral AI在3月24日突然发布并开源了 Mistral 7B v0.2模型,有如下几个特点: 和上一代Mistral v0.1版本相比, 上下文窗口长度从8k提升到32k ,上下文窗口(context window)是指语言模型在进行预测或生成文本时,所考虑的前一个token或文本片段的大小范围。随着上下文窗口长度的增加

    2024年04月26日
    浏览(37)
  • 数据仓库与数据集成架构:数据仓库与数据仓库规范与标准的制定与应用

    数据仓库是一种用于存储和管理大量结构化数据的系统,它的主要目的是为了支持数据分析和报告。数据仓库通常包括一个或多个数据源,这些数据源可以是来自不同的系统或来自不同的数据库。数据仓库的设计和实现需要考虑到数据的质量、一致性、可用性和安全性等方面

    2024年04月09日
    浏览(44)
  • 开源和自研——机器人

    MPC技术:封闭性非常高。没有开源方案可抄。 因为开源,不需要从0构建。 这也是前两年,国外一开源华为就遥遥领先。 射频芯片/射频天线:技术封闭。华为虽然做通信,但却没有攻破。 鸿蒙:基于AOSP(Android Open Source Project(Android 开源项目)) 国内自动驾驶系统2018年后

    2024年02月12日
    浏览(43)
  • 【入驻流程】自研系统申请奇门appkey

    奇门介绍 “奇门”是阿里巴巴提出的一套“系统与系统之间对接”的解决方案,致力于解决新零售场景下不同系统间通信协作中的相关问题,对于行业的标准场景的对接(如ERP-WMS、ERP-POS等)制定标准化的协议,收敛适配成本,服务商的系统只需一次对接,即可适配所有合作

    2024年02月06日
    浏览(46)
  • 基于 STM32自研多任务+SpringBoot+Vue 农业大棚智能调光系统

    工作以后常常容易感到疲于奔命,即使在周末也没有得到高质量的休息。打工人/学生党如何过周末?你有哪些延长周末和下班时间的好方法吗?- 方法就是多积累,多发博客,将感悟全写出来!!,接下来我给大家展示一个课程设计:源码和硬件端代码就不与展示了,在该博

    2024年02月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包