GitHub:https://github.com/apache/dolphinscheduler
我们对于千亿级数据量的数据同步需求,进行分析和选型后,初灵科技最终决定使用DolphinScheduler进行任务调度,同时需要周期性调度 DataX、SparkSQL 等方式进行海量数据迁移。在日常大数据工作中,利用DolphinScheduler减少日常运维工作量。
讲师介绍
钟霈合
初灵科技 大数据开发工程师
演讲大纲:
海量数据处理
未来的规划
背景介绍
01 自研任务调度
涌现了很多像海豚调度这样非常优秀的任务调度系统,而我们的需求已经到了必须要引入新的任务调度系统程度,来保证技术的更新迭代。
02 需求分析
1、支持多租户的权限控制
如果没有多租户的权限控制的话,那整个集群使用起来都会非常的混乱。
2、上手简单,支持可视化任务管理
上手简单,因为我们团队内部在很多时候,开发会给到数仓/业务团队去使用,如果任务调度上手非常困难,如果需要进行大量的配置或者编写代码,相对成本就要高很多,相信在很多大数据团队都会存在这个需求,并且有些项目需要快速迭代,所以对于选型的工具必然是上手简单的。
3、支持对任务及节点状态进行监控
服务器的监控,可以直接通过任务调度web页面去看,第二是任务调度的监控,针对任务是否成功、执行时间等相关数据和状态能够一目了然。
4、支持较为方便的重跑、补数
比如对于每15分钟或者每小时的数据任务,如果不能很好的支持重跑和补数的话,对我们影响还是比较大的。
5、支持高可用HA、弹性扩容、故障容错
6、支持时间参数
03 任务调度对比
Crontab
不支持多租户权限管理、平台管理、分发执行等功能,在我们公司中的应用是在一些特点服务器跑一些临时的脚本。
Rundeck
像Ansible之类的工具一样,Rundeck能够帮助开发和运维人员更好地管理各个节点。
分为企业版和免费版,免费版对于我们来说功能还是有点欠缺的。
Quartz
需要使用Java编程语言编写任务调度,这对于非研发团队而言,是无法去推广使用的。
xxl-job
其不依赖于大数据组件,而是依赖于MySQL,和海豚调度的依赖项是一样的。
Elastic-Job
设计理念是无中心化的,通过ZooKeeper的选举机制选举出主服务器,如果主服务器挂了,会重新选举新的主服务器。
Azkaban
AirFlow
需要使用Python进行DAG图的绘制,无法做到低代码任务调度。
Oozie
04 选择DolphinScheduler的理由
1、部署简单,Master、Worker各司其职,可线性扩展,不依赖于大数据集群
3、任务类型支持多,DAG图决定了可视化配置及可视化任务血缘
5、能够很好满足工作需求
大数据平台架构
数据流图
海量数据处理
01 数据需求
数据量:每天上千亿条
字段数:上百个字段,String类型居多
数据流程:在数据仓库中进行加工,加工完成的数据放入CK,应用直接查询CK的数据
存储周期:21天~60天
查询响应:对于部分字段需要秒级响应
02 数据同步选型
Sqoop
在DolphinScheduler上也集成了Sqoop的任务调度,但是对于从Hive到ClickHouse的需求,Sqoop是无法支持的。
Flink
其次需要编写程序,这对于后面的运维团队是不方便的。
Spark&SparkSQL
第一种是加工出来的数据不持久化存储,直接通过网络IO往ClickHouse里面去写,这一种方式对于服务器资源的开销是最小的,但是其风险也是最大的,因为加工出来的数据不落盘,在数据同步或者是ClickHouse存储中发现异常,就必须要进行重新加工,但是下面dws、dwd的数据是14天清理一次,所以不落盘这种方式就需要再进行考虑。
第二种方式是加工出来的数据放到Hive中,再使用SparkSQL进行同步,只是这种的话,需要耗费更多的Yarn资源量,所以在一期工程中,因为资源量的限制,我们并没有使用SparkSQL来作为数据同步方案,但是在二期工程中,得到了扩容的集群是完全足够的,我们就将数据加工和数据同步全部更换为了SparkSQL。
SeaTunnel
对于这个场景来说,SeaTunnel需要耗费Yarn资源。
DataX
03 ClickHouse优化
在搞定数据加工和数据同步架构之后,就需要进行ClickHouse的优化。
写入本地表
使用MergeTree表引擎家族
我们在CK中是使用的ReplicatedMergeTree作为数据表的本地表引擎,使用的ReplicatedReplacingMergeTree作为从MySQL迁移过来的数据字典的表引擎。
二级索引优化
在二级索引方面的话我们尝试过minmax、intHash64、halfMD5、farmHash64等,但是对于我们的数据而言的话,要么就是查询慢,要么就是入数据慢,后来改为了bloom_filter之后写入才平衡了。
小文件优化
set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=256000000;
参数优化
parts_to_delay_insert:200000
此外还可以添加background_pool_size参数(我们没有用)。
Zookeeper优化
在不改动源码的情况下,我们做了如下的优化:
在Zookeeper中将dataLogDir、dataDir目录分离
单独部署一套CK集群专供ClickHouse使用,磁盘选择超过1T,然后给的是SSD盘
调整MaxSessionTimeout参数,加大Zookeeper会话最大超时时间
04 海量数据处理架构
一期技术架构
二期架构1
二期架构2
05 数据同步操作
DataX技术原理
DataX在使用上比较简单,两部分一个Reader和一个Writer,在配置上面的话主要也是针对这两部分进行配置。
06 DataX在DS中的应用
export DATAX_HOME=${DATAX_HOME:-/opt/module/datax}
之后DataX可以有三种方式去使用。
第一种方式的使用“自定义模板”,然后在自定义模板中去编写DataX的json语句:
第二种方式是通过DS自带的选型,然后编写SQL去使用DataX,在DS中可以通过可视化界面配置的插件有_MySQL、PostgreSQL、ClickHouse、Oracle、SQLServer:_
第三种方式是在DS中建立shell任务,然后通过shell去调用部署在服务器上的DataX脚本,并且要把脚本放到DS的资源中心里面:
第一种方式对我们来说是最方便也是适配性最强的方式,第二种和第三种的话就要根据情况去使用了。
07 DataX的使用
一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度。
record限速是300M/s,单个channel的record限速是10M/s。
{
但是channel并不是越大越好,过分大反而会影响服务器的性能,会经常的报GC,一报GC的话,性能就会下降。
根据刚才的调控,明显一个DataX任务中的channel数是增多了的,这就表示占用的内存也会增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。
所以我们需要根据配置调整JVM的参数。
$DATAX_HOME:/opt/beh/core/datax/pybin/datax.py --jvm="-Xms8G -Xms8G" -p"-Da=1"
最后一部分就是我们在使用的时候,发现即使对CK做了优化,还是会报parts过多的错误,经过排查,DataX的ClickHouse Writer是通过JDBC远程连接到ClickHouse数据库,然后利用ClickHouse暴露的insert接口将数据insert into到ClickHouse。根据ClickHouse特性,每一次的insert into都是一个parts,所以不能一条数据就insert一次,必须大批量的插入ClickHouse,这也是官方推荐的。
"batchSize": 100000,
应用场景
01 元数据备份
使用DS周期性备份Hive元数据、CDH元数据、HDP元数据、DS自己的元数据,并将其上传到HDFS中进行保存。
02 任务调度
Shell、SparkSQL、Spark、DataX、Flink等任务进行调度,目前的工作点主要是分为新加任务和老任务迁移。
03 甘特图
04 数据清理
未来的规划
1、从某一个任务调度系统往DS进行任务迁移的工具,半自动化,帮助推进DS的在调度领域的应用。
3、从定制化监控转变为插件式监控,从高代码到低代码的转变,时监控告警更加灵活,及早发现节点、工作流、数据库、任务等的问题。
5、集成API网关功能,对协议适配、服务管理、限流熔断、认证授权、接口请求等进行一站式操作。
最后非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界!
将遇到的问题通过 GitHub 上 issue 的形式反馈出来。
帮助完善文档。
为代码添加注释。
发表应用案例实践、调度流程分析或者与调度相关的技术文章。
欢迎加入贡献的队伍,加入开源从提交第一个 PR 开始。
比如添加代码注释或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等 等等,先通过第一个简单的 PR 熟悉提交流程。
相信参与 DolphinScheduler,一定会让您从开源中受益!