Task Outputs
On task outputs in Databand.
When DBND runs are executed, output data, metadata, and return values are persisted into the file system automatically.
- The task return value is persisted based on the object type.
- The
prepare_data
task returns a DataFrame object which is serialized into a pickle and is given a default name ofresult
. - By default, DataFrames are serialized into
csv
files, and strings intotxt
files. - Every output is persisted as a unique location based on task signature.
You can change the name of a task result by using the result
property:
from dbnd import task
from pandas import DataFrame
@task(result="data")
def prepare_data(data: DataFrame):
return data
You can specify output type as part of the task definition:
from dbnd import task, output
from pandas import DataFrame
@task(result=output.txt)
def prepare_data(data: DataFrame):
return data
This would cause the data
object to be persisted as txt
instead of pickle.
It is also possible to set output path programmatically for each task in the pipeline:
from dbnd import task, pipeline
@task()
def prepare_data() -> str:
return "some data"
@task()
def process_data(input_text: str):
print(input_text)
@pipeline()
def some_pipeline():
prepared = prepare_data(result='custom_path.txt')
process_data(prepared)
Or, alternately, the output path can be set via command-line arguments. Call argument (prepare_data(result='custom_path.txt')
) takes precedence over the command line, and should be left empty:
from dbnd import pipeline
@pipeline()
def some_pipeline():
prepared = prepare_data()
process_data(prepared)
dbnd run demo.some_pipeline --task-version=now --set prepare_data.result=custom_path.txt
If you want to have direct control over task output, you can define a function as output and write to this output directly:
from dbnd import task, output
from targets.types import Path
@task(custom_output=output[Path])
def prepare_data(data_path: Path, custom_output: Path) -> str:
with open(str(custom_output), "w") as fp:
fp.write("my data successfully written")
return str(data_path)
This task has one input data_path
and two outputs result
and custom_output
. This task can be run with:
dbnd run prepare_data --set data_path=<some file>
You can define how your custom object is serialized into a string Custom Value Type or into a custom format Custom Parameter HDF5.
You can also define the exact way data is written into a file system - see the example described in Orchestration Examples.
Persisting Outputs
By default, a path template is defined at $DBND_LIB\databand-core.cfg
as:
[output]
path_task = {root}/{task_env}/date={task_target_date}/{task_name}{task_class_version}_{task_signature}/{output_name}{output_ext}
By default, {root}
is set to $DBND_HOME\data
and {env}
is equal to dev
. The exact path of task outputs is available in logs and the console. For example, this is a path generated by databand: "/data/dev/2021-07-02/prepare_data/prepare_data_6236128077/result.csv"
Your environment configuration defines where to write outputs. For example, when working in research, you may persist outputs locally using smaller data. Running in production you may write to a cloud environment using terabytes of data.
Each task run will store its output and metadata in the environment file system by default. The default local
environment root is set to $DBND_HOME/data
.
For example, running prepare_data
task on the same data set from 4th of August 2018, with default local
environment configuration would produce the following artifacts:
$DBND_HOME/dbnd/data/dev/date=2018-08-04
directory, if such directory doesn't yet existPrepareData_9574807509
directory under$DBND_HOME/dbnd/data/dev/date=2018-08-04
directory (the numerical value after the task name is a task signature)- The task output (i.e., the prepared data) would be stored within the
PrepareData_9574807509
directory - An additional meta-directory under
PrepareData_9574807509
. DBND will use this directory to store run info metadata including metrics, git commit and other artifacts.
[output]
Configuration Section Parameter Reference
[output]
Configuration Section Parameter Referencepath_task
- Set the default path for every Task.path_prod_immutable_task
- Set the format of the path to be used by Production Immutable tasks.hdf_format
- Determine default format to save DataFrame to hdf.deploy_id
- Set deploy prefix to use for remote deployments.
Controlling the Structure of the Output Path
DBND will generate a unique output location for every output parameter based on task name, version, and environment configuration. You can affect that by changing:
- The output path format string for all tasks via
[output] path_root
configuration value. - The
_conf__base_output_path_fmt
property of the Task class. - (For specific tasks) The task property
task_output_path_format
.
Overwriting Data Targets
You can overwrite a target to be able to write to the same data source continuously.
For example, on a regular basis, you run a pipeline that consumes yesterday's data stored in the same path (for example, s3://some-bucket/customers/data.csv
). In this scenario, you need to create the input data and write it to the same bucket every day.
As an alternative to this manual approach, you can create another task that runs every day. It collects all customer data and then writes the output to the s3://some-bucket/customers/data.csv
.
To define this second task, we need to define an overwrite target. See the following example:
from dbnd import task, output, parameter
from pandas import DataFrame
@task(result=output(default=DEFAULT_OUTPUT_HDFS_PATH).overwrite.csv[DataFrame])
def prepare_data(data=parameter[DataFrame]):
return data
Temporary Outputs
Currently, by default, Databand saves your in-memory objects to the file system, meaning all outputs will be persisted indefinitely.
The upside of that is that if you try to run X -> Y, and Y crashes, you can resume with the previously built X.
The downside is that you will have a lot of intermediate results on your file system. A useful pattern is to put these files in a special directory and have some kind of periodical garbage collection clean it up.
Task.task_in_memory_outputs
Set this property to
true
to store task outputs in memory.
Example:
calculate_alpha(task_in_memory_outputs=True)
Task Outputs Names
DBND tasks can also return a complex object such as a tuple. You can use the result parameter to name the elements in a tuple. This way, downstream tasks and UI can work with meaningful names.
Example:
from dbnd import task
from pandas import DataFrame
from typing import Tuple
@task(result="training_set,real_data")
def prepare_data(data: DataFrame) -> Tuple[DataFrame, DataFrame]:
data["new_column"] = 5
return data, data
All tasks in this example return two DataFrames
. Tasks show various syntaxes to name these outputs:
from dbnd import task
from pandas import DataFrame
from typing import NamedTuple
@task(result=("training_set", "real_data"))
def prepare_data(p: int = 1) -> (DataFrame, DataFrame):
return (
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
)
@task(result="training_set,real_data")
def prepare_data(p: int = 1) -> (DataFrame, DataFrame):
return (
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
)
@task
def prepare_data(p: int) -> (DataFrame, DataFrame):
return (
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
)
Outputs = NamedTuple("outputs", training_set=DataFrame, real_data=DataFrame)
@task
def prepare_data(p: int) -> Outputs:
return Outputs(
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
)
@task
def prepare_data(p: int) -> NamedTuple(
"Outputs", fields=[("training_set", DataFrame), ("real_data", DataFrame)]
):
return (
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
DataFrame(data=[[p, 1]], columns=["c1", "c2"]),
)
Task Output Extension
This table describes DBND's mapping from file formats to extensions:
Name | Input File Extention | Output File Extention |
---|---|---|
csv | csv | csv |
txt | txt, py | txt |
hdf5 | h5, hdf5 | h5 |
json | json, hjson | json |
table | table | table |
feather | feather | feather |
numpy | npy, numpy | npy |
excel | excel | excel |
parquet | parquet | parquet |
html | html | html |
pickle | pickle | pickle |
A file named myfile.csv.gz
would be parsed as a gz
-compressed csv
file. The following compression types are supported: gz, bz2, zip, xz, snappy
.
This table summarizes how various in-memory objects are marshalled into file formats:
Python Type | Configuration Key | Default Marshalling | Additional Marshallers |
---|---|---|---|
Pandas Dataframe | pandas_dataframe | csv | csv, table, json, html, pickle, parquet, feather, hdf5 |
Dict[DataFrame] | pandas_df_dict | hdf5 | |
Numpy Arrays | numpy_ndarray | numpy | numpy, pickle |
str | str | txt | |
List[str] | list_str | txt (as lines) | |
object | object | pickle | pickle, json |
List[object] | list | pickle |
In order to change the marshaling behavior, use a config file:
[output]
pandas_dataframe = hdf5
This would cause DBND to store the DataFrame in hdf5
format.
Output Serialization Options
If you want to save a result without a header, you can use the following option:
from dbnd import task, parameter
from pandas import DataFrame
from targets.target_config import FileFormat
@task(result=parameter.csv.save_options(FileFormat.csv, header=False))
def prepare_data(data: DataFrame) -> DataFrame:
data["new_column"] = 5
return data
This way to_csv options can be provided.
Notice that along with the specification of
save_options
, you must also provide the actual file format wanted, hence the.csv
addition is included in both options.
Updated 5 months ago