Overhead testing - casangi/RADPS GitHub Wiki
The purpose of this page is to document the results of investigations into the scaling properties of the workflow manager software frameworks Prefect and Airflow, which will inform the down-selection in the eventual architecture decision records. The investigations currently comprise two "Epic" tickets which aim to quantify the task coordination and data transmission aspects of the two workflow manager frameworks. The parent issues are further subdivided into issues tracking the individual investigations into each framework. In addition to these stories, there are some generic overhead tests run using another task scheduling framework, dask.distributed.
Scheduler
Both workflow management frameworks under consideration -- Prefect and Airflow -- consist of separate components to manage the process of scheduling tasks for execution, and to track the state of the operations comprising a given workflow in some backing database. Each of these elements present a distinct scaling constraint on the system that we want to evaluate (and ideally, minimize).
Dask
See https://github.com/casangi/RADPS/issues/34#issuecomment-2718641819 for details.
Prefect
Prefect is a flexible tool which allows for concurrent task submission in a variety of ways. Almost any python function can be submitted to the scheduler for execution. During the development of the example pipeline, intra-stage parallelism was demonstrated via flow deployment concurrency and via task runner submission. The test scripts developed in these investigations make use of both options.
Both scripts are stored in the RADPS/prefect_workflow directory. The deployment script is called scheduler_deploy.py and the test script is called scheduler_overhead.py.
As in the dask example, to isolate the measurements as much as possible to the framework components themselves the test submissions consist of empty sleep functions, with actual wall clock time uniformly distributed between some minimum and maximum fraction of a second (in this case [0.001, 0.1]).
Initializing and running tests
First, make sure to turn off results persistence - otherwise, the benchmarks will use cached results after the first run!
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT='false'
Then, create a locally hosted prefect server, and change the default sqlite database to postgreSQL running in a docker container. This avoids locking errors and generally improves stability and performance.
prefect server start &
docker run -d --name prefect-postgres -v prefectdb:/var/lib/postgresql/data -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=yourTopSecretPassword -e POSTGRES_DB=prefect postgres:latest
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/prefect"
Optionally, if runs of the sub-flow test case are desired, configure the global logging to suppress frequent messages and set up the deployment as a background process and call the test script:
prefect config set PREFECT_LOGGING_LEVEL=ERROR
python prefect_workflow/scheduler_deploy.py &
Finally call the test script:
python prefect_workflow/scheduler_overhead.py
The parameters of the tests (number of tasks, task runners used, etc.) are configurable (as are other settings, e.g. log level), but by default are set in the test scripts themselves. The results are written to artifacts accessible via the Prefect UI, as well as a csv file stored in the working directory for plotting, analysis, and posterity.
Debugging
If results are not as expected, check your prefect configuration settings using
prefect config view
It should only contain something like the following:
PREFECT_PROFILE='local'
PREFECT_API_URL='http://127.0.0.1:4200/api' (from profile)
PREFECT_RESULTS_PERSIST_BY_DEFAULT='false' (from profile)
PREFECT_API_DATABASE_CONNECTION_URL=(local database info)
Results
Task submission
In the task submission case, preliminary results indicate effectively linear scaling in the duration of the parent flow (T_workflow in the dask example) and the duration of the accumulated sleeps (T_sum_task_times in the dask example). Past 2000 concurrent tasks in a a single flow and the dashboard stopped automatically rendering the execution stream for that flow run. Past 8000 tasks and even the manual graph rendering struggled to complete.
| # Tasks | ATM's m3 Pro 12CPU 36GB | Tak's Intel 4CPU(HT) 16GB | Kristin's m3 Pro 36GB | cvpost node |
|---|---|---|---|---|
| 1000 | 8 | 41 | 7 | |
| 2000 | 15 | 81 | 14 | |
| 4000 | 31 | 166 | 28 | |
| 8000 | 60 | 373 | 55 | |
| 16000 | 119 | 961 | 116 | |
| 32000 | 240 | 1792 | 230 | |
| 64000 | 483 | 4080 | 443 | |
| 100000 | 757 | 6840 |
Note: units of each measurement in the table above are reported in seconds, rounded to the nearest whole second.
Flow submission
In the flow submission case, initial results indicate sub-linear scaling in the duration of both parent flow and accumulated sleeps, and that this method scales in a much less performant manner than the task submission case. Optimizations (such as adjusting the values of PREFECT_RUNNER_PROCESS_LIMIT, PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED, PREFECT_API_ENABLE_HTTP2, PREFECT_API_MAX_CONNECTIONS, etc.) are configurable but have not yet been conclusively demonstrated to improve performance. Imposing concurrency limits (e.g., tuning the scheduler occupancy by modifying the slot decay) has likewise not significantly improved performance in these tests.
It may be necessary to change the default setting of the environment variable prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true to avoid prefect.exceptions.MissingResult in the test scripts.
| Flows | ATM's m3 Pro 12CPU 36GB | Tak's Intel 4CPU(HT) 16GB | Kristin's setup | cvpost node |
|---|---|---|---|---|
| 1000 | 89 | |||
| 2000 | 181 | |||
| 4000 | 380 | |||
| 8000 | 960 | |||
| 16000 | 2951 |
Note: units of each measurement in the table above are reported in seconds, rounded to the nearest whole second.
Open questions
Some results of the test script execution in different environments require further analysis to completely describe.
Better visualizations of the results are still being developed.
The change in the scheduler overhead as a function of task duration (i.e., increasing the sleep time across different order of magnitude) has not yet been systematically measured.
The difference in time between the scheduler reporting that the parent flow has completed and the total runtime of the individual sub-tasks or sub-flows (i.e., T_workflow vs. T_sum_task_times) needs more precise characterization.
When both test modes are run back to back, the flow submission test case shows (at the dashboard and the logs) that the coordination of scheduler requests against the state database may be introducing a substantial overhead on the completion of the individual sub-flows:
03:02:30.877 | WARNING | prefect.server.services.cancellationcleanup - CancellationCleanup took 1154.489155 seconds to run, which is longer than its loop interval of 20.0 seconds.
CVPOST018
Setup Conda/Mamba environment
mamba create -n zinc python=3.13
mamba activate zinc
mamba install prefect, matplotlib, prefect-sqlalchemy, psycopg2-binary, postgresql asyncpg
Create folder for Postgres (database will also reside here)
mkdir -p /.lustre/cv/users/jsteeb/postgres_data
initdb -D /.lustre/cv/users/jsteeb/postgres_data
Note: Don't use the cvfiler system as the database grows quickly (disk quota exceeded errors).
Modify /.lustre/cv/users/jsteeb/postgres_data/postgresql.conf
listen_addresses = 'localhost'
port = 5432
max_connections = 1000
shared_buffers = 2GB
work_mem = 64MB
maintenance_work_mem = 512MB
effective_cache_size = 6GB
I suspect tuning this for our use case is going to be important.
Start Postgres and create database
pg_ctl -D /.lustre/cv/users/jsteeb/postgres_data -l /.lustre/cv/users/jsteeb/postgres_logfile start
createdb prefect_db
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://$(whoami)@localhost:5432/prefect_db"
Start prefect in screen
screen -R prefect
prefect server start
Note of useful commands:
ctrl + a + d # exit screen
screen -ls # see all available screens
screen -r prefect #reconect to screen
If you want to modify the postgress config
ctrl + c # Kill prefect
dropdb prefect_db # Optionally delete database if it is getting large.
Killall postgres # Should be a beter way
pg_ctl -D /.lustre/cv/users/jsteeb/postgres_data -l /.lustre/cv/users/jsteeb/postgres_logfile start
createdb prefect_db
prefect config set PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://$(whoami)@localhost:5432/prefect_db"
prefect server start
Using cvpost018 (16 cores), Postgress database and ThreadPoolTaskRunner:
As the time the tasks take increases, the overhead decreases (matches the trend observed with Dask). For this initial attempt the overhead is higher than Dask, for example, for max_sleep=1.0 the Prefect overhead is ~20% and for Dask ~5% (note that the Dask tests were completed on a nmpost node).
Tests should be rerun for Dask and other task_runners.
There is also some strange behaviour related to the database. Even when a workflow has been completed (return values are available), not all the tasks have been written to the database:
Continued testing (sprint 6)
After modifications to the test scripts to implement the decisions from https://github.com/casangi/RADPS/issues/36#issuecomment-2773233437, the task tests will be re-run against the version of scheduler_overhead.py that gets merged into the main branch. Results are aggregated in a csv file tracked in the RADPS repository:
RADPS/prefect_workflow/timing_results.csv
Plotted results from running the scheduler overhead test on kberry's laptop:
Zoomed-in to the flow results here since this is run on such a different size scale:
Results from running with min_time 0.01, max_time=0.1:
Combined:
Here are the combined results of tests on three different workstations, covering three orders of magnitude of sleep time distributions ([0.001,0.01], [0.01,0.1], [0.03, 0.3], [0.1,1.0]):
(Preliminary) Conclusions
Even further testing and analysis seem warranted, but it is possible to start forming general conclusions about "best practices" for scaling with Prefect, even under the limited conditions tested thus far:
- Modifications to the default Prefect server configuration are required to run workflows that don't crash at the level of ~thousand-way concurrency.
- The scaling limits associated with flow/sub-flow concurrency are encountered much sooner (~hundreds of flows) than task concurrency (~tens of thousands of tasks).
- Scheduler and database overhead associated with large numbers of concurrent tasks is substantial, especially when compared to the Dask case (without interfacing via Prefect integration).
- Large numbers of tasks or flows that take shorter than ~seconds to execute do not seem well-suited to scaling with this framework.
- Database deployment and tuning become areas of concern even when running repeated benchmarks on individual developer machines.
Examples of the difference in sleep task runtimes when result caching is enabled:
compared to the same two benchmarks run with result caching disabled: