Execute Federated Learning Tasks With Spark in FATE - FederatedAI/KubeFATE GitHub Wiki

Overview

The previous document Use FATE Client to Build Jobs in Jupyter Notebook mentioned that FATE 1.5 LTS supports to use Spark as the underlying computing engine. This document will briefly introduce the implementation details to help users tune or troubleshoot errors in practical use.

Advantage of Using Distributed Computing Engines

An important component in FATE is "FATE Flow". It is responsible for the management and scheduling of user's job. As shown in the official document, "FATE Flow" has two working modes: standalone mode and cluster mode. In standalone mode, data storage and computing are executed locally in "FATE Flow" and thus cannot be effectively scaled, so it is mainly used for learning and testing. In cluster mode, data storage and compute are no longer executed locally but in a distributed cluster, and the cluster size can be scaled according to actual business needs.

FATE, by default, supports the use of "Eggroll" as its underlying computing and storage cluster, which has been able to meet the requirements of most federated learning application scenarios after continuous iteration and optimization. Eggroll itself is a relatively independent cluster, providing a unified entry and a set of APIs for external use. External applications offload the jobs to the Eggroll cluster for execution. Eggroll itself supports horizontal scaling, so users can adjust cluster size based on the actual scenario.

In FATE v1.5, to use Apache Spark as the computing engine is now officially supported. Spark is an in-memory compute engine with widespread industry recognition. Due to its simplicity, efficiency, mature cluster management tools and other features, Spark has been widely deployed and used in many companies. This is one of the main reasons why FATE supports it.

KubeFATE v1.5 supports the deployment of FATE On Spark. It can enable a Spark cluster in the form of container to provide compute services to FATE. For details, see the following section: Using Spark as the Compute Engine.

Firstly, let's simply analyze the structure of the service to see how it interacts with the different compute engines.

Introduction to the FATE Flow Structure

In FATE v1.5, the "FATE Flow" is abstracting storage, compute, federation, and other operations into different interfaces. An upstream application can access different runtimes with the same interface, thereby making it very easy to scale the support for other computing (such as Spark) or storage (HDFS, MySQL) services. Generally speaking, using FATE for federated learning jobs contains the following steps (provided that the networking has been completed):

  1. Call the interface provided by FATE Flow to upload the dataset for training
  2. Define the pipeline for the training job and call the FATE Flow interface to upload the job
  3. Continuously adjust the training parameters according to the training results and obtain the final model
  4. Upload the dataset for prediction through FATE Flow
  5. Define the prediction job and execute it through FATE Flow

In the above steps, besides job scheduling is requiring FATE Flow, other parts of the job such as storage and computing can be dispatched to other services.

As shown below, some interfaces are listed here in the "FATE Flow" box, including:

  • "Storage Interface", used to manage the dataset, such as uploading local data, delete uploaded data, etc.
  • "Storage Meta Interface", used to manage the metadata for the dataset.
  • "Computing Interface", used to execute the computing job.
  • "Federation Interface", used to transmit data between the participants.

fateflow_arch

The green square is the specific implementation of the interface, the dark gray square is the client used to interact with the remote service, and the blue square corresponds to other runtimes independent of the FATE Flow service. The computing interface for example, has a class that implements the interface is "Table". There are two Table types, one of them uses "rollpair" to interact with the "Eggroll" cluster; and the other uses "rdd" in "pyspark" to interact with the "spark" cluster.

Comparison of Compute Engines

As mentioned in the previous section, FATE Flow can use different computing, storage and other services through abstracted interfaces. However, due to dependencies, implementation and other reasons, there are certain constraints of the selection of these services, however, this can be divided into two categories:

  1. Using Eggroll as the compute engine
  2. Using Spark as the compute engine

When Eggroll is used as the compute engine, the overall architecture of FATE is as follows: arch_eggroll

There are three different types of nodes in the Eggroll cluster, namely Cluster Manager, Node Manager, and Rollsite. Cluster Manager is responsible for providing service entry and allocating resources, Node Manager is the node that actually executes the compute and storage, and Rollsite provides transmission services.

When Spark is used as the compute engine, the overall architecture of FATE is as follows: arch_spark

Spark itself is an in-memory compute framework that generally needs other services for persistent output. Therefore, HDFS is needed for data persistence. Federated transmissions are divided into two parts, respectively the synchronization of instructions (pipeline) and the synchronization of messages during training, which are handled by the nginx service and the rabbitmq service respectively.

Using Spark as the Compute Engine

Prerequisites

As mentioned in the previous subsection, using Spark as the compute engine is also dependent upon services such as Nginx, RabbitMQ, and HDFS. Complete service installation, deployment, and configuration can be accomplished in three ways.

  1. For cluster installation and deployment based on a bare machine, refer to FATE On Spark Deployment Guide.
  2. For cluster installation and deployment based on "docker-compose", refer to Using Docker Compose to Deploy FATE, provided that you set computing_backend to spark in the configuration file. During real deployment, HDFS, Spark and other services will be pulled up in the form of containers.
  3. For cluster deployment based on "kubernetes", refer to Kubernetes Deployment Scheme. When creating a cluster, you just use "cluster-spark.yaml" file and thus can create a FATE On Spark cluster based on k8s. You can also define the number of Spark nodes in this file.

Note: So far, the recommended versions of Spark, HDFS and RabbitMQ are 2.4, 2.7 and 3.6 respectively

For users who want to use an existing Spark cluster, in addition to deploying other dependent services, they also need to solve the problem of FATE's dependency on the python package. Specifically, they will need to execute the following commands for all Spark nodes that need to run the federated learning workload:

  1. Create a directory to place the files in
$ mkdir -p /data/projects
$ cd /data/projects
  1. Download and install "miniconda"
$ wget https://repo.anaconda.com/miniconda/Miniconda3-4.5.4-Linux-x86_64.sh
$ sh Miniconda3-4.5.4-Linux-x86_64.sh -b -p /data/projects/miniconda3
$ miniconda3/bin/pip install virtualenv
$ miniconda3/bin/virtualenv -p /data/projects/miniconda3/bin/python3.6 --no-wheel --no-setuptools --no-download /data/projects/python/venv
  1. Download FATE project code
$ git clone https://github.com/FederatedAI/FATE/tree/v1.5.0

// Add python dependency path
$ echo "export PYTHONPATH=/data/projects/fate/python" >> /data/projects/python/venv/bin/activate
$ echo "export PYTHONPATH=/data/projects/fate/python" >> $SPARK_HOME/spark-env.sh
  1. Enter the virtual environment of python
$ source /data/projects/python/venv/bin/activate
  1. Modify and download the python library
// Remove tensorflow and pytorch dependencies
$ sed -i -e '23,25d' ./requirements.txt
$ pip install setuptools-42.0.2-py2.py3-none-any.whl
$ pip install -r /data/projects/python/requirements.txt

By now, the dependency is installed, then when submitting a job in FATE, you need to configure spark.pyspark.python to specify the python environment to use.

Example

When a Spark cluster is ready, you can use it in FATE by setting backend to 1 in job definition so that the job will be eventually executed on the Spark cluster..

It should be noted that although the master of the Spark cluster can be specified in the job's configuration, HDFS, RabbitMQ, Nginx and other services need to be specified in the form of a configuration file before FATE Flow is enabled. Therefore, when the address of any of these services changes, you need to update the configuration file and restart the FATE Flow service.

When the FATE Flow service is enabled as per normal, you may use "toy_example" to verify the environment. Here are the steps:

  1. Modify the toy_example_config file as follows:
{
  "initiator": {
    "role": "guest",
    "party_id": 9999
  },
  "job_parameters": {
    "work_mode": 0,
    "backend": 1,
    "spark_run": {
      "executor-memory": "4G",
      "total-executor-cores": 4
    }
  },
  "role": {
    "guest": [
      9999
    ],
    "host": [
      9999
    ]
  },
  "role_parameters": {
    "guest": {
      "secure_add_example_0": {
        "seed": [
          123
        ]
      }
    },
    "host": {
      "secure_add_example_0": {
        "seed": [
          321
        ]
      }
    }
  },
  "algorithm_parameters": {
    "secure_add_example_0": {
      "partition": 4,
      "data_num": 1000
    }
  }
}

In which, spark_run defines the parameters provided to "spark-submit", so you can specify the address of the master through master, or specify the path of spark.pyspark.python through conf, and so on. A simple example is given below:

...
  "job_parameters": {
    "work_mode": 0,
    "backend": 0,
    "spark_run": {
      "master": "spark://127.0.0.1:7077"
      "conf": "spark.pyspark.python=/data/projects/python/venv/bin/python"
    },
  },
...

If the spark_run field is not set, the configuration in ${SPARK_HOME}/conf/spark-defaults.conf is read by default. For more spark parameters, refer to Spark Configuration.

  1. Submit the job and view the job's status: run the following command to submit the job and run "toy_example".
$ python run_toy_example.py -b 1 9999 9999 1

View fate_board fateboard_job

View Spark board spark_job

As you can see from the above output, FATE has successfully run the "toy_example" test through the Spark cluster.

Conclusion

This document mainly discusses the importance of distributed systems to FATE, compares the difference between the "Eggroll" and "Spark" engines supported by FATE, and finally elaborates how to use Spark to run jobs in FATE. Given the limited space, the introduction of FATE On Spark was brief, and users may wish to find out more about node resource allocation, parameters tuning, etc. that have not been covered here.