| # ----------------------------------------------------------------------- |
| # Implementation of ARIMA model |
| # |
| # Currently, we implement CSS method |
| # ----------------------------------------------------------------------- |
| |
| from utilities.math_utils import compute_dot |
| import plpy |
| from utilities.utilities import _assert |
| from utilities.validate_args import columns_exist_in_table |
| from utilities.validate_args import table_exists |
| from utilities.utilities import __mad_version |
| from utilities.utilities import add_postfix |
| |
| # ---------------------------------------------------------------------- |
| |
| version_wrapper = __mad_version() |
| madvec = version_wrapper.select_vecfunc() |
| |
| |
| def arima_forecast(schema_madlib, model_table, output_table, steps_ahead): |
| """ Main function to forecast timeseries data using a trained ARIMA model |
| |
| @brief ARIMA forecast is used to predict future values for a time series |
| data using an ARIMA model |
| |
| Args: |
| @param model_table: str, Name of the table containing the ARIMA model |
| @param output_table: str, Name of the table to output forecast values |
| @param steps_ahead: type, Number of steps to forecast |
| |
| Returns: |
| None |
| """ |
| _validate_params(schema_madlib, model_table, output_table, steps_ahead) |
| old_messages = plpy.execute("""SELECT setting |
| FROM pg_settings |
| WHERE name = 'client_min_messages'""" |
| )[0]["setting"] |
| plpy.execute('SET client_min_messages TO warning') |
| |
| # get all parameters from table |
| model_meta_params = _extract_meta_params_from_tbl(add_postfix(model_table, "_summary")) |
| |
| # use madvec to handle array behavior in GPDB 4.1 |
| non_seasonal_orders = map(int, |
| madvec(model_meta_params['non_seasonal_orders'])) |
| mean, arparams, maparams = _extract_params_from_tbl( |
| model_table, |
| model_meta_params["include_mean"], |
| non_seasonal_orders) |
| # residuals and data are in chronological order. |
| # To get dot product of the form: |
| # ar[0]*data[-1] + ar[1]*data[-2] + ..., |
| # we reverse the params list to easily perform the dot product |
| rev_ar_params = list(reversed(arparams)) |
| rev_ma_params = list(reversed(maparams)) |
| p, d, q = non_seasonal_orders |
| |
| # get last 'p + d' data items (for endogenous variable) and |
| # last 'q' residual values |
| orig_data, residual = _extract_end_data_residuals( |
| model_meta_params["input_table"], |
| add_postfix(model_table, "_residual"), |
| model_meta_params["timestamp_col"], |
| model_meta_params["timeseries_col"], |
| non_seasonal_orders) |
| # mean center and difference the original source data since ARIMA |
| # model is trained on transformed data |
| mean_centered_data = [i - mean for i in orig_data] |
| diff_data = _calculate_diff(mean_centered_data, d) |
| # at this point diff_data should have 'p' elements (after the differencing) |
| diff_data.extend([0] * steps_ahead) # store future values in diff_data |
| i = (0, -1)[q == 0] # if q == 0, AR loop should start from 0 (== i+1) |
| for i in range(min(q, steps_ahead)): |
| # AR and MA forecasting |
| diff_data[i + p] = (compute_dot(rev_ar_params, diff_data[i:i+p]) + |
| compute_dot(rev_ma_params[:q-i], residual[i:])) |
| for i in range(i+1, steps_ahead): |
| # only AR forecasting |
| diff_data[i + p] = compute_dot(rev_ar_params, diff_data[i: i+p]) |
| # reverse the differencing operation and add the mean back to get |
| # actual forecast |
| forecast = [i + mean for i in _calculate_rev_diff(diff_data[p:], mean_centered_data[-d:], d)] |
| forecast_str = ','.join(map(str, forecast)) |
| # create output table |
| plpy.execute("""CREATE TABLE {output} |
| AS SELECT generate_series(1, {end}) AS steps_ahead, |
| unnest(ARRAY[{forecast_str}]::FLOAT8[]) |
| AS forecast_value |
| """.format(output=output_table, |
| end=steps_ahead, |
| forecast_str=forecast_str)) |
| plpy.execute('SET client_min_messages TO ' + old_messages) |
| # ------------------------------------------------------------------------- |
| |
| |
| def _validate_params(schema_madlib, model_table, output_table, steps_ahead): |
| """ Validate all argument values to the ARIMA forecast function |
| |
| Args: |
| @param model_table: str, Name of the table containing the ARIMA model parameters |
| @param output_table: str, Name of the table to output forecast values |
| @param steps_ahead: int, Number of steps to forecast |
| |
| Returns: |
| None |
| |
| Raises: |
| plpy.error, if any of the arguments are invalid |
| """ |
| if model_table is not None and model_table.strip(): |
| _assert(table_exists(model_table), |
| "ARIMA error: Model table '{0}' does not exist!". |
| format(model_table)) |
| else: |
| plpy.error("ARIMA error: Invalid model table parameter") |
| |
| # validate if summary table exists and has all columns |
| summary_table_name = add_postfix(model_table, "_summary") |
| _assert(table_exists(summary_table_name), |
| "ARIMA error: Model summary table '{0}' does not exist!". |
| format(summary_table_name)) |
| meta_param_columns = ["input_table", "timestamp_col", |
| "timeseries_col", "include_mean", |
| "non_seasonal_orders"] |
| _assert(columns_exist_in_table(summary_table_name, |
| meta_param_columns, |
| schema_madlib), |
| "ARIMA error: Table '{0}' missing at least one column from {1}!". |
| format(summary_table_name, str(meta_param_columns))) |
| |
| ## validate if model table has the parameter columns |
| model_meta_params = _extract_meta_params_from_tbl(summary_table_name) |
| # use 'madvec' to extract an array from the table to ensure correct |
| # behavior for GPDB 4.1 |
| p, d, q = map(int, madvec(model_meta_params["non_seasonal_orders"])) |
| if p: |
| _assert(columns_exist_in_table(model_table, ["ar_params"], |
| schema_madlib), |
| "ARIMA error: Table {0} missing ar_params column!". |
| format(model_table)) |
| if q: |
| _assert(columns_exist_in_table(model_table, ["ma_params"], |
| schema_madlib), |
| "ARIMA error: Table {0} missing ma_params column!". |
| format(model_table)) |
| if d == 0 and model_meta_params["include_mean"]: |
| _assert(columns_exist_in_table(model_table, ["mean"], schema_madlib), |
| "ARIMA error: Table {0} missing mean column!". |
| format(model_table)) |
| |
| # validate if output table can be created |
| if output_table is not None and output_table.strip() != '': |
| _assert(not table_exists(output_table, only_first_schema=True), |
| "ARIMA error: Output table {0} already exists!". |
| format(str(output_table))) |
| else: |
| plpy.error("ARIMA error: Invalid output table name") |
| |
| # steps_ahead has to be a valid positive integer |
| _assert(steps_ahead > 0 and isinstance(steps_ahead, int), |
| "ARIMA error: Invalid steps_ahead parameter!") |
| _assert(steps_ahead <= 1e9, |
| "ARIMA error: Maximum number of steps that can be forecasted is " |
| "{0}!".format(1e9)) |
| # ------------------------------------------------------------------------- |
| |
| |
| def _extract_params_from_tbl(model_table, include_mean, model_orders): |
| """ Extract the ARIMA parameters from the model table |
| |
| @brief ARIMA model is stored in a table created from the train function. This |
| function extracts the parameters and returns as multiple lists of parameters. |
| The function assumes that tables 'model_table' and 'model_table_summary' |
| exist with the appropriate columns. |
| |
| Args: |
| @param model_table_name: str, Name of the table containing the model |
| @param include_mean: bool, Does the model contain the mean value? |
| |
| Returns: |
| Tuple. (ar_params=list(), ma_params=list(), mean=float) |
| The tuple contains the AR parameters, MA parameters, and the mean value |
| (if present in the model table). |
| """ |
| model_params = plpy.execute("""SELECT * from {0}""". |
| format(model_table))[0] |
| if include_mean and model_orders[1] == 0: |
| # mean value is defined only for d == 0 |
| mean = model_params["mean"] |
| else: |
| mean = 0 |
| if model_orders[0] > 0: |
| ar_params = map(float, madvec(model_params["ar_params"])) |
| else: |
| ar_params = [] |
| if model_orders[2] > 0: |
| ma_params = map(float, madvec(model_params["ma_params"])) |
| else: |
| ma_params = [] |
| return (mean, ar_params, ma_params) |
| # ------------------------------------------------------------------------- |
| |
| |
| def _extract_meta_params_from_tbl(summary_table_name): |
| """ Extract the meta-parameters used to train the ARIMA model |
| |
| Args: |
| @param arg: str, Name of the summary table |
| |
| Returns: |
| Dictionary. A dict of meta parameters that includes the following keys: |
| * input_table: str, Name of the original source table |
| * timestamp_col: str, Name of the timestamp column |
| * timeseries_col: str, Name of the timeseries column |
| * include_mean: bool, indicates if mean was included in original table |
| * non_seasonal_orders: list, Array of integers with length of 3, |
| containing orders for AR, I and MA |
| """ |
| # the function currently only returns the values in first row of table |
| # this needs to be extended for grouping |
| model_meta_params = plpy.execute("""SELECT * from {0}""". |
| format(summary_table_name))[0] |
| return model_meta_params |
| # ------------------------------------------------------------------------- |
| |
| |
| def _extract_end_data_residuals(input_table, residual_table, |
| timestamp_col, |
| timeseries_col, order): |
| """ Extract the data and residual values for the final few timeseries entries. |
| |
| @brief Extract the residual values from the end to use for the MA part and |
| the data values from the end to use for AR part of the ARIMA forecasting |
| Args: |
| @param input_table: str, Name of the source data table |
| @param residual_table: str, Name of the residual table |
| @param timestamp_col: str, Name of the column containing timestamp (index) values |
| @param order: tuple/list, Orders of the ARIMA model (p, d, q) |
| |
| Returns: |
| data_values: List. Array of floats that represents the last 'p + d' data values. |
| residual: List. Array of floats that represents the last 'q' residual values. |
| |
| """ |
| residual_values = plpy.execute(""" |
| SELECT residual |
| FROM ( |
| SELECT * |
| FROM {table} |
| ORDER BY {timestamp} DESC |
| LIMIT {q}) q1 |
| ORDER BY {timestamp} ASC""".format(table=residual_table, |
| timestamp=timestamp_col, |
| q=order[2])) |
| residual_values = [val["residual"] for val in residual_values] |
| data_values = plpy.execute(""" |
| SELECT {value} as data |
| FROM ( |
| SELECT * |
| FROM {table} |
| ORDER BY {timestamp} DESC |
| LIMIT {pd}) q1 |
| ORDER BY {timestamp} ASC""". format(table=input_table, |
| timestamp=timestamp_col, |
| value=timeseries_col, |
| pd=(order[0]+order[1]))) |
| data_values = [val["data"] for val in data_values] |
| return data_values, residual_values |
| # ------------------------------------------------------------------------- |
| |
| |
| def _calculate_diff(data, diff_order=0): |
| """ Calculate the n-th order discrete difference. |
| |
| @brief Calculate the n-th order discrete difference. |
| Args: |
| @param data: list, Input vector to compute differences |
| @param diff_order: int, The order of the differencing operator (Default=0) |
| |
| Returns: |
| List. The nth order differences. The size of the list will be smaller |
| than the original data. |
| """ |
| if not diff_order: |
| return data |
| output = data |
| while diff_order > 0: |
| # Compute actual differencing as an iterative process |
| output = [output[i] - output[i-1] for i in range(1, len(output))] |
| diff_order -= 1 |
| return output |
| # ------------------------------------------------------------------------- |
| |
| |
| def _calculate_rev_diff(diff_data, orig_data_seed_values=None, diff_order=0): |
| """ Calculate the original data from n-th order discrete difference. |
| |
| @brief Calculate the n-th order discrete difference. |
| Args: |
| @param diff_data: list[float], The nth order difference of the original data |
| @param orig_data_seed_values: list[float], 'n' values from the original |
| data that preceedes the original data |
| vectors to be used for seeding. |
| If None, then seed values |
| are assumed to be zero |
| @param diff_order: int, The order of the differencing operator (Default=0) |
| |
| Returns: |
| List. The original data corresponding to the differences |
| """ |
| if not diff_order: |
| return diff_data |
| # diff_data is smaller than the actual data that produced the diff |
| # hence we need some seed/initial values to construct the original data vector |
| intermed_initial_vals = [0]*diff_order |
| if orig_data_seed_values: |
| # if seed values are not provided then all initial values are 0 |
| curr_diff = orig_data_seed_values |
| intermed_initial_vals[0] = curr_diff[-1] |
| for each_level in range(1, min(diff_order, len(orig_data_seed_values))): |
| curr_diff = _calculate_diff(curr_diff, 1) |
| intermed_initial_vals[each_level] = curr_diff[-1] |
| |
| output = diff_data |
| n_elements = len(diff_data) |
| while diff_order > 0: |
| curr_output = [0] * n_elements |
| curr_output[0] = output[0] + intermed_initial_vals[diff_order-1] |
| for i in range(1, len(output)): |
| # this loop is same as computing cumulative sum |
| curr_output[i] = output[i] + curr_output[i-1] |
| output = curr_output |
| diff_order -= 1 |
| return output # Compute actual differencing |
| # ------------------------------------------------------------------------- |
| |
| |
| def arima_forecast_help_message(schema_madlib, message, **kwargs): |
| """ Display a help message about ARIMA forecast function |
| |
| Args: |
| @param schema_madlib: String. Schema where MADlib is installed |
| @param message: String. Which help message to display? |
| |
| Returns: |
| String. A detailed help message |
| """ |
| if message is not None and \ |
| message.lower() in ("usage", "help", "?"): |
| return """ |
| Forecasting function for ARIMA modeling |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT {schema_madlib}.arima_forecast |
| ( |
| model_table TEXT, -- Model table name returned by ARIMA training function |
| output_table TEXT, -- Output table to store forecast values |
| steps_ahead INTEGER -- Number of steps to forecast ahead |
| ) |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The following table is created: |
| output_table: Table containing the ARIMA model forecast with the following columns |
| - group_by_cols -- Grouping col value |
| -- (if grouping parameter provided during model training) |
| - steps_ahead INTEGER -- Time step ahead |
| - forecast_value FLOAT8 -- Forecast of the time step |
| """ |
| else: |
| return """ |
| ARIMA forecast function |
| -------------------------------------- |
| For an overview on usage, run: |
| SELECT {schema_madlib}.arima_forecast('usage'); |
| """.format(schema_madlib=schema_madlib) |
| # ----------------------------------------------------------------------- |
| |
| |
| # ------------------------------------------------------------------------------ |
| # -- Unit tests for python only functions--------------------------------------- |
| import unittest |
| |
| |
| class TestForecast(unittest.TestCase): |
| """ |
| The import statements will have to be commented to execute the unittest |
| since the modules cannot be imported when run standalone. |
| """ |
| def setUp(self): |
| self.time_data1 = range(10) |
| self.time_data2 = [i**2 for i in range(10)] |
| self.time_data3 = [i**3 for i in range(10)] |
| self.time_data4 = [i**4 for i in range(10)] |
| |
| self.diff_data1 = _calculate_diff(self.time_data1, 1) |
| self.diff_data2 = _calculate_diff(self.time_data2, 2) |
| self.diff_data3 = _calculate_diff(self.time_data3, 3) |
| self.diff_data4 = _calculate_diff(self.time_data4, 4) |
| self.diff_data4_3 = _calculate_diff(self.time_data4, 3) |
| |
| def tearDown(self): |
| pass |
| |
| def test_diffs(self): |
| self.assertEqual([1]*9, _calculate_diff(self.time_data1, 1)) |
| self.assertEqual([2]*8, _calculate_diff(self.time_data2, 2)) |
| self.assertEqual([6, 12, 18, 24, 30, 36, 42, 48], |
| _calculate_diff(self.time_data3, 2)) |
| self.assertEqual([6]*7, _calculate_diff(self.time_data3, 3)) |
| self.assertEqual([24]*6, _calculate_diff(self.time_data4, 4)) |
| self.assertEqual([36, 60, 84, 108, 132, 156, 180], |
| _calculate_diff(self.time_data4, 3)) |
| |
| def test_rev_diffs(self): |
| self.assertEqual(self.time_data1[1:], |
| _calculate_rev_diff(self.diff_data1, |
| self.time_data1[0], 1)) |
| self.assertEqual(self.time_data2[2:], |
| _calculate_rev_diff(self.diff_data2, |
| self.time_data2[0:2], 2)) |
| self.assertEqual(self.time_data3[3:], |
| _calculate_rev_diff(self.diff_data3, |
| self.time_data3[0:3], 3)) |
| self.assertEqual(self.time_data4[4:], |
| _calculate_rev_diff(self.diff_data4, |
| self.time_data4[0:4], 4)) |
| self.assertEqual(self.time_data4[3:], |
| _calculate_rev_diff(self.diff_data4_3, |
| self.time_data4[0:3], 3)) |
| |
| if __name__ == '__main__': |
| unittest.main() |