blob: 54f4f42eafb05658123408a5f6427b64ed0151fc [file] [log] [blame]
"""A module that has a bunch of features.
This enables us to test out all capabilities in a smoke test, which is
especially helpful for adding complex graphadapters.
This aims to include the following features:
1. @config.when() ✅
2. @parameterize (arguments for both value and source) ✅
3. @extract_columns ✅
4. @extract_fields ✅
5. @does ✅
6. @tag ✅
7. @check_output (with default args) ✅
Note this will also have a simple API to work with:
1. Required inputs = start_date/end_date, 'YYYYMMDD'
2. Required config = {'region' : 'US'/'CA'}
3. Four outputs, all pd.Series:
raw_acquisition_cost
pessimistic_net_acquisition_cost
neutral_net_acquisition_cost
optimistic_net_acquisition_cost
"""
from typing import Dict
import numpy as np
import pandas as pd
from hamilton.function_modifiers import (
check_output,
config,
does,
extract_columns,
extract_fields,
parameterize,
source,
tag,
value,
)
def start_date(date_range: Dict[str, str]) -> str:
return date_range["start_date"]
def end_date(date_range: Dict[str, str]) -> str:
return date_range["end_date"]
def _identity(**kwargs: int) -> int:
"""The API for this is suboptimal but this will serve for testing."""
return list(kwargs.values())[0]
@extract_fields(fields={"us_churn": int, "ca_churn": dict})
def weekly_churn() -> Dict[str, int]:
"""Weekly churn for both regions. A little contrived so we can use extract_fields"""
return {"us_churn": 3, "ca_churn": 2}
@config.when(region="US")
@does(_identity)
def weekly_churn_base_rate__US(us_churn: int) -> int:
pass
@config.when(region="CA")
@does(_identity)
def weekly_churn_base_rate__CA(ca_churn: int) -> int:
pass
@config.when_not(pandas_on_spark=True)
@extract_columns("spend", "signups", "weeks")
def spend_and_signups_source__pd(start_date: str, end_date: str) -> pd.DataFrame:
"""Yields `spend`, `signups`, and 'weeks' (the spine) in a single dataframe.
All of this is obviously dummy data.
"""
index = pd.DatetimeIndex(pd.date_range(start=start_date, end=end_date, freq="7d"))
df_out = pd.DataFrame(data=dict(weeks=index, counts=list(range(len(index)))))
df_out["spend"] = df_out.apply(lambda row: 100 + min((row.counts + 1) * 5, 250), axis=1)
df_out["signups"] = df_out.apply(lambda row: 20 * min(row.counts + 1, 100), axis=1)
return df_out[["spend", "signups", "weeks"]]
@config.when(pandas_on_spark=True)
@extract_columns("spend", "signups", "weeks")
def spend_and_signups_source__ps(start_date: str, end_date: str) -> pd.DataFrame:
"""Yields `spend`, `signups`, and 'weeks' (the spine) in a single dataframe.
All of this is obviously dummy data.
"""
# This is just me being lazy -- need to dig into pandas on spark to figure out the best way to instantiate it
# But for now I can just re-use the code above.
pd_df = spend_and_signups_source__pd(start_date=start_date, end_date=end_date)
import pyspark.pandas as ps
return ps.DataFrame(pd_df)
@check_output(
importance="fail",
range=(-1000, 10000), # Could be negative but never below 100
data_type=np.float64,
)
@parameterize(
net_signups_pessimistic={"optimism_adjustment": value(0.5)},
net_signups_neutral={"optimism_adjustment": value(1.0)},
net_signups_optimistic={"optimism_adjustment": value(2)},
)
def net_signups(
signups: pd.Series, weekly_churn_base_rate: int, optimism_adjustment: float
) -> pd.Series:
return signups - weekly_churn_base_rate * 1 / optimism_adjustment
@parameterize(
raw_acquisition_cost={"signups_source": source("signups")},
pessimistic_net_acquisition_cost={"signups_source": source("net_signups_pessimistic")},
neutral_net_acquisition_cost={"signups_source": source("net_signups_neutral")},
optimistic_net_acquisition_cost={"signups_source": source("net_signups_optimistic")},
)
@tag(final_output="true")
def acquisition_cost(signups_source: pd.Series, spend: pd.Series) -> pd.Series:
return spend / signups_source
def _prefix_dict_values(prefix: str, dict_to_prefix: Dict[str, str]):
"""Smoke screen tests to ensure we can work with complex types.
Contrived relationship to the theme...
:param prefix: Some string
:param dict_to_prefix: Some dict of string to string
:return: Dict b with a concated to keys
"""
return {k: prefix + v for k, v in dict_to_prefix.items()}
def date_prefix() -> str:
return "date_"
@does(_prefix_dict_values, prefix="date_prefix", dict_to_prefix="date_range")
def prefixed_date_info(date_prefix: str, date_range: Dict[str, str]) -> Dict[str, str]:
"""Smoke screen tests to ensure we can work with complex types."""
def series_with_start_date_end_date(
weeks: pd.Series, prefixed_date_info: Dict[str, str]
) -> pd.Series:
"""Smoke screen tests to ensure we can work with complex types."""
return weeks.apply(
lambda x: prefixed_date_info["start_date"] + "_" + prefixed_date_info["end_date"]
)