PySpark DataFrame

Tracking Metrics with the Logging API in Spark.

You can use the logging API for Spark jobs using an approach similar to the one described in Pandas DataFrame.

Here is an example of a PySpark function with Databand metrics logging (and decoration). At line 19, the log_metric saves a count to be reported to Databand's tracking system.

import sys
from operator import add
from pyspark.sql import SparkSession
from dbnd import log_metric, task


@task
def calculate_counts(input_file, output_file):
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    counts = (
        lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
    )
    counts.saveAsTextFile(output_file)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
    log_metric("counts", len(output))
    spark.sparkContext.stop()


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount  ")
        sys.exit(-1)
    calculate_counts(sys.argv[1], sys.argv[2])

Databand will correlate these tracking metrics from the Spark job with the associated Airflow DAG, based on user design.

The following environment variables should be defined in your Spark context. Usually, these variables are set dynamically by an orchestrator like Airflow before the task is run.

  • DBND_HOME - Databand work folder
  • DBND__CORE__DATABAND_URL - a server URL to report metrics and logs to
  • AIRFLOW_CTX_DAG_ID - Airflow DAG ID to associate with a run
  • AIRFLOW_CTX_EXECUTION_DATE - execution date to associate with a run
  • AIRFLOW_CTX_TASK_ID - task ID to associate with a run

Did this page help you?