基于chunjun纯钧的增量数据同步
chunjun的官网文档对增量同步已经做出了一定的说明
纯钧官方
根据文档我编写了一个SQL脚本
create table `source` (
`sfzh` STRING COMMENT '公民身份号码',
`xm` STRING COMMENT '姓名',
`xb` STRING COMMENT '性别',
`xbdm` STRING COMMENT '性别代码',
`jzdz` STRING COMMENT '居住地址',
`fzrq` DATE COMMENT '发证日期',
`dsc_biz_record_id` STRING COMMENT '唯一自增序列号'
with (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.14.236:3306/zxk?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true',
'table-name' = 'ods_gsq_yjrcjzzxx',
'username' = 'root',
'password' = 'taotao0226.?',
'scan.fetch-size' = '1024',
'scan.increment.column' = 'fzrq',
--'scan.increment.column-type' = 'date',
'scan.start-location' = '1659974400000'
;
create table `sink` (
`sfzh` STRING COMMENT '公民身份号码',
`xm` STRING COMMENT '姓名',
`xb` STRING COMMENT '性别',
`xbdm` STRING COMMENT '性别代码',
`jzdz` STRING COMMENT '居住地址',
`fzrq` DATE COMMENT '发证日期',
`dsc_biz_record_id` STRING COMMENT '唯一自增序列号',
PRIMARY KEY (`dsc_biz_record_id` NOT ENFORCED
with (
'connector' = 'stream-x'
;
然后提交任务的时候发现已经记录了start-location 和 start-location的指标信息了,但是并没有上报到Prometheus!
在本地调试源码解决问题的大致过程
/** 自定义的prometheus reporter,用于提交startLocation和endLocation指标 */
protected transient CustomReporter customReporter;
该变量是用来提交增量信息的对象,flink任务在开始的时候会执行一下方法
@Override
public void openInputFormat( throws IOException {
Map<String, String> vars = getRuntimeContext(.getMetricGroup(.getAllVariables(;
if (vars != null {
jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName";
jobId = vars.get(Metrics.JOB_NAME;
indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX;
}
LOG.info("是否使用自定义报告 {}", useCustomReporter(;
if (useCustomReporter( {
customReporter =
DataSyncFactoryUtil.discoverMetric(
config, getRuntimeContext(, makeTaskFailedWhenReportFailed(;
customReporter.open(;
LOG.info("customReporter 的hashcode is {}", customReporter.hashCode(;
}
startTime = System.currentTimeMillis(;
}
通过排查useCustomReporter方法得知 jdbcConf.getInitReporter(是false,而在JdbcConfig类里面这个对象默认是true
/** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
@Override
protected boolean useCustomReporter( {
return jdbcConf.isIncrement( && jdbcConf.getInitReporter(;
}
/** 增量同步或者间隔轮询时,是否初始化外部存储 */
protected Boolean initReporter = true;
经过查找 initReporter 属性的set方法调用,找到了下面的问题
在类 com.dtstack.chunjun.connector.jdbc.source.JdbcDynamicTableSource 中有个地方说暂时不支持SQL的方式
尝试一下将false修改为true,然后在本地进行测试,测试的时候将pushgateway的host和port写到代码里面,执行任务发现pushgateway里面已经有数据了
那么可以开始打包了,由于改了源代码,所以要先格式化代码 mvn spotless:apply 再打包 mvn clean package -DskipTests
后续问题
public void openInputFormat( throws IOException {
Map<String, String> vars = getRuntimeContext(.getMetricGroup(.getAllVariables(;
if (vars != null {
jobName = vars.getOrDefault(Metrics.JOB_NAME, "defaultJobName";
jobId = vars.get(Metrics.JOB_NAME;
indexOfSubTask = Integer.parseInt(vars.get(Metrics.SUBTASK_INDEX;
}
LOG.info("是否使用自定义报告 {}", useCustomReporter(;
if (useCustomReporter( {
customReporter =
DataSyncFactoryUtil.discoverMetric(
config, getRuntimeContext(, makeTaskFailedWhenReportFailed(;
customReporter.open(;
LOG.info("customReporter 的hashcode is {}", customReporter.hashCode(;
}
startTime = System.currentTimeMillis(;
}
public static CustomReporter discoverMetric(
ChunJunCommonConf commonConf,
RuntimeContext context,
boolean makeTaskFailedWhenReportFailed {
try {
String pluginName = commonConf.getMetricPluginName(;
// 这里获取到了类的全限定名 com.dtstack.chunjun.metrics.prometheus.PrometheusReport
String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.metric;
MetricParam metricParam =
new MetricParam(
context, makeTaskFailedWhenReportFailed, commonConf.getMetricProps(;
ClassLoader classLoader = Thread.currentThread(.getContextClassLoader(;
Class<?> clazz = classLoader.loadClass(pluginClassName;
Constructor<?> constructor = clazz.getConstructor(MetricParam.class;
return (CustomReporter constructor.newInstance(metricParam;
} catch (Exception e {
throw new ChunJunRuntimeException(e;
}
}
在本地的时候这里加载类的时候是没问题的,但是在线上的时候出现了了找不到类的异常,猜测是相关的jar没有加载到flink jvm进程里面,所以将项目里面的 chunjun-metrics-prometheus.jar 放到了flink的lib目录下,再次启动任务 问题得以解决!