GCP Dataproc

How to configure GCP Dataproc to report metadata to DBND.

  1. Add the following commands to your cluster initialization script:
#!/bin/bash -x

set -e
python -m pip install databand[spark]
echo "export DBND__CORE__DATABAND_URL=http://<databand_host>:8081" | tee -a /usr/lib/spark/conf/spark-env.sh
echo "export DBND_HOME=`pwd`" | tee -a /usr/lib/spark/conf/spark-env.sh
echo "export DBND__ENABLE__SPARK_CONTEXT_ENV=True" | tee -a /usr/lib/spark/conf/spark-env.sh
echo "export DBND__NO_TABLES=True" | tee -a /usr/lib/spark/conf/spark-env.sh
  1. On submitting a PySpark job, add the Airflow context:
{
  "projectId": "dbnd-demo",
  "job": {
    "placement": {
      "clusterName": "<your_cluster_name>"
    },
    "statusHistory": [],
    "reference": {
      "jobId": "job-id123",
      "projectId": "dbnd-demo"
    },
    "pysparkJob": {
        "mainPythonFileUri": "gs://<main python script path>",
      "properties": {
        "spark.env.AIRFLOW_CTX_DAG_ID": "mydag",
        "spark.env.AIRFLOW_CTX_EXECUTION_DATE": "2020-01-01",
        "spark.env.AIRFLOW_CTX_TASK_ID": "mytask"
      }
    }
  }
}
  1. To report logs from Dataproc back to the DBND tracking service, add an additional environment variable DBND__OVERRIDE_AIRFLOW_LOG_SYSTEM_FOR_TRACKING with a value set to True.

🚧

Note

The existing logging configuration is overridden when you add the DBND__OVERRIDE_AIRFLOW_LOG_SYSTEM_FOR_TRACKING variable.

Here is an example of an Airflow DAG with Dataproc job invocation:

gcp_dataproc_dag = DAG(dag_id="dataproc_example_dag", default_args=args, schedule_interval="0 */6 * * *")

with gcp_dataproc_dag:
    output_file = "gs://dbnd-dev-playground/output/aggregate_out_1_{{ds}}.csv"

    t1 = DataProcPySparkOperator(
        task_id="my_task_id"
        job_name="{{dag.dag_id}}__{{task.task_id}}_{{ds_nodash}}",
        cluster_name="cluster123",
        main=SCRIPT_NAME,
        arguments=[INPUT_FILE, output_file],
        dataproc_pyspark_properties= {
            "spark.env.AIRFLOW_CTX_DAG_ID": "{{dag.dag_id}}",
            "spark.env.AIRFLOW_CTX_EXECUTION_DATE": "{{ds}}",
            "spark.env.AIRFLOW_CTX_TASK_ID": "{{task.task_id}}"
        }
    )

Did this page help you?