01前言
由于数据治理层面可以分多个层面且内容繁多(包括模型合规、数据质量、数据安全、计算/存储资源、数据价值等治理内容),因此需要单独拆分为6个模块单独去阐述其中内容。
02问题出现
在做计算治理之前(2022.12)我们团队盘点了下当前计算资源存在的几个问题:
(2)200w+的小文件:当前任务存在未合并小文件、任务Reduce数量过多、上游数据源接入(尤其是API数据接入)会造成过多小文件出现,小文件过多会开启更多数据读取,执行会浪费大量的资源,严重影响性能;
(4)线上无效DQC(数据质量监控)&监控配置资源过小:存在部分历史任务没下线表及DQC场景,每日都在空跑无意义DQC浪费资源,同时DQC资源过少导致DQC需要运行过长时间;
(6)任务缺少调优参数&部分任务仍然使用MapReduce/Spark2计算引擎:任务缺少调优参数导致资源不能适配及动态调整,甚至线上仍有早期配置MapReduce/Spark2计算引擎导致运行效率较低。
03思考与行动
3.1 治理前的思考:
经过与团队多次脑暴对当前治理优先级/改动成本大小/难度做了一个排序,我们先选择从简单的参数调优&任务引擎切换开始->小文件治理->DQC治理->高消耗任务治理->调度安排->下线无用模型及沉淀指标到其他数据资产,同时在初期我们完成各类元数据接入搭建治理看板以及团队治理产出统计数据模型,并通过网易数帆提供的数据治理平台解决具体细节问题。
数据治理平台截图
3.2 治理行动:
(1)大部分任务切换至Spark3计算引擎&补充任务调优参数
AQE解释:Spark 社区在 DAG Scheduler 中,新增了一个 API 在支持提交单个 Map 阶段,以及在运行时修改 shuffle 分区数等等,而这些就是 AQE,在 Spark 运行时,每当一个 Shuffle、Map 阶段进行完毕,AQE 就会统计这个阶段的信息,并且基于规则进行动态调整并修正还未执行的任务逻辑计算与物理计划(在条件运行的情况下),使得 Spark 程序在接下来的运行过程中得到优化。
(2)小文件治理
当前小文件处理:
1 set hive.exec.dynamic.partition.mode=nonstrict;
2 insert overwrite table xxx.xxx partition (ds
3 select column
4 ,ds
5 from xxx.xxx
对于分区较少或未分区的表采用重建表,补数据方法回刷。
- 使用Spark3引擎,自动合并小文件
- 减少Reduce的数量(可以使用参数进行控制
- 用Distribute By Rand控制分区中数据量
- 添加合并小文件参数
- 将数据源抽取后的表做一个任务(本质也是回刷分区合并小文件任务)去处理小文件保障从数据源开始小文件不向下游流去
(3)DQC治理
DQC资源:由于之前DQC配置资源为集群默认参数,效率极低导致所有DQC运行时长均超过10min,从而使得整体任务链路运行时长过久,调整Driver内存为2048M,Executor个数为2,Executor内存为4096M
(4)高消耗任务调优
- 关联表过多,需拆分
- 关联时一对多,数据膨胀
- 资源配置过多,运行时资源严重浪费,需要将配置调小(包括Driver内存、Executor个数、Executor内存)
- 代码结尾添加Distribute By Rand(),用来控制Map输出结果的分发
- 查询中列和行未裁剪、分区未限定、Where条件未限定
- SQL中Distinct切换为Group by(Distinct会被hive翻译成一个全局唯一Reduce任务来做去重操作,Group by则会被hive翻译成分组聚合运算,会有多个Reduce任务并行处理,每个Reduce对收到的一部分数据组,进行每组聚合(去重))
- 关联后计算切换为子查询计算好后再关联
- 使用Map Join(Map Join会把小表全部读入内存中,在Map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在Map是进行了Join操作,省去了Reduce运行的效率也会高很多)可用参数代替
(5)任务调度合理优化
- 找到所有表的输出输入点即启始ODS与末尾ADS
- 划分其中核心表/非核心表,及对应任务开始时间与结束时间
- 按照梳理内容把非核心的任务穿插在当前集群资源非高峰时期(2点前与5点后),同时把核心任务调度提前,保障CDM层任务及时产出
- 对实践后内容再度调优,达到资源最大利用率
(6)烟囱任务下沉&无用任务下线
04治理效果
(1)Hive与Spark2任务升级Spark3.1,总计升级任务137个,升级任务后总体任务执行效率提升43%,cpu资源消耗降低41%,内存资源消耗降低46%
(3)下线无效DQC任务总计50+,修改DQC配置资源降低运行时长,由原来10min优化至3min内
(5)调度重新分配后2-5点资源使用率由90+%降低至50+%,保障日用资源趋势图无大突刺波动
05小结
计算资源治理是一件长久之事,并不能因为资源紧张才去治理,而要将计算治理常态化,可通过周/月资源扫描内容及时推送给每个同学,并为之打分,让每个任务都有源可循,有方法可优化。
参数内容
Hive:
(1)set hive.auto.convert.join = true; (是否自动转化成Map Join)
(3)set hive.groupby.skewindata=true; (用于控制负载均衡,当数据出现倾斜时,如果该变量设置为true,那么Hive会自动进行负载均衡)
(5)set mapreduce.map.memory.mb=4096; (设置Map内存大小,解决Memory占用过大/小)
(7)set hive.exec.dynamic.partition.mode=nonstrict;(动态分区开启)
Spark:
(2)set spark.sql.adaptive.enabled=true;(是否开启调整Partition功能,如果开启,spark.sql.shuffle.partitions设置的Partition可能会被合并到一个Reducer里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个Executor的性能,还能缓解小文件问题)
(4)set spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes=2048M;(Spark小文件合并)
作者简介: 语兴,网易数据开发工程师。
免费试用网易数据治理产品