挑战海量数据:基于Apache DolphinScheduler对千亿级数据应用实践

科技资讯 投稿 39900 0 评论

挑战海量数据:基于Apache DolphinScheduler对千亿级数据应用实践

GitHub:https://github.com/apache/dolphinscheduler

我们对于千亿级数据量的数据同步需求,进行分析和选型后,初灵科技最终决定使用DolphinScheduler进行任务调度,同时需要周期性调度 DataX、SparkSQL 等方式进行海量数据迁移。在日常大数据工作中,利用DolphinScheduler减少日常运维工作量。

讲师介绍

钟霈合

初灵科技 大数据开发工程师

演讲大纲:

  1. 海量数据处理

  2. 未来的规划

背景介绍

01 自研任务调度

涌现了很多像海豚调度这样非常优秀的任务调度系统,而我们的需求已经到了必须要引入新的任务调度系统程度,来保证技术的更新迭代。

02 需求分析

1、支持多租户的权限控制

如果没有多租户的权限控制的话,那整个集群使用起来都会非常的混乱。

2、上手简单,支持可视化任务管理

上手简单,因为我们团队内部在很多时候,开发会给到数仓/业务团队去使用,如果任务调度上手非常困难,如果需要进行大量的配置或者编写代码,相对成本就要高很多,相信在很多大数据团队都会存在这个需求,并且有些项目需要快速迭代,所以对于选型的工具必然是上手简单的

3、支持对任务及节点状态进行监控

服务器的监控,可以直接通过任务调度web页面去看,第二是任务调度的监控,针对任务是否成功、执行时间等相关数据和状态能够一目了然。

4、支持较为方便的重跑、补数

比如对于每15分钟或者每小时的数据任务,如果不能很好的支持重跑和补数的话,对我们影响还是比较大的。

5、支持高可用HA、弹性扩容、故障容错

6、支持时间参数

03 任务调度对比

Crontab

不支持多租户权限管理、平台管理、分发执行等功能,在我们公司中的应用是在一些特点服务器跑一些临时的脚本。

Rundeck

像Ansible之类的工具一样,Rundeck能够帮助开发和运维人员更好地管理各个节点。

分为企业版和免费版,免费版对于我们来说功能还是有点欠缺的。

Quartz

需要使用Java编程语言编写任务调度,这对于非研发团队而言,是无法去推广使用的。

xxl-job

其不依赖于大数据组件,而是依赖于MySQL,和海豚调度的依赖项是一样的。

Elastic-Job

设计理念是无中心化的,通过ZooKeeper的选举机制选举出主服务器,如果主服务器挂了,会重新选举新的主服务器。

Azkaban

AirFlow

需要使用Python进行DAG图的绘制,无法做到低代码任务调度。

Oozie

04 选择DolphinScheduler的理由

1、部署简单,Master、Worker各司其职,可线性扩展,不依赖于大数据集群

3、任务类型支持多,DAG图决定了可视化配置及可视化任务血缘

5、能够很好满足工作需求

大数据平台架构

数据流图

海量数据处理

01 数据需求

数据量:每天上千亿条

字段数:上百个字段,String类型居多

数据流程:在数据仓库中进行加工,加工完成的数据放入CK,应用直接查询CK的数据

存储周期:21天~60天

查询响应:对于部分字段需要秒级响应

02 数据同步选型

Sqoop

在DolphinScheduler上也集成了Sqoop的任务调度,但是对于从Hive到ClickHouse的需求,Sqoop是无法支持的。

Flink

其次需要编写程序,这对于后面的运维团队是不方便的。

Spark&SparkSQL

第一种是加工出来的数据不持久化存储,直接通过网络IO往ClickHouse里面去写,这一种方式对于服务器资源的开销是最小的,但是其风险也是最大的,因为加工出来的数据不落盘,在数据同步或者是ClickHouse存储中发现异常,就必须要进行重新加工,但是下面dws、dwd的数据是14天清理一次,所以不落盘这种方式就需要再进行考虑。

第二种方式是加工出来的数据放到Hive中,再使用SparkSQL进行同步,只是这种的话,需要耗费更多的Yarn资源量,所以在一期工程中,因为资源量的限制,我们并没有使用SparkSQL来作为数据同步方案,但是在二期工程中,得到了扩容的集群是完全足够的,我们就将数据加工和数据同步全部更换为了SparkSQL。

SeaTunnel

对于这个场景来说,SeaTunnel需要耗费Yarn资源。

DataX

03 ClickHouse优化

在搞定数据加工和数据同步架构之后,就需要进行ClickHouse的优化。

写入本地表

使用MergeTree表引擎家族

我们在CK中是使用的ReplicatedMergeTree作为数据表的本地表引擎,使用的ReplicatedReplacingMergeTree作为从MySQL迁移过来的数据字典的表引擎。

二级索引优化

在二级索引方面的话我们尝试过minmax、intHash64、halfMD5、farmHash64等,但是对于我们的数据而言的话,要么就是查询慢,要么就是入数据慢,后来改为了bloom_filter之后写入才平衡了。

小文件优化

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=256000000;

参数优化

parts_to_delay_insert:200000

此外还可以添加background_pool_size参数(我们没有用)。

Zookeeper优化

在不改动源码的情况下,我们做了如下的优化:

    调整MaxSessionTimeout参数,加大Zookeeper会话最大超时时间

  1. 在Zookeeper中将dataLogDir、dataDir目录分离

  2. 单独部署一套CK集群专供ClickHouse使用,磁盘选择超过1T,然后给的是SSD盘

04 海量数据处理架构

一期技术架构

二期架构1

二期架构2

05 数据同步操作

DataX技术原理

DataX在使用上比较简单,两部分一个Reader和一个Writer,在配置上面的话主要也是针对这两部分进行配置。

06 DataX在DS中的应用

export DATAX_HOME=${DATAX_HOME:-/opt/module/datax}

之后DataX可以有三种方式去使用。

第一种方式的使用“自定义模板”,然后在自定义模板中去编写DataX的json语句:

第二种方式是通过DS自带的选型,然后编写SQL去使用DataX,在DS中可以通过可视化界面配置的插件有_MySQL、PostgreSQL、ClickHouse、Oracle、SQLServer:_

第三种方式是在DS中建立shell任务,然后通过shell去调用部署在服务器上的DataX脚本,并且要把脚本放到DS的资源中心里面:

第一种方式对我们来说是最方便也是适配性最强的方式,第二种和第三种的话就要根据情况去使用了。

07 DataX的使用

一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是1MB/s,可以根据具体硬件情况设置这个byte速度或者record速度,一般设置byte速度。

record限速是300M/s,单个channel的record限速是10M/s。

{

但是channel并不是越大越好,过分大反而会影响服务器的性能,会经常的报GC,一报GC的话,性能就会下降。

根据刚才的调控,明显一个DataX任务中的channel数是增多了的,这就表示占用的内存也会增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。

所以我们需要根据配置调整JVM的参数。

$DATAX_HOME:/opt/beh/core/datax/pybin/datax.py --jvm="-Xms8G -Xms8G" -p"-Da=1"

最后一部分就是我们在使用的时候,发现即使对CK做了优化,还是会报parts过多的错误,经过排查,DataX的ClickHouse Writer是通过JDBC远程连接到ClickHouse数据库,然后利用ClickHouse暴露的insert接口将数据insert into到ClickHouse。根据ClickHouse特性,每一次的insert into都是一个parts,所以不能一条数据就insert一次,必须大批量的插入ClickHouse,这也是官方推荐的。

"batchSize": 100000,

应用场景

01 元数据备份

使用DS周期性备份Hive元数据、CDH元数据、HDP元数据、DS自己的元数据,并将其上传到HDFS中进行保存。

02 任务调度

Shell、SparkSQL、Spark、DataX、Flink等任务进行调度,目前的工作点主要是分为新加任务和老任务迁移。

03 甘特图

04 数据清理

未来的规划

1、从某一个任务调度系统往DS进行任务迁移的工具,半自动化,帮助推进DS的在调度领域的应用。

3、从定制化监控转变为插件式监控,从高代码到低代码的转变,时监控告警更加灵活,及早发现节点、工作流、数据库、任务等的问题。

5、集成API网关功能,对协议适配、服务管理、限流熔断、认证授权、接口请求等进行一站式操作。

最后非常欢迎大家加入 DolphinScheduler 大家庭,融入开源世界!

  • 将遇到的问题通过 GitHub 上 issue 的形式反馈出来。

  • 帮助完善文档。

  • 为代码添加注释。

  • 发表应用案例实践、调度流程分析或者与调度相关的技术文章。

欢迎加入贡献的队伍,加入开源从提交第一个 PR 开始。

  • 比如添加代码注释或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等 等等,先通过第一个简单的 PR 熟悉提交流程。

相信参与 DolphinScheduler,一定会让您从开源中受益!

编程笔记 » 挑战海量数据:基于Apache DolphinScheduler对千亿级数据应用实践

赞同 (81) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽