ReqMgr2 MicroService Transferor - dmwm/WMCore GitHub Wiki

This documentation is meant to describe the architecture, behaviour and APIs for the ReqMgr2 Microservice Transferor module, which is responsible for executing all the necessary input data placement before an workflow can get acquired by global workqueue and the underlying agents.

For the record - given that those abbreviations will be mentioned several times in this document - here is their meaning: GQE: global workqueue element LQE: local workqueue element

Architecture proposal based on request status in ReqMgr2

As previously discussed, we decided not to rely on GQE information in order to create transfer requests because it would be too heavy on CouchDB and thus it wouldn't deliver a good performance.

This ReqMgr2-based model though, assumes that the MicroService would have to parse the request spec and find out what exactly are the input datasets/blocks, pileup and parent data. It also assumes there would be another (micro)service monitoring those transfer requests and driving work acquisition.

Here is how we envision it to work:

  1. Unified assigns a workflow (request transition from assignment-approved to assigned)
  2. MS Transferor queries for requests in assigned, parses their spec in order to find whether there are any input data that needs to be placed (contacting DBS to get the final list of blocks to be transferred). A given request might have the following input data type:
  • zero or one input primary dataset
  • zero or one parent dataset
  • zero to two pileup datasets
  1. With the overall list of data to replicate, create transfer requests based on:
  • SiteWhitelist used during the workflow assignment
  • campaign configuration
  • unified configuration
  • estimated amount of work
  • anything else from the "black box logic"
  1. For each workflow, we need to persist the transfer request IDs - and some extra information - in CouchDB. See this gist for details on the document schema: https://gist.github.com/amaltaro/72599f995b37a6e33566f3c749143154#file-transferor_format-json-L32
  2. Once all transfer requests were successfully made, update the request status assigned -> staging
  • if there is nothing to be transferred (no input at all), then update the request status once again staging -> staged

After this stage, we'd have to rely on the MSMonitor thread to monitor the status of the transfer requests and take actions on the requests when needed. For a full description of the MSMonitor component, please refer to: https://github.com/dmwm/WMCore/wiki/ReqMgr2-MicroService-Monitor

ReqMgr2 MicroService APIs

The MicroService is a data-service which provides set of APIs to perform certain actions. Its general architecture is shown below: MicroServiceArchitecture

In particular the WMCore MicroService provides an interface to perform Unified actions, such as fetch requests from ReqMgr2 data-services, obtain necessary informations for data placement and place requests of assigned workflows into data placement system PhEDEx.

Available APIs

GET APIs

  • /ms-transferor/data/status provides basic information about MicroService. It returns the following information:
{"result": [
 {"microservice_version": "1.3.0.pre3", "microservice": "MSManager", "wmcore_version": "1.3.0.pre3", "status": "OK"}
]}
  • /ms-transferor/data/status?detail=True provides detailed information about requests in MicroService. It returns the following information:
curl --cert $X509_USER_CERT --key $X509_USER_KEY -X GET -H "Content-type: application/json" https://cmsweb-testbed.cern.ch/ms-transferor/data/status?detail=True

{"result": [
 {"microservice_version": "1.3.0.pre3", "status": "OK", "execution_time": 46.331639, "failed_request_transition": 1, "start_time": "Wed, 22 Jan 2020 12:12:42 UTC", "num_datasets_subscribed": 0, "wmcore_version": "1.3.0.pre3", "nodes_out_of_space": ["T2_GR_Ioannina", "T2_CH_CERNBOX", "T2_UK_SGrid_Bristol", "T2_BR_SPRACE", "T2_CH_CERN", "T2_KR_KNU", "T2_DE_DESY", "T2_RU_SINP", "T2_TH_CUNSTDA", "T2_US_Caltech", "T1_IT_CNAF_Disk", "T2_RU_ITEP", "T2_RU_PNPI", "T2_US_Purdue", "T2_US_Wisconsin", "T2_US_UCSD", "T2_US_MIT", "T2_MY_UPM_BIRUNI", "T1_UK_RAL_Disk", "T1_US_FNAL_Disk"], "success_request_transition": 0, "error": "", "microservice": "MSManager", "total_num_requests": 1, "total_num_campaigns": 139, "num_blocks_subscribed": 0, "end_time": "Wed, 22 Jan 2020 12:13:28 UTC"}
]}

POST APIs (NEEDS REVIEW!!!)

  • /ms-transferor/data allows to send specific request to MicroService

post request to process some state

curl -X POST -H "Content-type: application/json" -d '{"request":{"process":"assignment-approved"}}' http://localhost:8822/ms-transferor/data

obtain results about specific workflow

curl --cert $X509_USER_CERT --key $X509_USER_KEY -X POST -H "Content-type: application/json" -d '{"request":{"task":"amaltaro_StepChain_DupOutMod_Mar2019_Validation_190322_105219_7255"}}' https://cmsweb-testbed.cern.ch/ms-transferor/data

{"result": [
 {"amaltaro_StepChain_DupOutMod_Mar2019_Validation_190322_105219_7255": {"completed": 100}}
]}

Data placement model

This section is meant to describe how the Workload Management works with data, what type of workflows and their input data type are handled in the WM and how to take the most advantage from a smart data placement.

The WMCore Workload Management system has two work granularities:

  • First it starts with chunks of work, which gets created by Global WorkQueue, and each input block corresponds to a chunk of workflow that can get acquired by any WMAgent belonging to the same target team name.
    • Exception for a Monte Carlo from scratch, where there is no input data and the chunk of work corresponds to a specific amount of jobs
  • the second granularity level is the grid job itself, which is associated to a higher level workqueue element (or input block), where each workqueue element can spawn 1 to many grid jobs.

Input data classification

A workflow can have multiple different input data types, or no input at all. The possibilities are:

  1. no input data at all: which means the workflow starts from the event generation step;
  2. primary input data: it's the signal to be processed in the workflow and it's described through the InputDataset parameter in the workflow (which can be either at the top level workflow description or in the first task/step description); each workflow can have 0 or 1 primary input dataset.
  3. secondary input data: it corresponds to the pileup dataset to be used in the workflow and it's described normally through the MCPileup parameter, even though it can also be set via DataPileup (these parameters can be either at the top level workflow description, or inside any task/step dictionary); each workflow can have from 0 to a few pileup datasets (likely not beyond 2 pileups).
  4. parent input data: on top of the signal to be processed - which means the workflow has an input primary dataset - the workflow will process events from the parent dataset (the dataset which originated the input dataset). Such workflows are defined through the IncludeParents = true parameter (following the same InputDataset convention). There can't be a parent dataset if an input primary dataset is not defined. Each workflow has either 0 or 1 parent dataset.

Basic rules for data placement

Provided what's been described above, we should chase a block level data placement whenever possible, such that disk space is wisely used and more data can be staged (thus more workflows can get into running). Given that data distribution will follow a block model, we also need to know the total block size (in bytes) such that we can evenly distribute data among different disk resources (block sizes can vary from MB to TB, so we cannot blindly just count the number of blocks in each subscription).

Unified has an extensive logic and monitoring (using data management monitoring sources) of the CMS disk storage and, one should read the Unified code to see the whole logic that got implemented. However, there are some basic rules that we can already describe here, such as:

  • premix pileup there are usually only a few of those and they are very large secondary input datasets, commonly staged in full at the same location (actually it's made 2 copies of it, one in US and the other in Europe), given that such workflows have a low I/O requirements, jobs are set to read data remotely and the workflow usually runs at many different sites. Usually there is no data placement to be performed, because the dataset is already available a couple of sites. However, if needed, the whole dataset will get subscribed to a given location.
  • classical pileup there are many of such secondary input datasets and of varied data sizes; given that such workflows require a high I/O, jobs have to be sent where the data is located in order to decrease WAN bandwidth needs. Initially we will be working with dataset level subscriptions. However, if the dataset is too large, we might consider distributing a fraction of it at block level, but that option needs to be carefully analysed to make sure there is a good distribution of pileup events compared to the input signal. Further details can be checked on the Unified source code.
  • primary + pileup data many workflows require both of these input data types. For such cases, we need to make sure that the primary/parent input blocks are made available at the same locations holding the pileup dataset in full. Exceptions apply for AAA flags though, where either the primary/parent or the pileup dataset could be read remotely.
  • primary data is the main input dataset to be processed and jobs can either read them locally or remotely. Its data placement is made at block level because there are usually many sites processing the same workflow (exception might go to relvals), such that data and jobs get evenly distributed between sites (well, maybe normalised according to their cpus/storage).
  • parent data is rarely used, but there are workflows that require both input primary data and its parent. In such cases, we need to make sure that a given input block (and ALL its parent blocks) are placed at the same storage, otherwise work acquisition gets stuck and no jobs get created for it. In short, it follows the same block level subscription as the primary input.

Input data placement logic with Rucio

This model assumes that workflows will only use data that belongs to the same Rucio account used by MSTransferor, thus data available under any other rucio account will not be considered during the input data placement. The logic is something like:

  1. Using DBS, perform the data discovery (which blocks have to be processed by the workflow);
  2. OPTION-A) if possible, retrieve all the rules created for the container and all its blocks in a single query; OPTION-B) otherwise, retrieve all the rules created for each block (one call per block)
    1. skip blocks that are not needed by the workflow
    2. check if the rucio rule belongs to the account wmcore_transferor and if the RSE expression matches our SiteWhitelist
    3. if there is a rule, reuse it and persist this rule for that given workflow (thus, will be evaluated by MSMonitor in the near future)
    4. if there is NO rule, keep this block in the queue to have a rule created for it
    5. make rules for blocks in bulk, against the same RSE. In the future, we might consider making a single rule per block, such that the continuous data cleanup becomes easier and more efficient.

NOTE: pileup samples are placed as a whole at the same location, for now at least. Said that, proceed as

  1. retrieve all the rules for a rucio container
    1. check if the rucio rule belongs to the account wmcore_transferor and if the RSE expression matches our SiteWhitelist
    2. if there is a rule, reuse it and persist this rule for that given workflow (thus, will be evaluated by MSMonitor in the near future)
    3. if there is NO rule, keep this container in the queue such that a rule gets created
  2. if the workflow requires input and pileup containers, those need to target the same RSE (thus no RSE wildcard allowed)
  3. OPTION-A) if the workflow requires only input container - AND it's meant to process the whole container - then we could have a single rule with grouping=DATASET and an RSE expression covering all sites in the SiteWhitelist (e.g. RSE_1|RSE_2|RSE_3|etc etc); OPTION-B) if the workflow requires only input container - AND it's meant to process a subset of its blocks - we could either:
    1. make a single rule for a list of blocks and provide all the RSEs matching the SiteWhitelist
    2. make multiple rules - one rule for each block - if possible providing all the RSEs matching the SiteWhitelist
  4. if the workflow requires only pileup dataset, we can make a single rule for the rucio container providing all the RSEs matching the SiteWhitelist

Ideas for translation to Rucio

Premix pileup: Rucio can easily be made to have the concept of countries or regions. We'd need two rules to place premixed pileup: One rule to place a full copy at FNAL and one rule to place a copy in "Europe && Tier1". By specifying the grouping as "dataset" (CMS block), the Europe copy would be distributed over the European Tier1s. If certain Tier2s wanted to be hosts as well, you can easily add a bit of metadata to the sites like "Reliable sites"

Classic pileup: Lots of options. For instance, placing 5 copies among 10 "reliable sites" would give each site about 1/2 of the pileup dataset and give you 10 sites where you could run workflows reading it. This can be done with a single rule. If you need to surge production using this dataset, make a rule requiring 8 copies. Then when you are done running, delete the 2nd rule and it will go back down to 5 copies.

Primary data: This is a direct translation. A Rucio container distributed at the Rucio dataset (CMS block) granularity among the relevant sites or group of sites.

Parent data: This is tricky to get the right parents at the right sites. You could micromanage this on a block by block level with a rule per block. Or you just make sure the parent and child datasets are both present at the same site. You could even make a higher level container which contained the parent and the child and require that that be put completely at a site. That way Rucio could pick the site and by deleting one rule, you clean up.

About the RSE quota report

During every cycle of the MSTransferor, we collect storage information such as:

  • quota for a given group/account
  • data storage committed and used and that summary reported is recorded in the logs, e.g.:
2020-03-26 23:07:33,647:INFO:RSEQuotas: Summary of the current quotas (in TB - Terabytes):
2020-03-26 23:07:33,647:DEBUG:RSEQuotas:   T1_DE_KIT_Disk:		bytes_limit: 2676.00, bytes_used: 3167.57, bytes_remaining: -491.57, quota: 2676.00, quota_avail: -491.57
2020-03-26 23:07:33,648:DEBUG:RSEQuotas:   T1_ES_PIC_Disk:		bytes_limit: 882.00, bytes_used: 2161.76, bytes_remaining: -1279.76, quota: 882.00, quota_avail: -1279.76
...

where:

  • bytes_limit and quota represent the quota for a given group/account, i.e., it's the maximum amount of data that can be placed under that group/account at a specific site.
  • bytes_used represents how much data has already been subscribed and approved for a given group/account (thus, not necessarily all transferred over)
  • bytes_remaining represents how many bytes are free, taken from: quota - bytes_used, that's why it's sometimes negative.
  • quota_avail represents a fraction of the total group/account quota. This is a configurable parameter and it's currently set to 80%.

In short, quota_avail is the metric used to evaluate whether MSTransferor can or cannot put data in a given site.

Injecting Unified campaigns into central CouchDB (plus test campaigns)

In order to properly test MSTransferor and MSMonitor services, we need to inject the test campaigns (and the real campaigns available in the Unified database/MongoDB). Here is what we can do at this very moment, until we have a way to keep unified campaigns in synchronized in central CouchDB.

Using a snapshot of the Unified campaigns

Connect to a WMAgent box and run the following commands:

source apps/wmagent/etc/profile.d/init.sh 
wget -nv https://raw.githubusercontent.com/dmwm/WMCore/master/src/python/WMCore/MicroService/DataStructs/campaigns.json
wget -nv https://raw.githubusercontent.com/dmwm/WMCore/master/bin/adhoc-scripts/parseUnifiedCampaigns.py
python parseUnifiedCampaigns.py --fin=campaigns.json --url=https://YOUR_VM.cern.ch/reqmgr2 --verbose=10 --testcamp

this will source the agent environment, download a json snapshot of the campaigns, download the script to inject campaigns into CouchDB and finally the execution of that script. Note that you need to update YOUR_VM parameter when executing that command. The option --testcamp will, in addition to the Unified campaigns, add some WMCore test campaigns that we are used to use in our daily tests.

Using current Unified campaigns

In this case, you need to log in to one of the Unified nodes and fetch an up-to-date copy of all the Unified campaigns. Here are the steps:

  1. Log in to vocms0274 as yourself and
cd /data/admin  # or another directory that you like
  1. Download the parse script from WMCore github
curl https://raw.githubusercontent.com/dmwm/WMCore/master/bin/adhoc-scripts/parseUnifiedCampaigns.py > parseUnifiedCampaigns.py
  1. Now get all the campaigns from local MongoDB, parse and make them use the WMCore schema. fout specifies an output file to dump the campaign information in a json format. Run the script as follows:
python parseUnifiedCampaigns.py --dburi=mongodb://localhost:27017 --dbname=unified --dbcoll=campaignsConfiguration --verbose=10 --fout=output.json
  1. Now you need to move the output file (in this example output.json) to a node with the WMCore environment, to be more precise, with the ReqMgrAux library. Once there and with the file in pwd, you can source the WMCore environment and inject all the campaigns plus those hardcoded as test:
source apps/wmagent/etc/profile.d/init.sh 
python parseUnifiedCampaigns.py --fin=output.json --url=https://alancc7-cloud2.cern.ch/reqmgr2 --verbose=10 --testcamp

Note that you need to specify the instance you want to inject these campaigns, in my case it was alancc7-cloud2. In addition to that, you might need to add extra test campaigns to the source code (refer to insertTestCampaigns method).

MSTransferor logic

[NOTE that this section is currently being adapted]

Here is the full logic for workflow input data placement, as it's currently implemented.

  1. Fetch workflows from ReqMgr2 in "assigned" status, then order them by RequestPriority
  2. Updates cache data with: RSE quotas, pileup docs, unified configuration, CRIC PSN/PNN and campaign docs
  3. In slices of 100 requests, process all the remaining logic.
  4. In RequestInfo module, execute:
    1. setup Rucio token, unified configuration and pileup docs
    2. classify workflows according to their type and input data placement required
    3. getParentDatasets: parse workflows and if they ask for IncludeParents, find the parent dataset in DBS
    4. setParentDatasets: update parent dataset information and data campaign map
    5. setSecondaryData: update secondary names and their expected RSEs.
      • Also create a location intersection for multiple pileup workflows
    6. getInputDataBlocks: knowing the parent and primary dataset names, find all the block names and their sizes in Rucio
    7. setInputDataBlocks: with the rucio block names, apply the run/block/lumi filters.
      • _handleInputDataInfo: performed against DBS and discarding blocks not VALID. Files not valid are discarded as well.
      • Blocks that exist in DBS but not in Rucio are discarded as well.
    8. getParentChildBlocks: for workflows with parent dataset. Use the final primary blocks resolved in the step above and find the parent in DBS for each primary block to be processed in the workflow.
    9. setParentChildBlocks: update the Workflow object with {"primary_block": ["parent blocks"]}
      • Note that any parent blocks not available in Rucio or in DBS are discarded
  5. Back to MSTransferor: for each request that made it out of RequestInfo
  6. TODO Check if campaign exist for the primary, parent and secondary datasets. If not, make an alert.
  7. Check if there is a pileup configuration (or expected RSEs) for the secondary datasets. If not, make an alert.
  8. Given the workflow configuration, set the final list of RSEs as:
    • map the site lists to PNN (RSEs)
    • using the Trust flags and pileup location - and the site -> RSEs above, define the expected intersection RSE list
    • apply the campaign SiteWhiteList/SiteBlackList as further filter the the RSE list intersection
    • last but not least, filter any RSEs that do not have quota available
  9. checkDataLocation: check which primary/parent blocks are already available and/or within the finalRSE list
  10. makeTransferRequest:
  • if workflow has no input data, just mark transfers as performed and advance workflow to next state
  • for data type=parent, skip data placement which will be performed with the primary
  • for data type=secondary, skip data placement because it's done by MSPileup
  • TODO for RelVal, make data placement where the workflow is configured to (bypass campaign)
  • for non-RelVal, match the workflow final RSEs with RSEs that have available quota
    • if final list of RSEs is empty, skip the workflow and retry in the next cycle
  • get a list of dids and their sizes, their grouping and num of copies and make a Rucio rule