TL;DR Hamilton now has a full pyspark integration. This enables you to build modular pyspark applications by declaring each transformation as a hamilton function. The new with_columns decorator enables you to specify a series of individual column operations to a dataframe, written as pandas UDFs, python UDFs, or function of pyspark dataframes. Together with vanilla Hamilton, this new integration can help you break complex pyspark code into a series of self-contained, unit-testable functions.
A spark application written with Hamilton that represents multiple joins (in blue), a set of map operations (in green) and a set of join/filters (in yellow). This uses Hamilton’s visualization features (with a little extra annotation). See TPC-H query 8 for motivation.
Apache Spark (and its python API, pyspark) is an open-source library for building out highly scalable data transformations. At its core is the notion of the RDD (resilient distributed dataframe), which represents a lazily evaluated, partitioned, in-memory dataset that stores the information needed to recreate the data if any of the servers computing it fail. The pyspark library gives data practitioners a dataframe-centric API to interact with this in python, enabling them to specify computation and scale up to the resources they have available. Since its introduction in 2014, spark has taken off and is now the de facto way to perform computations on large (multi gb -> multi tb) datasets.
Just like any data transformation scripts, spark applications can be difficult to maintain and manage, and often devolve into spaghetti code over time. Specifically, we've observed the following problems with pyspark code:
As this is a README inside the Hamilton repository, we assume some basic familiarity. That said, here's a quick primer:
Hamilton is an open-source Python framework for writing data transformations. One writes Python functions in a declarative style, which Hamilton parses into nodes in a graph based on their names, arguments and type annotations. The simple rule is akin to that of pytest fixtures -- the name of a parameter points to another node (function) in the graph, and the name of the function defines a referencable node. You can request specific outputs, and Hamilton will execute the required nodes (specified by your functions) to produce them.
You can try hamilton out in your browser at tryhamilton.dev.
Breaking your spark application into Hamilton functions with pyspark dataframes as inputs and outputs gets you most of the way towards more modular/documented code. That said, it falls flat in a critical area – column-level lineage/transformation simplicity. For complex series of map operations, spark represents all transformations on a single dataframe in a linear chain by repeatedly calling withColumn/select to create columns. For dataframe APIs that manage indices, hamilton improves this experience by encouraging the user to pull apart column-level transformations then join later. With columns that share cardinality, this is generally an efficient approach.
Spark, however, has no notion of indices. Data is partitioned across a cluster, and once a set of columns is selected it has the potential to be reshuffled. Thus, the two options one previously had for integrating with pyspark both have disadvantages:
The idea is to break your transformations into sections that form one of two shapes.
For the first case, we just use the pyspark dataframe API. You define functions that, when put through Hamilton, act as a pipe. For example:
Case 1: load, join, and filter data (aggregations not shown):
import pyspark.sql as ps @load_from.csv(path="data_1.csv" inject_="raw_data_1", spark=source("spark")) @load_from.parquet(path="data_2.parquet", inject_="raw_data_2") def all_initial_data_unfiltered(raw_data_1: ps.DataFrame, raw_data_2: ps.DataFrame) -> ps.DataFrame: """Combines the two loaded dataframes""" return _custom_join(raw_data_1, raw_data_2) def all_initial_data(all_inital_data_unfiltered: ps.DataFrame) -> ps.DataFrame: """Filters the combined dataframe""" return _custom_filter("some_column > 0")
Case 2: define columnar operations in a DAG:
import pandas as pd def column_3(column_1_from_dataframe: pd.Series) -> pd.Series: return _some_transform(column_1_from_dataframe) def column_4(column_2_from_dataframe: pd.Series) -> pd.Series: return _some_other_transform(column_2_from_dataframe) def column_5(column_3: pd.Series, column_4: pd.Series) -> pd.Series: return _yet_another_transform(column_3, column_4)
Finally, we combine them together with a call to with_column:
from hamilton.plugins.h_spark import with_columns import pyspark.sql as ps import map_transforms # file defined above @with_columns( map_transforms, columns_to_pass=["column_1_from_dataframe", "column_2_from_dataframe"] ) def final_result(all_initial_data: ps.DataFrame) -> ps.DataFrame: """Gives the final result. This decorator will apply the transformations in the order specified in the DAG. Then, the final_result function is called, with the result of the transformations passed in.""" return _process(all_initial_data)
Contained within the load_from functions/modules is a set of transformations that specify a DAG. These transformations can take multiple forms – they can use vanilla pyspark operations, pandas UDFs, or standard python UDFs. See documentation for specific examples.
The easiest way to think about this is that the with_columns decorator “linearizes” the DAG. It turns a DAG of hamilton functions into a topologically sorted chain, repeatedly appending those columns to the initial dataframe.
@with_columns takes in the following parameters (see the docstring for more info)
load_from -- a list of functions/modules to find the functions to load the DAG from, similar to @subdagcolumns_to_pass -- not compatible with pass_dataframe_as. Dependencies specified from the initial dataframe, injected in. Not that you must use one of this or pass_dataframe_aspass_dtaframe_as -- the name of the parameter to inject the initial dataframe into the subdag. If this is provided, this must be the only pyspark dataframe dependency in the subdag that is not also another node (column) in the subdag.select -- a list of columns to select from the UDF group. If not specified all will be selected.namespace -- the namespace of the nodes generated by this -- will default to the function name that is decorated.Note that the dependency that forms the core dataframe will always be the first parameter to the function. Therefore, the first parameter must be a pyspark dataframe and share the name of an upstream node that returns a pyspark dataframe.
You have two options when presenting the initial dataframe/how to read it. Each corresponds to a with_columns parameter. You can use: 1.columns_to_pass to constrain the columns that must exist in the initial dataframe, which you refer to in your functions. In the example above, the functions can refer to the three columns column_1_from_dataframe, column_2_from_dataframe, and column_3_from_dataframe, but those cannot be named defined by the subdag. 2. pass_dataframe_as to pass the dataframe you're transforming in as a specific parameter name to the subdag. This allows you to handle the extraction -- use this if you want to redefine columns in the dataframe/preserve the same names.
import pandas as pd, pyspark.sql as ps #map_transforms.py def colums_1_from_dataframe(input_dataframe: ps.DataFrame) -> ps.Column: return input_dataframe.column_1_from_dataframe def column_2_from_dataframe(input_dataframe: ps.DataFrame) -> ps.Column: return input_dataframe.column_2_from_dataframe def column_3(column_1_from_dataframe: pd.Series) -> pd.Series: return _some_transform(column_1_from_dataframe) def column_4(column_2_from_dataframe: pd.Series) -> pd.Series: return _some_other_transform(column_2_from_dataframe) def column_5(column_3: pd.Series, column_4: pd.Series) -> pd.Series: return _yet_another_transform(column_3, column_4)
from hamilton.experimental.h_spark import with_columns import pyspark.sql as ps import map_transforms # file defined above @with_columns( map_transforms, # Load all the functions we defined above pass_dataframe_as="input_dataframe", #the upstream dataframe, referred to by downstream nodes, will have this parametter name ) def final_result(all_initial_data: ps.DataFrame) -> ps.DataFrame: """Gives the final result. This decorator will apply the transformations in the order. Then, the final_result function is called, with the result of the transformations passed in.""" return _process(all_initial_data)
Approach (2) requires functions that take in pyspark dataframes and return pyspark dataframes or columns for the functions reading directly from the dataframe. If you want to stay in pandas entirely for the with_columns group, you should use approach (1).
There are four types of transforms supported that can compose the group of DAG transformations:
These are functions of series:
from hamilton import htypes def foo(bar: pd.Series, baz: pd.Series) -> htypes.column[pd.Series, int]: return bar + 1
The rules are the same as vanilla hamilton -- the parameter name determines the upstream dependencies and the function name determines the output column name.
Note that, due to the type-specification requirements of pyspark, these have to return a “typed” (Annotated[]) series, specified by htypes.column. These are adapted to form pyspark-friendly pandas UDFs
These are functions of python primitives:
def foo(bar: int, baz: int) -> int: return bar + 1
These are adapted to standard pyspark UDFs.
These are functions that take in a pyspark dataframe (single) and output a pyspark column.
def foo(bar: ps.DataFrame) -> ps.Column: return df["bar"] + 1
Note that these have two forms:
@require(...) decorator, to specify which column you want to use.import h_spark @h_spark.require_columns("bar", "baz") def foo(bar_baz: ps.DataFrame) -> ps.Column: return df["bar"] + 1
In this case we are only allowed a single dataframe dependency, and the parameter name does not matter. The columns specified are injected into the dataframe, allowing you to depend on multiple upstream columns.
This is the ultimate power-user case, where you can manipulate the dataframe in any way you want. Note that this and the column-flavor is an out, meaning that its a way to jump back to the pyspark world and not have to break up your map functions for a windowed aggregation. You can easily shoot yourself in the foot here. This should only be used if you strongly feel the need to inject a map-like (index-preserving, but not row-wise) operation into the DAG, and the df -> column flavor is not sufficient (and if you find yourself using this a lot, please reach out, we'd love to hear your use-case).
This has the exact same rules as the column flavor, except that the return type is a dataframe.
import h_spark @h_spark.require_columns("bar", "baz") def foo(df: ps.DataFrame) -> ps.DataFrame: return df.withColumn("bar", df["bar"] + 1)
Note that this is isomorphic to the column-flavor in which you (not the framework) are responsible for calling withColumn.
We have implemented the hamilton hello_world example in run.py and the map_transforms.py/dataflow.py files so you can compare. You can run run.py:
python run.py
and check out the interactive example in the notebook.ipynb file.
We have also implemented three of the TPC-H query functions to demonstrate a more real-world set of queries:
See the README for more details on how to run these.
The with_columns decorator does the following:
Thus the graph continually assigns to a single (immutable) dataframe, tracking the result, and still displays the DAG shape that was presented by the code. Column-level lineage is preserved as dependencies and easy to read from the code, while it executes as a normal set of spark operations.
Pyspark is not the only way to scale up your computation. Hamilton supports pandas-on-spark as well. You can use pandas-on-spark with the KoalaGraphAdapter -- see Pandas on Spark for reference. Some people prefer vanilla spark, some like pandas-on-spark. We support both. Hamilton also support executing map-based pandas UDFs in pyspark, in case you want simple parallelism. See pyspark_udfs for reference.
Hamilton has integrations with other scaling libraries as well -- it all depends on your use-case:
A few interesting directions:
collect()/cache() through the DAG