Tracking Airflow DAGs

How to use Databand's Airflow DAG tracking functionality.

You can use DAGs tracking functionality for additional visibility into:

  • metadata from operators
  • task code
  • job execution engines (such as Redshift and BigQuery)

You can leverage this tool through

  • pip install or
  • use the track_dag function that wraps operators within the DAG to provide additional tracking features.

Supported Operators

The Databand team is constantly integrating new operators for metadata tracking. Currently, the following operators are now supported:

  • EmrAddStepsOperator
  • DatabricksSubmitRunOperator
  • DataProcPySparkOperator
  • SparkSubmitOperator
  • PythonOperator
  • BashOperator
  • SubDagOperator
  • Custom operator
from airflow.models import BaseOperator
from dbnd import log_metric, task
class MyOperator(BaseOperator):
    def execute(self, context):
        my_func()
        my_task()
def my_func():
    log_metric("this is metric", "much wow!")
@task
def my_task():
    log_metric("this inside task", "such fascinate!")
    log_metric("very dbnd!", "much wow!")

Tracking Airflow DAGs with Airflow Monitor

🚧

Before You Begin

You must have Databand integrated with Airflow before you can perform the tasks described in this topic. For this, you need to have the dbnd-airflow plugin installed.

To Enable Automatic Global Tracking of DAGs

You can automatically track all DAG operators and tasks without changing the code. For this, you need to install the dbnd-airflow-auto-tracking plugin.

If you do not want to track specific DAGs, operators, or functions, you can exclude them from automatic tracking by using the following function:

  • dont_track(dag)
  • dont_track(operator)

Alternatively, you can use @dont_track decorator shown in the following example:

@dont_track
def f():
  pass

To Track Specific DAGs

If you do not use automatic tracking, and you want to track specific DAGs, operators, or functions you can use the track_dag function in your DAG definition.

args = {"start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(dag_id="my_dag", default_args=args, schedule_interval=timedelta(days=1))

def task1():
    print("Hello World!")

PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

track_dag(dag)

Deprecation of __execute tasks

All the information that was previously reported on <task_name>__execute task and sub-tasks (like metrics or logs) is now reported on <task_name> task eliminating the extra entity. So __execute tasks and separation for root-Run and sub-Runs per Airflow task are deprecated.

This change will be implemented automatically for most tasks. As a result, the Tracking workflow gets more consistent as all the metrics are now reported to their proper task without the need to create an __execute subtask and create further subtasks as children tasks of <task_name>__execute.

This depreciation may cause some of the known issues:

  • The number of tasks in Databand UI might be different from Airflow if @task was used on some functions.
  • Alerts defined on <task_name>__execute tasks won’t work as is - you will need to redefine them on <task_name> task.

Note that your existing Runs won’t be automatically changed and will still appear with __execute.


Did this page help you?