blob: b1500d0edfc46c354b720e53c64372645e9b78c7 [file] [log] [blame]
"""
This module defines data loading logic that we'd like to apply.
Note:
(1) to handle running on dask, ray, and spark, you will see some `@config.when*` decorators, which will
determine whether this function should exist or not depending on some configuration passed in.
(2) instead of @config.when* we could instead move these functions into specific independent modules, and then in
the driver choose which one to use for the DAG. For the purposes of this example, we decided one file is simpler.
"""
from typing import List
import numpy as np
import pandas as pd
from hamilton.function_modifiers import config, extract_columns
data_columns = [
"id",
"reason_for_absence",
"month_of_absence",
"day_of_the_week",
"seasons",
"transportation_expense",
"distance_from_residence_to_work",
"service_time",
"age",
"work_load_average_per_day",
"hit_target",
"disciplinary_failure",
"education",
"son",
"social_drinker",
"social_smoker",
"pet",
"weight",
"height",
"body_mass_index",
"absenteeism_time_in_hours",
]
def _sanitize_columns(df_columns: List[str]) -> List[str]:
"""Renames columns to be valid hamilton names -- and lower cases them.
:param df_columns: the current column names.
:return: sanitize column names that work with Hamilton
"""
return [c.strip().replace("/", "_per_").replace(" ", "_").lower() for c in df_columns]
@config.when_not_in(execution=["dask", "spark"])
@extract_columns(*data_columns)
def raw_data__base(location: str) -> pd.DataFrame:
"""Extracts the raw data, renames the columns to be valid python variable names, and assigns an index.
:param location: the location to load from
:return:
"""
df = pd.read_csv(location, sep=";")
# rename columns to be valid hamilton names -- and lower case it
df.columns = _sanitize_columns(df.columns)
# create proper index -- ID-Month-Day;
index = (
df["id"].astype(np.str)
+ "-"
+ df["month_of_absence"].astype(np.str)
+ "-"
+ df["day_of_the_week"].astype(np.str)
)
df.index = index
return df
@config.when(execution="dask")
@extract_columns(*data_columns)
def raw_data__dask(location: str, block_size: str = "10KB") -> pd.DataFrame:
"""Extracts the raw data, renames the columns to be valid python variable names, and assigns an index.
:param location: the location to load from
:param block_size: size at which to partition the data. 10KB here is only to ensure our small data set is partitioned.
:return: dask dataframe
"""
from dask import dataframe
df = dataframe.read_csv(location, sep=";", blocksize=block_size)
# rename columns to be valid hamilton names -- and lower case it
df.columns = _sanitize_columns(df.columns)
# create proper index -- ID-Month-Day;
df.index = (
df["id"].astype(str)
+ "-"
+ df["month_of_absence"].astype(str)
+ "-"
+ df["day_of_the_week"].astype(str)
)
return df
@config.when(execution="spark")
@extract_columns("index_col", *data_columns)
def raw_data__spark(location: str) -> pd.DataFrame:
"""Extracts the raw data, renames the columns to be valid python variable names, and assigns an index.
:param location: the location to load from
:param number_partitions: number of partitions to partition the data for dask.
:return: dask dataframe
"""
df = pd.read_csv(location, sep=";")
# rename columns to be valid hamilton names -- and lower case it
df.columns = _sanitize_columns(df.columns)
# create proper index -- ID-Month-Day;
index = (
df["id"].astype(np.str)
+ "-"
+ df["month_of_absence"].astype(np.str)
+ "-"
+ df["day_of_the_week"].astype(np.str)
)
df.index = index
df["index_col"] = df.index
# this is a quick way to do this
import pyspark.pandas as ps
ps_df = ps.from_pandas(df)
return ps_df
if __name__ == "__main__":
# this is here as a quick way to check that things are working.
d = raw_data__base("Absenteeism_at_work.csv")
print(d.to_string())