资源节省超 50%!作业帮 Spark 全面替换 Hive 的技术实践
资源节省超 50%!作业帮 Spark 全面替换 Hive 的技术实践
作业帮在大数据计算引擎方面进行了重要的技术升级,从传统的Hive迁移到了Spark SQL。这一技术实践不仅带来了显著的资源节省,还提升了系统的稳定性和性能。本文详细介绍了这一技术升级的背景、架构设计、迁移过程和优化措施,以及最终取得的收益。
历史背景
作业帮历史数据计算引擎主要依赖 Apache Hive 2.3.7,主要用于数仓建设、即席查询、算法特征分析、实验效果统计等方面。虽然 Hive 在数据管理和计算方面有自己的优势,但随着湖技术、云原生、引擎向量化等技术发展,以及业务对成本敏感程度的变化,Hive 逐渐暴露出一些局限性,主要体现在引擎生态、资源利用效率和系统稳定性方面。
Spark 作为计算引擎基本已经成为行业大数据标配。能力上不仅有 SQL,还有 ML、Steaming、GraphX,以及对各种编程语言的支持。在 Catalyst 优化器、内存计算等资源利用效率方面明显优于 Hive。同时官方支持除 Yarn 部署模式外还支持 K8S,为在离线资源峰谷互补提供支持。Spark 作为事实上的标配,也吸引了很多外围开源项目的深度适配,较 Hive 的简单支持在性能、功能方面差异很大,例如 Apache Iceberg、Apache Kyuubi。
Hive 由 hive metastore 和 hiveserver2 两部分组成,metastore 主要解决 Hive 表数据管理,也是目前行业的主流方案,hiveserver2 主要用于接收 SQL 任务请求转为物理执行计划提交给 Yarn 等资源执行。随着湖相关技术的不断成熟,Databricks、Snowflake、公有云 EMR 等商业组织积极推广。Databricks 收购 Iceberg 背后公司 Tabular 后,Unity Catalog、Apache Gravitino 等元数据技术更是被提到了新的高度,用于统一解决 Data + AI 的数据管理,Hive metastore 也将会被逐渐替代。而 hiveserver2 抛开 Hive 计算引擎本身的没落,自身稳定性不足也比较突出,尤其在资源隔离、限流控制等方面。
在行业技术趋势以及业务需求上考虑,我们期望通过计算引擎迭代,实现规模性降本、整体架构可长期演进,系统性提高服务稳定性。
总体架构
结合作业帮当前 Hive 应用情况,Spark SQL 替换 Hive 在技术架构上核心需要考虑 SQL 提交、认证和鉴权、以及数据开发平台适配。
SQL 提交开源解决方案主要有 Apache Kyuubi、Spark Thrift Server。STS 是 Spark 主导的 SQL 解析和优化器,兼容了 HiveServer2 的 Thrift 通信协议,但是 Thrift Server 和 Driver 在同一个进程内,所有 SQL 都由这个 Driver 负责编译、执行,此阶段资源消耗高,高并发情况容易造成服务节点异常。Kyuubi 则更加灵活,兼容了 STS 服务能力,Kyuubi Server 基于 ZooKeeper 实现服务发现机制保障高可用,针对 ETL、即系查询等不同应用场景设计灵活、可选的 Engine 共享级别。具体对比如下图 (引自 Apache Kyuubi 微信公众号)
认证和鉴权部分在平台(调度 / 即系等系统)和客户端(beeline)访问处理逻辑并不统一,解决起来会稍微麻烦些。为了兼顾项目效率我们采用了与 HiveServer2 类似的鉴权逻辑。如下图简单举例说明整体流程。
Beeline 客户端访问时,通过 Kyuubi 实现信息透传 Spark App,二次开发 Hive Metastore 实现认证,结合 Ranger Hive Metastore Plugin 实现鉴权。相对来说,平台实现会简单些,系统直接和公司内部 IPS 系统对接实现认证。服务入口获取相应 SQL 并解析,在通过 AuthServer 权限服务进行权限校验,较 Client 方式执行速度更快,不需要等待资源创建 Kyuubi Engine。认证鉴权完成后会以超级用户继续走后续流程。本质上账号体系是两套,为了实现一次授权双向可用,用户名必须统一。
迁移和优化
任务迁移
整体迁移大概分为两个阶段,一是部门内部流量数仓由 Hive 迁移到 Spark SQL + Iceberg,同时沉淀平台能力。这部分主要为提供基础能力,人工执行迁移动作,规模性验证收益效果。此处不多介绍,可参考 https://www.infoq.cn/article/SXZhixk65MjUfEecrhGu。二是全面推广 Spark 引擎,规模性迁移快速拿到收益。
开始考虑仍然以业务人工自主迁移的方式进行,历史存量 Hive 任务有 1W+,牵扯多个组织角色和团队,少量团队结合当前成本压力意愿度较高,但从全局视角看节奏远低预期。整体上可投入精力有限,新引擎学习、迁移流程复杂(SQL 转换、双跑对数)、遇到问题后迁移意愿降低是主要影响因素。
为尽快拿到收益,以工具化迁移替代人工,大概流程如下图。首先通过调用离线调度平台接口获取 Hive SQL,替换 SQL 中时间等变量占位符,形成可执行 Hive SQL。解析可执行 Hive SQL 基于规则转换为 Spark SQL,同时在测试库下创建目标表,并替换输出结果到测试表,再做 explain 校验。资源低峰期执行 Spark SQL 任务,对比 Hive 产出数据结果(对比方式见下文)。对比收集 Yarn Application 中 Aggregate Resource Allocation 资源消耗指标。数据准确且资源收益为正,持续双跑一段时间无异常后切换为 Spark 引擎,并关注迁移任务接下来一周异常情况,迁移首天出现异常可自动回滚。整个过程当出现超预期问题时,人工分析具体原因迭代转换规则、优化集群和任务参数。经过几轮迭代,可迁移任务覆盖率逐步提高。
迁移工具流程图
基础类型对数 sum(hash(xxx)) 即可,复杂类型需要做些特殊处理,例如
array类型:sum(hash(array_sort(array())))
map类型:sum(hash(array_sort(map_entries(map()))))
array类型:sum(hash(array_sort(flatten(transform(array(map()), x -> map_entries(x))))))
在控制增量任务时,用户视角主要关注以下情况,一是平台功能是否对齐;二是 SQL 语法是否存在差异;三是是否稳定;四是新引擎的收益。在平台功能上,主要是即席查询、例行调度、表 / 任务血缘、语法 / 语义检测、自定义 UDF、Holo/ES 外表查询等方面,随着存量任务迁移能力已补充。SQL 语法差异上,虽说两个引擎语法差异不大,但是明显感觉 Spark SQL 3.3.2 较 Hive 2.7 语法校验上更严谨,在应用上还是会有小的差别,例如 map 类型不支持 group by、distinct、join key;数据类型自动转换、空值做 map key 等异常情况下的输出表现存在差异等等。针对这部分主要以培训宣贯、整理用户使用手册的方式解决。稳定性问题在存量任务双跑阶段基本已经覆盖了绝大头,主要通过调整相关参数解决。切换 Spark 后的收益已有论证,不存在争议。总体上,为避免用户反馈太过强烈,平台能力具备后并没有急于做任务的增量控制,仍然以迁移存量任务和宣贯优势为主。当存量任务覆盖率占绝对优势后,修改默认计算引擎逐步关闭 Hive 入口。
稳定和性能
- 内存控制不合理导致任务异常
在 Spark 应用中,OOM 问题一直比较突出,随着自身内存管理的不断发展和迭代已有明显改善,但在一些场景中仍然有这种风险。默认情况下,Executor 数据和计算过程所用内存资源主要在 JVM 管理的 On-Heap Unified Memory Pool 中,而 JVM 内对象通常是其原始数据的 2-5 倍,这种高内存占用主要因为封装为 JVM Object 时一些额外开销。加持数据内容、存储格式和文件存储压缩比差异,想要准确预测一个任务需要多少内存资源比较合理是件很难的事情。
为了兼顾迁移进度、稳定性、性能多方面因素,我们提供了两版参数。一是集群级别默认参数,兼顾绝大多数任务性能和稳定性。二是针对稳定性要求较高,允许一定性能损失的参数。以上参数工具化迁移阶段基本覆盖。部分异常 case 整理如下
Broadcast Join 时,广播数据阈值是通过 spark.sql.autoBroadcastJoinThreshold=10M 控制的,当压缩比较高时,需要申请较多的 On-Heap Storage Memory 内存,会导致等待资源时间长或者 OOM 问题,需要禁用或调低阈值才能解决。
执行窗口函数、开启向量化读时,缓存在内存中数据条数默认都为 4096 行,单行记录过大时会导致 OOM,适当调低阈值、避免扫描非必要的大字段可解决。
Kyuubi 提交 Spark SQL 任务时,Driver 会额外加载一些类、启动一些线程,默认的 Spark spark.driver.memoryOverhead 内存 10% 或 384M 总体偏小,导致任务 Driver 使用内存超限被 yarn kill,利用 jmcd 简单分析 Driver 内存消耗,这部分资源使用相对固定的,适当增加后解决。
针对 Spark 内存管理划分整理如下图,结合 Spark UI Metric 可用于辅助分析内存情况。
- 大规模并发提交 Spark 任务优化
离线任务中有很多用户设置整点调度,同时任务实例数也比较多。Kyuubi 是通过线程数量来控制提交任务并发的,如果线程数较低,会产生 Yarn 资源空闲但是任务提交限流情况。如果线程数调高,高并发时瞬时产生大量 SparkSubmit 进程,消耗 Kyuubi 节点 Cpu、Mem 资源,Kyuubi 服务有稳定性风险。我们在 Kyuubi engine 启动后释放 startupProcessSemaphore 可以解除并发限制,Kyuubi 节点资源利用率较之前提高 70%,Yarn 资源也可以打满。相关 PR 地址 https://github.com/apache/kyuubi/pull/6463
- 结果集返回慢和乱序问题
在部分场景中(例如算法特征数据同步到 GPU 节点、业务数仓结果同步到 OLAP 引擎用于 B 端系统),对任务稳定性、结果集拉取速度比较敏感。Kyuubi 在处理结果集返回有多种方式,但针对我们的场景使用每种方式都有些缺陷。首先是大结果集返回时,因为 Kyuubi 默认使用的方式是 resultDF.collect(),这样会把所有的数据拉取到 Driver 的内存中再返回给 kyuubi Server,数据量大容易发生 OOM 问题。之后测试串行拉取 kyuubi.engine.spark.operation.incremental.collect=true 解决了内存问题,但是拉取速度上会比较慢,尤其是遇到 scan + filter 这种简单查询时。最后 kyuubi.operation.result.saveToFile.enabled=true,结果集大于 minSize 后会将结果存储到 hdfs/cos 等文件系统,但是会触发 Kyuubi 的小文件优化,用户 order by 后获取的结果集仍然乱序,最后优化 Repartition 小文件合并判断逻辑后问题解决。
- 复杂类型开启向量化读,性能显著提升
日志数据在 Hive 表存储中占比非常高,为了保障日志打点的灵活性表设计一般都会包含嵌套类型(StructType、ArrayType、MapType),数仓在构建 DWD、DWS 层表时也会延续这种设计方式。而针对这种嵌套类型查询时会因为嵌套解析、未做 projection pushdown、逐行操作等原因导致资源消耗较多。Spark 在 3.4 版本默认开启嵌套类型向量化读,而我们引用的是 3.3.2 主要因测试覆盖度问题默认未开启。结合我们批量工具迁移逻辑及定向任务性能测试判断无风险有收益。开启向量化后效果非常显著,详细见下图。
SQL
spark.sql.orc.enableNestedColumnVectorizedReader=true
spark.sql.parquet.enableNestedColumnVectorizedReader=true
- 默认 jvm GC 并行度高导致 CPU 负载高
历史情况,大数据离线主要采用腾讯云定制高 CPU 核数和内存的大规格机型,高峰期 cpu idle 几乎为 0。明显感受处理数据吞吐能力下降,偶尔出现因 CPU 高导致的节点卡死、网络超时等异常情况,与腾讯 EMR 团队配合针对这种情况定向分析。节点高峰时定位消耗 cpu 较多的 yarn container,利用 async-profiler 分析 cpu 占用,发现瓶颈在 jvm gc。利用 jstack 分析 jvm gc 线程情况,发现相关线程数和 cpu 核数有关,有小几百个线程。一个节点 192c,跑 100 多个 container,jvm 线程总共约小几万个,大批量数据处理情况下,内存频繁分配和释放 GC 表现更明显。调整参数 ParallelGCThreads=8 限制 jvm gc 并行度,cpu 使用率变化情况如图。
https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/g1_gc_tuning.html#g1_gc_tuning
总体收益
总体上看,从平台能力建设 -> 内部少量任务灰度沉淀平台能力、验证整体效果 -> 存量任务规模化迁移 -> 关闭 Hive 增量入口,通过平台和工具化手段完成,整个迁移过程对业务影响很小。Spark 任务覆盖量从 0 到 1.5W,占例行任务约 80%,资源节省约 54%(同逻辑、同数据),收益超预期。同时未来 Spark on K8S 弹在线资源、基于 Kyuubi 做计算网关收口等技术演进提供了基础。