基于chunjun纯钧的增量数据同步问题排查[博客园-实习小生]

科技资讯 投稿 7300 0 评论

基于chunjun纯钧的增量数据同步问题排查[博客园-实习小生]

基于chunjun纯钧的增量数据同步

chunjun的官网文档对增量同步已经做出了一定的说明

纯钧官方
根据文档我编写了一个SQL脚本

create table `source` (
        `sfzh` STRING COMMENT '公民身份号码',
        `xm` STRING COMMENT '姓名',
        `xb` STRING COMMENT '性别',
        `xbdm` STRING COMMENT '性别代码',
        `jzdz` STRING COMMENT '居住地址',
        `fzrq` DATE COMMENT '发证日期',
        `dsc_biz_record_id` STRING COMMENT '唯一自增序列号'
 with (
        'connector' = 'mysql-x',
        'url' = 'jdbc:mysql://192.168.14.236:3306/zxk?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
        'table-name' = 'ods_gsq_yjrcjzzxx',
        'username' = 'root',
        'password' = 'taotao0226.?',
        'scan.fetch-size' = '1024',
        'scan.increment.column' = 'fzrq',
        --'scan.increment.column-type' = 'date',
        'scan.start-location' = '1659974400000'
;

create table `sink` (
        `sfzh` STRING COMMENT '公民身份号码',
        `xm` STRING COMMENT '姓名',
        `xb` STRING COMMENT '性别',
        `xbdm` STRING COMMENT '性别代码',
        `jzdz` STRING COMMENT '居住地址',
        `fzrq` DATE COMMENT '发证日期',
        `dsc_biz_record_id` STRING COMMENT '唯一自增序列号',
        PRIMARY KEY (`dsc_biz_record_id` NOT ENFORCED
 with (
        'connector' = 'stream-x'
;

然后提交任务的时候发现已经记录了start-locationstart-location的指标信息了,但是并没有上报到Prometheus!

在本地调试源码解决问题的大致过程

/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
protected transient CustomReporter customReporter;

该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法

    @Override
    public void openInputFormat( throws IOException {
        Map<String, String> vars = getRuntimeContext(.getMetricGroup(.getAllVariables(;
        if (vars != null {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName";
            jobId = vars.get(Metrics.JOB_NAME;
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX;
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter(;
        if (useCustomReporter( {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(, makeTaskFailedWhenReportFailed(;
            customReporter.open(;
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode(;
        }

        startTime = System.currentTimeMillis(;
    }

通过排查useCustomReporter方法得知 jdbcConf.getInitReporter(是false,而在JdbcConfig类里面这个对象默认是true

 /** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
    @Override
    protected boolean useCustomReporter( {
        return jdbcConf.isIncrement( && jdbcConf.getInitReporter(;
    }

    /** 增量同步或者间隔轮询时,是否初始化外部存储 */
    protected Boolean initReporter = true;

经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式
尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了
那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests

后续问题

public void openInputFormat( throws IOException {
        Map<String, String> vars = getRuntimeContext(.getMetricGroup(.getAllVariables(;
        if (vars != null {
            jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName";
            jobId = vars.get(Metrics.JOB_NAME;
            indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX;
        }

        LOG.info("是否使用自定义报告 {}", useCustomReporter(;
        if (useCustomReporter( {

            customReporter =
                    DataSyncFactoryUtil.discoverMetric(
                            config, getRuntimeContext(, makeTaskFailedWhenReportFailed(;
            customReporter.open(;
            LOG.info("customReporter 的hashcode is {}", customReporter.hashCode(;
        }

        startTime = System.currentTimeMillis(;
    }

    public static CustomReporter discoverMetric(
            ChunJunCommonConf commonConf,
            RuntimeContext context,
            boolean makeTaskFailedWhenReportFailed {
        try {
            String pluginName = commonConf.getMetricPluginName(;
            // 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
            String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric;
            MetricParam metricParam =
                    new MetricParam(
                            context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps(;

            ClassLoader classLoader = Thread.currentThread(.getContextClassLoader(;
            Class<?> clazz = classLoader.loadClass(pluginClassName;
            Constructor<?> constructor = clazz.getConstructor(MetricParam.class;

            return (CustomReporter constructor.newInstance(metricParam;
        } catch (Exception e {
            throw new ChunJunRuntimeException(e;
        }
    }

在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!

编程笔记 » 基于chunjun纯钧的增量数据同步问题排查[博客园-实习小生]

赞同 (32) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽