Data Quality - CoxAutomotiveDataSolutions/waimak GitHub Wiki

This page describes the capability of Waimak to monitor and alert on the data quality of your flows.

The general data quality functionality is described in Data Quality Core, however data quality is best monitored using the Deequ functionality that builds from the core functionality. The Deequ actions allow you to use the full capability of Amazon Deequ in Waimak.

Contents

Data Quality Core

The core data quality functionality in Waimak provides:

  • implementations of alerters that handle alerts when a check fails
  • a generic way of adding data quality checks onto Waimak labels on a flow

The alerter functionality is used throughout all data quality functionality in Waimak (e.g. in Deequ) and alerters are constructed in the same way whether you are using Deequ actions or data quality actions.

In the case of data quality checks, you should aim to use the Deequ checks where possible as it requires much less work to implement checks and is highly optimised. It is possible to write data quality checks without using Deequ, and this is briefly shown in the Generic Actions section.

Alerters

Alerters are used by checks to handle any alerts generated by a check. Each check can have multiple alerters defines, and alerts will be send to each alerter in the order the alerters were added to a check, and each alerter can define a set of alert importances for which it will trigger.

Currently there are three built-in alerters, but it is possible to define your own.

All built-in alerters take an optional list of alert importances when created that allows you to restrict the alert importances at which the alerter will alert on. If the list is empty or unspecified, every importance level will be alerted on.

You create the list of alert importances in the following way:

import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._

// All available levels, you can choose the ones you need
val levels = List(Critical, Warning, Good, Information)

Exception alerter

An exception alerter will throw an exception when a data quality check fails and triggers an alert.

It is defined as follows:

/**
  * Handles alerts by causing an exception to be thrown.
  *
  * @param alertOn If specified, the list of alert importance levels to alert on. If unspecified or empty, every level
  *                will be alerted on.
  */
case class ExceptionQualityAlert(alertOn: List[AlertImportance] = List.empty) extends DataQualityAlertHandler

The exception alerter takes no configuration besides the list of alert importances.

For example, to create an exception alerter that only alerts on critical checks you would do the following:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._

val exception = ExceptionQualityAlert(List(Critical))

Slack alerter

The Slack alerter sends an alert message to a Slack channel using a configured webhook token (everything after https://hooks.slack.com/services/).

It is defined in the following way:

/**
  * Sends alerts to a Slack channel
  *
  * @param token   the Slack token for the channel
  * @param alertOn If specified, the list of alert importance levels to alert on. If unspecified or empty, every level
  *                will be alerted on.
  */
case class SlackQualityAlert(token: String, alertOn: List[AlertImportance] = List.empty) extends DataQualityAlertHandler

The slack alerter takes in Slack webhook token and an optional list of alert importances.

For example, to create a Slack alerter that will alert at both warning and critical levels you would do the following:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._

val slack = SlackQualityAlert( "****slacktoken****", List(Warning, Critical))

Email alerter

The email alerter will send an alert one or more email addresses when an alert is triggered.

It is defined in the following way:

/**
  * Sends alerts via email
  *
  * @param settings the email settings to use
  * @param alertOn  If specified, the list of alert importance levels to alert on. If unspecified or empty, every level
  *                 will be alerted on.
  */
case class EmailQualityAlert(settings: EmailSettings, alertOn: List[AlertImportance] = List.empty) extends DataQualityAlertHandler

It takes an EmailSettings object and an optional list of alert importances. The EmailSettings is a trait, and there is an implementation for SMTP email servers defined in the following way:

/**
  * Email settings used to configure an [EmailQualityAlert](/CoxAutomotiveDataSolutions/waimak/wiki/EmailQualityAlert) for SMTP emails
  *
  * @param to                (Optional) comma-separated list of 'to' destination addresses
  * @param cc                (Optional) comma-separated list of 'cc' destination addresses
  * @param bcc               (Optional) comma-separated list of 'bcc' destination addresses
  * @param from              (Optional) from address in email message
  * @param host              (Mandatory) hostname/address of email server
  * @param port              (Optional) port of email server, default 25
  * @param auth              (Optional) whether to use authentication to email server, default false
  * @param starttls          (Optional) whether to enable starttls when communicating with email server, default true
  * @param ssl               (Optional) whether to force connections only over SSL, default false
  * @param user              (Optional) username to use if authentication enabled
  * @param pass              (Optional) password to use if authentication enabled
  * @param connectionTimeout (Optional) Socket connection timeout in MS, default 1000ms
  * @param timeout           (Optional) Socket I/O connection timeout in MS, default 1000ms
  * @param debug             (Optional) Whether to enable debugging on the session object, default true
  */
case class SMTPEmailSettings(to: List[String] = List.empty,
                         cc: List[String] = List.empty,
                         bcc: List[String] = List.empty,
                         from: Option[String] = None,
                         host: String,
                         port: Int = 25,
                         auth: Boolean = false,
                         starttls: Boolean = true,
                         user: Option[String] = None,
                         pass: Option[String] = None) extends EmailSettings

For example, to configure an email alerter to alert at critical level only and send alerts to the email server smtp.example.com and [email protected] you would configure the following:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._

val emailSettings = SMTPEmailSettings(to = List("[email protected]"), host = "smtp.example.com")
val email = EmailQualityAlert( emailSettings, List(Critical))

Custom alerter

A custom alerter can be created by extending the DataQualityAlertHandler trait:

package com.coxautodata.waimak.dataflow.spark.dataquality
trait DataQualityAlertHandler {

  /**
    * The alert importance levels for which this handler should alert.
    * If this is empty, the handler will alert for all importance levels.
    *
    * @return the list of alert importance levels
    */
  def alertOn: List[AlertImportance]

  /**
    * Handle the given data quality alert
    *
    * @param alert the data quality alert
    * @return Success() if the alert was successfully handled, Failure otherwise. If the handler intends for an exception
    *         to be thrown, a Failure should be returned containing the exception. This will be thrown once all non-exception
    *         alerts have been handled.
    */
  def handleAlert(alert: DataQualityAlert): Try[Unit]
}

It is also possible to make your custom alerter available to the Deequ Configuration Extension but this is more involved. You must create a service extending the DataQualityAlertHandlerService trait that will be loaded by the ServiceLoader (i.e. your service must have a constructor with no arguments and be referenced in META-INF/services/com.coxautodata.waimak.dataflow.spark.dataquality.DataQualityAlertHandlerService).

Generic Actions

As mentioned above, it is possible to use the core data quality features directly but we advise that you use the Deequ module where possible as it offers a large amount of built-in features and a highly optimised execution engine.

To add a data quality check, you would use the addDataQualityCheck action (the Deequ actions use this underneath):

    /**
      * Add a data quality check for the given label
      *
      * @param label         the label to perform the check on
      * @param check         the data quality check to perform
      * @param alertHandler  the alert handler to use for handling alerts for this check
      * @param alertHandlers additional alert handlers to use
      * @tparam CheckType the type of the data quality check
      */
    def addDataQualityCheck[CheckType <: DataQualityCheck[CheckType]](label: String
                                                                      , check: CheckType
                                                                      , alertHandler: DataQualityAlertHandler
                                                                      , alertHandlers: DataQualityAlertHandler*): SparkDataFlow

This action takes a type parameter CheckType (which is F-bounded with DataQualityCheck), a label to check, a check of type CheckType to perform, and one or more alerters/alert handlers.

A check of type CheckType can be created by extending the following trait:

import com.coxautodata.waimak.dataflow.spark.dataquality
trait DataQualityCheck[Self <: DataQualityCheck[Self]]

The implementation of this trait is out of the scope of this wiki, and you should see existing examples on how to do this.

There is an existing DatasetChecks check that does not use Deequ and will allow you to write a check using any Dataset transformation (this will perform significantly worse than a similar check written using Deequ):

package com.coxautodata.waimak.dataflow.spark.dataquality

case class DatasetChecks(checks: Seq[DatasetCheck[_]]) extends DataQualityCheck[DatasetChecks]

trait DatasetCheck[T] {
  def getAlerts(label: String, data: Dataset[_]): Seq[DataQualityAlert]
}


class SimpleDatasetCheck[T](metric: Dataset[_] => Dataset[T]
                            , alert: (Dataset[T], String) => Seq[DataQualityAlert]) extends DatasetCheck[T]

Deequ

The Deequ module in Waimak provides a set of actions allowing you to use Amazon Deequ checks on labels in Waimak flows. It provides you the full capability of Deequ coupled with the flow layout of Waimak.

There is a simplified configuration-driven Deequ extension providing a subset of pre-configured Deequ checks that can be found on the Configuration Extensions#Deequ page.

The Deequ actions take alerters to handle alerts, and their configuration is described in the Alerters section of this page.

Finally, the Deequ dependency in Waimak is marked as provided so you must provide the Deequ dependency when using Deequ in your project.

Deequ Actions

This section explains the Deequ actions available to use within Waimak. It is divided into actions that add checks and actions that specify a metrics repository for Deequ to use.

This wiki does not explain how to write checks in Deequ therefore you should consult the Deequ documentation for how to capture your data quality assertions as Deequ checks.

Add Checks

There are three Deequ actions to add checks:

  • addDeequCheck for adding one or more Check objects
  • addDeequValidation for adding any VerificationRunBuilder state change (used by addDeequCheck)
  • addDeequValidationWithMetrics for adding any VerificationRunBuilderWithRepository state change (for adding checks that require metrics/adding anomaly checks)

All checks added onto labels are additive, meaning that you can add multiple checks to the same label across multiple actions and all checks will be run.

Note: Deequ only has two levels of alert importance: Warning and Error. These correspond to the Waimak data quality Warning and Critical alert importances respectively.

addDeequCheck:

This action allows you to add Deequ Check objects onto a label.

It is defined as:

    /**
      * Adds Deequ Checks for a label
      *
      * @param label         the label to perform the validation for
      * @param check         first Deequ Check to perform
      * @param checks        additional Deequ checks to perform
      * @param alertHandler  the alert handler to use for handling alerts for this check
      * @param alertHandlers additional alert handlers to use
      */
    def addDeequCheck(label: String
                      , check: Check
                      , checks: Check*)
                     (alertHandler: DataQualityAlertHandler,
                      alertHandlers: DataQualityAlertHandler*): SparkDataFlow

The action takes a label to add checks on, one or more checks and one or more alert handlers.

For example, to add a completeness and uniqueness check onto label table1 and alert to an exception alerter you would do the following:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._
import com.coxautodata.waimak.dataflow.spark.dataquality.deequ
import com.amazon.deequ.checks.{Check, CheckLevel}

val exception = ExceptionQualityAlert(List(Critical))
flow
   .addDeequCheck("table1", 
        Check(CheckLevel.Error, "error_checks")
          .hasCompleteness("col1", completeness => completeness >= 0.6 && completeness < 0.8, Some("More than 40% of col1 values were null."))
          .isUnique("col2", Some("col2 was not unique"))
    )(exception)

addDeequValidation:

This action allows you to add any transformation to the VerificationRunBuilder state. The previous action uses this action by calling VerificationRunBuilder.addChecks(..) but this action also gives you access to other Deequ transformations such as addRequiredAnalyzers.

It is defined as follows:

    /**
      * Add Deequ validation for a given label (https://github.com/awslabs/deequ)
      *
      * @param label         the label to perform the validation for
      * @param checks        the Deequ validation to perform e.g.
      *                      `_.addCheck(
      *                      Check(CheckLevel.Error, "unit testing my data")
      *                      .hasSize(_ == 5) // we expect 5 rows
      *                      .isComplete("id") // should never be NULL
      *                      )`
      * @param alertHandler  the alert handler to use for handling alerts for this check
      * @param alertHandlers additional alert handlers to use
      */
    def addDeequValidation(label: String
                           , checks: VerificationRunBuilder => VerificationRunBuilder
                           , alertHandler: DataQualityAlertHandler,
                           alertHandlers: DataQualityAlertHandler*): SparkDataFlow

The action takes a label to add checks on, a function that changes the state and one or more alert handlers.

For example, the previous example using addDeequCheck could be rewritten as:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._
import com.coxautodata.waimak.dataflow.spark.dataquality.deequ
import com.amazon.deequ.checks.{Check, CheckLevel}

val exception = ExceptionQualityAlert(List(Critical))
flow
   .addDeequValidation("table1",
        _.addCheck(
            Check(CheckLevel.Error, "error_checks")
              .hasCompleteness("col1", completeness => completeness >= 0.6 && completeness < 0.8, Some("More than 40% of col1 values were null."))
              .isUnique("col2", Some("col2 was not unique"))
        ),
       exception
    )

addDeequValidationWithMetrics:

This action allows you to add any state change on the VerificationRunBuilderWithRepository object to a label. More than likely, this action will be used to add Deequ Anomaly checks.

To use this action, you must have already specified a metrics repository (see next section).

It is defined in the following way:

    /**
      * Adds Deequ validation which uses a metrics repository (see Deequ docs on metrics repositories for more information)
      * N.B in order for this to work, you must set a metrics repository using `setDeequMetricsRepository` or `setDeequStorageLayerMetricsRepository`
      *
      * @param label             the label to perform the validation for
      * @param checksWithMetrics the Deequ validation to perform e.g.
      *                          `_.addAnomalyCheck(RateOfChangeStrategy(maxRateDecrease = Some(0.2)), Completeness("col1")))`
      * @param alertHandler      the alert handler to use for handling alerts for this check
      * @param alertHandlers     additional alert handlers to use
      */
    def addDeequValidationWithMetrics(label: String
                                      , checksWithMetrics: VerificationRunBuilderWithRepository => VerificationRunBuilderWithRepository
                                      , alertHandler: DataQualityAlertHandler,
                                      alertHandlers: DataQualityAlertHandler*): SparkDataFlow

The action takes a label to add checks on, a function that changes the state and one or more alert handlers.

For example, to add a rate of change check to label table1 to ensure the completeness of column col1 does not decrease by more than 20% over subsequent runs you would do the following:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._
import com.coxautodata.waimak.dataflow.spark.dataquality.deequ
import com.amazon.deequ.analyzers.Completeness
import com.amazon.deequ.anomalydetection.RateOfChangeStrategy

val exception = ExceptionQualityAlert()
flow
   .addDeequValidationWithMetrics("table1",
        _.addAnomalyCheck(RateOfChangeStrategy(maxRateDecrease = Some(0.2)), Completeness("col1")),
      exception
    )

Set Metrics Repository

Deequ metrics repositories are used to optionally store all successful metrics, and are required if using anomaly checks via the addDeequValidationWithMetrics action.

There are two actions for setting a metrics repository:

  • setDeequStorageLayerMetricsRepository for setting a metrics repository that uses the Waimak storage layer
  • setDeequMetricsRepository for setting a metrics repository that uses any com.amazon.deequ.repository.MetricsRepository implementation

setDeequStorageLayerMetricsRepository:

This action will add a metrics repository to the flow that uses the Waimak storage layer underneath. This approach is recommended as the storage layer will deal with appending data and file compaction.

The action is defined in the following way:

    /** Sets the Deequ metrics repository to use for for Deequ checks using metrics, specified using `addDeequValidationWithMetrics`
      * The metrics repository will be a [StorageLayerMetricsRepository](/CoxAutomotiveDataSolutions/waimak/wiki/StorageLayerMetricsRepository)
      *
      * @param storageBasePath the base path to use for the [StorageLayerMetricsRepository](/CoxAutomotiveDataSolutions/waimak/wiki/StorageLayerMetricsRepository)
      * @param appendDateTime  the date time to be used as a key in the metrics repository (defaults to now)
      */
    def setDeequStorageLayerMetricsRepository(storageBasePath: String, appendDateTime: ZonedDateTime = ZonedDateTime.now()): SparkDataFlow

It takes a path in which all metrics will be written, and an optional date/time object used as a key for identifying when metrics were produced. Ensure the provided path is only used to contain metrics for this flow.

For example, to add a metrics repository using a storage layer at path /output/metrics/ you would do the following:

import com.coxautodata.waimak.dataflow.spark.dataquality.deequ

flow
    .setDeequStorageLayerMetricsRepository("/output/metrics/")

setDeequMetricsRepository:

This action allows you to set any implementation of com.amazon.deequ.repository.MetricsRepository as a metrics repository. This function is useful if you need to use an alternative metrics repository provided by Deequ.

The action is defined in the following way:

    /**
      * Sets the Deequ metrics repository to use for for Deequ checks using metrics, specified using `addDeequValidationWithMetrics`
      *
      * @param builder        Deequ metrics repository builder
      * @param appendDateTime the date time to be used as a key in the metrics repository (defaults to now)
      */
    def setDeequMetricsRepository(builder: DeequMetricsRepositoryBuilder, appendDateTime: ZonedDateTime = ZonedDateTime.now()): SparkDataFlow

where DeequMetricsRepositoryBuilder is the type alias:

  /**
    * A type alias used when setting a metrics repository for Deequ through Waimak actions.
    * Type goes from label to metrics repository.
    */
  type DeequMetricsRepositoryBuilder = String => MetricsRepository

The action takes a function that when given a label returns a MetricsRepository instance, and an optional date/time object used as a key for identifying when metrics were produced.

For example, to use the Deequ FileSystemMetricsRepository for storing metrics as a JSON files separated by labels at a FileSystem path, you would do the following:

import com.coxautodata.waimak.dataflow.spark.dataquality.deequ
import com.amazon.deequ.repository.fs.FileSystemMetricsRepository

flow
    .setDeequMetricsRepository(label => new FileSystemMetricsRepository(flow.flowContext.spark, s"/output/metrics/$label.json"))

Example

Putting everything together, a complete set of checks may look like:

import com.coxautodata.waimak.dataflow.spark.dataquality._
import com.coxautodata.waimak.dataflow.spark.dataquality.AlertImportance._
import com.coxautodata.waimak.dataflow.spark.dataquality.deequ
import com.amazon.deequ.analyzers.Completeness
import com.amazon.deequ.anomalydetection.RateOfChangeStrategy
import com.amazon.deequ.checks.{Check, CheckLevel}

val exception = ExceptionQualityAlert(List(Critical))
flow
   .setDeequStorageLayerMetricsRepository("/output/metrics/")
   .addDeequCheck("table1", 
        Check(CheckLevel.Error, "error_checks")
          .hasCompleteness("col1", completeness => completeness >= 0.6 && completeness < 0.8, Some("More than 40% of col1 values were null."))
          .isUnique("col2", Some("col2 was not unique"))
    )(exception)
   .addDeequValidationWithMetrics("table1",
        _.addAnomalyCheck(RateOfChangeStrategy(maxRateDecrease = Some(0.2)), Completeness("col1")),
      exception
    )