Custom Value Type
How to extend DBND and create value types.
Custom value types are used as parameters for tasks. They can be parsed from configuration files or the command line and then used in code.
Supporting advanced DataFrames
You can enable support for other dataframe types by implementing a newDataFrameValueType
and registering it with dbnd
.
For example, you might want to log a Koalas dataframe. Koalas implements the pandas API on top of Spark.
In order to support Koalas, copy the code snippet below, and make sure it is running when your system is starting up:
import databricks.koalas as ks
from targets.values import DataFrameValueType, register_value_type
class KoalasValueType(DataFrameValueType):
type = ks.DataFrame
type_str = "KoalasDataFrame"
register_value_type(KoalasValueType())
Please notice that Koala implements Panadas API ,so base class DataFrameValueType
can be used.
To Define a Custom Value Type
from targets.values import ValueType, register_value_type
# This is a custom object we want to use as a parameter in tasks
class MyCustomObject(object):
def __init__(self, custom):
self.custom = custom
# This defines how MyCustomObject is parsed and serialied into strings
class _MyCustomObjectValueType(ValueType):
type = MyCustomObject
def parse_from_str(self, x):
return MyCustomObject(x)
def to_str(self, x):
return x.custom
# This registers value type with value type registry
register_value_type(_MyCustomObjectValueType())
To Use a Custom Value Type at Task definition Task Class
from typing import List, Set, Dict
from dbnd import PythonTask, parameter, output
class TaskWithCustomValue(PythonTask):
custom_value = parameter.type(_MyCustomObjectValueType)
list_of_customs = parameter.sub_type(_MyCustomObjectValueType)[List]
report = output[str]
def run(self):
assert isinstance(self.custom_value, MyCustomObject)
assert isinstance(self.list_of_customs[1], MyCustomObject)
self.report = self.custom_value.custom + self.list_of_customs[1].custom
class TaskWithCustomValueInline(PythonTask):
# works only after registration!
custom_value = parameter[MyCustomObject]
custom_value_with_default = parameter.value(MyCustomObject("1"))
list_of_customs = parameter.sub_type(MyCustomObject)[List]
set_of_customs = parameter.sub_type(MyCustomObject)[Set]
dict_of_customs = parameter.sub_type(MyCustomObject)[Dict]
report = output[str]
def run(self):
assert isinstance(self.custom_value, MyCustomObject)
assert isinstance(self.custom_value_with_default, MyCustomObject)
self.report = "_".join(
[
c.custom
for c in [
self.custom_value,
self.custom_value_with_default,
self.list_of_customs[0],
self.dict_of_customs["a"],
]
]
)
Updated 10 months ago