blob: d9c019452b8de4d2a6ef3bd39ef5674f1054c7d0 [file] [log] [blame]
"""
This modules shows how to use @check_output with Pandera (https://pandera.readthedocs.io/).
It's purpose is to define some feature transformation logic that we'd like to apply.
For runtime data quality checks to happen, we decorate the functions with @check_output. This executes a validator
against the output of the function. The default is to log a warning if validation fails, unless `importance='fail'`
is specified. In which case, it will halt execution.
Note:
(1) The functions written here scale to running on dask and ray. For spark see `feature_logic_spark.py`.
(2) Pandera only supports dataframes and series checks. Scalars outputs have to use the standard Hamilton validators.
(3) If you aren't familiar with Pandera we invite you to look at
https://pandera.readthedocs.io/en/stable/reference/generated/pandera.checks.Check.html# specifically the "Methods"
section. That will show you what other checks Pandera comes with.
(4) If you require dataframe validation - see the examples here.
"""
import numpy as np
import pandas as pd
import pandera as pa
from hamilton.function_modifiers import check_output, config
# pandera doesn't operate over single values
@check_output(range=(20.0, 60.0), data_type=np.float64)
def age_mean(age: pd.Series) -> np.float64:
"""Average of age"""
return age.mean()
age_zero_mean_schema = pa.SeriesSchema(
float,
checks=[
pa.Check.in_range(-120.0, 120.0),
],
nullable=False,
)
@check_output(schema=age_zero_mean_schema)
def age_zero_mean(age: pd.Series, age_mean: np.float64) -> pd.Series:
"""Zero mean of age"""
return age - age_mean
age_std_dev_schema = pa.SeriesSchema(
float,
checks=[
pa.Check.in_range(0.0, 40.0),
],
)
# pandera doesn't operate over single values
@check_output(range=(0.0, 40.0), data_type=np.float64)
def age_std_dev(age: pd.Series) -> np.float64:
"""Standard deviation of age."""
return age.std()
age_zero_mean_unit_variance_schema = pa.SeriesSchema(
float,
checks=[
pa.Check.in_range(-4.0, 4.0),
],
nullable=False,
)
@check_output(schema=age_zero_mean_unit_variance_schema)
def age_zero_mean_unit_variance(age_zero_mean: pd.Series, age_std_dev: np.float64) -> pd.Series:
"""Zero mean unit variance value of age"""
return age_zero_mean / age_std_dev
seasons_encoded_schema = pa.DataFrameSchema(
{
"seasons_1": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"seasons_2": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"seasons_3": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"seasons_4": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
},
strict=True,
)
@check_output(schema=seasons_encoded_schema)
@config.when_not_in(execution=["dask"])
def seasons_encoded__base(seasons: pd.Series) -> pd.DataFrame:
"""One hot encodes seasons into 4 dimensions:
1 - first season
2 - second season
3 - third season
4 - fourth season
"""
return pd.get_dummies(seasons, prefix="seasons")
@check_output(schema=seasons_encoded_schema)
@config.when_in(execution=["dask"])
def seasons_encoded__dask(seasons: pd.Series) -> pd.DataFrame:
"""One hot encodes seasons into 4 dimensions:
1 - first season
2 - second season
3 - third season
4 - fourth season
"""
import dask.dataframe as dd
categorized = seasons.astype(str).to_frame().categorize()
df = dd.get_dummies(categorized, prefix="seasons")
return df
seasons_schema = pa.SeriesSchema(
np.uint8,
checks=[
pa.Check.isin([0, 1]),
],
nullable=False,
)
@check_output(schema=seasons_schema)
def seasons_1(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_1"""
return seasons_encoded["seasons_1"]
@check_output(schema=seasons_schema)
def seasons_2(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_2"""
return seasons_encoded["seasons_2"]
@check_output(schema=seasons_schema)
def seasons_3(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_3"""
return seasons_encoded["seasons_3"]
@check_output(schema=seasons_schema)
def seasons_4(seasons_encoded: pd.DataFrame) -> pd.Series:
"""Returns column seasons_4"""
return seasons_encoded["seasons_4"]
day_of_week_encoded_schema = pa.DataFrameSchema(
{
"day_of_the_week_2": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"day_of_the_week_3": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"day_of_the_week_4": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"day_of_the_week_5": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
"day_of_the_week_6": pa.Column(np.uint8, checks=[pa.Check.isin([0, 1])], nullable=False),
},
strict=True,
)
@check_output(schema=day_of_week_encoded_schema)
@config.when_not_in(execution=["dask"])
def day_of_week_encoded__base(day_of_the_week: pd.Series) -> pd.DataFrame:
"""One hot encodes day of week into five dimensions -- Saturday & Sunday weren't present.
1 - Sunday, 2 - Monday, 3 - Tuesday, 4 - Wednesday, 5 - Thursday, 6 - Friday, 7 - Saturday.
"""
return pd.get_dummies(day_of_the_week, prefix="day_of_the_week")
@check_output(schema=day_of_week_encoded_schema)
@config.when_in(execution=["dask"])
def day_of_week_encoded__dask(day_of_the_week: pd.Series) -> pd.DataFrame:
"""One hot encodes day of week into five dimensions -- Saturday & Sunday weren't present.
1 - Sunday, 2 - Monday, 3 - Tuesday, 4 - Wednesday, 5 - Thursday, 6 - Friday, 7 - Saturday.
"""
import dask.dataframe as dd
categorized = day_of_the_week.astype(str).to_frame().categorize()
df = dd.get_dummies(categorized, prefix="day_of_the_week")
return df
day_of_week_schema = pa.SeriesSchema(
np.uint8,
checks=[
pa.Check.isin([0, 1]),
],
nullable=False,
)
@check_output(schema=day_of_week_schema)
def day_of_the_week_2(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_2 column."""
return day_of_week_encoded["day_of_the_week_2"]
@check_output(schema=day_of_week_schema)
def day_of_the_week_3(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_3 column."""
return day_of_week_encoded["day_of_the_week_3"]
@check_output(schema=day_of_week_schema)
def day_of_the_week_4(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_4 column."""
return day_of_week_encoded["day_of_the_week_4"]
@check_output(schema=day_of_week_schema)
def day_of_the_week_5(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_5 column."""
return day_of_week_encoded["day_of_the_week_5"]
@check_output(schema=day_of_week_schema)
def day_of_the_week_6(day_of_week_encoded: pd.DataFrame) -> pd.Series:
"""Pulls out the day_of_the_week_6 column."""
return day_of_week_encoded["day_of_the_week_6"]
has_children_schema = pa.SeriesSchema(
int,
checks=[
pa.Check.isin([0, 1]),
],
nullable=False,
)
@check_output(schema=has_children_schema)
def has_children(son: pd.Series) -> pd.Series:
"""Single variable that says whether someone has any children or not."""
return (son > 0).astype(int)
has_pet_schema = pa.SeriesSchema(
int,
checks=[
pa.Check.isin([0, 1]),
],
nullable=False,
)
@check_output(schema=has_pet_schema)
def has_pet(pet: pd.Series) -> pd.Series:
"""Single variable that says whether someone has any pets or not."""
return (pet > 0).astype(int)
is_summer_schema = pa.SeriesSchema(
int,
checks=[
pa.Check.isin([0, 1]),
],
nullable=False,
)
@check_output(schema=is_summer_schema)
def is_summer(month_of_absence: pd.Series) -> pd.Series:
"""Is it summer in Brazil? i.e. months of December, January, February."""
return month_of_absence.isin([1, 2, 12]).astype(int)