You can use the logging API for Spark jobs using an approach similar to the one described in Pandas DataFrame.
Here is an example of a PySpark function with Databand metrics logging (and decoration). At line 19, the
log_metric saves a count to be reported to Databand's tracking system.
import sys from operator import add from pyspark.sql import SparkSession from dbnd import log_metric, task @task def calculate_counts(input_file, output_file): spark = SparkSession.builder.appName("PythonWordCount").getOrCreate() lines = spark.read.text(input_file).rdd.map(lambda r: r) counts = ( lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add) ) counts.saveAsTextFile(output_file) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) log_metric("counts", len(output)) spark.sparkContext.stop() if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: wordcount ") sys.exit(-1) calculate_counts(sys.argv, sys.argv)
Databand will correlate these tracking metrics from the Spark job with the associated Airflow DAG, based on user design.
The following environment variables should be defined in your Spark context. Usually, these variables are set dynamically by an orchestrator like Airflow before the task is run.
DBND_HOME- Databand work folder
DBND__CORE__DATABAND_URL- a server URL to report metrics and logs to
AIRFLOW_CTX_DAG_ID- Airflow DAG ID to associate with a run
AIRFLOW_CTX_EXECUTION_DATE- execution date to associate with a run
AIRFLOW_CTX_TASK_ID- task ID to associate with a run
Updated about 1 month ago