Tracking Spark Applications

Databand provides variety of options to cover wide set of Spark appliances. Since there is a lot of way to deploy Spark application, consider using the integration which is best suited for you. Currently, Databand supports Python and Scala/Java Spark applications. Databand also collects Spark-specific metadata such as job metrics as well as Spark execution logs.

Tracking PySpark

The most basic variant of tracking will require including dbnd package to your application and using @task decorators:

from operator import add
from pyspark.sql import SparkSession
import logging
from dbnd import log_metric, task

logger = logging.getLogger(__name__)

@task
def calculate_counts(input_file, output_file):
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    counts = (
        lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
    )
    counts.saveAsTextFile(output_file)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
    log_metric("counts", len(output))
    logger.info(
        "Log message from EMR cluster"
    )
    spark.sparkContext.stop()


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount  ")
        sys.exit(-1)
    calculate_counts(sys.argv[1], sys.argv[2])

For more details and advanced configuration please proceed to the Tracking PySpark guide.

Tracking Scala/Java Spark applications

To get insights on your Scala/Java app, you need to include dbnd-client jar to your application and proceed with using DbndLogger.logMetric, DbndLogger.logDatasetOperation and other useful methods:

package ai.databand.demo.service311

import ai.databand.annotations.Task
import ai.databand.log.DbndLogger
import ai.databand.schema.DatasetOperationType
import ai.databand.schema.DatasetOperationType.READ
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}

object GenerateReports {

  @Task
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("example")
      .getOrCreate

    val df: DataFrame = spark.read.option("header","true").csv("/data/daily_data")
    DbndLogger.logDatasetOperation(path, READ, df)
  }
  
  def build
}

For full guid please proceed to the Tracking Spark (Scala/Java).

Track your data quality by using Deequ/PyDeequ library

Deequ is a library for tracking data quality which provides handy DSL for "unit-testing" your data. Databand provides special integration with Deequ and will track all Deequ metrics. More details in the guide: Tracking with Deequ

Deployment-specific Guides

Databand also supports advanced tracking for EMR, Databricks and Dataproc jobs.


Did this page help you?