Configuration Extensions - CoxAutomotiveDataSolutions/waimak GitHub Wiki
Configuration extensions are additional behaviours that can be configured on a flow by setting Spark configuration values. There are currently three configuration extensions available in Waimak:
cacheasparquetandsparkcacheextensions used for caching labels without calling the explicitcacheAsParquetorsparkCacheactions on a labeldeequextension used for setting Deequ monitoring an alerts on a label without using the explicit Deequ actions
Extensions are enabled by setting the spark.waimak.dataflow.extensions property. For example, to enable both the cacheasparquet and deequ extensions you would set the following:
spark.waimak.dataflow.extensions=cacheasparquet,deequ
Each extension then requires it's own configuration.
Contents
Cache extensions
The cacheasparquet and sparkcache extensions provide a simple way of requesting Waimak to cache specific or all labels on the flow. Underneath, Waimak will call the cacheAsParquet or sparkCache actions explicitly just before a flow is executed.
These extensions are useful if you would like to test caching various labels for performance purposes without having to change your flow and recompile.
The extensions let you cache specific labels:
spark.waimak.dataflow.extensions.cacheasparquet.cacheLabels=table1,table2
spark.waimak.dataflow.extensions.sparkcache.cacheLabels=table1,table2
Or cache all labels on the flow (use carefully!):
spark.waimak.dataflow.extensions.cacheasparquet.cacheAll=true
spark.waimak.dataflow.extensions.sparkcache.cacheAll=true
By default, a cacheAsParquet or a sparkCache action will only cache a label if it is used as input to more than one action (see spark.waimak.dataflow.cacheOnlyReusedLabels parameter in Configuration Parameters).
Putting it all together, to cache labels table1 and table2 using cacheAsParquet you would configure the following:
spark.waimak.dataflow.extensions=cacheasparquet
spark.waimak.dataflow.extensions.cacheasparquet.cacheLabels=table1,table2
or, to cache labels table1 and table2 using sparkCache:
spark.waimak.dataflow.extensions=sparkcache
spark.waimak.dataflow.extensions.sparkcache.cacheLabels=table1,table2
You can mix-and-match sparkcache and cacheasparquet, but if you specify a label to be cached in both ways, it will only do one.
Deequ extension
The Deequ configuration extension provides a set of convenience configurations for adding data quality monitoring to labels on a Waimak flow using Deequ actions underneath. The extension provides only a subset of the functionality available through the Deequ actions directly, but abstracts away much of the complexity required when using the Deequ actions.
If you require more complex functionality than the extension provides then you should use the Deequ actions directly.
Overview
Before reading this page, it might be useful to read the Data Quality page.
The Deequ extension can be enabled using the following configuration value: spark.waimak.dataflow.extensions=deequ. To configure the extension you must configure both the alerters you wish to trigger when data quality checks fail and the checks themselves.
Alerters
Alerters are invoked when a data quality check fails. They can be configured to alert at both warning and critical thresholds depending on how a check is configured.
In the configuration extension, multiple alerters can be configure to alert at different and overlapping thresholds. The alerters will be invoked in the order they are configured. For example, you can have warning alerts to only alert via email, but have critical alerts to alert via email and then throw an exception. Each check will get the same alerter configuration, and you must use the Deequ actions directly if your use-case requires each data check to have a different alerter configuration.
By default, there are three prepackaged alerters however they are loaded via ServiceLoader so you can provide your own:
- Exception alerter simply throws an exception in the application when invoked
- Slack alerter will send a message to a Slack channel using a supplied token
- Email alerter will send a message to one or more email addresses using the supplied SMTP configuration
You configure which alerters to use and the threshold each alerter should alert at in the following way:
spark.waimak.dataquality.alerters=email,exception
spark.waimak.dataquality.alerters.email.alertOn=warning,critical
spark.waimak.dataquality.alerters.exception.alertOn=critical
Each alerter may also need additional configuration.
Exception alerter
This alerter simply throws an exception with the check message when triggered and requires no additional configuration.
For example, to throw an exception when critical alerts occur you would configure the following:
spark.waimak.dataquality.alerters=exception
spark.waimak.dataquality.alerters.exception.alertOn=critical
Slack alerter
This alerter sends an alert message to a Slack channel using a configured webhook token (everything after https://hooks.slack.com/services/).
You can set the token using the spark.waimak.dataquality.alerters.slack.token configuration parameter.
For example, to alert at warning level to Slack you would configure the following:
spark.waimak.dataquality.alerters=slack
spark.waimak.dataquality.alerters.slack.alertOn=warning
spark.waimak.dataquality.alerters.slack.token=*************
Email alerter
The email alerter will send an alert to one or more email addresses via an SMTP server, and takes the following configuration settings:
| Parameter | Default | Required | Purpose |
|---|---|---|---|
spark.waimak.dataquality.alerters.email.to |
Empty | No | Comma-separated list of 'to' destination addresses |
spark.waimak.dataquality.alerters.email.cc |
Empty | No | Comma-separated list of 'cc' destination addresses |
spark.waimak.dataquality.alerters.email.bcc |
Empty | No | Comma-separated list of 'bcc' destination addresses |
spark.waimak.dataquality.alerters.email.from |
None | No | From address in email message |
spark.waimak.dataquality.alerters.email.host |
N/A | Yes | Hostname/address of email server |
spark.waimak.dataquality.alerters.email.port |
25 | No | Port of email server |
spark.waimak.dataquality.alerters.email.auth |
False | No | Whether to use authentication to email server |
spark.waimak.dataquality.alerters.email.starttls |
True | No | Whether to enable starttls when communicating with email server |
spark.waimak.dataquality.alerters.email.ssl |
False | No | Whether to enable ssl when communicating with email server |
spark.waimak.dataquality.alerters.email.user |
None | No | Username to use if authentication enabled |
spark.waimak.dataquality.alerters.email.pass |
None | No | Password to use if authentication enabled |
spark.waimak.dataquality.alerters.email.debug |
False | No | Whether to enable debugging on the session object. Useful for debugging email connection issues |
For example, to configure alerts to go via email at critical level to the SMTP email server smtp.example.com to [email protected] you would configure the following:
spark.waimak.dataquality.alerters=email
spark.waimak.dataquality.alerters.email.alertOn=critical
[email protected]
spark.waimak.dataquality.alerters.email.host=smtp.example.com
Custom alerter
It is possible to define your own alerter. See Data Quality#Custom alerter for how to do this.
Checks
Checks are used to configure which data quality checks are actually performed against labels. The checks on each label are configured separately, and each label can have multiple checks.
To add checks to labels, you first configure the list of labels to run checks against:
spark.waimak.dataquality.deequ.labelsToMonitor=label1,label2
Then configure the list of checks to run against each label:
spark.waimak.dataquality.deequ.labels.label1.checks=completenessCheck
spark.waimak.dataquality.deequ.labels.label2.checks=completenessCheck,uniquenessCheck
Then configure each check for each label:
spark.waimak.dataquality.deequ.labels.label1.completenessCheck.columns=col1
spark.waimak.dataquality.deequ.labels.label1.completenessCheck.warningThreshold=0.8
spark.waimak.dataquality.deequ.labels.label2.completenessCheck.columns=col2
spark.waimak.dataquality.deequ.labels.label2.completenessCheck.warningThreshold=0.9
spark.waimak.dataquality.deequ.labels.label2.uniquenessCheck.columns=col1
There are currently four built-in checks you can use via the configuration extension:
- Completeness Check for monitoring null values in records
- Uniqueness Check for monitoring the uniqueness of values in a column or set of columns
- Latest Timestamp Check for monitoring the latest timestamp in a column
- Generic SQL Check for building checks from any valid Spark SQL expression
These pre-built checks only scratch the surface of the functionality available in Deequ, and you can leverage the full power of Deequ by using the Deequ actions directly.
Completeness Check
The completeness check allows you to verify that a column or columns meet a given threshold of completeness (or non-nullness).
The check takes the following configuration:
| Parameter | Default | Required | Purpose |
|---|---|---|---|
spark.waimak.dataquality.deequ.labels.${labelName}.completenessCheck.columns |
N/A | Yes | List of columns to run the check on for this label |
spark.waimak.dataquality.deequ.labels.${labelName}.completenessCheck.warningThreshold |
None | No | A threshold fraction (e.g. 0.8) of completeness under which a warning alert will be generated |
spark.waimak.dataquality.deequ.labels.${labelName}.completenessCheck.criticalThreshold |
None | No | A threshold fraction (e.g. 0.6) of completeness under which a critical alert will be generated |
You must use the Deequ actions directly if you need to set different thresholds for columns within a label.
Uniqueness Check
The uniqueness check allows you to verify that a column or set of combined columns provide a unique value. You can also set a threshold of uniqueness that must be satisfied.
The check takes the following configuration:
| Parameter | Default | Required | Purpose |
|---|---|---|---|
spark.waimak.dataquality.deequ.labels.${labelName}.uniquenessCheck.columns |
N/A | Yes | List of columns to consider when testing uniqueness. If multiple columns are given, the uniqueness of the combination of those columns is tested |
spark.waimak.dataquality.deequ.labels.${labelName}.uniquenessCheck.warningThreshold |
1.0 | No | A threshold fraction (e.g. 0.8) of uniqueness under which a warning alert will be generated |
spark.waimak.dataquality.deequ.labels.${labelName}.uniquenessCheck.criticalThreshold |
None | No | A threshold fraction (e.g. 0.6) of uniqueness under which a critical alert will be generated |
You must use the Deequ actions directly if you need to set multiple uniqueness checks for a given label.
Latest Timestamp Check
The timestamp check allows you to verify that the most recent value in a timestamp column is withing a configurable number of hours of now.
The check takes the following configuration:
| Parameter | Default | Required | Purpose |
|---|---|---|---|
spark.waimak.dataquality.deequ.labels.${labelName}.recentTimestampCheck.column |
N/A | Yes | A timestamp column used in the timestamp check |
spark.waimak.dataquality.deequ.labels.${labelName}.recentTimestampCheck.hoursToLookBack |
6 | No | An integer number of hours in which the latest timestamp in ${column} must be within from now |
spark.waimak.dataquality.deequ.labels.${labelName}.recentTimestampCheck.alertLevel |
"warning" | No | The alert level raised if the check fails |
spark.waimak.dataquality.deequ.labels.${labelName}.recentTimestampCheck.nowOverride |
Current date time | No | An optional override parameter that allows you to set the value of 'now', which by default will be the current time on the machine. Must be of format yyyy-[m]m-[d]d hh:mm:ss[.f...] |
You must use the Deequ actions directly if you need to set multiple timestamp checks for a given label.
Generic SQL Check
The generic SQL check allows you to write generic assertions in Spark SQL syntax. If a given assertion fails (returns false) then an alert is generated.
| Parameter | Default | Required | Purpose |
|---|---|---|---|
spark.waimak.dataquality.deequ.labels.${labelName}.genericSQLCheck.warningChecks |
None | No | The list of asserts that will generate alerts at warning threshold if any fail |
spark.waimak.dataquality.deequ.labels.${labelName}.genericSQLCheck.criticalChecks |
None | No | The list of asserts that will generate alerts at critical threshold if any fail |
For example, to assert that all values within the column my_column are greater than 5 you would do the following:
spark.waimak.dataquality.deequ.labels.label1.genericSQLCheck.warningChecks=my_column > 5
Metrics storage
You can also request Deequ to store successful metrics every time they are run in a Metrics repository. This is useful if you need a historic view of metric thresholds over time.
To do this, you can set the following parameter:
spark.waimak.dataquality.deequ.metricsStoragePath=hdfs:///path/to/folder
The path must be a valid File System URI, and the folder must only be used to store metrics for this specific flow.
Example
Putting this all together, the configuration for a set of checks may look like:
spark.waimak.dataflow.extensions=deequ
spark.waimak.dataquality.alerters=slack,exception
spark.waimak.dataquality.alerters.slack.alertOn=warning,critical
spark.waimak.dataquality.alerters.exception.alertOn=critical
spark.waimak.dataquality.alerters.slack.token=***********
spark.waimak.dataquality.deequ.labelsToMonitor=label1,label2
spark.waimak.dataquality.deequ.labels.label1.checks=completenessCheck,recentTimestampCheck
spark.waimak.dataquality.deequ.labels.label2.checks=uniquenessCheck,genericSQLCheck
spark.waimak.dataquality.deequ.labels.label1.completenessCheck.columns=col1
spark.waimak.dataquality.deequ.labels.label1.completenessCheck.warningThreshold=0.8
spark.waimak.dataquality.deequ.labels.label1.recentTimestampCheck.column=col2
spark.waimak.dataquality.deequ.labels.label1.recentTimestampCheck.hoursToLookBack=12
spark.waimak.dataquality.deequ.labels.label2.uniquenessCheck.columns=col1
spark.waimak.dataquality.deequ.labels.label2.genericSQLCheck.criticalChecks=col3 != 'bad value'