Here we have a version of the Hamilton hello world example showing how you can take some Hamilton functions and then easily run them as pyspark UDFs.
pip install "sf-hamilton[pyspark, visualization]" for the right dependencies to run this example.
pandas_udfs.py contains UDF definitions that Hamilton will know how to run as pandas_udfs on pyspark.vanilla_udfs.py contains UDF definitions that Hamilton will know how to run as row based UDFs on pyspark.run.py contains spark driver and hamilton driver code to run everything.python run.py pandas # to run using pandas_udfs python run.py vanilla # to run using vanilla_udfs
You should get the following as output:
+-----+-------+------------------+----------------+-----------------------------+---+---+--------------+ |spend|signups| avg_3wk_spend|spend_per_signup|spend_zero_mean_unit_variance|foo|bar|augmented_mean| +-----+-------+------------------+----------------+-----------------------------+---+---+--------------+ | 10| 1| 10.0| 10.0| -1.0644054|1.0|2.0| 3.0| | 10| 10| 10.0| 1.0| -1.0644054|1.0|2.0| 3.0| | 20| 50|13.333333333333334| 0.4| -0.48382062|1.0|2.0| 3.0| | 40| 100|23.333333333333332| 0.4| 0.6773489|1.0|2.0| 3.0| | 40| 200|33.333333333333336| 0.2| 0.6773489|1.0|2.0| 3.0| | 50| 400|43.333333333333336| 0.125| 1.2579336|1.0|2.0| 3.0| +-----+-------+------------------+----------------+-----------------------------+---+---+--------------+
with Hamilton showing you the following execution visualization: Note: foo, bar, and augmented_mean do not depend on any inputs in the dataframe.
You need to use the PySparkUDFGraphAdapter for this to work. This is because Hamilton needs to know how to augment operations for pyspark. With this adapter, Hamilton will:
dr.execute(..., inputs=INPUTS), the INPUTS dictionary contains a mapping of column name to pyspark dataframe. Hamilton will then know to use that single dataframe to pull from and append columns to, to coordinate UDF execution.map based functions. If they‘re not, then you’ll need to manually manage that outside of Hamilton.map based functions.PySparkUDFGraphAdapter currently only supports map based functions.pandas_udfs signature currently supported is for functions with pd.Series as inputs, and pd.Series as output.@check_output annotations are not currently supported for pyspark UDFs at this time. But we're working on it - ping us in slack (or via issues) if you need this feature!We could under the hood translate basic vanilla python UDF functions to use the pandas_udf route. This could be a variable passed to the PySparkUDFGraphAdapter to enable it/or require some annotation on the function, or both. Let us know if this would be useful to you!
(1) Let us know what you need. (2) Implementation is a matter of (a) getting the API right, and (b) making sure it fits with the Hamilton way of thinking.
We just need to determine what a good API for this would be. We're open to suggestions!
We could support other dataframe operations, like joins, etc. We're open to suggestions! The main issue is creating a good API for this.
Yes, with Hamilton you could write functions that define a named flow that operates entirely over pyspark dataframes. However, you lose a lot of the flexibility of Hamilton doing things that way. We're open to suggestions, or API changes to make this approach easier.