问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

大模型预训练数据处理方案详解

创作时间:
作者:
@小白创作中心

大模型预训练数据处理方案详解

引用
1
来源
1.
https://www.raygecao.cn/posts/data-processing/

数据质量是决定模型质量的关键因素,对海量训练数据进行高效清洗是AI工具链中至关重要的一环。本文将详细介绍大模型预训练数据处理的流程和相关技术方案,包括数据处理Pipeline的设计、批处理与流处理的对比、分布式数据处理的考虑因素,以及对Data-Juicer、Tekton和Pachyderm等开源解决方案的分析。

预训练与SFT是大模型训练的两个核心步骤,前者通常使用海量的非结构化数据进行无监督训练,后者使用已标注的指令数据进行微调。本文主要针对海量预训练数据的处理方案展开调研。

数据处理 Pipeline

预训练数据一般具备以下特点:

  • 来源广:一般来源于网络爬虫、B端提供数据集、开源数据集等。
  • 数据量大:通常是GB/TB级别的规模。
  • 数据格式多样:既包括pdf、word、文本等非结构化数据,又包括json、xml、csv等结构化数据。

考虑到上述数据特点,预训练数据处理通常包括文本解析、过滤、去重三个阶段。为了提升数据处理的灵活度和可扩展性,采用基于算子的Pipeline方式处理。每个阶段可能划分为很多个规则(又称算子),每类算子定义相同的输入输出,并可以灵活组合相连构成一条数据处理流水线。类比于传统流水线系统:

  • 每个算子是最小粒度的处理单元,类似于流水线系统中的Step。
  • 处理阶段是一组算子的逻辑集合,类似于流水线系统中的Stage/Task。
  • 完整的处理流程将各个算子串联起来,类似于流水线系统中的Pipeline。

批处理 vs 流处理

批处理与流处理是大数据中两个典型的处理方式。批处理通常处理固定的、有界的数据集,一般用于离线计算;而流处理通常处理不固定的、无界的数据,一般用于实时计算。对于预训练数据,批处理与流处理方案对比如下:

批处理

  • Data Collector是Data Processor的前驱任务,前者采集到的数据作为后者的输入。
  • Raw DataSet可以通过分片的方式被多个Data Processor进行处理。
  • 可以使用MapReduce、Spark等框架实现。


批处理方式

流处理

  • Data Collector与Data Processor可以并行进行,整体延时短。
  • 对于一些全局处理(去重)可能仍需要批处理。
  • 可以使用Flink、Spark Streaming等流处理框架实现。


流处理方式

考虑到预训练场景的高吞吐量等需求,我们采用离线的批处理计算方式。

分布式数据处理

考虑到预训练数据规模比较大,因此需要使用多机进行分布式处理以提升处理速度。分布式需要考虑以下几个问题:

  • 数据接入问题:每个节点都需要能够访问到训练数据,通常数据会存储在分布式存储系统上,考虑到B端交付场景,我们采用NFS存放完整数据。
  • 数据分片问题:每个节点都需要处理一部分数据,高效的数据划分需要考虑集群中各个节点的差异性。
  • 数据收集问题:有些操作需要在全局数据集上进行(比如全局去重),需要将各个处理后的分片收集起来整体处理。

考虑到上述问题,预训练数据的分布式处理流程如下:

开源解决方案

大数据处理多采用Apache提供的Hadoop、Spark等分布式计算框架,但这些框架比较重,不适合私有化交付。我们目前的私有化交付都是基于K8s容器平台,因此我们探索一些云原生相关的Pipeline开源方案。

Data-Juicer

Data-Juicer是由魔搭开源的LLM一站式数据处理系统,提供了大量数据处理、分析、可视化工具包。

Data-Juicer具备以下特点:

  • 通过配置定义pipeline,使用简单,便于进行版本管理。
  • 预置大量通用算子,分为Formatter、Mapper、Filter、Deduplicator、Selector五类,覆盖多种数据处理需求。
  • 框架使用Python实现,方便结合一些小模型进行处理。但环境依赖较为复杂,通过容器交付时镜像size过大。
  • 自定义算子流程较为简单。但其存在语言限制,仅支持在源码侧更改,交付成本较高。
  • 结合HuggingFace Datasets进行数据变换,基于内存映射与磁盘存储,使得小内存环境可以加载大量数据,且性能很好。
  • 框架提供了cache管理机制与checkpoint机制,大量减少重复计算。故障后可以在断点处继续处理,但代价是存储了大量中间数据。
  • 可以与Ray集群深度结合,支持分布式数据处理。

使用配置文件定义数据处理Pipeline示例如下:

# 全局参数
project_name: 'demo-dedup'
# 指定项目
dataset_path: '/path/to/dataset'
# 指定数据集的路径
np: 4
# 指定数据处理的并发度
open_tracer: true
# 是否打开 tracer 跟踪数据处理状态
export_path: '/path/to/output'
# 处理后的数据保存路径

# 算子配置
process:
- language_id_score_filter:
    # 语言过滤算子
    lang: en
    # 语言类型为英文
    min_score: 0.5
    # 评分阈值为 0.5
- document_minhash_deduplicator:
    # minHash 去重算子
    tokenization: 'character'
    # 分词方式为字符分词

此框架设计简单,专为LLM训练数据处理而生,十分适合我们的处理场景,但是其存在以下两个问题:

  • 私有化场景下扩展性较差,需更改源码并重新构建镜像使用。
  • 分布式方案需结合Ray框架,此框架存在ShadowRay漏洞,被禁止在私有化场景使用。

为解决这些问题,我们不得不寻找更通用的分布式Pipeline方案。

Tekton

Tekton是目前最火的云原生CI Pipeline系统,它引入了多种CRD灵活地定义Pipeline:

  • Task:包含若干顺序执行的Step,Task会在一个Pod内按顺序执行各个Step,一个Task内的Step可以共享数据。
  • TaskRun:运行Task的实例。
  • Pipeline:包含若干Task,并需声明Task的依赖关系,不存在依赖关系的Task可以并行运行。Pipeline可以复用Task。
  • PipelineRun:运行Pipeline的实例。


Tekton Pipeline 示例

定义pipeline的配置示例如下:

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: hello
spec:
  steps:
- name: echo
    image: alpine
    script: |
      #!/bin/sh
      echo "Hello World"

---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: goodbye
spec:
  params:
- name: username
    type: string
  steps:
- name: goodbye
    image: ubuntu
    script: |
      #!/bin/bash
      echo "Goodbye $(params.username)!"

---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: hello-goodbye
spec:
  params:
- name: username
    type: string
  tasks:
- name: hello
    taskRef:
      name: hello
- name: goodbye
    runAfter:
- hello
    taskRef:
      name: goodbye
    params:
- name: username
      value: $(params.username)

---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
  name: hello-goodbye-run
spec:
  pipelineRef:
    name: hello-goodbye
  params:
- name: username
    value: "Tekton"

Tekton基于K8s,因此天生具备分布式能力,但其并非为数据处理而设计,因此需要在上层自行实现数据分片、数据聚合以及分布式场景中常见的故障转移、负载均衡等逻辑。

Pachyderm

Pachyderm是一款主打数据版本化和数据血统的云原生自动化数据处理平台,其具备以下特性:

  • 数据驱动Pipeline,数据变化时自动触发。
  • 不可变数据血统和git-liked版本化管理。
  • 自动化数据分片,基于K8s的自动扩容与并发处理。
  • 基于对象存储的自动化去重。
  • 支持在多种公有云上与本地部署。
  • 为数据版本与数据血统提供可视化界面。
  • 支持checkpoint、重试、故障转移等能力。

Pachyderm提出了以下核心概念:

  • Repo:Pachyderm管理的数据集,基于S3 + postgresSQL实现数据的版本控制。
  • Datums:数据集的分片,分片加入队列中供workers挑选处理.
  • Pipeline:使用transform容器处理某个repo中的数据,输出到另一repo,通过声明repo的关系形成DAG。
  • Job:版本化的K8s资源,用于描述一个Pipeline的执行。
  • Task Parallelism:一个Job将数据集拆分成若干片,被多个worker并行处理。
  • 版本化:Repo、Job均以类似git方式管理起来,便于溯源。

Pachyderm使用filepath glob对数据进行分片,简单灵活。

分片 vs 不分片
上图可以与我们分布式处理的流程图完美契合,分布式处理时进行分片,全局去重时不分片。

Pachyderm中DAG的定义并不向Tekton那么直观,它所定义的Pipeline是声明一个输入、输出与转换关系的配置,本质是一个算子的定义:

pipeline:
  name: transform
  # Pipeline 的名字,同时也是输出数据集的名字
  input:
    pfs:
      repo: data
      # 输入数据集为 data
      glob: "/*"
      # repo 下的每个文件(目录)是一个分片
  transform:
    image: my-transform-image:v1.0
    # 数据处理镜像
    cmd:
      # 数据处理的命令
- python
- "/my_transform_code.py"
- "--input"
- "/pfs/data/"
      # data 数据集挂载到 /pfs/data
- "--output"
- "/pfs/out/"
      # /pfs/out 下的内容组成了 transform 数据集

然而,Pachyderm存在以下两个问题:

  • 社区版限制Pipeline个数上限为16,任务并行度上限为8。
  • 无法对大文件进行分片。

方案选择

综合上述的调研,我们选择Pachyderm + Data-juicer的方案来实现数据处理系统,原因如下:

  • 部署方式基于K8s,私有化交付效率高。
  • Pachyderm提供了简易的、语言无关的算子定义方式,可扩展性强。
  • Pachyderm数据处理通用性强,可以将模型、配置、数据集放在Pachyderm上进行版本化管理,并将Pipeline应用到数据处理、预训练、SFT、推理等各个阶段。
  • Data-juicer预置大量了算子,开发成本低,避免大量离散的算子消耗Pipeline数量。

我们将Data-juicer Pipeline配置(process-config/dedup-config)和数据集(jsonl-data)上传到pachyderm中,利用Data-juicer进行处理,效果图如下:


Pachyderm + Data-juicer进行数据处理

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号