blob: 6d0558fd7ce8a516d7ac99fb18c4e71e4066d86d [file] [log] [blame]
import gc
import logging
import os
from typing import Dict
import pandas as pd
import utils
from hamilton.function_modifiers import config, extract_columns, extract_fields
logger = logging.getLogger(__name__)
def _load_csv(
path: str, name: str, verbose: bool = True, reduce_memory: bool = True
) -> pd.DataFrame:
"""Helper function to load csv files.
Given that loading from parquet is faster than csv, this function will also save a parquet version of the csv file.
:param path:
:param name:
:param verbose:
:param reduce_memory:
:return:
"""
parquet_path = path.replace(".csv", ".parquet")
if os.path.exists(parquet_path):
logger.info("Loading from parquet.")
data_frame = pd.read_parquet(parquet_path)
else:
data_frame = pd.read_csv(path)
data_frame.to_parquet(parquet_path)
if reduce_memory:
data_frame = utils.reduce_mem_usage(data_frame, name, verbose=verbose)
if verbose:
logger.info(
"{} has {} rows and {} columns".format(name, data_frame.shape[0], data_frame.shape[1])
)
return data_frame
def calendar(
calendar_path: str = "/kaggle/input/m5-forecasting-accuracy/calendar.csv",
) -> pd.DataFrame:
"""Loads the calendar data.
Performs some normalization on it.
:param calendar_path:
:return:
"""
_calendar = _load_csv(calendar_path, "calendar", True, True)
# drop some calendar features -- TBD whether this is the best place
_calendar.drop(["weekday", "wday", "month", "year"], inplace=True, axis=1)
nan_features = ["event_name_1", "event_type_1", "event_name_2", "event_type_2"]
for feature in nan_features:
_calendar[feature].fillna("unknown", inplace=True)
_calendar["date"] = pd.to_datetime(_calendar["date"])
return _calendar
def sell_prices(
sell_prices_path: str = "/kaggle/input/m5-forecasting-accuracy/sell_prices.csv",
) -> pd.DataFrame:
"""Loads the sell prices data.
:param sell_prices_path:
:return:
"""
_sell_prices = _load_csv(sell_prices_path, "sell_prices", True, True)
return _sell_prices
def sales_train_validation(
sales_train_validation_path: str = "/kaggle/input/m5-forecasting-accuracy/sales_train_validation.csv",
) -> pd.DataFrame:
"""Loads the sales train validation data.
Makes the data into a shape we want to work with.
:param sales_train_validation_path:
:return:
"""
_sales_train_validation = _load_csv(
sales_train_validation_path, "sales_train_validation", True, reduce_memory=False
)
_sales_train_validation = pd.melt(
_sales_train_validation,
id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
var_name="day",
value_name="demand",
)
_sales_train_validation = utils.reduce_mem_usage(
_sales_train_validation, "sales_train_validation", verbose=True
)
logger.info(
"Melted sales train validation has {} rows and {} columns".format(
_sales_train_validation.shape[0], _sales_train_validation.shape[1]
)
)
_sales_train_validation["part"] = "train"
return _sales_train_validation
@extract_fields(
{"test1_base": pd.DataFrame, "test2_base": pd.DataFrame, "submission": pd.DataFrame}
)
def submission_loader(
submission_path: str = "/kaggle/input/m5-forecasting-accuracy/sample_submission.csv",
) -> Dict[str, pd.DataFrame]:
"""Loads the submission data.
The notebook I got this from did some weird splitting with test1 and test 2.
I'm not sure why they did that, but I replicated that logic here anyway.
:param submission_path:
:return: dict of dataframes
"""
submission = _load_csv(submission_path, "submission", True, reduce_memory=False)
# seperate test dataframes
test1_rows = [row for row in submission["id"] if "validation" in row]
test2_rows = [row for row in submission["id"] if "evaluation" in row]
test1 = submission[submission["id"].isin(test1_rows)]
test2 = submission[submission["id"].isin(test2_rows)]
# change column names
test1.columns = [
"id",
"d_1914",
"d_1915",
"d_1916",
"d_1917",
"d_1918",
"d_1919",
"d_1920",
"d_1921",
"d_1922",
"d_1923",
"d_1924",
"d_1925",
"d_1926",
"d_1927",
"d_1928",
"d_1929",
"d_1930",
"d_1931",
"d_1932",
"d_1933",
"d_1934",
"d_1935",
"d_1936",
"d_1937",
"d_1938",
"d_1939",
"d_1940",
"d_1941",
]
test2.columns = [
"id",
"d_1942",
"d_1943",
"d_1944",
"d_1945",
"d_1946",
"d_1947",
"d_1948",
"d_1949",
"d_1950",
"d_1951",
"d_1952",
"d_1953",
"d_1954",
"d_1955",
"d_1956",
"d_1957",
"d_1958",
"d_1959",
"d_1960",
"d_1961",
"d_1962",
"d_1963",
"d_1964",
"d_1965",
"d_1966",
"d_1967",
"d_1968",
"d_1969",
]
return {"test1_base": test1, "test2_base": test2, "submission": submission}
def product(sales_train_validation: pd.DataFrame) -> pd.DataFrame:
"""Creates a "product" table from the sales train validation data.
:param sales_train_validation:
:return:
"""
# get product table
_product = sales_train_validation[
["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
].drop_duplicates()
return _product
def _merge_test_set_with_product(
base_df: pd.DataFrame, product: pd.DataFrame, part_value: str
) -> pd.DataFrame:
"""Helper function to merge the test set with the product table.
:param base_df:
:param product:
:param part_value:
:return:
"""
merged_df = base_df.merge(product, how="left", on="id")
merged_df = pd.melt(
merged_df,
id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
var_name="day",
value_name="demand",
)
merged_df["part"] = part_value
return merged_df
@config.when(load_test2="True")
def base_dataset__full(
test1_base: pd.DataFrame,
test2_base: pd.DataFrame,
product: pd.DataFrame,
sales_train_validation: pd.DataFrame,
) -> pd.DataFrame:
"""Creates the base dataset we're building from.
This is only invoked if you're wanting test2 to be used as well.
:param test1_base:
:param test2_base:
:param product:
:param sales_train_validation:
:return:
"""
# create test1 and test2
test1 = _merge_test_set_with_product(test1_base, product, "test1")
test2 = _merge_test_set_with_product(test2_base, product, "test2")
data = pd.concat([sales_train_validation, test1, test2], axis=0)
return data
@config.when(load_test2="False")
def base_dataset__test1_only(
test1_base: pd.DataFrame, product: pd.DataFrame, sales_train_validation: pd.DataFrame
) -> pd.DataFrame:
"""Creates the base dataset we're building from.
This is the default one.
:param test1_base:
:param product:
:param sales_train_validation:
:return:
"""
# create test1 only
test1 = _merge_test_set_with_product(test1_base, product, "test1")
data = pd.concat([sales_train_validation, test1], axis=0)
return data
@extract_columns(
*[
"id",
"item_id",
"dept_id",
"cat_id",
"store_id",
"state_id",
"demand",
"part",
"date",
"wm_yr_wk",
"event_name_1",
"event_type_1",
"event_name_2",
"event_type_2",
"snap_CA",
"snap_TX",
"snap_WI",
"sell_price",
]
)
def base_training_data(
base_dataset: pd.DataFrame,
calendar: pd.DataFrame,
sell_prices: pd.DataFrame,
num_rows_to_skip: int = 55000000,
) -> pd.DataFrame:
"""Creates the base training data set.
:param base_dataset:
:param calendar:
:param sell_prices:
:param num_rows_to_skip: how many rows to skip. If zero, or negative, use all rows.
:return:
"""
subset = base_dataset.loc[num_rows_to_skip:]
# notebook crash with the entire dataset (maybee use tensorflow, dask, pyspark xD)
subset = pd.merge(subset, calendar, how="left", left_on=["day"], right_on=["d"])
subset.drop(["d", "day"], inplace=True, axis=1)
# get the sell price data (this feature should be very important)
subset = subset.merge(sell_prices, on=["store_id", "item_id", "wm_yr_wk"], how="left")
subset.index = subset.id
logger.info(
"Our final dataset to train has {} rows and {} columns".format(
subset.shape[0], subset.shape[1]
)
)
gc.collect()
return subset