GuidesAPI ReferenceDiscussions
GuidesBlogPlatform

Task Outputs

On task outputs in Databand.

When DBND runs are executed, output data, metadata, and return values are persisted into the file system automatically.

  • The task return value is persisted based on the object type.
  • The prepare_data task returns a DataFrame object which is serialized into a pickle and is given a default name of result.
  • By default, DataFrames are serialized into csv files, and strings into txt files.
  • Every output is persisted as a unique location based on task signature.

You can change the name of a task result by using the result property:

from dbnd import task
from pandas import DataFrame

@task(result="data")
def prepare_data(data: DataFrame):
    return data

You can specify output type as part of the task definition:

from dbnd import task, output
from pandas import DataFrame

@task(result=output.txt)
def prepare_data(data: DataFrame):
    return data

This would cause the data object to be persisted as txt instead of pickle.

It is also possible to set output path programmatically for each task in the pipeline:

from dbnd import task, pipeline

@task()
def prepare_data() -> str:
    return "some data"


@task()
def process_data(input_text: str):
    print(input_text)


@pipeline()
def some_pipeline():
    prepared = prepare_data(result='custom_path.txt')
    process_data(prepared)

Or, alternately, the output path can be set via command-line arguments. Call argument (prepare_data(result='custom_path.txt')) takes precedence over the command line, and should be left empty:

from dbnd import pipeline

@pipeline()
def some_pipeline():
    prepared = prepare_data()
    process_data(prepared)
dbnd run demo.some_pipeline --task-version=now --set prepare_data.result=custom_path.txt

If you want to have direct control over task output, you can define a function as output and write to this output directly:

from dbnd import task, output
from targets.types import Path

@task(custom_output=output[Path])
def prepare_data(data_path: Path, custom_output: Path) -> str:
    with open(str(custom_output), "w") as fp:
        fp.write("my data successfully written")
    return str(data_path)

This task has one input data_path and two outputs result and custom_output. This task can be run with:

dbnd run prepare_data --set data_path=<some file>

You can define how your custom object is serialized into a string Custom Value Type or into a custom format Custom Parameter HDF5.

You can also define the exact way data is written into a file system - see the example described in Orchestration Examples.

Persisting Outputs

By default, a path template is defined at $DBND_LIB\databand-core.cfg as:

[output]
path_task = {root}/{task_env}/date={task_target_date}/{task_name}{task_class_version}_{task_signature}/{output_name}{output_ext}

By default, {root} is set to $DBND_HOME\data and {env} is equal to dev. The exact path of task outputs is available in logs and the console. For example, this is a path generated by databand: "/data/dev/2021-07-02/prepare_data/prepare_data_6236128077/result.csv"

Your environment configuration defines where to write outputs. For example, when working in research, you may persist outputs locally using smaller data. Running in production you may write to a cloud environment using terabytes of data.

Each task run will store its output and metadata in the environment file system by default. The default local environment root is set to $DBND_HOME/data.

For example, running prepare_data task on the same data set from 4th of August 2018, with default local environment configuration would produce the following artifacts:

  • $DBND_HOME/dbnd/data/dev/date=2018-08-04 directory, if such directory doesn't yet exist
  • PrepareData_9574807509 directory under $DBND_HOME/dbnd/data/dev/date=2018-08-04 directory (the numerical value after the task name is a task signature)
  • The task output (i.e., the prepared data) would be stored within the PrepareData_9574807509 directory
  • An additional meta-directory under PrepareData_9574807509. DBND will use this directory to store run info metadata including metrics, git commit and other artifacts.

[output] Configuration Section Parameter Reference

  • path_task - Set the default path for every Task.
  • path_prod_immutable_task - Set the format of the path to be used by Production Immutable tasks.
  • hdf_format - Determine default format to save DataFrame to hdf.
  • deploy_id - Set deploy prefix to use for remote deployments.

Controlling the Structure of the Output Path

DBND will generate a unique output location for every output parameter based on task name, version, and environment configuration. You can affect that by changing:

  1. The output path format string for all tasks via [output] path_root configuration value.
  2. The _conf__base_output_path_fmt property of the Task class.
  3. (For specific tasks) The task property task_output_path_format.

Overwriting Data Targets

You can overwrite a target to be able to write to the same data source continuously.

For example, on a regular basis, you run a pipeline that consumes yesterday's data stored in the same path (for example, s3://some-bucket/customers/data.csv). In this scenario, you need to create the input data and write it to the same bucket every day.

As an alternative to this manual approach, you can create another task that runs every day. It collects all customer data and then writes the output to the s3://some-bucket/customers/data.csv.

To define this second task, we need to define an overwrite target. See the following example:

from dbnd import task, output, parameter
from pandas import DataFrame

@task(result=output(default=DEFAULT_OUTPUT_HDFS_PATH).overwrite.csv[DataFrame])
def prepare_data(data=parameter[DataFrame]):
    return data

Temporary Outputs

Currently, by default, Databand saves your in-memory objects to the file system, meaning all outputs will be persisted indefinitely.

The upside of that is that if you try to run X -> Y, and Y crashes, you can resume with the previously built X.

The downside is that you will have a lot of intermediate results on your file system. A useful pattern is to put these files in a special directory and have some kind of periodical garbage collection clean it up.

Task.task_in_memory_outputs

Set this property to true to store task outputs in memory.
Example:
calculate_alpha(task_in_memory_outputs=True)

Task Outputs Names

DBND tasks can also return a complex object such as a tuple. You can use the result parameter to name the elements in a tuple. This way, downstream tasks and UI can work with meaningful names.

Example:

from dbnd import task
from pandas import DataFrame
from typing import Tuple

@task(result="training_set,real_data")
def prepare_data(data: DataFrame) -> Tuple[DataFrame, DataFrame]:
    data["new_column"] = 5
    return data, data

All tasks in this example return two DataFrames. Tasks show various syntaxes to name these outputs:

from dbnd import task
from pandas import DataFrame
from typing import NamedTuple


@task(result=("training_set", "real_data"))
def prepare_data(p: int = 1) -> (DataFrame, DataFrame):
    return (
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
    )


@task(result="training_set,real_data")
def prepare_data(p: int = 1) -> (DataFrame, DataFrame):
    return (
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
    )


@task
def prepare_data(p: int) -> (DataFrame, DataFrame):
    return (
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
    )


Outputs = NamedTuple("outputs", training_set=DataFrame, real_data=DataFrame)
@task
def prepare_data(p: int) -> Outputs:
    return Outputs(
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
    )


@task
def prepare_data(p: int) -> NamedTuple(
    "Outputs", fields=[("training_set", DataFrame), ("real_data", DataFrame)]
):
    return (
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
        DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
    )

Task Output Extension

This table describes DBND's mapping from file formats to extensions:

NameInput File ExtentionOutput File Extention
csvcsvcsv
txttxt, pytxt
hdf5h5, hdf5h5
jsonjson, hjsonjson
tabletabletable
featherfeatherfeather
numpynpy, numpynpy
excelexcelexcel
parquetparquetparquet
htmlhtmlhtml
picklepicklepickle

A file named myfile.csv.gz would be parsed as a gz-compressed csv file. The following compression types are supported: gz, bz2, zip, xz, snappy.

This table summarizes how various in-memory objects are marshalled into file formats:

Python TypeConfiguration KeyDefault MarshallingAdditional Marshallers
Pandas Dataframepandas_dataframecsvcsv, table, json, html, pickle, parquet, feather, hdf5
Dict[DataFrame]pandas_df_dicthdf5
Numpy Arraysnumpy_ndarraynumpynumpy, pickle
strstrtxt
List[str]list_strtxt (as lines)
objectobjectpicklepickle, json
List[object]listpickle

In order to change the marshaling behavior, use a config file:

[output]
pandas_dataframe = hdf5

This would cause DBND to store the DataFrame in hdf5 format.

Output Serialization Options

If you want to save a result without a header, you can use the following option:

from dbnd import task, parameter
from pandas import DataFrame
from targets.target_config import FileFormat

@task(result=parameter.csv.save_options(FileFormat.csv, header=False))
def prepare_data(data: DataFrame) -> DataFrame:
    data["new_column"] = 5
    return data

This way to_csv options can be provided.

Notice that along with the specification of save_options, you must also provide the actual file format wanted, hence the .csv addition is included in both options.