基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】

这篇具有很好参考价值的文章主要介绍了基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

基于chunjun纯钧的增量数据同步

目前我司的大数据平台使用的是flink技术栈,底层的连接器插件使用的是国产的chunjun插件,在使用chunjun的过程中也遇到了很多问题,本次记录下在SQL模式的情况下怎么支持增量的数据同步

chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方
根据文档我编写了一个SQL脚本

create table `source` (
        `sfzh` STRING COMMENT '',
        `xm` STRING COMMENT '',
        `xb` STRING COMMENT '',
        `xbdm` STRING COMMENT '',
        `jzdz` STRING COMMENT '',
        `fzrq` DATE COMMENT '',
        `dsc_biz_record_id` STRING COMMENT ''
) with (
        'connector' = 'mysql-x',
        'url' = 'jdbc:mysql://:/?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
        'table-name' = '',
        'username' = '',
        'password' = '',
        'scan.fetch-size' = '1024',
        'scan.increment.column' = 'fzrq',
        --'scan.increment.column-type' = 'date',
        'scan.start-location' = '1659974400000'
);

create table `sink` (
        `sfzh` STRING COMMENT '',
        `xm` STRING COMMENT '',
        `xb` STRING COMMENT '',
        `xbdm` STRING COMMENT '',
        `jzdz` STRING COMMENT '',
        `fzrq` DATE COMMENT '',
        `dsc_biz_record_id` STRING COMMENT '',
        PRIMARY KEY (`dsc_biz_record_id`) NOT ENFORCED
) with (
        'connector' = 'stream-x'
);

然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!

在本地调试源码解决问题的大致过程

在类 com.dtstack.chunjun.source.format.BaseRichInputFormat中有一个成员变量

/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
protected transient CustomReporter customReporter;

该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

    @Override
    public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

通过排查useCustomReporter方法得知 jdbcConf.getInitReporter()是false,而在JdbcConfig类里面这个对象默认是true

 /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
    @Override
    protected boolean useCustomReporter() {
        return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
    }

    /** 增量同步或者间隔轮询时,是否初始化外部存储 */
    protected Boolean initReporter = true;

经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式
基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】
尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了
基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】
那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

后续问题

打包到虚拟机进行测试,我使用的是yarn-per-job模式,提交任务后发现报找不到Prometheus报告类的异常,通过异常信息发现在前面提到的方法里有classloader

public void openInputFormat() throws IOException {
        Map<String, String> vars = getRuntimeContext().getMetricGroup().getAllVariables();
        if (vars != null) {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName");
            jobId = vars.get(Metrics.JOB_NAME);
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX));
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter());
        if (useCustomReporter()) {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(), makeTaskFailedWhenReportFailed());
            customReporter.open();
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode());
        }

        startTime = System.currentTimeMillis();
    }

    public static CustomReporter discoverMetric(
            ChunJunCommonConf commonConf,
            RuntimeContext context,
            boolean makeTaskFailedWhenReportFailed) {
        try {
            String pluginName = commonConf.getMetricPluginName();
            // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
            String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric);
            MetricParam metricParam =
                    new MetricParam(
                            context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps());

            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            Class<?> clazz = classLoader.loadClass(pluginClassName);
            Constructor<?> constructor = clazz.getConstructor(MetricParam.class);

            return (CustomReporter) constructor.newInstance(metricParam);
        } catch (Exception e) {
            throw new ChunJunRuntimeException(e);
        }
    }

在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!文章来源地址https://www.toymoban.com/news/detail-411423.html

到了这里,关于基于chunjun纯钧的增量数据同步问题排查【博客园-实习小生】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Canal与Flink实现数据实时增量同步(一),计算机毕设源码要提交吗

    配置修改 修改conf/example/instance.properties,修改内容如下: canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = kms-1.apache.com:3306 #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.mq.topic

    2024年04月12日
    浏览(43)
  • 【kafka】JDBC connector进行表数据增量同步过程中的源表与目标表时间不一致问题解决...

    〇、参考资料 时间不一致,差了8个小时 (1)source (2)sink 即sink和source都加  \\\"db.timezone\\\": \\\"Asia/Shanghai\\\", 并需要保持一直

    2024年02月11日
    浏览(26)
  • Elasticsearch实战-数据同步(解决es数据增量同步)

    之前测试的数据都是一次从mysql导入到es,随着时间的推移,每天都有可能发生增删改查,不可能每次都全量同步,所以需要考虑增量同步问题。 缺点: 耦合性高,服务之间会相互影响 依赖消息队列的可靠性 启动:端口8099

    2024年02月11日
    浏览(61)
  • Redis主从架构、数据同步原理、全量同步、增量同步

    大家好,我是哪吒。 2023年再不会Redis,就要被淘汰了 图解Redis,谈谈Redis的持久化,RDB快照与AOF日志 Redis单线程还是多线程?IO多路复用原理 Redis集群的最大槽数为什么是16384个? Redis缓存穿透、击穿、雪崩到底是个啥?7张图告诉你 Redis分布式锁的实现方式 Redis分布式缓存、

    2024年02月07日
    浏览(56)
  • flinkcdc同步完全量数据就不同步增量数据了

    使用flinkcdc同步mysql数据,使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量阶段同步完成之后,发现并不开始同步增量数据,原因有以下两个: 1.mysql中对应的数据库没有开启binlog 在/etc/my.cnf配置文件中,在[ mysqld ]添加以下内容 然后重启数据库 ,执行命令 和chec

    2024年02月11日
    浏览(23)
  • Maxwell - 增量数据同步工具

            今天来学习一个新的大数据小工具 Maxwell ,它和 Sqoop 很像。Sqoop主要用于在 Hadoop (比如 HDFS、Hive、HBase 等)和关系型数据库之间进行数据的批量导入和导出,而 Maxwell 则主要用于监控数据库的变化(通过监控 binlog ),并将变化的数据以JSON格式发布到消息队列(一

    2024年02月20日
    浏览(29)
  • 【大数据精讲】全量同步与CDC增量同步方案对比

    目录 背景 名词解释 问题与挑战 FlinkCDC DataX 工作原理 调度流程 五、DataX 3.0六大核心优势 性能优化 CDC        CDC又称变更数据捕获(Change Data Capture),开启cdc的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。CDC通过捕获进程将变更数据捕获到变更表中

    2024年01月24日
    浏览(33)
  • ELK增量同步数据【MySql->ES】

            1.  linux,已经搭建好的logstash+es+kibana【系列版本7.0X】,es 的plugs中安装ik分词器 ES版本:  Logstash版本:  (以上部署,都是运维同事搞的,我不会部署,同事给力) 1、在Logstash安装目录下【/usr/share/logstash】,新建XX.sh,内容如下: 2. 在Logstash安装目录下【/usr/shar

    2024年02月11日
    浏览(33)
  • DBSyncer安装_配置postgresql和mysql_sqlserver_oracel全量增量同步---数据全量增量同步之DBSyncer001

         国内做开源的大神做的,用了一下还可以,就是不能和Phoenix这种操作hbase等数据库一起用, 这个是官网,下载安装非常简单,官网也有中文详细说明. 直接下载安装包: 然后解压到某个地方,主要要用unzip dbsyncer.zip -d /opt/module这样解压 解压后直接启动就可以了    解压以后进入

    2024年02月09日
    浏览(42)
  • Oracle通过函数调用dblink同步表数据方案(全量/增量)

    创建对应的包,以方便触发调用 触发同步任务: SELECT yjb.pkg_scene_job.F_SYNC_DRUG_STOCK() AS a FROM dual WHERE 1=0; 没有结果行时是不会触发的,以下方式可触发: SELECT yjb.pkg_scene_job.F_SYNC_DRUG_STOCK() AS a FROM dual; PS:一定是使用(调用)到 触发函数yjb.pkg_scene_job.F_SYNC_DRUG_STOCK(),才可完成触

    2024年02月16日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包