9.1 Tailoring Pipeline Resources - NEONScience/NEON-IS-data-processing GitHub Wiki

What are pipeline resources?

For every job that runs in Pachyderm, at least one Kubernetes pod will spin up (more if parallelism > 1). Each pod manages the containers that are needed to run the pipeline. These are:

  • user container: runs the user code
  • storage container: executes file transfers & other Pachyderm operations

To effectively manage simultaneous processing workloads, Kubernetes needs to know how much CPU and memory the user and storage containers will need. These resource needs are specified in the pipeline specification.

If your pipeline is using more resources than requested, Kubernetes will probably kill the container at some point. This often results in the entire job restarting, which can lead to an endless restart loop and unhappiness all around. However, requesting more resources than needed is a waste of processing power and money, so please spend some time understanding the resources that the pipeline actually needs to reliably run and set them appropriately in the pipeline specification.

Setting user container resources

User container resources are the easiest to understand and set. The user container runs your code, so you may already have an idea of how much CPU and memory the code uses. If not, you can find out by looking in the GCP Monitoring interface for the pipeline during a job run.

View the monitoring output for a job that processed at least one datum, focusing on the following metrics:

  • Kubernetes Container - CPU usage
  • Kubernetes Container - Memory usage

An example of the memory usage that the prt_calibration_group_and_convert pipeline used during a large job is shown below:

Adjust as needed the selected settings in the interface for:

  • Metric
  • container_name
  • pipelineName

Delete any Group by settings toward the bottom of the interface so that you can see at least 2 lines in the plot. The Group by setting often gets automatically populated when changing metrics. If there is a single line in the plot, the Group by setting likely needs to be deleted.

The number of simultaneous lines in the graph will be 2 x the parallelism_spec set in the pipeline specification. You can ignore the lines with evictable in their names in the legend. Ensure you have zoomed in enough that the maximum values do not change with further zooming.

Use the top level value of the observed cpu and memory plus a small buffer to set the resource_requests spec in the pipeline specification. For example:

resource_requests:
  memory: 2.5G
  cpu: 6.3
resource_limits:
  memory: 4G
  cpu: 7

Set the resource_limits to higher values that you don't expect to hit if all is running well.

A note about parallelization

The resource requests and limits specified in the pipeline specification are per container. Thus, these do not change if you increase the parallelization in Pachyderm via the parallelism_spec field, since multiple containers will spin up, each with the same resources. However, if your code uses internal parallelization, such as with the doParallel package in R, the resource needs per container will need to scale with the internal parallelization since internal parallelization occurs within the same container.

Setting storage container resources

Storage container resources are more challenging to understand and set, but there are some guidelines and tools to help.

Understanding storage container memory needs

The following description is from Dale Georg at Pachyderm, edited for clarity and generalization:

1. First, we know that there is an in-memory chunk cache that caches downloaded chunks. The size of this cache is variable because it is defined in terms of the number of chunks that it caches rather than having a limit based on byte-size, and chunks themselves are variably sized. We do know that the maximum chunk size is 20 MiB and that the in-memory cache defaults to a max size of 100 chunks, so we therefore know that the in-memory cache can't exceed 2 GiB, but it's hard to predict in advance what the actual cache size might be for any particular job run. Fortunately, we have the benefit of hindsight. :slightly_smiling_face: The in-memory cache is never emptied during the lifetime of the worker, so after the job is done running, the bulk of the memory being used by the storage container must be that in-memory cache.

2. Next, we know that there's a download buffer that buffers chunks being downloaded from the datums in the datum set. The maximum size of this buffer is 1 GiB. This buffer is cleared after the processing of each datum set, so the actual size used is driven by the size of the datum set. You've configured things to have 1 datum per datum set, so datum size and datum set size are equivalent, but that's not typically the case, so you have to bear that in mind.

3. Similar to the download buffer, there is also an upload buffer. This buffer is cleared after the processing of each datum set, so the actual size used is driven by the size of the output datum set.

4. Finally, in addition to the actual data commit, there is also the meta commit to be considered which (for reasons I don't fully understand) is the same size as the input+output data commits. This means that for each datum set, we need to double the memory requirements from points 2 and 3 above.

Taking all of this together, we can calculate that on average the memory required to process a datum set in this scenario should very roughly be:

(2 * input size) + (2 * output size) + X
where X  is the size of the in-memory chunk buffer <= 2 GiB

Setting the resource request of the storage container

The equation above is a general guideline and can be used as a starting point for setting the memory request of the storage container. The actual memory needs are sometimes significantly higher or lower. Our data files are typically small (less than a few MB), so setting X to the average of the input size and output size is often successful.

The dialog above is also why we almost always set the datum set size to a single datum so that the input size and output size components of the memory usage equation refer to that of a single datum and thus relatively predictable. In the pipeline spec, setting the datum set size looks like this:

datum_set_spec:
  number: 1

Note, however, that X (the in-memory cache) is currently the only component of the storage memory use that is not able to be constrained by the size of the datum set. Rather, it escalates with increasing job size up to 2 GB, so if you expect to process many datums in a single job (i.e. reprocess) and your input + output repository size exceeds 2 GB, expect X to be 2 GB. Pachyderm currently has a feature request in progress to reduce this.

The CPU usage of the storage container is reliably less then 1 CPU unless the memory request is too low. Below are typical settings for the storage container resource request.

sidecar_resource_requests:
  memory: 3G
  cpu: 0.6

Pachyderm will automatically attempt to reclaim memory by running garbage collection on the storage container when the memory usage reaches 95% of the request. If the memory request is too low, the garbage collection will have a hard time keeping memory below the request and will run too frequently. This causes the CPU usage to rise and reach values well above 1. When you see this, increase the memory request. You'll also know if the memory request is too low if you see the job restart multiple times, which indicates that Kubernetes is killing and restarting the container, likely due to over-consumption of memory.

If jobs are running well and the equation indicates a storage memory request in excess of 3 GB, try reducing the memory request and monitor the pipeline for restarts or high storage CPU usage.

Because of the generally stable memory consumption as a result of garbage collection, and because there are sometimes very brief spikes above the request, we do not set any limits in the sidecar_resource_limits spec.