Custom Marshaller

Writing data in a custom file format or to reuse a different known format (adding support for marshalling SizedMessage into joblib object with z file extension).

You can extend the existing Marshaller with new formats.

import pyspark
from dbnd_spark.spark_targets import SparkMarshaller
from targets.marshalling import register_marshaller

register_marshaller(pyspark.sql.DataFrame, "avro", SparkMarshaller(fmt="avro"))

This example shows how to write data in a custom file format or to reuse some known format. Here, we want to add support for marshaling SizedMessage into joblib object with z file extension.

We need to register:

  1. The file extension.
  2. The Marshaller that we wrote.
This is a way to add another format to serialize/deserialize the SizedMessage example object
import logging

import joblib

from dbnd import output, task
from targets.marshalling import register_marshaller
from targets.marshalling.marshaller import Marshaller
from targets.target_config import TargetConfig, register_file_extension
from targets.values import ValueType, register_value_type

logger = logging.getLogger(__name__)

class SizedMessage(object):
    def __init__(self, msg, size):
        self.msg = msg
        self.size = size

# This defines how SizedMessage is parsed and serialised into strings
class MessageValueType(ValueType):
    type = SizedMessage

    def parse_from_str(self, x):
        parts = x.split(">")
        return SizedMessage(parts[0], parts[1])

    def to_str(self, x):
        return x.msg + ">" + str(x.size)

# This registers value type with value type registry

# 1. create file extension
z_file_ext = register_file_extension("z")

class JoblibSizedMessageMarshaller(Marshaller):
    def target_to_value(self, target, **kwargs):
        with as fp:
            from_file = joblib.load(
            return from_file

    def value_to_target(self, value, target, **kwargs):
        with"w") as fp:

# 2. register type to extension mapping
register_marshaller(SizedMessage, z_file_ext, JoblibSizedMessageMarshaller())

def dump_as_joblib():
    # type: ()-> SizedMessage
    return SizedMessage("example message \n", 10)

def load_as_joblib(sized_message: SizedMessage):
    return sized_message.msg * sized_message.size

What’s Next
Did this page help you?