GuidesAPI ReferenceDiscussions
GuidesBlogPlatform

Tracking PySpark

How to gain visibility into your code errors, metrics, and logging information in PySpark jobs.

If you are running jobs in PySpark, Databand can provide visibility into your code errors, metrics, and logging information, in the context of your broader pipelines or orchestration system.

Like for Python, you can use Databand decorators and the logging API for Spark jobs in a similar way. Here is an example of a PySpark function with Databand decoration and metrics' logging:

from operator import add
from pyspark.sql import SparkSession
import logging
from dbnd import log_metric, task

logger = logging.getLogger(__name__)

@task
def calculate_counts(input_file, output_file):
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    counts = (
        lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
    )
    counts.saveAsTextFile(output_file)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
    log_metric("counts", len(output))
    logger.info(
        "Log message from EMR cluster"
    )
    spark.sparkContext.stop()


if __name__ == "__main__":
    calculate_counts(sys.argv[1], sys.argv[2])

In this code example, there are a number of artifacts that will be reported to the DBND tracking system.

The first one is the output of the following Python snippet:

for (word, count) in output: print("%s: %i" % (word, count))

The second is Databand's log_metric API, which reports a count. When you use Python logging facilities, for example - logger.info (line below the log_metric API in the code), all logs are also reported.

Databand will correlate the tracking metrics from the Spark job with the associated pipeline from your orchestration system (for example, an Airflow DAG) based on the user design.

Tracking Dataframes

You can use the dataset logging API to track Spark DataFrame as described in Tracking Datasets.

Spark Logs

To report logs from Dataproc back to the DBND tracking service, add an additional environment variable DBND__OVERRIDE_AIRFLOW_LOG_SYSTEM_FOR_TRACKING with a value set to True.

Databand can send Spark logs to the metadata server. To enable this feature, set DBND__LOG_SPARK variable to true.

Integrating with PyDeequ for Data Quality Metrics

Databand can collect and send PyDeequ metrics.

Please follow up on this installation guide https://pydeequ.readthedocs.io/en/latest/README.html#installation
Make sure that DBND JVM client is part of your spark application including ai.databand:dbnd-api-deequ package Installing JVM DBND Library and Agent

DBND Python Deequ Metrics Repository

In order to connect Databand to Deequ use DbndDeequMetricsRepository as in the following example. See more details at Deequ Repository Documentation:

result_key = ResultKey(spark, ResultKey.current_milli_time(), {"name": "words"})
analysis_runner = AnalysisRunner(spark).onData(lines)

# implement your Deequ Validations , for example : 
# .addAnalyzer( ApproxCountDistinct("value") )

analysis_runner.useRepository(DbndMetricsRepository(spark)).saveOrAppendResult(result_key).run()

πŸ“˜

If you are running Scala or Java Spark

Please refer to our page for Tracking Spark (Scala/Java).


Did this page help you?