Tracking with Deequ

Deequ is a library for measuring data quality built on top of Spark. This guide describes how to leverage the capabilities of Deequ integrated with Databand in PySpark and Spark applications.

Integrating PySpark with PyDeequ for Data Quality Metrics

Databand can collect and send PyDeequ metrics. To enable it, use DbndDeequMetricsRepository as in following example:

result_key = ResultKey(spark, ResultKey.current_milli_time(), {"name": "words"})
AnalysisRunner(spark).onData(lines).addAnalyzer(
    ApproxCountDistinct("value")
).useRepository(DbndMetricsRepository(spark)).saveOrAppendResult(result_key).run()

You should also add Deequ and Databand jars to Spark context by either configuring Spark context inside the application

spark = (
        SparkSession.builder.config(
            "spark.jars.packages",
            "{},{}".format(
                pydeequ.deequ_maven_coord, "ai.databand:dbnd-api-deequ:0.xx.x"
            ),
        )
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
        .appName("PythonWordCount")
        .getOrCreate()
    )

or by passing jars in spark-submit command:

spark-submit \
  --conf spark.jars.packages="com.amazon.deequ:deequ:x.x.x-spark-x.x,ai.databand:dbnd-api-deequ:0.xx.x" \
  --conf spark.jars.excludes="net.sourceforge.f2j:arpack_combined_all"

Integrating Spark with Deequ for Data Quality Metrics

Databand provides the ability to capture any metrics produced during Deequ profiling. Histograms generated by Deequ during profiling are also reported to Databand.

A prerequisite for using Deequ is adding deequ and dbnd-deequ to your project:

libraryDependencies ++= Seq(
  "com.amazon.deequ" % "deequ" % "x.x.x-spark-x.x"
  "ai.databand" % "dbnd-api-deequ" % "0.xx.x",
)
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.amazon.deequ</groupId>
      <artifactId>deequ</artifactId>
      <version>x.x.x-spark-x.x</version>
    </dependency>
    <dependency>
      <groupId>ai.databand</groupId>
      <artifactId>dbnd-api-deequ</artifactId>
      <version>0.xx.x</version>
    </dependency>
  </dependencies>
</dependencyManagement>
dependencies {
  implementation 'com.amazon.deequ:deequ:x.x.x-spark-x.x'
  implementation 'ai.databand:dbnd-api-deequ:0.xx.x'
}

A Note on Scala/Spark Compatibility

Databand library is Scala/Spark-agnostic and can be used with any combination of Scala/Spark. However, the Deequ version should be selected carefully to match your needs. Please refer to Deequ docs and select the exact version from the list of available versions.

Databand utilizes a custom MetricsRepository and DbndResultKey. You need to explicitly add both to the code:

import ai.databand.deequ.DbndMetricsRepository
  
@Task
protected def dedupRecords(data: Dataset[Row], keyColumns: Array[String]): Dataset[Row] = {
    val dedupedData = data.dropDuplicates(keyColumns)
    // custom metrics repository
    val metricsRepo = new DbndMetricsRepository(new InMemoryMetricsRepository)
    // capturing dataset verification results
    VerificationSuite()
        .onData(dedupedData)
        .addCheck(
            Check(CheckLevel.Error, "Dedup testing")
                .isUnique("name")
                .isUnique("id")
                .isComplete("name")
                .isComplete("id")
                .isPositive("score"))
        .useRepository(metricsRepo)
        .saveOrAppendResult(new DbndResultKey("dedupedData"))
        .run()
    // using metrics repositoty to capture dataset profiling results
    ColumnProfilerRunner()
        .onData(dedupedData)
        .useRepository(metricsRepo)
        .saveOrAppendResult(new DbndResultKey("dedupedData"))
        .run()
}

If you already use a metrics repository, you can wrap it inside Databand's new DbndMetricsRepository(new InMemoryMetricsRepository). Databand will first submit the metrics to the wrapped repository and to the Databand tracker afterward.

To distinguish metric keys, you should use a special DbndResultKey. We recommend giving your checks/profiles names that will allow you to clearly distinguish them in the Databand's monitoring UI.


Did this page help you?