blob: 38defa5f362334c0ca039027c0a90692b0987505 [file] [log] [blame]
from typing import Any, Dict
import pandas as pd
import reusable_subdags
from hamilton.base import ResultMixin, SimplePythonGraphAdapter
from hamilton.driver import Driver
"""A pretty simple pipeline to demonstrate function reuse.
Note that this *also* demonstrates building a custom results builder!
This one does specific time-series methodology (upsampling...).
"""
class TimeSeriesJoinResultsBuilder(ResultMixin):
def __init__(self, upsample_frequency: str):
"""Initializes a results builder that does a time-series join
:param upsample_frequency: Argument to pandas sample() function.
"""
self.sampling_methodology = upsample_frequency
def resample(self, time_series: pd.Series):
return time_series.resample(
self.sampling_methodology
).bfill() # TODO -- think through the right fill -- ffill()/bfill()/whatnot
@staticmethod
def is_time_series(series: Any):
if not isinstance(series, pd.Series):
return False
if not series.index.inferred_type == "datetime64":
return False
return True
def build_result(self, **outputs: Dict[str, Any]) -> Any:
non_ts_output = [
key
for key, value in outputs.items()
if not TimeSeriesJoinResultsBuilder.is_time_series(value)
]
if len(non_ts_output) > 0:
raise ValueError(
f"All outputs from DAG must be time-series -- the following are not: {non_ts_output}"
)
resampled_results = {key: self.resample(value) for key, value in outputs.items()}
return pd.DataFrame(resampled_results).bfill()
dr = Driver(
{},
reusable_subdags,
adapter=SimplePythonGraphAdapter(
result_builder=TimeSeriesJoinResultsBuilder(upsample_frequency="D")
),
)
result = dr.execute(
[
"unique_users_daily_US",
"unique_users_daily_CA",
"unique_users_weekly_US",
"unique_users_weekly_CA",
"unique_users_monthly_US",
"unique_users_monthly_CA",
]
)
print(result)