Integrating with Apache Airflow

A guide to enabling tracking for Apache Airflow DAGs

Setting up Airflow Integration

To fully integrate Databand with your Airflow environment:

  1. Install our runtime tracking dbnd-airflow-auto-tracking python package on your Airflow cluster
  2. Install Airflow Syncer DAG
    • Create databand_airflow_monitor DAG in Airflow. Please create a new file databand_airflow_monitor.py with the following dag definition and add it to your project DAGs.
    • Deploy your new DAG and enable it in Airflow UI.
# databand_airflow_monitor.py
from airflow_monitor.monitor_as_dag import get_monitor_dag
# This DAG is used by Databand to monitor your Airflow installation.
dag = get_monitor_dag()
  1. Configure a New Airflow Syncer at Databand Application.
    • Click on "Integrations" at the left-side menu
    • Click "Add Syncer"

To find this information on Airflow environments (on-prem and managed), refer to our reference guides below:

Using Airflow Connection to Configure Airflow Monitor and SDK

In order to configure the Airflow monitor and SDK, you can create an Airflow connection with an ID of dbnd_config and use its JSON field to provide configuration parameters as shown below. You can use JSON string automatically generated by Apache Airflow Syncer wizard. This configuration will become a part of the configuration used by Databand. See SDK Configuration for more ways of configuration. Setting Databand configurations using Airflow connections will overwrite configurations from Environment and project.cfg.

The JSON contains the following fields:

Mandatory Parameters
Replace the values in <...> with the following information:

Optional Parameters

  • <dag_ids> - list of specific DAG IDs that you want to track
  • <number_of_airflow_log_head_bytes> - number of bytes to collect from DAG log heads
  • <number_of_airflow_log_tail_bytes> - number of bytes to collect from DAG log tails

In order to add a new config you need to:

  1. Go to connections page in Airflow UI.
  2. Add a new Connection:
    1. conn_id: dbnd_config
    2. conn_type: HTTP
    3. Extra: databand configurations formatted in json
    For example:
{
  "core": {
      "databand_url": "http://localhost:8080"
    }
}

Edit dbnd_config

The JSON object contains the minimum parameters required to establish communication between Airflow and Databand.

In general, the JSON keys map to the parameters found in databand-core.cfg. See our Configuration documentation on how you can extend this object with additional parameters.

However, you should NOT modify the values of the following fields manually:

  • dag_ids
  • number_of_airflow_log_head_bytes
  • number_of_airflow_log_tail_bytes
  • track_source_code

These fields get their values automatically from the values you provide in the syncer dialog.
Even when you edit an existing syncer, the Databand monitor would sync them to dbnd_config in Airflow automatically.

Controlling Tracked DAGs

By default, all Airflow DAGs are synced. A user can optionally filter out DAGs by providing an explicit list of DAG IDs to monitor. In Apache Airflow Syncer Configuration, a user can provide a comma-separated list of DAGs to sync in the Syncer Edit Page

If you do not want to track specific DAGs, operators, or functions, you can exclude them from automatic tracking by using the following function:

  • dont_track(dag)
  • dont_track(operator)

Alternatively, you can use @dont_track decorator shown in the following example:

from dbnd import dont_track

@dont_track
def f():
  pass

To Track Specific DAGs

If you don't want to use automatic tracking, you can install dbnd-airflow package instead of dbnd-airflow-auto-tracking.
For specific DAGs you want to track you can the track_dag function in your DAG definition.

from dbnd_airflow import track_dag

track_dag(dag)

Databand Monitor DAG Memory Guard

When running the monitor DAG, it automatically limits the amount of memory it can consume. The default value is 8GB. If the monitor consumes more memory than the guard allows for any reason, it will automatically stop itself.

Add the guard_memory parameter to the get_monitor_dag function and set it to the maximum number of bytes the monitor can consume. For example, the below parameter would limit memory consumption to 5GB:

from airflow_monitor.monitor_as_dag import get_monitor_dag

dag = get_monitor_dag(guard_memory=5 * 1024 * 1024 * 1024)

The main source of memory consumption by Databand Monitor Dag is Airflow DAGBag with the "in-memory" representation of all DAGs. A DAGBag is a collection of dags, loaded in memory by running user code with DAGs definition (Airflow DAGBag is the official way of loading DAG info). Airflow Database at old Airflow versions doesn't have the full context of the DAG (Dag Structure for example) therefore Databand will load DAGS from disk into DAGBag and sync the DAG structure. While Airflow DAGbag will parse all DAGs in the DAGs folder, currently Databand sends only relevant dags to the server (in your case that are DAGs defined by the filter)


Did this page help you?