大模型预训练数据处理方案详解
大模型预训练数据处理方案详解
数据质量是决定模型质量的关键因素,对海量训练数据进行高效清洗是AI工具链中至关重要的一环。本文将详细介绍大模型预训练数据处理的流程和相关技术方案,包括数据处理Pipeline的设计、批处理与流处理的对比、分布式数据处理的考虑因素,以及对Data-Juicer、Tekton和Pachyderm等开源解决方案的分析。
预训练与SFT是大模型训练的两个核心步骤,前者通常使用海量的非结构化数据进行无监督训练,后者使用已标注的指令数据进行微调。本文主要针对海量预训练数据的处理方案展开调研。
数据处理 Pipeline
预训练数据一般具备以下特点:
- 来源广:一般来源于网络爬虫、B端提供数据集、开源数据集等。
- 数据量大:通常是GB/TB级别的规模。
- 数据格式多样:既包括pdf、word、文本等非结构化数据,又包括json、xml、csv等结构化数据。
考虑到上述数据特点,预训练数据处理通常包括文本解析、过滤、去重三个阶段。为了提升数据处理的灵活度和可扩展性,采用基于算子的Pipeline方式处理。每个阶段可能划分为很多个规则(又称算子),每类算子定义相同的输入输出,并可以灵活组合相连构成一条数据处理流水线。类比于传统流水线系统:
- 每个算子是最小粒度的处理单元,类似于流水线系统中的Step。
- 处理阶段是一组算子的逻辑集合,类似于流水线系统中的Stage/Task。
- 完整的处理流程将各个算子串联起来,类似于流水线系统中的Pipeline。
批处理 vs 流处理
批处理与流处理是大数据中两个典型的处理方式。批处理通常处理固定的、有界的数据集,一般用于离线计算;而流处理通常处理不固定的、无界的数据,一般用于实时计算。对于预训练数据,批处理与流处理方案对比如下:
批处理
- Data Collector是Data Processor的前驱任务,前者采集到的数据作为后者的输入。
- Raw DataSet可以通过分片的方式被多个Data Processor进行处理。
- 可以使用MapReduce、Spark等框架实现。
批处理方式
流处理
- Data Collector与Data Processor可以并行进行,整体延时短。
- 对于一些全局处理(去重)可能仍需要批处理。
- 可以使用Flink、Spark Streaming等流处理框架实现。
流处理方式
考虑到预训练场景的高吞吐量等需求,我们采用离线的批处理计算方式。
分布式数据处理
考虑到预训练数据规模比较大,因此需要使用多机进行分布式处理以提升处理速度。分布式需要考虑以下几个问题:
- 数据接入问题:每个节点都需要能够访问到训练数据,通常数据会存储在分布式存储系统上,考虑到B端交付场景,我们采用NFS存放完整数据。
- 数据分片问题:每个节点都需要处理一部分数据,高效的数据划分需要考虑集群中各个节点的差异性。
- 数据收集问题:有些操作需要在全局数据集上进行(比如全局去重),需要将各个处理后的分片收集起来整体处理。
考虑到上述问题,预训练数据的分布式处理流程如下:
开源解决方案
大数据处理多采用Apache提供的Hadoop、Spark等分布式计算框架,但这些框架比较重,不适合私有化交付。我们目前的私有化交付都是基于K8s容器平台,因此我们探索一些云原生相关的Pipeline开源方案。
Data-Juicer
Data-Juicer是由魔搭开源的LLM一站式数据处理系统,提供了大量数据处理、分析、可视化工具包。
Data-Juicer具备以下特点:
- 通过配置定义pipeline,使用简单,便于进行版本管理。
- 预置大量通用算子,分为Formatter、Mapper、Filter、Deduplicator、Selector五类,覆盖多种数据处理需求。
- 框架使用Python实现,方便结合一些小模型进行处理。但环境依赖较为复杂,通过容器交付时镜像size过大。
- 自定义算子流程较为简单。但其存在语言限制,仅支持在源码侧更改,交付成本较高。
- 结合HuggingFace Datasets进行数据变换,基于内存映射与磁盘存储,使得小内存环境可以加载大量数据,且性能很好。
- 框架提供了cache管理机制与checkpoint机制,大量减少重复计算。故障后可以在断点处继续处理,但代价是存储了大量中间数据。
- 可以与Ray集群深度结合,支持分布式数据处理。
使用配置文件定义数据处理Pipeline示例如下:
# 全局参数
project_name: 'demo-dedup'
# 指定项目
dataset_path: '/path/to/dataset'
# 指定数据集的路径
np: 4
# 指定数据处理的并发度
open_tracer: true
# 是否打开 tracer 跟踪数据处理状态
export_path: '/path/to/output'
# 处理后的数据保存路径
# 算子配置
process:
- language_id_score_filter:
# 语言过滤算子
lang: en
# 语言类型为英文
min_score: 0.5
# 评分阈值为 0.5
- document_minhash_deduplicator:
# minHash 去重算子
tokenization: 'character'
# 分词方式为字符分词
此框架设计简单,专为LLM训练数据处理而生,十分适合我们的处理场景,但是其存在以下两个问题:
- 私有化场景下扩展性较差,需更改源码并重新构建镜像使用。
- 分布式方案需结合Ray框架,此框架存在ShadowRay漏洞,被禁止在私有化场景使用。
为解决这些问题,我们不得不寻找更通用的分布式Pipeline方案。
Tekton
Tekton是目前最火的云原生CI Pipeline系统,它引入了多种CRD灵活地定义Pipeline:
- Task:包含若干顺序执行的Step,Task会在一个Pod内按顺序执行各个Step,一个Task内的Step可以共享数据。
- TaskRun:运行Task的实例。
- Pipeline:包含若干Task,并需声明Task的依赖关系,不存在依赖关系的Task可以并行运行。Pipeline可以复用Task。
- PipelineRun:运行Pipeline的实例。
Tekton Pipeline 示例
定义pipeline的配置示例如下:
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: hello
spec:
steps:
- name: echo
image: alpine
script: |
#!/bin/sh
echo "Hello World"
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: goodbye
spec:
params:
- name: username
type: string
steps:
- name: goodbye
image: ubuntu
script: |
#!/bin/bash
echo "Goodbye $(params.username)!"
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: hello-goodbye
spec:
params:
- name: username
type: string
tasks:
- name: hello
taskRef:
name: hello
- name: goodbye
runAfter:
- hello
taskRef:
name: goodbye
params:
- name: username
value: $(params.username)
---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: hello-goodbye-run
spec:
pipelineRef:
name: hello-goodbye
params:
- name: username
value: "Tekton"
Tekton基于K8s,因此天生具备分布式能力,但其并非为数据处理而设计,因此需要在上层自行实现数据分片、数据聚合以及分布式场景中常见的故障转移、负载均衡等逻辑。
Pachyderm
Pachyderm是一款主打数据版本化和数据血统的云原生自动化数据处理平台,其具备以下特性:
- 数据驱动Pipeline,数据变化时自动触发。
- 不可变数据血统和git-liked版本化管理。
- 自动化数据分片,基于K8s的自动扩容与并发处理。
- 基于对象存储的自动化去重。
- 支持在多种公有云上与本地部署。
- 为数据版本与数据血统提供可视化界面。
- 支持checkpoint、重试、故障转移等能力。
Pachyderm提出了以下核心概念:
- Repo:Pachyderm管理的数据集,基于S3 + postgresSQL实现数据的版本控制。
- Datums:数据集的分片,分片加入队列中供workers挑选处理.
- Pipeline:使用transform容器处理某个repo中的数据,输出到另一repo,通过声明repo的关系形成DAG。
- Job:版本化的K8s资源,用于描述一个Pipeline的执行。
- Task Parallelism:一个Job将数据集拆分成若干片,被多个worker并行处理。
- 版本化:Repo、Job均以类似git方式管理起来,便于溯源。
Pachyderm使用filepath glob对数据进行分片,简单灵活。
分片 vs 不分片
上图可以与我们分布式处理的流程图完美契合,分布式处理时进行分片,全局去重时不分片。
Pachyderm中DAG的定义并不向Tekton那么直观,它所定义的Pipeline是声明一个输入、输出与转换关系的配置,本质是一个算子的定义:
pipeline:
name: transform
# Pipeline 的名字,同时也是输出数据集的名字
input:
pfs:
repo: data
# 输入数据集为 data
glob: "/*"
# repo 下的每个文件(目录)是一个分片
transform:
image: my-transform-image:v1.0
# 数据处理镜像
cmd:
# 数据处理的命令
- python
- "/my_transform_code.py"
- "--input"
- "/pfs/data/"
# data 数据集挂载到 /pfs/data
- "--output"
- "/pfs/out/"
# /pfs/out 下的内容组成了 transform 数据集
然而,Pachyderm存在以下两个问题:
- 社区版限制Pipeline个数上限为16,任务并行度上限为8。
- 无法对大文件进行分片。
方案选择
综合上述的调研,我们选择Pachyderm + Data-juicer的方案来实现数据处理系统,原因如下:
- 部署方式基于K8s,私有化交付效率高。
- Pachyderm提供了简易的、语言无关的算子定义方式,可扩展性强。
- Pachyderm数据处理通用性强,可以将模型、配置、数据集放在Pachyderm上进行版本化管理,并将Pipeline应用到数据处理、预训练、SFT、推理等各个阶段。
- Data-juicer预置大量了算子,开发成本低,避免大量离散的算子消耗Pipeline数量。
我们将Data-juicer Pipeline配置(process-config/dedup-config)和数据集(jsonl-data)上传到pachyderm中,利用Data-juicer进行处理,效果图如下:
Pachyderm + Data-juicer进行数据处理