blob: a8e62ad179fdd3aaad539da610a4b4d31ed8ded2 [file] [log] [blame]
import importlib
import logging
import sys
import pandas as pd
from hamilton import base, driver
from hamilton.plugins import h_dask
logger = logging.getLogger(__name__)
"""
This example shows you how to run a Hamilton DAG and farm each function out to dask using
dask.delayed.
"""
if __name__ == "__main__":
from dask.distributed import Client, LocalCluster
# Setup a local cluster.
# By default this sets up 1 worker per core
cluster = LocalCluster()
client = Client(cluster)
logger.info(client.cluster)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# easily replace how data is loaded by replacing the data_loaders module
module_names = ["business_logic"] # or do `import business_logic`
modules = [importlib.import_module(m) for m in module_names]
dga = h_dask.DaskGraphAdapter(
client,
base.PandasDataFrameResult(),
visualize_kwargs={"filename": "run_with_delayed", "format": "png"},
use_delayed=True,
compute_at_end=True,
)
# will output Dask's execution graph run_with_delayed.png -- requires sf-hamilton[visualization] to be installed.
initial_config_and_data = {
"signups": pd.Series([1, 10, 50, 100, 200, 400]),
"spend": pd.Series([10, 10, 20, 40, 40, 50]),
}
dr = driver.Driver(initial_config_and_data, *modules, adapter=dga)
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
"spend_mean",
"spend_zero_mean_unit_variance",
]
pandas_df = dr.execute(output_columns)
logger.info(pandas_df.to_string())
# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
# dr.visualize_execution(output_columns, './hello_world_dask', {"format": "png"})
# dr.display_all_functions('./my_full_dag.dot')
client.shutdown()