workflow for job creation and submission - dmwm/WMCore GitHub Wiki

Overview: How a request is pull down to agent?

This quick reference page describes how the request is propagated through Global Queue, Local Queue and WMAgent

Brief summary of queueing system in WMAgent.

There are several layers of queues in WMAgent system (ReqMgr2, GlobalQueue, LocalQueue, WMBSm, JobSubmitter Cache and GlideIn(Global pool)) There are certain conditions need to be met to full down the work (jobs) for each layers.

  1. ReqMgr2 to GlobalWorkqueue (GQ) transition:

    • when request is assigned (Unified makes this transition when the data is transferred to desire sites), requests is pulled down to the GQ - In the form of WorkQueue Elements(WQE) (block level unit - contains multiple potential jobs)
  2. GlobalQueue(GQ) to LocalQueue(LQ) transition:

  • WQEs from GQ are pulled down depending on the priority of the requests, site thresholds and number of active agents. If the jobs allocated given site exceeded the threshold, LQ won't pull down the WQE from GQ. - details of those conditions are explained below.
  1. LocalQueue(LQ) to WMBS transition:
  • All the WQEs form LQ are divided to "Jobs" and stored in WMBS by WorkQueueManager and JobCreator. (No specific conditions are check except the sanity checks - input file location, etc)
  1. WMBS to JobSumbitter Cache transition:
  • Cache the jobs in JobSumbitter Cache memory upto certain limit 30K jobs and periodically refresh the cache (~20 min). It is quite expensive to translate wmbs jobs to condor (or other types of) jobs, that is why we keep the in memory cache.
  1. JobSubmitter Cache to GlideIn transition:
  • Submit the jobs from JobSubmitter cache to GlideIn global pool depending on current pending and running jobs with site threshold and priority of jobs. To control the different type of jobs (processing/production, merge, cleanup, log collect), threshold is maintained by job type within the WMAgents.

Handling the priority and system limit.

There are 2 places higher priority jobs can be stuck due to too many lower priority jobs, (GQ -> LQ, JobSubmitter Cache -> GlideIn). If we let the higher priority jobs bypass the conditions, there is possibility to overloads the system.

  1. Jobs will be pulled down regardless the threshold from GQ to LQ

    • However, no jobs will be pulled down if the schedd limit is reached. (MAX_JOBS_PER_OWNER * some factor)
  2. JobSubmitter is only submitting the jobs for higher priority even though current jobs exceed the threshold.

    • JobSubmitter will push higher priority jobs regardless site threshold limit - 20% (configurable) of the current threshold. However, no jobs will be pulled down if the schedd limit is reached. (MAX_JOBS_PER_OWNER * some factor)

Still to do:

  1. Remove the distinction between Production and Processing threshold.

  2. Find optimal site threshold for each agents which can balance the enough jobs in the system, but also balancing jobs between WMAgent in various scenarios. (WMAgent can be added and removed anytime)

Base Information

There are two status mentioned in document. (request status is different from WorkQueue element status)

Request status (starts with lower case) is the collection of WQE (workqueue elements) status (starts with upper case).

DB stated here are all couchdbs (reqmgr_workload_cache, workqueue, workqueue_inbox).

Both workqueue (GQ, LQ) each contains two dbs (workqueue_inbox and workqueue)

Site threshold is getting from SSB. (Code is here)

Pending threshold is defined by percentage of available slots. (Code is here)

From ReqMgr to GQ

calling queueNewRequest()

  1. GQ pulls "assigned" request from reqmgr db. (reqmgr_workload_cache couchdb)
  2. Populate workqueue_inbox db with request. (WQE in workqueue_inbox is in "Negotiating" status)
  3. Split requests by blocks from workqueue_inbox and insert to workqueue db (WQE is in "Available" status)
  4. Update WQE in workqueue_inbox db to "Acquired" status and in turn change the request status in reqmgr db to "acquired"

There is no specific restiction for populating GQ. If GQ is not populated, most like team is not set for the request. After GQ is populated the status of request should change to "acquired" and GQE status is "Acquired" for workqueue_inbox and "Available" for workqueue

code reference

step 1-4 done by queueWork()

step 3 done by processInboundWork() - called in queueWork()

From GQ to LQ

calling pullWork() 1-4 / processWork() 5

  1. Gets the resource information from wmbs resource db.
    • slots are counted as pending + running slots for each site (don't subtract running jobs)
    • drain site is excluded
  2. Pulls available elements from GQ and match with the resources
    • randomly select the site from sites returned from step 1 above, then
    • check the site whitelist/blacklist with input block location and pile up data location (also xrootd option)
    • sort WQE by priority check whether given site has empty slots (by counting only jobs with higher or same priority as WQE) - This means higher priority WQE will gets matched first also if it has the same priority, request created first matches first.
  3. If there is a match, updates GQE with LQ address and set GQE status to "Negotiating"
  4. Replication is set to replicate the data from GQ. (GQE with LQ address will be replicated to LQ workqueue_inbox), This means all LQE in workqueue_inbox have exactly identical elements (including id) in GQ workqueue db. But # of GQE > # of LQE.
  5. split work from LQ workqueue_inbox to workqueue (set WQE status in workqueue to "Available"). Elements in workqueue_inbox and workqueue are identical except the status

At this point, GQE status in workqueue_inbox and workqueue status changed to "Acquired" and WQE status in workqueue_inbox and workqueue are "Acquired" and "Available" respectfully

code reference

step 1-3 done by pullWork()

step 1 done by freeSlots()

step 2 done by avaiableWork()

step 4 is set by replication in AgentStatusWatcher

step 5 done by processInboundWork() - called in queueWork()

From LQ to WMBS

calling getWorks()

  1. Gets the resource information from wmbs resource db.
    • slots are counted as pending + running slots for each site but subtract running jobs
    • include even for drain site
  2. Pulls available elements from LQ and match with the resources
    • randomly select the site from site from 1 then
    • check the site whitelist/blacklist with input block location and pile up data location (also xrootd option)
    • sort WQE by priority check whether given site has empty slots (by counting only jobs with higher or same priority as WQE) - This means higher priority WQE will gets matched first also if it has the same priority, request created first matches first.
  3. Creates wmbs subscription.

At this point LQE status in workqueue db set to "Running" and propagate to all the GQ db (workqueue and workqueue_inbox) - To do: couldn't find the code doing this yet.

code reference

step 1-3 done by getWork()

Input data (block) location update

Both GQE and LQE locations are updated periodically from cronjob and WorkQueueManager respectfully using PhEDEx block replica call. (Only "Available" WQE locations are updated) So if data is copied in new location after request is assigned it will still update locations for "Available" WQE.

Dataflow diagram

Dataflow Diagram