blob: 03462cb4a976d5cfc1e13d4e1559483c5c864d66 [file] [log] [blame]
"""
This module defines some feature transformation logic that we'd like to apply.
It then in addition, decorates the functions with @check_output, so that at runtime, we can
execute a validator against the output of the function. The default is to log a warning, unless `importance='fail'`
is specified. In which case, it will halt execution.
Note:
(1) to handle running on dask, ray, and spark, you will see some `@config.when*` decorators, which will
determine whether this function should exist or not depending on some configuration passed in.
(2) instead of @config.when* we could instead move these functions into specific independent modules, and then in
the driver choose which one to use for the DAG. For the purposes of this example, we decided one file is simpler.
(3) As of the 1.9.0 release, Hamilton, by default, does not handle data frame validation. Please use the Pandera
integration - see `examples/data_quality/pandera` for an example.
"""
import numpy as np
import pandas as pd
from hamilton.function_modifiers import check_output, config
@check_output(range=(20.0, 60.0), data_type=np.float64)
def age_mean(age: pd.Series) -> np.float64:
"""Average of age"""
return age.mean()
@check_output(range=(-120.0, 120.0), data_type=np.float64, allow_nans=False)
def age_zero_mean(age: pd.Series, age_mean: np.float64) -> pd.Series:
"""Zero mean of age"""
return age - age_mean
@check_output(range=(0.0, 40.0), data_type=np.float64)
def age_std_dev(age: pd.Series) -> np.float64:
"""Standard deviation of age."""
return age.std()
@check_output(range=(-4.0, 4.0), data_type=np.float64, allow_nans=False)
def age_zero_mean_unit_variance(age_zero_mean: pd.Series, age_std_dev: np.float64) -> pd.Series:
"""Zero mean unit variance value of age"""
return age_zero_mean / age_std_dev
@config.when_not_in(execution=["spark", "dask"])
def seasons_encoded__base(seasons: pd.Series) -> pd.DataFrame:
"""One hot encodes seasons into 4 dimensions:
1 - first season
2 - second season
3 - third season
4 - fourth season
"""
return pd.get_dummies(seasons, prefix="seasons")
@config.when_in(execution=["spark"])
def seasons_encoded__spark(seasons: pd.Series) -> pd.DataFrame:
"""One hot encodes seasons into 4 dimensions:
1 - first season
2 - second season
3 - third season
4 - fourth season
"""
import pyspark.pandas as ps
return ps.get_dummies(seasons, prefix="seasons")
@config.when_in(execution=["dask"])
def seasons_encoded__dask(seasons: pd.Series) -> pd.DataFrame:
"""One hot encodes seasons into 4 dimensions:
1 - first season
2 - second season
3 - third season
4 - fourth season
"""
import dask.dataframe as dd
categorized = seasons.astype(str).to_frame().categorize()
df = dd.get_dummies(categorized, prefix="seasons")
return df
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def seasons_1(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_1"""
return seasons_encoded["seasons_1"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def seasons_2(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_2"""
return seasons_encoded["seasons_2"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def seasons_3(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_3"""
return seasons_encoded["seasons_3"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def seasons_4(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_4"""
return seasons_encoded["seasons_4"]
@config.when_not_in(execution=["spark", "dask"])
def day_of_week_encoded__base(day_of_the_week: pd.Series) -> pd.DataFrame:
"""One hot encodes day of week into five dimensions -- Saturday & Sunday weren't present.
1 - Sunday, 2 - Monday, 3 - Tuesday, 4 - Wednesday, 5 - Thursday, 6 - Friday, 7 - Saturday.
"""
return pd.get_dummies(day_of_the_week, prefix="day_of_the_week")
@config.when_in(execution=["spark"])
def day_of_week_encoded__spark(day_of_the_week: pd.Series) -> pd.DataFrame:
"""One hot encodes day of week into five dimensions -- Saturday & Sunday weren't present.
1 - Sunday, 2 - Monday, 3 - Tuesday, 4 - Wednesday, 5 - Thursday, 6 - Friday, 7 - Saturday.
"""
import pyspark.pandas as ps
return ps.get_dummies(day_of_the_week, prefix="day_of_the_week")
@config.when_in(execution=["dask"])
def day_of_week_encoded__dask(day_of_the_week: pd.Series) -> pd.DataFrame:
"""One hot encodes day of week into five dimensions -- Saturday & Sunday weren't present.
1 - Sunday, 2 - Monday, 3 - Tuesday, 4 - Wednesday, 5 - Thursday, 6 - Friday, 7 - Saturday.
"""
import dask.dataframe as dd
categorized = day_of_the_week.astype(str).to_frame().categorize()
df = dd.get_dummies(categorized, prefix="day_of_the_week")
return df
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def day_of_the_week_2(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_2 column."""
return day_of_week_encoded["day_of_the_week_2"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def day_of_the_week_3(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_3 column."""
return day_of_week_encoded["day_of_the_week_3"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def day_of_the_week_4(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_4 column."""
return day_of_week_encoded["day_of_the_week_4"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def day_of_the_week_5(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_5 column."""
return day_of_week_encoded["day_of_the_week_5"]
@check_output(data_type=np.uint8, values_in=[0, 1], allow_nans=False)
def day_of_the_week_6(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_6 column."""
return day_of_week_encoded["day_of_the_week_6"]
@check_output(data_type=np.int64, values_in=[0, 1], allow_nans=False)
def has_children(son: pd.Series) -> pd.Series:
"""Single variable that says whether someone has any children or not."""
return (son > 0).astype(int)
@check_output(data_type=np.int64, values_in=[0, 1], allow_nans=False)
def has_pet(pet: pd.Series) -> pd.Series:
"""Single variable that says whether someone has any pets or not."""
return (pet > 0).astype(int)
@check_output(data_type=np.int64, values_in=[0, 1], allow_nans=False)
def is_summer(month_of_absence: pd.Series) -> pd.Series:
"""Is it summer in Brazil? i.e. months of December, January, February."""
return month_of_absence.isin([1, 2, 12]).astype(int)