torque amp An experiment in external Galaxy scheduling - AudiovisualMetadataPlatform/amp_documentation GitHub Wiki
2022
One of the biggest challenges with a system like AMP is the variability of the MGMs that we use to process the user data:
- Some MGMs are single threaded but take a long time
- Some MGMs are multi-threaded and can bog down the machine (gentle is an example of this)
- Cloud-based MGMs use very little in the scope of local resources but can take hours or more to run
- HMGMs also use very little local resources, but they can take days or weeks to finish
By default, Galaxy is naive when it comes to scheduling local processes: there are a fixed number of threads that can each run a single MGM. When faced with multi-threaded MGMs this breaks down because each processing thread could start a MGM that alone would use all of the CPU resources. A potential solution for this is to reduce the number of processing threads that Galaxy uses to run MGMs to avoid bogging down the machine but that would reduce the overall throughput when single-threaded MGMs are waiting to run. Having a static set of threads for a dynamic set of MGMs means that no matter what is chosen, it will always be sub-optimal for some workloads.
For HMGMs, our fork of galaxy was modified to include a job runner that would handle these non-local, long-running processes. While the solution does work, it has a major drawback: it touches deep within the galaxy codebase and because it isn't generic enough to push upstream, it is inherently fragile and means extra work porting it whenever we rebase our fork. Recently we modified it to use the LWLW protocol (I don't remember what Ying used for that acronym, sorry!) that handles both HMGM and cloud-based MGMs
Unfortunatly to take advantage of this galaxy has to be set up to know which job runners these specific tools should be assigned to -- which makes adding new MGMs difficult because there's no way to convey that information to the installer/configuration. Similarly, there isn't a way for the MGMs to indicate how much resources they'll need to the scheduler.
Lastly, with HMGMs and Cloud-based MGMs, there's the likelyhood of hundreds of outstanding tasks that would be waiting for completion.
Since Galaxy was designed to run in an HPC environment, there are job runners for multiple queuing systems built into it: Slurm, Torque, PBS, etc. These job runners behave differently than the local runner because the number of threads that Galaxy allocates are specifically for checking the status of the submitted jobs, rather than doing any work on their own. Additionally, the depth of the job queue is essentially unbounded, which means having hundreds of jobs in flight is normal operation, rather than the exception.
For this experiment, I'm looking at Galaxy's interface to the Torque job queuing system. There are three commands that Galaxy will call based on its needs:
Command Purpose qsub <script> Submit a job to the queuing system. The script is a wrapper that sets up the environment for a single galaxy tool script. It prints a jobid to stdout that is read by Galaxy qdel <jobid> Delete a job that's in the queue. Only called when a job is removed by the user. qstat -x Get the queue system's status (i.e. job list) in an XML format. The format includes the jobid and status. Galaxy calls this a lot when there are pending tasks...once a second
Theoretically, any software that implements these three commands would have full control of MGM scheduling without any changes needed in Galaxy!
Below is an example run of the torque_amp scheduler prototype.
I'm testing this on local Galaxy installation on my laptop, which disturbingly, has more CPUs than the server. I've hardcoded a limit of 4 CPUs available in the scheduler for the demostration, but in reality it would use however many are available to the process.
I modified Galaxy's job_conf.xml to use Torque:
<?xml version="1.0"?>
<job_conf>
<plugins workers="1">
<plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner"/>
</plugins>
<destinations default="local_torque">
<destination id="local_torque" runner="cli">
<param id="shell_plugin">LocalShell</param>
<param id="job_plugin">Torque</param>
</destination>
</destinations>
</job_conf>
The workers is set to 1, which is how many threads that are going to check the queuing system, not how many current tasks can be run.
The galaxy workflow is this:
AWS_Transcribe is a cloud-based MGM and it uses the LWLW protocol, the rest run synchronously The AWS Transcript to WebVTT and AWS Comprehend can run concurrently since they each only depend on single input that is provided at the same time.
I started 6 instances of the test workload within a few seconds of each other to test the concurrency.
The first few nodes finished quickly, each sequentially within their workflow, never exceeding four jobs running concurrently. When AWS Transcribe came up, the queue looked like this:
There are 6 jobs running even though I limited the CPU count to 4 -- that's because async jobs have a cpu load of 0 -- so they're basically free to run as many as we want.
When the transcribe was finished, AWS comprehend and ASW Transcript to WebVTT MGMs are synchronous, so they obeyed the 4 CPU limit. Galaxy has scheduled both of these nodes from the workflow because from its perspective, there's nothing stopping them from running.
After about 20 minutes all six workflow invocations had completed without error.
The torque_amp prototype described here was written in a couple of days and consists of around 850 lines of moderately-commented Python. It has only one non-stdlib dependency: PyYAML, which is also required by the amp_boostrap, so it doesn't need anything more than the standard install would provide. As long as it is in the PATH before any system-installed Torque (or PBS-compliant) job schedulers, Galaxy will happily use it for job scheduling.
I have attached the source of my prototype to this page.
I'm using a queue directory that holds YAML files that contain information about the submitted jobs. The process's standard error and standard out are accumulated here for the asynchronous jobs because they are run repeatedly and all of the logs need to be kept.
amp_params:
aproto: lwlw
async: false
cpus: 1
exit_time: -1
galaxy_params:
GALAXY_LIB: /home/bdwheele/work_projects/AMP/demo_installation/galaxy/lib
GALAXY_VIRTUAL_ENV: /home/bdwheele/work_projects/AMP/demo_installation/galaxy/.venv
is_galaxy: true
name: g213_ner_to_csv_bdwheele_iu_edu
pid: 63885
queue_time: 1653586588.224252
retries: 0
return_code: null
script: /home/bdwheele/work_projects/AMP/demo_installation/galaxy/database/jobs_directory/000/213/tool_script.sh
start_time: -1
status: Q
stderr: ''
stderr_file: /home/bdwheele/work_projects/AMP/demo_installation/galaxy/database/jobs_directory/000/213/galaxy_213.e
stdout: ''
stdout_file: /home/bdwheele/work_projects/AMP/demo_installation/galaxy/database/jobs_directory/000/213/galaxy_213.o
to_delete: false
Since this is all file-based and multiple independent processes will be reading and writing this data, the data has to be synchronized.
The metadata for the jobs is synchronized by advisory file locking (Using POSIX's flock(h, LOCK_EX) methods) which will cause the process to block until it can acquire the exclusive lock. A deletion sentinel is added to the data to avoid writing to a file that has already been deleted by another process but is still open by others. At lock release time, the data for the job is written to the disk. This is the equivalent to SQL's exclusive-row-level-locking. All of this lock management is wrapped in functions so it can be used as part of a python content manager:
with QueuedJob(jobid) as job: # wait for an exclusive lock for that job metadata
job.parse_script(args.script) # the data is updated by this method
job.status(QueuedJob.QUEUED) # and this one
# the lock gets released here and the data written to the disk
print(job.jobid) # we still have a copy of the data.
Or it can be managed directly with the code:
job = QueuedJob(jobid) # read the data
job.release_file() # but immediately release the lock for others
Process-level locking was also implemented to guarantee a single instance of a program executing. That happens in two cases: when a specific job is executing (or polling for results) and the job processor itself. These are managed by open(file, O_CREAT | O_EXCL) with an atexit which deletes the lock file at program termination (regardless of how it terminated, short of a python interpreter bug). So a program that needs to be the only one (aka The Highlander), the program can call the process_lock function:
if not process_lock(job.job_lockfile()):
logging.debug(f"Someone else is working on job {jobid}")
return
All processes that are started while the lockfile exists will log the message and stop.
When Galaxy submits a job to a scheduling system it uses script which sets up directories and environment variables and will invoke a tool_script.sh to actually run the tool/MGM. The tool_script.sh is the same one it generates for local execution. When the job script is submitted, we parse the script and find information we need for execution:
- Lines starting with #PBS contain options for where the caller (in this case Galaxy) expects the scheduler's stdout and stderr, as well as the name of the job. These are standard options.
- If there's a line in the job script which contains a reference to the galaxy environment setup AND there's a tool_script.sh in the same directory, we can do extra things for Galaxy (see below)
- Lines starting with GALAXY_VIRTUAL_ENV and GALAXY_LIB are harvested for environment variables to be used later
If it is a galaxy-supplied job script, we look for more details in the tool_script.sh and set up the base environment
- The script to be run is changed to the tool_script.sh rather than the submitted job script
- The directory structure galaxy is expecting is created
Whichever script the scheduler is going to run is scanned for a #AMP_REQ(...) chunk which provides key-value pairs for scheduler hinting. These are the current settings:
Key Default Purpose cpus 1 How many CPUs (or threads) the job will likely need to perform adequately async false Whether or not this job should be scheduled asynchronously (the cpus value is set to 0 if this is true) aproto lwlw What protocol to use for async job managment. Right now the values 'lwlw' and 'full' are supported
While a non-galaxy job could supply the #AMP_REQ data easily by including it into the submitted job, for our use, how does this information get from the MGM developer to the job scheduler? By placing this text at the end of the <command> block in the tool XML file! Galaxy grabs the contents of that block and embeds that into the tool_script.sh which it generates when applying all of the parameters. However, since the newlines are stripped in the command block, the #AMP_REQ() text has to be at the END of the command or it will effectively comment out the whole command.
So, for the aws_transcribe tool to use LWLW instead of running synchronously, the command in the XML was modified to this:
<command detect_errors="exit_code"><![CDATA[
'$__tool_directory__/mgm_python.sif' '$__tool_directory__/aws_transcribe.py' '$input_audio' '$aws_transcript' --audio_format '$audio_format' --bucket '$s3_bucket
' --directory '$s3_directory' --lwlw &&
'$__tool_directory__/aws_transcript_to_amp_transcript.py' '$input_audio' '$aws_transcript' '$amp_transcript' '$amp_diarization'
#AMP_REQ(async=true)
]]></command>
Using this method allows the MGM developer to inform the scheduling system how to handle this particular MGM, rather than requiring modification of the Galaxy configuration. In addition, other resource requirements (like # of CPUs) can be passed along as well.
Once jobs get submitted, how do they get started? All of the commands (qsub, qdel, qstat) need to happen fairly quickly so jobs cannot be processed as the commands are run. Also, this system isn't cron-based, nor is it a daemon. The trick is the nature of how Galaxy uses the queuing systems. Galaxy will send a qstat every second to see how things are going with the jobs it's submitted. After sending back the current state of the jobs, a background process is started to handle the current jobs and the qstat program exits. If the job processing program is already running, it will exit, since there should only be one program managing the queue. When it has processed all of the jobs, it will exit and remove the lock. The next time someone asks for the status of the jobs, the job processor is started and the cycle continues.
When jobs are processed they are done so in this fashion:
- All of the completed jobs have their results committed to the correct locations and are removed from the system
- Jobs that are exiting and are async with the full protocol are invoked one more time for cleanup, and then moved to completed
- Any jobs that are running are counted toward the allocated CPU count
- Jobs that are running and are async/lwlw are invoked. If the return code isn't 255, the job is moved to the completed state
- jobs that are running and are async/full are invoked in polling mode. If the return code isn't 255 then the job is moved to the exiting state
- Jobs that are queued while there are unallocated CPUs are invoked and moved to running. The cpus are marked as allocated. Synchronous jobs will automatically move to Completed when they finish.
For synchronous jobs it's possible to use the galaxy-supplied job script as-is. However, this won't work for async jobs: when the script is run for the first time the return code and output are sent back to galaxy and it registers it as a failure.
That's why during the script parsing phase the GALAXY_* variables were pulled from the job script and a bunch of directories were created. When it is time to invoke the tool_script.sh, the variables are used (along with others) and the galaxy venv is set up to provide the same environment that the tool_script.sh is expecting. If a non-255 is returned, the files that galaxy is expecting are created (including the accumulation of stdout/stderr from the different invocations) and a galaxy-supplied "metadata/set.py" script is run to create metadata for the generated files.
This is really the only semi-fragile part of this process.
The LWLW async protocol works well in many instances, but one place it falls short is cleanup. If the job is removed from Galaxy's queue, there's no way for the MGM to clean up remote resources. For a production system this is a must so the protocol will need to be updated for this particular case. I slapped together a "async/full" protocol which would handle this case, but it probably needs to be improved.
It's possible to add other hints, so we could do things like automatically kill a job if it has run beyond a certain wall clock time, won't be started if load average is high, or whatever.
image2022-5-26_12-51-41.png
(image/png)
image2022-5-26_12-59-57.png
(image/png)
image2022-5-26_13-2-23.png
(image/png)
torque_amp.tar
(application/x-tar)\
Document generated by Confluence on Feb 25, 2025 10:39