paddle进程间tensor传输设计文档 paddle.multiprocessing - PaddlePaddle/Paddle GitHub Wiki

paddle.multiprocessing 模块新增设计方案

  • 此为paddle.multiprocessing设计方案,相关实现已经由此PR实现:#37302
  • 剩余未完成的功能较多,因此开放此文档,提供给外部开发者使用,希望外部开发者贡献完善。
  • 此文档为2023飞桨黑客马拉松活动使用。

一、概要

1、相关背景

本文档,主要设计新增paddle.multiprocessing模块。通过自定义Tensor序列化、反序列化方式,使用共享内存、cudaIpc等技术,实现paddle Tensor在进程间快速传输、共享。

2、功能目标

实现 paddle.multiprocessing模块,可在多进程间,方便快捷的传输Tensor。

目标:

  1. 功能支持: 动态图下,支持paddle.Tensor, paddle.ParamBase类型Tensor,在进程间传输,传输后Tensor属性与原来一致。(此处为基于LoDTensor的Tensor类型)支持CPU/GPU。
  2. 全局引用计数: 对共享的Tensor,支持全局引用计数。
  3. 多平台支持: 支持windows/mac平台,支持ROCM设备

难点:

  • 共享Tensor生命周期管理multiprocessing 进程间Tensor传输,是典型的生产者消费者场景。需要跨进程实现tensor的引用计数,确保tensor正确析构。

3、方案要点

  1. 速度、功能符合预期: 在linux平台上,支持CPU/GPU传输Tensor,传输后的功能正常,可正常计算、修改。
  2. 不发生显存泄露: 支持初步的共享Tensor生命周期管理,尽量不发生显存,内存泄露。

二、竞品对照

pytorch 现状

竞品技术分析

竞品pytorch对进程间Tensor传输的支持,实现较早,目前整体的支持较为完备。 对于cpu Tensor进程间传输,支持linux、windows、mac平台。gpu Tensor支持cuda、rocm设备。具体支持情况的见下表。

torch.multiprocessing 设备支持列表

设备 平台 方案 补充
Host/CPU linux file_descriptor 、 file_system file_descriptor 内存泄露风险小。打开文件获得句柄后,立即删除文件。多进程传输句柄。句柄关闭后文件系统释放存储。
Host/CPU win32 file_system file_system 存储为文件形式。多进程传输文件名。生命周期结束后需要删除文件。
Host/CPU mac file_system 同上
Device/GPU CUDA(linux) share_cuda 使用cudaIpcMemHandle传输
Device/GPU ROCM(linux) share_cuda 使用hipIpcMemHandle传输

从代码结构上分析,下面是简要的代码模块介绍:

torch multiprocessing架构图

torch.multiprocessing 主要分为python层和C++层。

其中,python层主要有通过自定义ForkingPickler函数,改写Tensor相关类型的变量的序列化函数。在reduction.py 中定义了多种reduce、rebuild方法。

python层API调用StorageSharing.cpp中c++ api, 实现Tensor到序列化、反序列化。并且还支持了进程间Tensor引用计数(主要使用了RefcountedMapAllocator)。

此外cuda Tensor的引用计数支持较为复杂,这里实现了单独的CudaIPCTypes来支持。主要原理是将引用计数的变量,写入到共享内存中,通过将引用计数的变量与Cuda IPC发送的变量绑定的方式(c10::detail::UniqueVoidPtr),实现了全局引用计数。

三、设计思路与实现方案

1、主体设计思路与折衷

整体全貌

飞桨框架整体架构图

主要增加Tensor存储这一部分的。

Tensor方面:当前计划只支持 LoDTensor,能够满足绝大部分动态图场景使用需求。 此外,对于共享的Tensor, 修改了部分 allocator 代码,以存储共享的Tensor变量。

设备支持方面:目前支持CPUPlace, CUDAPlace, 支持CUDAPinnedPlace(注:共享后应变为CPUPlace,PinnedPlace与share的状态不共存)。

cpu部分在mmap_allocator,新增了RefCountMemoryMapAllocation, 支持共享显存分配,读取,修改,写入,全局计数。gpu部分实现了单独的cuda_ipc_allocator模块,可以将传输后的cuda ipc handle转化为Tensor holder的allocation。

注:

  1. 这里的mmap_allocatorcuda_ipc_allocator 目前主要在pybind的对外接口暴露的Tensor序列化/反序列化接口中使用。没有在其他地方使用。

主体设计具体描述

整体设计主要参考pytorch的设计架构。我们在python层和c++层新增了部分序列化代码。paddle multiprocessing代码架构示意图如下:

paddle multiprocessing架构图

python层:包装multiprocessing,自定义进程间pickle函数

python层主要新增了paddle.multiprocessing文件夹,主要做了两件事情:

  1. 原始的multiprocessing做了封装,import原始multiprocessing的所有方法。
  2. 添加了针对Paddle Tensor类型数据的自定义进程间pickle协议函数

下图为整体Paddle Tensor进程间序列化的全部流程图。Tensor(VarBase) 先reduce为LoDTensor,然后LoDTensor在根据Tensor所在的设备(cpu/gpu)调用相应的c++函数,reduce成为handle文件。将handle传递给其他进程后,其他进程相应的反序列化函数即可重新构造为Tensor。

image

c++ 层: Tensor序列化接口

C++ 层主要分为pybind.cc中间的接口函数、进程间 Tensor 存储 allocation代码两部分。

pybind.cc主要定义的参数,返回值如下:
def("_share_cuda", [](LoDTensor self) {
  // ...
  return py::make_tuple(_handle,
     (py::size_t) offset_bytes,
     data_size, type_idx,
     vectorize(self.dims()),
     self.lod(), device_id);
}
.def("_new_shared_cuda", [](py::tuple t) {
  LoDTensor tensor;
  // ... 返回解析后的Tensor 
  return tensor;
}
.def("_share_filename", [](LoDTensor &self) {
  // ...
  return py::make_tuple(mmap_writer_allocation->ipc_name(),
                      mmap_writer_allocation->size(),
                      type_idx, vectorize(self.dims()), self.lod());
}
.def("_new_shared_filename", [](py::tuple t) {
  LoDTensor tensor;
  // ... 返回解析后的Tensor 
  return tensor;
}
序列化函数,首先将Tensor属性与数据存储位置分离。将数据转化成内存、显存的 handle(一般为str类型)。并且将handle和其他Tensor的数据类型、维度等数据,以元组的形式返回到python层。

反序列化函数,新建了一个空的LoDTensor, 将handle对应的数据取出来,变成 Allocation 类型,并且将空LoDTensor的 数据重置为 Allocation。 最后设置其他 Tensor 属性,返回给用户即可。

c++ 层:共享Tensor存储

cpu部分在mmap_allocator.h中,新增了MemoryMapAllocation, RefCountMemoryMapAllocation, 支持共享显存分配,读取,修改,写入,全局计数。

gpu部分实现了单独的cuda_ipc_allocator模块,可以将传输后的cuda ipc handle转化为Tensor holder的CudaIpcAllocation。同时提供了GetIpcBasePtr接口,支持ipc handle到数据指针的转换。

主体设计选型考量

python实现方案基本参考竞品设计思路,与torch基本一致。 c++层方面:

  1. cpu Tensor传输支持了file_system方案。与paddle之前的实现设计保持一致。
  2. gpu Tensor传输支持cudaIpcMemHandle方案。与竞品一致。

2、关键技术点/子模块设计与实现方案

paddle cpu Tensor 共享方案 (linux)

根据前面对于竞品的介绍,cpu Tensor进程间共享,竞品主要提供了两种方案: 方案一: file_system

// linux
int fd = shm_open(ipc_name.c_str(), flags, 0600);
// 进程间传递 ipc_name,子进程打开文件

优点:

  • 较为简单,进程间传递文件名即可( 此处的 ipc name)。

缺点:

  • Tensor以文件形式留存在文件系统内,进程异常退出后,可能发生文件残留。

方案二: file_descriptor

// open filename first
 if((fd = shm_open(filename_.c_str(), flags, (mode_t)0600)) == -1) {
   TORCH_CHECK(false, "error");
 }
// 保存此处的 fd,用于进程间传输。
// delete file     
if (shm_unlink(filename_.c_str()) == -1) {
  TORCH_CHECK(false, "error");
}

优点:

  • 传递文件描述符,比较小概率产生内存泄露问题。

缺点:

  • 只能支持linux操作系统。

建议:目前paddle侧已有的初步开发与file_system 方案类似,将此方案改进为支持paddle.multiprocessing的成本比较低。后期,可以进一步支持file_descriptor方案。

paddle cuda Tensor 进程间共享方案 (linux)

方案: CUDA Ipc 相关 API 可以支持进程间显存共享。 目前只有此方法支持。

使用API cudaIpcGetMemHandle 可以获取cudaIpcMemHandle_t, 将cudaIpcMemHandle_t转化为string,传递给子进程。子进程获取handle之后,使用 cudaIpcOpenMemHandle API 打开,获取对应的显存地址,即可访问父进程的显存地址空间,实现进程间显存共享。

// 获取进程间显存的 handle
__host__​cudaError_t cudaIpcGetMemHandle ( cudaIpcMemHandle_t* handle, void* devPtr )
// -> 进程间传输handle
// 使用handle打开显存地址
__host__​cudaError_t cudaIpcOpenMemHandle ( void** devPtr, cudaIpcMemHandle_t handle, unsigned int  flags )

注:

  1. IPC 功能仅限于,在Linux 操作系统上支持统一寻址的设备。
  2. cudaIpcGetMemHandle 功能需要使用CudaMalloc的原始地址。

进程间传输Tensor生命周期管理

生产者、消费者场景,生命周期管理比较复杂。

方案一用户使用event变量自己维护,生产者、消费者状态

## producer
# send Tensors, do something
send(Tensors)
event.wait()
event.clear()

## consumer
# receive Tensors and use them
Tensors = receive()
use_Tensors()
event.set()
del Tensors

建议使用 multiprocessing.Event() 进行进程间传输握手。

  • 对于cpu Tensor生命周期:

    1. 目前已经初步实现了全局计数方案。但通过 event 变量传输握手,更安全,不容易发生内存泄露。
  • 对于gpu Tensor生命周期:当前阶段,用户必须自己保证:生产者消费者状态

  1. 生产者:用户需要保证消费者消费前,Tensor不被析构。
  2. 消费者,析构时候,close IpcMemoryHandle 即可,不会造成文件残留。

总体来看,由于gpu ipc传递方式不会显式存在文件残留,只需要用户自己维护生产者消费者状态。传输完成后,原始Tensor按照生产者的引用计数管理即可。造成显存泄露的风险较小。

方案二: 全局计数方案:实现共享Tensor全局引用计数。用户不需要显式进行,进程间同步

## producer
# send Tensors, 
send(Tensors)
...

## consumer
x = queue.get()
# do somethings with x
del x

在共享内存总开辟额外空间,实现全局引用计数。 消费者接收后,增加了Tensor的引用计数,生产者引用计数减少为0后,可删除文件。 实现方案:基本实现原理参考竞品,开辟共享内存中的额外空间存放计数信息。

  • 对于cpu Tensor
    • 【已实现】 实现RefCountMemoryMapAllocation,在分配给Tensor分配存储数据空间时,额外增加一些字节,存储引用计数。所有IPC tensor,在建立时,引用计数+1,析构后-1,最后引用计数为0,删除共享内存中的文件。
  • 对于gpu Tensor
    • 参考竞品,计划实现allocation/utils/cuda_ipc_helper.h,支持CudaIPCSentDataCudaIPCRefcountedFiles等功能,将ipc 传输后的Tensor与CudaIPCSentData使用UniqueVoidPtr绑定。全局引用计数。

总的来讲,event消息维护方案对用户编写代码有一定要求,全局计数方案更易用。

建议:可以先合入初步版本,支持方案一使用(支持 cpu/gpu tensor),后续完善方案二(暂时支持 cpu tensor),完善全局引用计数支持。

需要自测的项目有:

  • 不同设备,cpu、gpu传输
  • Tensor异步进程读写检验
  • 不同Tensor,Paddle.Tensor, Paddle.ParamBase
  • Tensor属性,stop gradient,lod
  • 内存泄露检测(多次传输)

四、示例

应用分析:

paddle.multiprocessing模块的主要场景有,数据传输、参数共享两种。

主要应用点有:

  • 传输:DataLoader:多进程造数据,将Tensor传输到主进程。(N->1)造数据慢,训练快。
  • 共享:异步多进程预测、训练:主进程传输后,主进程和子进程各自独立进行预测、训练。

Tensor 简单传递、修改示例

import paddle
import paddle.multiprocessing as mp

paddle.set_device("cpu")

def fill_Tensor(queue, event):
    data = queue.get()
    data[:] = 5
    event.set()

Tensor = paddle.zeros([5, 5], dtype="float32")
queue = mp.Queue()
event = mp.Event()
queue.put(Tensor)

process = ctx.Process(target=fill_Tensor, args=(queue, event))
process.daemon = True
process.start()
event.wait(30)
print(Tensor)
# Tensor(shape=[5, 5], dtype=float32, place=CPUPlace, stop_gradient=True,
#        [[5., 5., 5., 5., 5.],
#         [5., 5., 5., 5., 5.],
#         [5., 5., 5., 5., 5.],
#         [5., 5., 5., 5., 5.],
#         [5., 5., 5., 5., 5.]])
process.join(4)

简单cpu预测示例

多个进程使用了相同地址的参数

import paddle.multiprocessing as mp
paddle.set_device("cpu")
from model import MyModel

def evaluate(model):
    # Construct data_loader, etc.
    model.eval()
    for data, labels in data_loader:
        predict = loss_fn(model(data))
        calculate_acc(predict, labels)
    
if __name__ == '__main__':
    num_processes = 4
    model = MyModel()
    model.share_memory() # 拷贝到共享内存
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=evaluate, args=(model,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
⚠️ **GitHub.com Fallback** ⚠️