Adds reuse_functions example
diff --git a/examples/reusing_functions/main.py b/examples/reusing_functions/main.py new file mode 100644 index 0000000..e1b55b2 --- /dev/null +++ b/examples/reusing_functions/main.py
@@ -0,0 +1,66 @@ +from typing import Any, Dict + +import pandas as pd +import reusable_subdags + +from hamilton.base import ResultMixin, SimplePythonGraphAdapter +from hamilton.driver import Driver + +"""A pretty simple pipeline to demonstrate function reuse. +Note that this *also* demonstrates building a custom results builder! +This one does specific time-series methodology (upsampling...). +""" + + +class TimeSeriesJoinResultsBuilder(ResultMixin): + def __init__(self, upsample_frequency: str): + """Initializes a results builder that does a time-series join + :param upsample_frequency: Argument to pandas sample() function. + """ + self.sampling_methodology = upsample_frequency + + def resample(self, time_series: pd.Series): + return time_series.resample( + self.sampling_methodology + ).bfill() # TODO -- think through the right fill -- ffill()/bfill()/whatnot + + @staticmethod + def is_time_series(series: Any): + if not isinstance(series, pd.Series): + return False + if not series.index.inferred_type == "datetime64": + return False + return True + + def build_result(self, **outputs: Dict[str, Any]) -> Any: + non_ts_output = [ + key + for key, value in outputs.items() + if not TimeSeriesJoinResultsBuilder.is_time_series(value) + ] + if len(non_ts_output) > 0: + raise ValueError(f"All outputs from DAG") + resampled_results = {key: self.resample(value) for key, value in outputs.items()} + return pd.DataFrame(resampled_results).bfill() + + +dr = Driver( + {}, + reusable_subdags, + adapter=SimplePythonGraphAdapter( + result_builder=TimeSeriesJoinResultsBuilder(upsample_frequency="D") + ), +) + +result = dr.execute( + [ + "unique_users_daily_US", + "unique_users_daily_CA", + "unique_users_weekly_US", + "unique_users_weekly_CA", + "unique_users_monthly_US", + "unique_users_monthly_CA", + ] +) + +print(result)
diff --git a/examples/reusing_functions/reusable_subdags.py b/examples/reusing_functions/reusable_subdags.py index 7764977..b74c627 100644 --- a/examples/reusing_functions/reusable_subdags.py +++ b/examples/reusing_functions/reusable_subdags.py
@@ -1,70 +1,124 @@ import pandas as pd +import unique_users -from hamilton.function_modifiers import extract_columns, reuse, reuse_functions, value +from hamilton.function_modifiers import reuse, reuse_functions, value -@extract_columns("timestamp", "user_id", "region") # one of "US", "CA" (canada) -def website_interactions(random_seed: int) -> pd.DataFrame: +def website_interactions() -> pd.DataFrame: """Gives event-driven data with a series :return: Some mock event data. """ - # TODO -- implement some random data data = [ ("20220901-14:00:00", 1, "US"), ("20220901-18:30:00", 2, "US"), ("20220901-19:00:00", 1, "US"), - ("20220902-08:00:00", 1, "US"), + ("20220902-08:00:00", 3, "US"), ("20220903-16:00:00", 1, "US"), - ("20220907-13:00:00", 1, "US"), + ("20220907-13:00:00", 4, "US"), ("20220910-14:00:00", 1, "US"), - ("20220911-12:00:00", 1, "US"), + ("20220911-12:00:00", 3, "US"), ("20220914-11:00:00", 1, "US"), - ("20220915-07:30:00", 1, "US"), + ("20220915-07:30:00", 2, "US"), ("20220916-06:00:00", 1, "US"), - ("20220917-16:00:00", 1, "US"), - ("20220920-17:00:00", 1, "US"), - ("20220922-09:30:00", 1, "US"), + ("20220917-16:00:00", 2, "US"), + ("20220920-17:00:00", 5, "US"), + ("20220922-09:30:00", 2, "US"), ("20220922-10:00:00", 1, "US"), - ("20220924-07:00:00", 1, "US"), + ("20220924-07:00:00", 6, "US"), ("20220924-08:00:00", 1, "US"), ("20220925-21:00:00", 1, "US"), - ("20220926-15:30:00", 1, "US"), + ("20220926-15:30:00", 2, "US"), + ("20220901-14:00:00", 7, "CA"), + ("20220901-18:30:00", 8, "CA"), + ("20220901-19:00:00", 9, "CA"), + ("20220902-08:00:00", 7, "CA"), + ("20220903-16:00:00", 10, "CA"), + ("20220907-13:00:00", 9, "CA"), + ("20220910-14:00:00", 8, "CA"), + ("20220911-12:00:00", 11, "CA"), + ("20220914-11:00:00", 12, "CA"), + ("20220915-07:30:00", 7, "CA"), + ("20220916-06:00:00", 9, "CA"), + ("20220917-16:00:00", 10, "CA"), + ("20220920-17:00:00", 7, "CA"), + ("20220922-09:30:00", 11, "CA"), + ("20220922-10:00:00", 8, "CA"), + ("20220924-07:00:00", 9, "CA"), + ("20220924-08:00:00", 10, "CA"), + ("20220925-21:00:00", 13, "CA"), + ("20220926-15:30:00", 14, "CA"), ] - return pd.DataFrame(data) - - -def _validate_grain(grain: str): - assert grain in ["day", "week"] - - -def interactions_filtered(filtered_interactions: pd.DataFrame, region: str) -> pd.DataFrame: - pass - - -def unique_users(filtered_interactions: pd.DataFrame, grain: str) -> pd.Series: - """Gives the number of shares traded by the frequency""" - assert grain in ["day", "week", "month"] - return ... + df = ( + pd.DataFrame(data, columns=["timestamp", "user_id", "region"]) + .set_index("timestamp") + .sort_index() + ) + df.index = pd.DatetimeIndex(df.index) + return df @reuse_functions( - with_inputs={"grain": value("day")}, - namespace="daily_users", - outputs={"unique_users": "unique_users_daily"}, + with_inputs={"grain": value("day"), "region": value("US")}, + namespace="daily_users_US", + outputs={"unique_users": "unique_users_daily_US"}, with_config={"region": "US"}, - load_from=[unique_users, interactions_filtered], + load_from=[unique_users], ) -def daily_user_data_US() -> reuse.MultiOutput({"unique_users_daily_US": pd.Series}): +def daily_user_data_US() -> reuse.MultiOutput(unique_users_daily_US=pd.Series): pass @reuse_functions( - with_inputs={"grain": value("day")}, - namespace="daily_users", - outputs={"unique_users": "unique_users_daily"}, - with_config={"region": "CA"}, - load_from=[unique_users, interactions_filtered], + with_inputs={"grain": value("week"), "region": value("US")}, + namespace="weekly_users_US", + outputs={"unique_users": "unique_users_weekly_US"}, + with_config={"region": "US"}, + load_from=[unique_users], ) -def daily_user_data_CA() -> reuse.MultiOutput({"unique_users_daily_CA": pd.Series}): +def weekly_user_data_US() -> reuse.MultiOutput(unique_users_weekly_US=pd.Series): + pass + + +@reuse_functions( + with_inputs={"grain": value("month"), "region": value("US")}, + namespace="monthly_users_US", + outputs={"unique_users": "unique_users_monthly_US"}, + with_config={"region": "US"}, + load_from=[unique_users], +) +def monthly_user_data_US() -> reuse.MultiOutput(unique_users_monthly_US=pd.Series): + pass + + +@reuse_functions( + with_inputs={"grain": value("day"), "region": value("CA")}, + namespace="daily_user_data_CA", + outputs={"unique_users": "unique_users_daily_CA"}, + with_config={"region": "CA"}, + load_from=[unique_users], +) +def daily_user_data_CA() -> reuse.MultiOutput(unique_users_daily_CA=pd.Series): + pass + + +@reuse_functions( + with_inputs={"grain": value("month"), "region": value("CA")}, + namespace="weekly_user_data_CA", + outputs={"unique_users": "unique_users_weekly_CA"}, + with_config={"region": "CA"}, + load_from=[unique_users], +) +def weekly_user_data_CA() -> reuse.MultiOutput(unique_users_weekly_CA=pd.Series): + pass + + +@reuse_functions( + with_inputs={"grain": value("day"), "region": value("CA")}, + namespace="monthly_user_data_CA", + outputs={"unique_users": "unique_users_monthly_CA"}, + with_config={"region": "CA"}, + load_from=[unique_users], +) +def monthly_user_data_CA() -> reuse.MultiOutput(unique_users_monthly_CA=pd.Series): pass
diff --git a/examples/reusing_functions/unique_users.py b/examples/reusing_functions/unique_users.py new file mode 100644 index 0000000..fe1a05b --- /dev/null +++ b/examples/reusing_functions/unique_users.py
@@ -0,0 +1,17 @@ +import pandas as pd + +_grain_mapping = {"day": "D", "week": "W", "month": "M"} + + +def _validate_grain(grain: str): + assert grain in ["day", "week", "month"] + + +def filtered_interactions(website_interactions: pd.DataFrame, region: str) -> pd.DataFrame: + return website_interactions[website_interactions.region == region] + + +def unique_users(filtered_interactions: pd.DataFrame, grain: str) -> pd.Series: + """Gives the number of shares traded by the frequency""" + _validate_grain(grain) + return filtered_interactions.resample(_grain_mapping[grain])["user_id"].nunique()