Spark Integration

How to create Spark tasks in Databand.

Apache Spark is a unified analytics engine for large-scale data processing. In this section, we show how to configure Spark integration and implement Spark tasks with DBND.

πŸ“˜

Please make sure you have configured DBND to work with your spark cluster

Spark Configuration

Creating Spark Tasks

DBND supports the following ways to write spark logic:

  • Inline Spark tasks - a function that takes Spark DataFrame(s) as an input and returns a Spark DataFrame(s) as an output
  • PySpark tasks - a task that takes any arbitrary PySpark script and wraps it into a DBND pipeline
  • JVM Task - a task that takes any arbitrary Java/Scala script and wraps it into a DBND pipeline.

Inline Spark Task

The simplest way to write a Spark task with DBND is to:

  • Write a function that gets a Spark DataFrame, transforms it using Spark APIs and returns a transformed Spark DataFrame
  • Annotate this function using DBND's @spark_task decorator

The example below shows a word count Spark function wrapped with a DBND decorator.

import pyspark.sql as spark
from dbnd import output, parameter
from dbnd_spark.spark import spark_task

@spark_task
def prepare_data_spark(text: spark.DataFrame)->spark.DataFrame:
    from operator import add
    from dbnd_spark.spark import get_spark_session

    lines = text.rdd.map(lambda r: r[0])
    counts = (
        lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
    )

    return  counts

This task will run on a Spark cluster (the local Spark installation by default) and return a DataFrame with the word index.

Inline Spark provides access to Spark sessions using the get_spark_session() method. Spark DataFrames are supported as input parameters and as function output.

To run this task on DBND:

dbnd run prepare_data_spark --set text=<some file>

PySpark Task

Another common option is to run an existing PySpark script using DBND. To do this, specify the name of the script and define input/output parameters:

class PrepareDataPySparkTask(PySparkTask):
    text = parameter.data
    counters = output

    python_script = spark_folder("pyspark/word_count.py")

    def application_args(self):
        return [self.text, self.counters]

In the example above, you can see the python_script points to the Python Spark code, while application_args is a list of command-line arguments passed to the script.

JVM Spark Task

To run a Spark job implemented with Java/Scala, provide the name of the JAR and the name of the main class to be used by spark_submit:

class PrepareDataSparkTask(SparkTask):
    text = parameter.data
    counters = output

    main_class = "ai.databand.examples.WordCount"
    defaults = {SparkConfig.main_jar: "${DBND_HOME}/databand_examples/tool_spark/spark_jvm/target/ai.databand.examples-1.0-SNAPSHOT.jar"}

    def application_args(self):
        return [self.text, self.counters]

In the example above, you can see main_class points to the main Java class, while application_args is a list of command-line arguments passed to the class.

Pipelines with Spark Tasks

Spark tasks are not different from the regular task when you use them at @pipeline task


Did this page help you?