GuidesAPI ReferenceDiscussions
GuidesBlogPlatform

Tracking Airflow DAGs

How to use Databand's Airflow DAG tracking functionality.

πŸ“˜

Before You Begin

You must have Databand integrated with Airflow before you can perform the tasks described in this topic. For this, you need to follow Tracking Apache Airflow guide.

You can use DAGs tracking functionality for additional visibility into:

  • metadata from operators
  • task code
  • job execution engines (such as Redshift and BigQuery)

We track all Operators and can capture runtime information from every execute() call from any Airflow Operator. Everything that happens in the boundaries of execute() function is tracked.

from airflow.models import BaseOperator
from dbnd import log_metric, task
class MyOperator(BaseOperator):
    def execute(self, context):
        my_func()
        my_task()
def my_func():
    log_metric("this is metric", "much wow!")
@task
def my_task():
    log_metric("this inside task", "such fascinate!")
    log_metric("very dbnd!", "much wow!")

Some of the operators cause "remote" execution, so the connection between Airflow Operator and sub-process execution has to be established. Currently, by-passing execution context in addition to regular tracking is automatically supported for the following operators:

  • EmrAddStepsOperator
  • DatabricksSubmitRunOperator
  • DataProcPySparkOperator
  • SparkSubmitOperator
  • BashOperator
  • SubDagOperator

The Databand team is constantly integrating new operators for sub-process metadata tracking. Contact us if you don't see your operator on the list.

Tracking Airflow DAGs with Airflow Monitor

To Enable Automatic Global Tracking of DAGs

You can automatically track all DAG operators and tasks without changing the code. For this, you need to install the dbnd-airflow-auto-tracking plugin.

Controlling Tracked DAGs

If you do not want to track specific DAGs, operators, or functions, you can exclude them from automatic tracking by using the following function:

  • dont_track(dag)
  • dont_track(operator)

Alternatively, you can use @dont_track decorator shown in the following example:

@dont_track
def f():
  pass

In addition, you can control the list of synced DAGs from Syncer Config page at Databand UI.

To Track Specific DAGs

If you do not use automatic tracking, and you want to track specific DAGs, operators, or functions you can use the track_dag function in your DAG definition.

args = {"start_date": airflow.utils.dates.days_ago(2)}
dag = DAG(dag_id="my_dag", default_args=args, schedule_interval=timedelta(days=1))

def task1():
    print("Hello World!")

PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

track_dag(dag)

Architecture of Airflow Tracking by Databand

The Airflow integration works by tracking data from the Airflow database and syncing Airflow data with metadata observed from tasks and other system levels during the DAGs execution time.
There are two main components:

  • Databand's Syncer (Sync DAG).
  • Databand's Runtime Tracking.

Databand Syncer (Sync DAG or standalone monitor)

Databand's Syncer is in charge of bringing all the information about the current state of execution into Databand Service. It can run as a standalone service or as a DAG that will run "sync" operations periodically. It has two sources of info:

  • Airflow Database (main source) - it will be used to gather the state information and other meta info about the DAGs
  • Airflow DAGBag with the "in-memory" representation of all DAGs. A DAGBag is a collection of dags, loaded in memory by running user code with DAGs definition (Airflow DAGBag is the official way of loading DAG info). Airflow Database at old Airflow versions doesn't have the full context of the DAG (Dag Structure for example) therefore Databand will load DAGS from disk into DAGBag and sync the DAG structure. While Airflow DAGbag will parse all DAGs in the DAGs folder, currently Databand sends only relevant dags to the server (in your case that are DAGs defined by the filter)

Databand's Runtime Tracking

Our Runtime Tracking wraps Airflow Operator execute() function and constantly tracks runtime information into Databand Service. It tracks Operator start/end time, user metrics emitted from the code, user exceptions, and a lot of additional information. We will not show the run in our UI until it's synced by Databand Syncer.


Did this page help you?