blob: 93f546313dc7e55fb4dcceb2fc929aceb03bab8d [file] [log] [blame]
import importlib
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from hamilton import base, driver, log_setup
from hamilton.plugins import h_spark
if __name__ == "__main__":
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
spark = SparkSession.builder.getOrCreate()
# spark.sparkContext.setLogLevel('info')
ps.set_option(
"compute.ops_on_diff_frames", True
) # we should play around here on how to correctly initialize data.
ps.set_option("compute.default_index_type", "distributed") # this one doesn't seem to work?
module_names = [
"data_loaders", # functions to help load data
"business_logic", # where our important logic lives
]
modules = [importlib.import_module(m) for m in module_names]
initial_config_and_or_data = { # load from actuals or wherever
"signups_location": "some_path",
"spend_location": "some_other_path",
"base_df_location": "some_other_other_path",
}
# df_container = ps.DataFrame(initial_config_and_or_data['spend']) # assign spine column
# Proving to myself that koalas works:
# print(df_container)
# import my_functions
# df_container['spend_per_signup'] = my_functions.spend_per_signup(df_container['spend'], initial_columns['signups'])
# df_container['avg_3wk_spend'] = my_functions.avg_3wk_spend(df_container['spend'])
# df_container['spend_zero_mean_unit_variance'] = my_functions.spend_zero_mean_unit_variance(
# my_functions.spend_zero_mean(df_container['spend'], my_functions.spend_mean(df_container['spend'])),
# my_functions.spend_std_dev(df_container['spend'])
# )
# df_container['signups'] = initial_columns['signups']
# print(df_container)
"""
spend spend_per_signup avg_3wk_spend spend_zero_mean_unit_variance signups
0 10 10.000 NaN -1.064405 1
1 10 1.000 NaN -1.064405 10
2 20 0.400 13.333333 -0.483821 50
3 40 0.400 23.333333 0.677349 100
4 40 0.200 33.333333 0.677349 200
5 50 0.125 43.333333 1.257934 400
"""
# okay hamilton is now doing its thing:
skga = h_spark.SparkKoalasGraphAdapter(
spark_session=spark,
result_builder=base.PandasDataFrameResult(),
# result_builder=h_spark.KoalasDataFrameResult(),
spine_column="spend",
)
dr = driver.Driver(
initial_config_and_or_data, *modules, adapter=skga
) # can pass in multiple modules
# we need to specify what we want in the final dataframe.
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
"spend_zero_mean_unit_variance",
]
# let's create the dataframe!
df = dr.execute(output_columns)
# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
# dr.visualize_execution(output_columns, "./pandas_on_spark", {"format": "png"})
# dr.display_all_functions('./my_full_dag.dot')
print(type(df))
print(df.to_string())
spark.stop()
"""
spend signups avg_3wk_spend spend_per_signup spend_zero_mean_unit_variance
0 10 1 NaN 10.000 -1.064405
1 10 10 NaN 1.000 -1.064405
2 20 50 13.333333 0.400 -0.483821
3 40 100 23.333333 0.400 0.677349
4 40 200 33.333333 0.200 0.677349
5 50 400 43.333333 0.125 1.257934
"""