任务流 - tencentmusic/cube-studio GitHub Wiki

任务流编排

平台有很多封装好的模板,比如数据导入、数据预处理、各种机器学习算法,可以将需要的模板按照计划执行的顺序拖拽(长按左侧任务模板拖拽,到指定位置松开)到面板中,按顺序连接起来。

输入图片说明

点击“运行”,可验证任务流的执行顺序是否与编排的、期望的顺序一致。

输入图片说明

首页展示

如果想在首页展示编排好的任务流,可以在“模型训练”-“任务流”-“更多”-“修改”中,找到扩展参数,如

{
    "demo": "true",
    "img": "/static/assets/images/pipeline/yolo.jpg"
}

设置“demo”为True则可在首页展示,必须配“img”参数,配一张图片才可以。

输入图片说明

任务配置

模板拖拽到面板区后,就形成了任务节点。点击任务节点,可以对任务进行配置,具体的配置方法和每个模板的设计有关,需要参照对应模板的使用模板。

输入图片说明

任务还支持配置跳过、重试等,方便在执行过程中可根据情况调整适应。

输入图片说明

CPU、GPU申请

任务的配置支持CPU、GPU配置,可以配置任务需要的内存资源、CPU、GPU资源等,任务运行时,会自动根据申请的资源及资源状况进行分配。

输入图片说明

GPU支持多种卡型,比如填写1(V100)表示申请1张V100的GPU卡,平台目前支持不同GPU卡型(如T4/V100/A100等),还支持国产的GPU,如海光的DCU、华为的NPU。

任务对CPU/GPU等资源的占用,支持独占GPU、共享GPU和VGPU等卡型占用方式,以满足不同任务对资源需求的场景。如1、2表示独占,0.1(T4)表示VGPU,-1表示共享。

单任务运行、调试

单个任务节点可以独立进行调试。点击Debug是进入命令行进行调试,点击Run是直接运行模板的py文件,log可以查看调试日志,clear清除调试任务。

输入图片说明

通过notebook我们可以进行代码的编辑,通过镜像调试,我们可以构建自己的镜像。然后就可以编排pipeline了。pipeline运行前需要独立运行调试单个task,所以cube添加单task独立调试的功能。

输入图片说明

每个task都可以独立的debug和独立的运行。这样就不用每次都运行整体pipelie才能进行调试。并且对于分布式任务的task,cube在模板中借用stern实现了多个容器log聚合查看。这样就不用每个worker的日志都单独去看了。

在这里插入图片描述

单个任务为分布式任务:

往往某个任务需要是分布式任务,所以需要启动分布式任务,云原生分布式任务需要k8s 对应的operator。分布式算法的模板的实现方法,主要在模板中实现launcher,创建对应分布式任务的job yaml,提交到k8s,对应的job operator创建每个worker的pod和service,然后运行业务代码。launcher端启动stern组件,监听该分布式任务的所有pod的日志,进行输出,这样就可以在当前任务中查看到所有worker的日志。

输入图片说明

并且每个任务启动后都可以看到相应的资源情况,这样可以边调试边配置资源的申请量,在gpu上也可以及时知道是否还有优化的空间,目前任务的主要瓶颈在哪里。这样能更加直观的确定优化的方向。

在这里插入图片描述

自定义镜像调试

有一个比较特殊的模板,模板分类“基础命令”下面的“自定义镜像”模板。其他任务节点的调试,都是为了调试模板对应的py文件,而自定义镜像是为了提供一个可视化面板,可以调试用户封装的镜像,避免有部分用户不是很熟悉镜像的调试。

任务其他参数配置

如果想修改其他任务配置,比如任务挂载,启动命令,启动目录,可以在按照图中的方法修改

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

任务流中监听端口

部分特殊的模板需要再运行期间监听端口,比如spark任务提交,任务作为spark driver端需要监听端口。需要在此类任务的任务模板的扩展中添加扩展参数:HostNetwork=True

在这里插入图片描述

则任务运行时会自动占用占用指派的端口,端口会以环境变量PORT1,PORT2的形式告诉代码,代码自行决定如何使用。

在这里插入图片描述

任务流配置

任务流也可以进行配置,主要是配置定时调度、项目组、任务流运行状态的监控及报警等。

输入图片说明

1、调度类型:触发任务流运行的方式,crontab为定时触发,once为手动触发。设置为定时触发的任务,也可以同时被手动触发,相互不影响 2、调度周期:定时触发的任务的定时周期 3、监控状态:当任务流达到哪些状态时会触发报警推送消息 4、报警人:当达到想要监控的状态时,发送消息给哪些用户 5、补录起点:对于定时任务,每个定时周期触发时间点都需要设置启动任务,这个定时周期的最开始计算的时间起点。默认是设置为定时周期或修改定时周期的时间点。 6、过往依赖:定时任务,每次实例运行时是否检查上一次的任务流是否完成,若完成才可以调度新一轮周期的任务流实例 7、最大激活运行数:同时可以运行的任务流实例数目 8、任务并行数:同一个任务流实例中,同时可以运行的任务数目 9、全局环境变量:为任务流中每个任务都会添加的环境变量,同时也可以以参数的形式配置为任务参数。

另有一个值得一提的,就是任务流的补录配置,可以设置任务流回溯到哪个时间节点开始,定时补录数据。比如我要计算过去2个月内,每一天往前360天的用户行为数据,就可以通过这个功能来实现。

日志查看

可以查看任务实例的在线日志、离线日志、资源使用等。在线日志保留到任务结束,任务结束后的离线日志保留一个月。

输入图片说明

任务流变量输入输出

1、文件中转(推荐)

在前一个任务中将输出文件存储到分布式存储,在后续的任务中读取这个分布式存储文件,实现任务间内容的传递

2、配置变量传递(不推荐)

在上一个任务中将短文本内容直接输出容器的/output文件内,在下一个任务的参数中直接把这个文件的内容当成字符串配置为参数。读取的方法为{{$task_name.output}}

比如:

在这里插入图片描述

在这里插入图片描述

全局变量

往往我们想要在一次任务流运行实例的过程中,每个任务的中间数据,总是在同一个目录下,但是每次任务流实例又是不同的目录,那么我们往往需要一个全局变量,在每次运行时不一样,在同一次任务流实例的不同任务间又是相同的。这个时候可以使用全局变量。

可以在pipeline中定义全局变量。在任务参数中引用这个变量。

在这里插入图片描述

在任务重可以直接引用这里定义的变量。引用方式{{YYYYMMDD}}

在这里插入图片描述

模板变量

往往任务参数配置中,我们还想定义一些其他的变量,现在cube-studio里面可以以模板变量的形式 {{}} 使用下列python变量

creator=pipeline.created_by.username                字符串:pipeline的创建者
datetime=datetime,                                  包 
runner=g.user.username                              字符串:实例的运行者
uuid = uuid                                         包
pipeline_id=pipeline.id                             字符串
pipeline_name=pipeline.name                         字符串: pipeline的名称
cluster_name=pipeline.project.cluster['NAME']       字符串,当前所属k8s集群
execution_date=eta.strftime('%Y-%m-%d %H:%M:%S')    字符串, 实例的周期时间

使用示例

{{uuid.uuid4().hex[:4]}}
{{datetime.datetime.now().strftime("%Y%m%d-%H%M%S.%f")}}
{{pipeline_id}}
{{pipeline_name}}
{{cluster_name}}
{{runner}}
{{execution_date}}

系统环境变量

cube-studio会自动将任务流/任务的信息以环境变量的形式传递给任务

KFJ_TASK_ID        任务的id
KFJ_TASK_NAME      任务的名称
KFJ_TASK_NODE_SELECTOR    任务的机器选择器
KFJ_TASK_VOLUME_MOUNT     任务的挂载
KFJ_TASK_IMAGES           任务使用的镜像
KFJ_TASK_RESOURCE_CPU     任务占用的cpu资源
KFJ_TASK_RESOURCE_MEMORY  任务占用的内存
KFJ_TASK_RESOURCE_GPU     任务占用的gpu资源
KFJ_TASK_RESOURCE_RDMA    任务占用的rdma资源
KFJ_TASK_PROJECT_NAME     任务所属的项目组名称
GPU_RESOURCE_NAME         任务占用的gpu所对应的k8s资源名
RDMA_RESOURCE_NAME        任务占用的rdma所对应的k8s资源名
USERNAME                  用户名

workflow(任务流调度实例)

通过pipeline的“调度实例”按钮,可以查看历史调度实例的启动时间、耗时、运行状态等。

输入图片说明

再点击调度实例后面的“日志”链接,可以查看任务流的workflow。

输入图片说明

点击workflow中的单个任务节点,还可以查看任务的在线日志、可视化结果、pod信息等。

输入图片说明

定时调度

点击任务流名称旁边的锯齿状按钮,可调出任务流配置界面,可以通过设置调度类型和调度周期配置任务流的定时调度。

调度类型:once表示仅运行一次,crontab表示周期运行,crontab配置保存5分钟后才生效;
调度周期:周期任务的时间设定 * * * * * 表示为 minute hour day month week
过往依赖:是否依赖上一次运行的该任务流结果(只在定时调度有效)
有效实例数目:定时调度最新实例限制数目,0表示不限制

输入图片说明

定时调度的记录,可以点击“定时记录”按钮查看,定时调度记录将如下图所示。

输入图片说明

结果可视化

构建任务模板时,在代码中按照规范格式在容器/metrics中输出可视化结果,包括文字、图片和echart源码,实现在web界面上的可视化展示。比如以下模板中,设置在/metrics中输出训练集、验证集的ROC曲线图,以及决策树的结构图。

/metric.json文件格式为

[
  {
    "metric_type":"image",
    "describe":"指标描述"
    "image":"图片地址"
  },
  {
    "metric_type":"text",
    "describe":"指标描述"
    "text":"文本内容"
  },
  {
     ...或者是echart的option源码
  }
]

输入图片说明

通过以上的设置,在运行该模板时,就可以在对应的任务节点中查看结果图。

输入图片说明

pipeline导入导出

选中需要导出的任务流,点击批量导出,可将任务流导出为json文件。

输入图片说明

输入图片说明

如果平台需要重新部署,或者需要将pipeline迁移到新部署的集群,将导出的json文件再导入新的平台即可。

导出导入json格式说明

{
    "pipeline英文名": {
        "pipeline": {
            "name": "pipeline英文名",
            "describe": "pipeline描述",
            "project": "public",
            "parameter": {},
            "dag_json": {
                "任务1英文名": {
                    "upstream": []
                },
                "任务2英文名": {
                    "upstream": ["任务1英文名"]
                }
            },
            "global_env": "AA=aa",
            "expand": [
                {
                    "id": "任务1英文名",
                    "type": "dataSet",
                    "position": {
                        "x": 任务1_x坐标,
                        "y": 任务1_y坐标
                    },
                    "data": {
                        "info": {
                            "describe": "任务1描述"
                        },
                        "name": "任务1英文名",
                        "label": "任务1中文名"
                    }
                },
                {
                    "id": "任务2英文名",
                    "type": "dataSet",
                    "position": {
                        "x": 任务2_x坐标,
                        "y": 任务2_y坐标
                    },
                    "data": {
                        "info": {
                            "describe": "任务2描述"
                        },
                        "name": "任务2英文名",
                        "label": "任务2中文名"
                    }
                },
                {
                    "source": "任务1",
                    "arrowHeadType": "arrow",
                    "target": "任务2",
                    "id": "任务1英文名-任务2英文名"
                }
            ]
        },
        "tasks": [
            {
                "job_templete": "任务1模板名",
                "name": "任务1英文名",
                "label": "任务1中文名",
                "volume_mount": "kubeflow-user-workspace(pvc):/mnt,kubeflow-archives(pvc):/archives",
                "resource_memory": "2G",
                "resource_cpu": "2",
                "resource_gpu": "0",
                "resource_rdma": "0",
                "args": {
                    "images": "任务1对应的每个参数取值",
                    "workdir": "任务1对应的每个参数取值",
                    "command": "任务1对应的每个参数取值"
                }
            },
            {
                "job_templete": "任务1模板名",
                "name": "任务2英文名",
                "label": "任务1中文名",
                "volume_mount": "kubeflow-user-workspace(pvc):/mnt,kubeflow-archives(pvc):/archives",
                "resource_memory": "2G",
                "resource_cpu": "2",
                "resource_gpu": "0",
                "resource_rdma": "0",
                "args": {
                    "images": "任务1对应的每个参数取值",
                    "workdir": "任务1对应的每个参数取值",
                    "command": "任务1对应的每个参数取值"
                }
            }
        ]
    }
}