blob: 6eae4d76b734226d32cfd827791558f0837b0f01 [file] [log] [blame]
from typing import Any, Dict
import pandas as pd
import reusable_subdags
from hamilton.base import ResultMixin, SimplePythonGraphAdapter
from hamilton.driver import Driver
"""A pretty simple pipeline to demonstrate function reuse.
Note that this *also* demonstrates building a custom results builder!
Why not use the standard one? Well, because time-series joining is weird.
In this case, we're running the subdag for different granularities (daily, weekly, and monthly),
and we want the functions that provide outputs for these granularities to yield one
row per datapoint. This means that the weekly series will have 7x less much data as the daily
series. Monthly series will have less than the weekly series, etc...
This results builder handles it by upsampling them all to a specified granularity.
Specifically, it:
1. Up/down-samples all time-series to the granularity specified in the constructor
2. Joins them as normal (a full outer join)
Note that pandas also has capabilities for as-of joins, but those tend to be messy and tricky to work
with. Furthermore, the as-of joins don't usually work with multiple. Upsampling/downsampling does the
trick quite well and adds more control to the user.
Note that it might be nice to pass in an argument to say which data is the "spine" column,
allowing us to have a basis of data to join with. For now, however, this should be a useful piece
of code to help with time-series joining!
Furthermore, you could actually include upsampling *in* the DAG -- this has the added feature of
encoding an index/spine column, and could be run as the final step for each subdag. Doing so is left
as an exercise to the reader.
As an example, consider the following outputs:
1. monthly_unique_users_US
timestamp
2022-09-30 6
Freq: M, Name: user_id, dtype: int64
2. weekly_unique_users_US
2022-09-04 3
2022-09-11 3
2022-09-18 2
2022-09-25 4
2022-10-02 1
Freq: W-SUN, Name: user_id, dtype: int64
3. daily_unique_users_US
timestamp
2022-09-01 2
2022-09-02 1
2022-09-03 1
2022-09-04 0
...
2022-09-22 2
2022-09-23 0
2022-09-24 2
2022-09-25 1
2022-09-26 1
Freq: D, Name: user_id, dtype: int64
Joining these with upsample granularity of "D" would produce:
daily_unique_users_US weekly_unique_users_US monthly_unique_users_US
timestamp
2022-09-01 2.0 3.0 6.0
2022-09-02 1.0 3.0 6.0
2022-09-03 1.0 3.0 6.0
2022-09-04 0.0 3.0 6.0
2022-09-05 0.0 3.0 6.0
2022-09-06 0.0 3.0 6.0
...
2022-09-28 NaN 1.0 6.0
2022-09-29 NaN 1.0 6.0
2022-09-30 NaN 1.0 6.0
2022-10-01 NaN 1.0 NaN
2022-10-02 NaN 1.0 NaN
"""
class TimeSeriesJoinResultsBuilder(ResultMixin):
def __init__(self, upsample_frequency: str):
"""Initializes a results builder that does a time-series join
:param upsample_frequency: Argument to pandas sample() function.
"""
self.sampling_methodology = upsample_frequency
def resample(self, time_series: pd.Series):
return time_series.resample(
self.sampling_methodology
).bfill() # TODO -- think through the right fill -- ffill()/bfill()/whatnot
@staticmethod
def is_time_series(series: Any):
if not isinstance(series, pd.Series):
return False
if not series.index.inferred_type == "datetime64":
return False
return True
def build_result(self, **outputs: Dict[str, Any]) -> Any:
non_ts_output = [
key
for key, value in outputs.items()
if not TimeSeriesJoinResultsBuilder.is_time_series(value)
]
if len(non_ts_output) > 0:
raise ValueError(
f"All outputs from DAG must be time-series -- the following are not: {non_ts_output}"
)
resampled_results = {key: self.resample(value) for key, value in outputs.items()}
return pd.DataFrame(resampled_results).bfill()
def main():
dr = Driver(
{},
reusable_subdags,
adapter=SimplePythonGraphAdapter(
result_builder=TimeSeriesJoinResultsBuilder(upsample_frequency="D")
),
)
result = dr.execute(
[
"daily_unique_users_US",
"daily_unique_users_CA",
"weekly_unique_users_US",
"weekly_unique_users_CA",
"monthly_unique_users_US",
"monthly_unique_users_CA",
]
)
# dr.visualize_execution([
# "daily_unique_users_US",
# "daily_unique_users_CA",
# "weekly_unique_users_US",
# "weekly_unique_users_CA",
# "monthly_unique_users_US",
# "monthly_unique_users_CA",
# ], "./reusable_subdags", {"format": "png"})
print(result)
if __name__ == "__main__":
main()