平台架构:分布式训练能力 加速 - tencentmusic/cube-studio GitHub Wiki

分布式集群

要做分布式处理/训练,首先要有分布式集群,在数据处理/训练前,先根据用户的需求创建需要的分布式集群,例如:

  • tf分布式训练集群,
  • pytorch分布式训练集群,
  • spark分布式数据处理集群,
  • ray分布式超参搜索集群,
  • mpi分布式训练集群,
  • horovod分布式训练集群,
  • nni分布式超参搜索集群,
  • mxnet分布式集群
  • volcano分布式集群
  • kaldi分布式语音训练集群 ...

以及在此衍生出来的分布式的数据下载,hdfs拉取,cos上传下载,视频采帧,音频抽取,分布式的训练,例如推荐场景的din算法,ComiRec算法,MMoE算法,DeepFM算法,youtube dnn算法,ple模型,ESMM模型,双塔模型,音视频的wenet,containAI等算法的分布式训练。

要实现这些分布式数据处理/训练,在云原生中定义对应的crd,然后部署相应的operator,来解析我们提交的crd yaml。例如下面的我们要实现一个tf的ps分布式训练集群的结构。

174096957-f76ba623-4d3b-45a1-a7a5-979862e43f5c

实现自动创建这样的分布式集群,需要四个层面的支持。

1、配置分布式训练集群的部署(CRD)

用户的每次任务使用的集群的框架和集群的规模都是不一样的,需要一个简单的方式能够描述用户本次所需要的集群。在k8s中通过crd来实现这样的集群的定义,可以理解为一个yaml文件。

2、控制器部署分布式训练集群(operator)

有了crd定义的这样一个集群。还需要一个operator,来监听、解析、创建、监控yaml文件中定义的集群。对于没法直接搬移到k8s之上的分布式,要开发对应的crd+operator,例如nni的超参搜索在frameworkcontroller框架上做开发,horovod分布式训练在mpi的分布式上做开发,kaldi分布式在volcano分布式上做开发,tf/pytorch/mxnet/ray官方都有对应的k8s分布式解决方案,可以直接使用对应的方式加入到k8s中来。

3、代码识别分布式角色(有状态):

在k8s中分布式集群创建好了以后,对于有状态的分布式集群,需要用户的代码能识别自己是哪个角色。如果单独为每个角色都单独写一份代码是很不友好的。所以用户代码需要按照分布式策略识别自己在集群中的角色和身份,进而加入到集群,协调工作内容。对于无状态集群,代码只需要接收自己要处理的任务,处理返回即可。在k8s中,大部分的场景是通过环境变量来识别身份的。

4、训练框架支持分布式(协议和策略):

当每个pod都进入各自的工作岗位以后,还需要该用户所使用的算法框架必须支持分布式的训练模式,支持分布式集群通信的协议和协商的方法,以及不同角色的节点之间的分工。这样才能协调一致工作,不然就要用户自己手动进行分工内容的逻辑编写。

在k8s中角色(pod)之间的区别主要通过环境变量或者名称来识别。例如role:ps role:worker,通信的方式主要通过域名:端口,或者挂载相同的分布式存储,使用文件来沟通。

通过上面的方法,我们就可以实现一个分布式任务在集群中的创建运行。

选对分布式框架

k8s中分布式框架按照角色形式主要分为下面几种:

image

1、对于无状态的框架,优势在于任务worker可以横向扩展,任务的完成没有数据倾斜的问题。在集群启动前并不定义worker要做哪些数据的处理任务,运行起来以后才开始决定处理哪些任务,比如kafka的消费者,比如ray的分布式。这类分布式任务不会因为某个worker而严重拖慢整体任务的进度。但是这类分布式任务一定层度上受限于任务分发机制。比如kafka,需要将任务发送到队列,ray只能传输可序列化的函数,无法传递大内存变量。比较适合于无状态的任务函数,每次处理之间不存在依赖或者不依赖基础对象。

2、对于有序分布式任务,就是提前告诉每个worker在集群中的角色和位置,但是并不定义他们之间的通信逻辑。比如volcano分布式,kaldi分布式训练,这类任务可以在数据处理前先定义好每个worker要处理的任务,这种就不需要任务分发器,但是这样用户的任务容易存在数据倾斜的问题,比如某个worker所要处理的数据因为某些原因被拖慢,那么整个pipeline的结束时间会因为这一个worker而拖慢,例如volcano-job。也可以在运行中有专门的角色worker来分发任务,例如kaldi-job,这种任务如果官方没有分发器的话需要自己写一个专门的分发器。

3、还有一种分角色有序分布式任务,在上面的基础上,同时定义好了角色之间的通信逻辑,这种一般是因为除了做分工还要做协同。比如ring all reduce的分布式训练方案。这样每个worker不仅要知道自己是什么角色(ps-worker),还要知道访问其他worker的方法,这种分布式一般来源于特定分布式任务的框架,例如tf/pytorch。

对于同一个分布式框架,也有多种分布式的方式,比如对于tf分布式,有ps模式和ring all reduce的方式。在单机模型可承受的情况下,ring all reduce的效率一般比ps的方式效率要高,对于单机无法承受模型大小的情况下,选择ps模式进行模型并行优化。同时还有cpu/gpu方式的选择,稀疏embedding大模型和稠密大模型的选择等。

image

分布式worker的均衡度

有了分布式的能力,在机器学习平台中cube加入了通过拖拉拽的能力制作分布式ml任务流的能力,每个task可能都是一个分布式任务,也可能是单机的任务。不同的用户会有很多分布式的任务流。

image

如果不做均衡,分布式任务很容易无法更好的利用多机的算力,会更早的受到机器的性能瓶颈,进而影响任务运行的效率。为了更好的利用多机的能力,加快整体任务流的运行效率,平台在三个层次做优化。

亲和度+反亲和度

一个分布式任务的不同worker

在一个分布式框架启动了一个分布式集群进行训练时,这个worker应该根据实际需求确定尽量分布在哪里,就像impala尽量分布在hdfs datanode所在的节点上。因为分布式集群中除了分工协作,还有沟通的过程,相当于既独立又合作。所以分布式的方式要能利于这两种操作。例如在tf ring allreduce的方式进行分布式训练时,不同worker节点之间需要通信传递参数,所以通信的时延也是不可忽略的,对于在gpu上训练时,如果训练中其他过程中耗时很小(io、gpu计算等耗时小),那么通信的时延性能影响就会比较突出。这个时候我们可能更偏向于将gpu的任务分布在相同的机器上。如果是cpu的训练任务,前向计算的时长比较大,通信时延可以忽略,我们为了能更多的利用cpu资源可能会选择cpu的任务分布在不同的机器上。

同一个pipeline,不同的task

在同一个pipeline下,我们有时候会并行很多task,他们并不是完全串行的,比如我们同时7个hdfs数据拉取的任务,分别拉取最近7天的数据,并且后面用这7天的数据,同时跑3个分布式的多机多卡训练。在k8s中资源内存cpu等资源是相互隔离的,但是对于部分资源无法做到很好的隔离,比如机器的网络io,磁盘io,三方平台的黑白名单限制限速等。对于这种作用相同的任务,我们倾向于将同一个pipeline下不同的task区分的不同的机器上。如果某个task是一个分布式任务,那么task下面的work再由task launcher自行决定如何调度。

kube scheduler打分策略

不同的pipeline可能归属于不同的用户,可能会运行在同一个资源组。 在k8s中pod 调度依赖于算法和策略,而且策略部分主要由过滤和打分两个阶段。就像推荐中的召回和排序一样。过滤会将满足必备条件的机器筛选出来,打分阶段则选择其中最适合部署当前pod的机器。

image

过滤段主要是

  • PodFitsHostPorts:检查 Pod 请求的端口(网络协议类型)在节点上是否可用。
  • PodFitsHost:检查 Pod 是否通过主机名指定了 Node。
  • PodFitsResources:检查节点的空闲资源(例如,CPU和内存)是否满足 Pod 的要求。
  • MatchNodeSelector:检查 Pod 的节点选择算符 和节点的标签是否匹配。
  • NoVolumeZoneConflict:给定该存储的故障区域限制, 评估 Pod 请求的卷在节点上是否可用。
  • NoDiskConflict:根据 Pod 请求的卷是否在节点上已经挂载,评估 Pod 和节点是否匹配。
  • MaxCSIVolumeCount:决定附加CSI卷的数量,判断是否超过配置的限制。
  • PodToleratesNodeTaints:检查 Pod 的容忍是否能容忍节点的污点。
  • CheckVolumeBinding:基于 Pod 的卷请求,评估 Pod 是否适合节点,这里的卷包括绑定的和未绑定的PVCs 都适用

打分段主要包含如下的接口:

  • SelectorSpreadPriority:属于同一Service、StatefulSet、ReplocaSet或的 Pod,跨主机部署。
  • InterPodAffinityPriority:实现了Pod减亲和性和反亲和性的优先级。
  • LeastRequestedPriority:偏向最少请求资源的节点。 换句话说,节点上的 Pod 越多,使用的资源就越多,此策略给出的排名就越低。
  • MostRequestedPriority:支持最多请求资源的节点。 该策略将 Pod 调度到整体工作负载所需的最少的一组节点上。
  • RequestedToCapacityRatioPriority:使用默认的打分方法模型,创建基于 ResourceAllocationPriority 的 requestedToCapacity。
  • BalancedResourceAllocation:偏向平衡资源使用的节点。
  • NodePreferAvoidPodsPriority:根据节点的注解 scheduler.alpha.kubernetes.io/preferAvoidPods 对节点进行优先级排序。 你可以使用它来暗示两个不同的 Pod 不应在同一节点上运行。
  • NodeAffinityPriority:根据节点亲和中 - PreferredDuringSchedulingIgnoredDuringExecution 字段对节点进行优先级排序。 你可以在将Pod分配给节点中了解更多。
  • TaintTolerationPriority:根据节点上无法忍受的污点数量,给所有节点进行优先级排序。 此策略会根据排序结果调整节点的等级。
  • ImageLocalityPriority:偏向已在本地缓存 Pod 所需容器镜像的节点。
  • ServiceSpreadingPriority:对于给定的 Service,此策略旨在确保该 Service 关联的 - Pod 在不同的节点上运行。 它偏向把 Pod 调度到没有该服务的节点。 整体来看,Service 对于单个节点故障变得更具弹性。
  • EqualPriority:给予所有节点相等的权重。
  • EvenPodsSpreadPriority:实现了Pod拓扑扩展约束的优先级排序

其中在RequestedToCapacityRatioPriority打分阶段,k8s默认的方式是在之前运行成功过的机器上调度任务,这个策略不太适用于分布式训练,我们是需要在资源申请率(注意不是申请量)最小的机器上调度新的pod。所以我们可以创建修正默认的调度打分策略。配置文件可以参考:https://github.com/tencentmusic/cube-studio/blob/master/install/kubernetes/scheduler-policy-config.json

gang调度

在一个算力资源池中运行多个分布式任务时,会存在资源死锁的问题。例如下图所示的3台4卡的gpu机器,如果每个分布式任务有4个worker,每个worker占1张卡,那么最多能同时启动三个这个的分布式任务,但是如果同时启动了4个这样的分布式任务,就有可能存在每个分布式任务只成功启动了三个worker,而另一个worker因为占不到卡而启动不了的情况。进而所有的分布式任务都在等待新的卡释放出来。这种情况经常出现在剩余资源不足够多的场景下。

image

应对这种情况,cube加入了kube-batch工具,使用gang调度算法,配置新的scheduler组件。为分布式任务配置批量调度的能力,在调度队列中,先计算剩余资源是否满足所有worker的算力要求,完全满足并且全部占用以后,整个集群才算调度成功。

同时引入volcano实现批调度的同时,加入优先级调度的能力。在scheduler层面统筹批调度的能力。

image

分布式任务单worker的性能加速

通过分布式框架是为了能使用更多的算力帮助用户尽快完成任务,但是能尽快完成的前提也是分布式任务中每个worker也能尽快完成自己所属的任务。

对于均匀式的任务分配

即每个worker所做的工作时长基本一致,那么就需要每个worker尽快完成自己所属的数据处理工作,并尽快完成不同worker之间的交流,尽快进入下一个迭代。

对于独立的单worker任务加速,我们可以像单机优化方案一样,使用多进程多线程的并发方案,或者使用ray,异步io等并发方案,就是让单worker申请的资源能尽可能的使用起来。虽然用户申请的cpu/内存资源如果没有使用起来会被cube的智能算法自动调整到合理的范围,但这些方案只是提高了整体利用率,并没有加速任务的运行。所以一定程度上还是需要用户加速单worker的运行。

当然还有尽快完成每个worker之间的协商,例如ring allreduce之间的通信时长。对于不同worker之间的通信,在k8s中均是通过内部服务域名解析,在pod之间进行通信,主要优化的点是在内核方面优化在容器tcp通信时的效率。使用更高版本的内核。从linux3升级到linux4后,压测pod之间的通信效率1.3Gi升级到18Gi。主机带宽25Gi。还有nvlink等解决方案。

对于非均匀式任务分配

尽量减少数据倾斜,或者对未完成任务进行重分配,重新均衡。这种方式的不好是需要用户手动进行再均衡,优势是用户可以自由决定角色的定义。比如用户自行决定rank=0的worker进行最后结果数据的合并操作。

gpu利用率提高

gpu相对于内存和cpu的算法比较特殊,内存和cpu可以通过算法进行智能修改,可以通过worker数量的增大替代单worker性能不高的问题。但是对于gpu因为在训练中是独占的形式(可以使用vgpu的方式,但是并不推荐在训练中使用vgpu,而是将vgpu的能力应用到推理中),所以gpu使用率不高的话非常影响训练任务的效率。

因为gpu的计算能力比cpu强很多,所以之前在cpu下不凸显的问题在gpu下都变的非常明显。

gpu利用率监控

通过监控按钮,可以进入查看任务运行的资源使用率,对于资源使用超标,可以手动配置增加资源。

image

其中gpu由于是整卡占用,需要调整任务的部分参数和代码,提高gpu显存占用率和gpu使用率

image

注意:很多框架对gpu的显存占用默认是独占,所以在容器内通过nvidia-smi等命令查看,容易看到gpu显存占用100%。

gpu利用率低的原因

主要思想是cpu操作慢,进而阻塞了gpu的计算。

image

常见的 CPU 计算操作如下:

  • 数据加载
  • 数据预处理
  • 模型保存
  • loss 计算
  • 评估指标计算
  • 日志打印
  • 指标上报
  • 进度上报

代码层面gpu利用率优化

1、数据加载相关

  • 存储计算不在同一个城市:数据导入到集群存储
  • 磁盘io性能太差:对于临时数据可以将内存映射为磁盘
  • 小文件太多,频繁io:合并为大文件处理
  • 未启用多进程并行读取数据:pytorch提高num_workers,tf配置num_parallel_calls/num_parallel_reads
  • 未启用提前加载机制来实现 CPU 和 GPU 的并行:pytorch配置prefetch_factor,tf配置Dataset.prefetch()
  • 未设置共享内存 pin_memory:设置为true
  • 每次送入gpu的_size太少:模型固定后,调整 batch_size,尽量增大显存的利用率。然后再调节num_workers提高gpu利用率

2、数据预处理相关

  • 数据处理和训练耦合在一起:将数据处理和训练分成两个task,训练中需要的配置之类的全部提前加载到内存,让gpu只做训练任务。
  • 使用Nvidia DALI,在gpu中做数据处理

3、频繁IO操作

  • 模型保存太频繁:减少保存模型(checkpoint)的频率
  • tensorboard文件保存太频繁
  • 日志打印太频繁,频繁cpu/gpu切换:不要打印训练迭代中个人日志
  • 存储io性能太低
  • 对于频繁使用的文件(库文件,lib等)放入了性能不高的存储中

vgpu+共享gpu

常规我们会先采用gpu利用率提高的方法,把申请的gpu尽量占满,但是这部分的优化一定层度上是代码层面的优化,需要用户的配合。但是有时候用户的代码无法做到很完美。这时候就需要架构层面来解决利用率的问题。

1、使用vgpu的形式。

现在有很多种方案使用vgpu,将一张卡隔离成多张卡,但是在训练中使用vgpu有个调度问题,如下图所示,分布式任务t1大量占用1/3卡,造成大量算力碎片化,进而对于正常的任务t2,无法找到满足的机器1卡进行调度。这是因为k8s首先还是在机器上,pod的算力申请上限无法突破单机。另外一个问题,就算通过零散化算力只是为了提高gpu的利用率,并没有加速任务的运行。这种vgpu更适合在推理阶段使用,因为推理阶段输入的流量不是训练开发者决定的,而是c端用户决定的。

image

虚拟化gpu以及虚拟内存+gpu利用率的提高在推理端的方案,在推理文章中有介绍。

2、共享gpu

所以我们可以采用共享gpu的方式,来提高gpu利用率,同时加速任务的训练进度。在常规的多机多卡训练中,gpu的占用是独享的,已ring all reduce为例,每个worker独享一张gpu卡。

image

但是并不是每个开发者都能将gpu卡的利用率使用的非常高。比如用户的代码使用gpu训练,gpu利用率10%,对于这种情况,一般开发者会采用使用更多的gpu卡来加速训练,这样gpu的浪费情况就更加严重。在架构层面我们调整用户可以配置共享同一个gpu的进程数目,以此来让在相同gpu卡上,跑出更多分布式worker的效果。直到机器的某个资源达到屏蔽。比如3个进程共享一张卡,一台机器4张卡,同时12个进程,此时cpu使用率已经接近100%,那么就要开始从其他的地方进一步加速了。

image

这种方式重置了每个pod的角色,而是角色直接传递到进程。rank和world_sized都被重置。

任务资源配置动态调整

用户在运行任务的时候也不知道任务的资源该设置为多少。除了监控,通知用户任务的资源使用情况,还需要能够智能的调整用户的配置,因为cube平台是统筹算力的,一个资源池里面会有很多任务在同时运行。

image

但是对于某些特殊的任务,也支持模板里面提前设置好固定资源配置TASK_RESOURCE_MEM/TASK_RESOURCE_CPU/TASK_RESOURCE_GPU,屏蔽用户的配置。同时对于某些不合理的资源动态修正,也允许在模板中通过环境变量NO_RESOURCE_CHECK屏蔽动态资源配置。

在动态资源配置中仅对内存、cpu进行修正,对gpu资源,采用的仍然是上面gpu利用率提升的方法。因为大部分场景,cpu上的任务可以通过减少每个worker的cpu数目,增加worker数量来提升下来,但是在gpu训练场景,我们采用独占方式,所以无法缩减配置。

资源池动态均衡

分布式集群的加速能力与可以占用的资源几乎可以成线性关系。所以即使保障分布式任务可以有资源能调度起来,也就变的异常重要,尤其在多集群,多资源组的平台下。平台在联邦控制器下,多个集群之间不进行均衡,但是在一个集群下的多个项目组下进行均衡配置。受限保障每个项目组的最低算力,保障基本任务的运行,当有临时性任务发起时,就会从公共池中将算力加入到指定项目组。运行结束后释放资源会公共池

image

待补充:

分布式worker日志收集

分布式任务中共享内存

分布式io性能

小文件/大文件场景

分布式参数通信

超参搜索+分布式训练

分布式中中的代码问题:超参不会设置(自带超参搜索的训练),gpu利用率不高,资源配置不会

分布式python ray

分布式tfjob

如果单机准分布式

分布式pytorchjob

怎么代码改造

分布式中前置环境脚本处理

分布式horovod/mpi

volcano任务