Tracking Apache Spark

How to gain visibility into your code errors, metrics, and logging information in PySpark jobs.

If you are running jobs in PySpark, Databand can provide visibility into your code errors, metrics, and logging information, in the context of your broader pipelines or orchestration system.

πŸ“˜

If you are running Java or Scala Spark

Please refer to our page for tracking JVM applications

Just as for Python, you can use Databand decorators and the logging API for Spark jobs in a similar way. Here is an example of a PySpark function with Databand decoration and metrics logging:

from operator import add
from pyspark.sql import SparkSession
import logging
from dbnd import log_metric, task

logger = logging.getLogger(__name__)

@task
def calculate_counts(input_file, output_file):
    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

    lines = spark.read.text(input_file).rdd.map(lambda r: r[0])
    counts = (
        lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
    )
    counts.saveAsTextFile(output_file)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
    log_metric("counts", len(output))
    logger.info(
        "Log message from EMR cluster"
    )
    spark.sparkContext.stop()


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: wordcount  ")
        sys.exit(-1)
    calculate_counts(sys.argv[1], sys.argv[2])

In this code example, there are a number of artifacts that will be reported to the DBND tracking system.
The first one is the output from the following Python:

for (word, count) in output: print("%s: %i" % (word, count))

The second is our log_metric API, which reports a count. When you use Python logging facilities such as the logger.info line found below the log_metric API, all logs are also reported.

Databand will correlate these tracking metrics from the Spark job with the associated pipeline from your orchestration system (ie. the Airflow DAG) based on user design.

To Configure Spark

Perform the followings steps to enable metadata reporting from Spark to Databand:

  1. Install the dbnd package on the Spark master and Spark workers by running pip install dbnd.
  2. Define the following environment variables:
  • DBND_HOME
  • DBND_CORE_DATABAND_URL
  • DBND__ENABLE__SPARK_CONTEXT_ENV=True

🚧

Note

If you are tracking jobs on an EMR or Dataproc cluster, the DBND__ENABLE__SPARK_CONTEXT_ENV and DBND__CORE__DATABAND_URL are already set in the bootstrap or initialize script.
You only need to set the DBND__TRACKING environment.

  1. Add the following spark properties to your job submission command. Usually, these variables are set dynamically by an orchestrator such as Airflow before your task runs:
  • AIRFLOW_CTX_DAG_ID - The Airflow DAG ID to associate with a given job
  • AIRFLOW_CTX_EXECUTION_DATE - The execution date to associate with a given run
  • AIRFLOW_CTX_TASK_ID - The task ID to associate with a job.

πŸ“˜

Additional Support

Contact our team ([email protected]) for details on the following supported Spark engines: Databricks and Qubole.

Collecting Spark Logs

Databand can send Spark logs to the metadata server. To enable this feature, set DBND__LOG_SPARK variable to true.

Tracking Spark Metrics

Databand can collect and send Spark Executor metrics such as bytesRead/bytesWritten using Spark Listeners. To enable automatic listener injection into Spark Session and collecting of task metrics, set DBND__SPARK__LISTENER_INJECT_ENABLED variable to True.

Integrating with PyDeequ for Data Quality Metrics

Databand can collect and send PyDeequ metrics. To enable it, use DbndDeequMetricsRepository as in following example:

result_key = ResultKey(spark, ResultKey.current_milli_time(), {"name": "words"})
AnalysisRunner(spark).onData(lines).addAnalyzer(
    ApproxCountDistinct("value")
).useRepository(DbndMetricsRepository(spark)).saveOrAppendResult(result_key).run()

You should also add Deequ and Databand jars to Spark context, either by passing the in spark-submit command or configuring spark context inside application:

spark = (
        SparkSession.builder.config(
            "spark.jars.packages",
            "{},{}".format(
                pydeequ.deequ_maven_coord, "ai.databand:dbnd-api-deequ:0.37.2"
            ),
        )
        .config(
            "spark.jars.repositories",
            "https://dbnd-dev-maven-repository.s3.us-east-2.amazonaws.com",
        )
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
        .appName("PythonWordCount")
        .getOrCreate()
    )
spark-submit \
  --conf spark.jars.repositories="https://dbnd-dev-maven-repository.s3.us-east-2.amazonaws.com" \
  --conf spark.jars.packages="com.amazon.deequ:deequ:1.0.3,ai.databand:dbnd-api-deequ:0.37.2" \
  --conf spark.jars.excludes="net.sourceforge.f2j:arpack_combined_all"

Did this page help you?