Guzzle common services - ja-guzzle/guzzle_docs GitHub Wiki
- What are common services?
- Control Table
- Queryable Runtime audits
- Implementation guidelines for runtime audits
- Starting an instance
- Multi step instances
- Closing an instance
- Resilience and recoverablity
- Extendability
- Terminilogies
- Example of Job call
- Dynamic no of stages depending on the system:
- Job groups
- Mapping of Job Groups to Context and ETL Stages
- Mapping of Jobs to Job group
- Dynamic Context columns
- Catch ups
- Rerunning FAILED jobs
- Job dependency within a stage
Common services act as a glue which connects multiple frameworks and will route the jobs based on the configuration/context to respective connectors/frameworks. It always provides the system parameters, user parameters, env parameters to the respective frameworks.
Common services also has the notion of context, and its able provide those contexts to any connecting framework in form of arguments. Contexts are contains details about env, connections and current & previous state of the job/db.
A control table needs to be maintained which provides a dashboard of current status of the system. For every batch workflow created, an entry must be put in this table and track the progress of the batch. Common services framework may use this table to decide which layer to process next.
batch_id | system | location | business_date | overall_status | layer1_status | layer2_status |
---|---|---|---|---|---|---|
20180102110012 | Order | IN | 2018-01-01 | SUCCESS | SUCCESS | SUCCESS |
20180102120123 | HR | IN | 2018-01-01 | SUCCESS | SUCCESS | SUCCESS |
20180103120016 | Order | IN | 2018-01-02 | RUNNING | SUCCESS | RUNNING |
20180103181028 | HR | IN | 2018-01-02 | OPEN | SUCCESS | |
20180103190123 | Order | SG | 2018-01-01 | RUNNING | RUNNING |
All the frameworks will be logging to a file via log4j, but they also should be returning the key run time metrics to common services as hash maps or any appropriate data structure. Common services should log these in a place where it is easily queryable prefarably in nosql database ( HBase, which is found in almost all Hadoop distributions).
The key metrics which helps in debugging performance issues, identifying resource utilization should be logged into Hbase tables (Hbase is installed in almost all Hadoop distributions.
The runtime audits may contain start, end time of a work unit, stats such as resource utilization (cpu seconds, no. of yarn containers used, no of tasks in mappers/reducers stage, i/o stats), no. of records inserted into target, info such as yarn application id etc., which will be useful to locate server logs of that particular instance.
The stats such as resource utlization etc., mentioned here are only for spark, hive jobs and other entries like stage, batch will not be populated. Also metrics could called differently in spark and hive, but they should be parked under a common column. These are not full list of metrics, but a representative. Any stat which will be useful for analyzing performance, resource utilization should be captured here. For eg., hive logs presents i/o bytes, shuffle bytes, mapper bytes etc., which should be captured. If similar kind of metric is returned by spark they should also be captured.
instance_id | system | location | business_date | work_unit | work_unit_name | parent_id | parent_unit | status | start_time | end_time | no_of_tasks | cpu_milliseconds | records_out |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
20180102110012 | Order | IN | 2018-01-01 | batch | batch | OPEN | 2018-01-02 11:00:12 | ||||||
20180102110123 | Order | IN | 2018-01-01 | stage | layer1 | 20180102110012 | batch | OPEN | 2018-01-02 11:01:23 | ||||
20180102110140 | Order | IN | 2018-01-01 | job_instance | customer | 20180102110123 | layer1 | SUCCESS | 2018-01-02 11:01:40 | 2018-01-02 11:03:15 | 5 | 21501 | 100001222 |
20180102110321 | Order | IN | 2018-01-01 | job_instance | customer_po | 20180102110123 | layer1 | SUCCESS | 2018-01-02 11:03:21 | 2018-01-02 11:04:15 | 2 | 10055 | 302225054 |
20180102110419 | Order | IN | 2018-01-01 | job_instance | invoices | 20180102110123 | layer1 | RUNNING | 2018-01-02 11:04:19 |
Note: Depending on your screen size, all the columns in above table may not be visible. Below are list of columns present
- instance_id
- system
- location
- business_date
- work_unit
- work_unit_name
- parent_id
- parent_unit
- status
- start_time
- end_time
- message
- yarn_application_id
- yarn_queue
- dag_name (applicable for hive jobs. Incase of spark could be something else)
- no_of_tasks (no of containers used by the job in lifetime)
- cpu_milliseconds
- records_out
Below are list of implementation guidelines that could be followed by ingestion and data processing frameworks:
The common services would be providing details such as instance_id and other contexts such as system, location etc., With this information the invoking framework should insert a entry in run time audit table before begging of any job. For example we had a api like below in our first generation framework.
def start_instance(instance_id: String, context_cols: Map[String, String]): Unit {
val start_ts = get_current_timestamp()
val status = "RUNNING"
hbase.upsert(...)
}
As you see the above would create an entry in run time audit table with status as "RUNNING" (which means job is started) and stamps the start time.
Also note that whenever starting an instance, necassary JVM shutdown hooks to configured (described briefly under section Resilience and recoverablity)
Some job instances can have multiple steps. For example in data processing, a merge operation can have three steps - dedup on source table, populating a temp table with merge result and exchaning the temp table partitoin with main table. Each of these steps create a new DAG in hive and can have its own resource utilization.
In case like above three entries can be created in run time audit table and log their run time stats and metrics. In place of instance id, some logical string can be appended with the instance id passed by common services. It could be "2018030101010101-dedup", "2018030101010101-temp", "2018030101010101-exchangepartition"
When a particular instance is completed it should marked as closed by populating end time timestamp, staus should be updated accordingly (SUCCESS or FAILURE), messages if any (incase of failure, failure related message) and runtime time stats should also be captured and inserted. Incase of any exceptions from application itself or from spark/hive necessary try catch blocks should be inplace and application should close only after updating the runtime audit table.
The framework should be able to handle the abnormal failures/crashes of the other frameworks and the edge node as well. This includes exceptions unhandled by others frameworks should'nt crash the common services as it will run in same JVM instance - but it should update the status in control tables and gracefully exit with defined exit codes. There is also possibility that job may be abnormally killed from edge node (kill command) - it should be appropriately handled with JVM shutdown hooks.
Also after a failure, if that particular stage job is rerun, the FAILED batch should be processed again from starting. Most of the scenarios dont need to start from point of failure and entire stage can be reprocessed - this is the simplistic approach. The feature to resume processing from point of failure can be optional, given the full scan in run time audit table might become expensive over the time.
This framework should be made extend for any extra adhoc requirements from the customer. One example use case might be customer may ask to send automated generated email at end of day regarding the status of the system - for this we may need to have abstracts on connections to control tables where we can levereage on it.
- batch: Describe one entire workflow. For example sourcing of Order system data, for India, for business data Jan 1st 2018 is called one batch
- ETL stage/layers: Our pattern for processing a batch is by layers. Meaning we may 1st layer which connects to source DB and puts data into the staging area. Or it could ingestion job which read the file and ingest data into HDFS staging area
- job group: A job group will be set of individual jobs (could be ingestion jobs, transform logic) grouped together. One or more job_group will be tagged to a ETL stage.
- data load job: is definition of a single job (ingestion, sql's etc) - a single unit which can be executed by respective framework
- work unit: batch/stage/data load job may be generically called as a work unit
- status descriptions:
status | description |
---|---|
OPEN | No inprogess jobs running, but overall batch is not fully completed |
RUNNING | Job is inprogress running |
SUCCESS | Job is completed successfully |
FAILED | Failure in a job / job group as whole |
WARNING | Some jobs have completed successfully, some have failed |
Below is an example how would a job be triggered via common services. An command line call, would look like this.
./common_services.sh --location IN --system Order --stage layer1
Below are the activites the common services framework is expected to do:
- Validate the input parameters passed (valid location, system, or the given stage is applicable to a particular system)
- Read the business date of last batch / in progress job(if appliable) for the system/location from the control table. In below example it would be
2018-01-02
.
batch_id | system | location | business_date | overall_status | layer1_status | layer2_status |
---|---|---|---|---|---|---|
20180102110012 | Order | IN | 2018-01-01 | SUCCESS | SUCCESS | SUCCESS |
20180103120016 | Order | IN | 2018-01-02 | RUNNING | SUCCESS | RUNNING |
- If the layer given is 1st layer of that system, we should check the watermarks from the source, if the data for subsequent business dates are available, if yes we should create a new entry in the control and runtime audit table. Watermark from source could a table located externally, a seperate file, argument passed to common services itself. Lets says source watermark say the date available from source is
2018-01-03
.
batch_id | system | location | business_date | overall_status | layer1_status | layer2_status |
---|---|---|---|---|---|---|
20180102110012 | Order | IN | 2018-01-01 | SUCCESS | SUCCESS | SUCCESS |
20180103120016 | Order | IN | 2018-01-02 | RUNNING | SUCCESS | RUNNING |
20180104030016 | Order | IN | 2018-01-03 | OPEN |
Entry will also be made into runtime audit table
instance_id | system | location | business_date | work_unit | work_unit_name | parent_id | parent_unit | status | start_time | end_time | no_of_tasks | cpu_milliseconds | records_out |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
20180102110012 | Order | IN | 2018-01-01 | batch | batch | OPEN | 2018-01-02 11:00:12 | ||||||
20180102110123 | Order | IN | 2018-01-01 | stage | layer1 | 20180102110012 | batch | OPEN | 2018-01-02 11:01:23 | ||||
20180102110140 | Order | IN | 2018-01-01 | job_instance | customer | 20180102110123 | layer1 | SUCCESS | 2018-01-02 11:01:40 | 2018-01-02 11:03:15 | 5 | 21501 | 100001222 |
20180102110321 | Order | IN | 2018-01-01 | job_instance | customer_po | 20180102110123 | layer1 | SUCCESS | 2018-01-02 11:03:21 | 2018-01-02 11:04:15 | 2 | 10055 | 302225054 |
20180102110419 | Order | IN | 2018-01-01 | job_instance | invoices | 20180102110123 | layer1 | RUNNING | 2018-01-02 11:04:19 | ||||
20180104030016 | Order | IN | 2018-01-03 | batch | batch | OPEN | 2018-01-04 03:00:16 | ||||||
20180104030114 | Order | IN | 2018-01-03 | stage | layer1 | 20180104030016 | batch | OPEN | 2018-01-04 03:01:14 |
-
Then common services should start the layer1 jobs. First it would figure out the job groups associated with the
layer1
forOrder
system. This would return list of individual jobs and their job type. Common services would loop through and invoke individual job via calling respective frameworks. -
Frameworks (ingestion/data load) are responsible to create entries in the runtime audit table.
instance_id | system | location | business_date | work_unit | work_unit_name | parent_id | parent_unit | status | start_time | end_time | no_of_tasks | cpu_milliseconds | records_out |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
20180102110012 | Order | IN | 2018-01-01 | batch | batch | OPEN | 2018-01-02 11:00:12 | ||||||
20180102110123 | Order | IN | 2018-01-01 | stage | layer1 | 20180102110012 | batch | OPEN | 2018-01-02 11:01:23 | ||||
20180102110140 | Order | IN | 2018-01-01 | job_instance | customer | 20180102110123 | layer1 | SUCCESS | 2018-01-02 11:01:40 | 2018-01-02 11:03:15 | 5 | 21501 | 100001222 |
20180102110321 | Order | IN | 2018-01-01 | job_instance | customer_po | 20180102110123 | layer1 | SUCCESS | 2018-01-02 11:03:21 | 2018-01-02 11:04:15 | 2 | 10055 | 302225054 |
20180102110419 | Order | IN | 2018-01-01 | job_instance | invoices | 20180102110123 | layer1 | RUNNING | 2018-01-02 11:04:19 | ||||
20180104030016 | Order | IN | 2018-01-03 | batch | batch | OPEN | 2018-01-04 03:00:16 | ||||||
20180104030114 | Order | IN | 2018-01-03 | stage | layer1 | 20180104030016 | batch | OPEN | 2018-01-04 03:01:14 | ||||
20180104030224 | Order | IN | 2018-01-03 | job_instance | customer | 20180104030114 | layer1 | OPEN | 2018-01-04 03:02:24 |
Framework should allow different number of stages for different stages. When all stages for a batch is completed the overall status in control table should be updated as success. The stages applicable to different systems can be captured in the config yaml file. An example would be as follows.
valid_systems: [ORDER, HR, SALES, INVENTORY]
valid_stages: [STG, FND, SNAPSHOT, PLP, OUTPUT]
system_stage_mapping:
- system: ORDER
stages: [STG, FND, SNAPSHOT]
- system: HR
stages: [STG, FND]
- system: INVENTORY
stages: [STG, FND, SNAPSHOT, PLP, OUTPUT]
The job (ingestion, data processing job instances) can be grouped together called as job group. These job group can be then linked to a system and stage. When a stage is invoked for a given system, the framework should read the list of job group applicable and loop through individual job instances.
job_group_mapping:
- system: ORDER
stage: STG
job_group: [STG_ORDER_JOB_GROUP]
- system: ORDER
stage: FND
job_group: [FND_COMMON_JOB_GROUP]
- system: INVENTORY
stage: FND
job_group: [FND_COMMON_JOB_GROUP, FND_INVENTORY_JOB_GROUP]
On can specify the job group for specific sub-system, location,sub-location, custom1 and custom2. The framework should look for more precise one where with the order of system, sub-system, location,sub-location, custom1 and custom2.
job_group_mapping:
- system: ORDER
stage: STG
job_group: [STG_ORDER_JOB_GROUP]
- sub-system: INVOICING
stage: STG
job_group: [JOB_GROUP_X]
- sub-system: ORDER
- sub-system: SHIPMENT
stage: STG
job_group: [JOB_GROUP_Y]
- system: INVENTORY
stage: FND
job_group: [FND_COMMON_JOB_GROUP, FND_INVENTORY_JOB_GROUP]
In above example if a job is invoked for system=ORDER , sub-sytem=INVOCING and stage=STG then STG_ORDER_JOB_GROUP is invoked
if a job is invoked for system=ORDER , sub-sytem=SHPIMENT and stage=STG then JOB_GROUP_Y is invoked as is more precise
Mapping of job to job group, could be represented as below.
Job instance to job group mapping can be placed in a seperate yaml file, rest all defined in this page can be in one single file.
job_groups:
- name: STG_ORDER_JOB_GROUP
jobs:
- STG_ORDER_JOB1
- STG_ORDER_JOB2
- STG_ORDER_JOB3
- name: FND_COMMON_JOB_GROUP
jobs:
- FND_JOB1
- FND_JOB2
- FND_JOB3
- name: FND_INVENTORY_JOB_GROUP
jobs:
- FND_INVENTORY_JOB1
- FND_INVENTORY_JOB2
- FND_INVENTORY_JOB3
This would be a good to have feature. Hence an evaluation should be made how hard to implement this in a easily configurable way.
System and location are global context columns in which a job is triggered. In some cases, there may be requirement that jobs to be triggered in additional contexts as well. The context columns may be highly business terms, hence names of these context vary between projects. Thats why its important to define these names dynamically.
For example, there may be a context called frequency. Based on the frequeny specified batches should be spawned in control table accordingly. If frequency is specified as daily, one batch per business day should be created for that system/location. If monthly, one batch per month should spawned. Here jobs from scheduler can be called daily. The framework should read watermarks from source system & EDW, infer if new business date is available (next day or next month) and then spawn a batch. If no new business date is found, job may skip creating a new batch and skip without doing anything.
Another example would be business_module. A system/location can have differnet business modules attached to it. Based on the module, different set of jobs would be called.
Its important to note that these additional context columns are will be optional for any given system. System A might have frequency, busienss_module, System B might have just business_module and Sytem C might not have any context (apart from default system/country)
-
There should be a separate stage in the run to initialize the batches. This should ideally create the batches based on from/to and other parameters provided after validating against the JSON
-
What are catchups?
There may be cases where a EDW may lag far behind the source system. In these cases when jobs are triggered, the framework should create one batch for each business date and loop thorugh the batches one by one and source them into EDW.
-
Why catchups are needed?
Given the dynamic invoved in handling multiple systems, the lag between source systems and EDM are un-preventable due to upstream issue, cluster issue, environment issue, planned downtimes etc., In most cases its necessary to build point in time snapshots for a given source/location.
-
How and where does looping happen?
If a lag is detected, at time of creating batches in control table multiple entries are created. When a job for given stage is invoked it loops thorugh all batches, always beggining with earliest business date unprocessed.
Some stages are run togther always, we may call them as stage pairs for lack of better term. These stage pairs will always have a single job. A common example is foundation and snapshot. When a foundation is invoked, immediately snapshots have to be taken before proceeding next business date, in case of catch ups. In these scenarios catch up looping slightly varies as it completes two stages and then proceeds to next business date.
The system should support capturing number of time periods/ batches to clear when there is backlog instead of clearing all. This can be respective json file
The jobs will fail during processing for variety of reasons including application issues, cluster issue, infra issue etc,. In this case when the job is resubmitted (jobs are always invoked for the whole stage, not at particular data processing id), the FAILED stage should be reprocessed.
Default behavior can be to reprocess the config's from beginning, meaning if there are 10 data processing id's for a given stage and the job failed in 5th step, when restarting the stage all data processing jobs should be processed again beginning from 1st.
An config can be provided (which is optional config, not mandatory) to specify if the jobs needs to processed again only from point of failure i.e., resuming from 5th job in above example.
There are multiple ways to specify job dependency. Simplistic way is to specify the dependency in terms of job sequence number.
The other way to specify a parent task (or group of parent task) needs to be completed before starting the mentioned job. A DAG graph can be constructed after reading through the all the processing config id's for the mentioned stage and DAG can traversed to meet the dependency requirements. Since a DAG is constructed for mentioning dependency, it gives the flexibility to have parallel streams when processing the jobs within a stage.
Thus common services should be able to run parallel streams within a stage. Parallelism degree can be made as an parameter.