blob: 9e37aef19805ed51f769ea8008aba93bc103d32d [file] [log] [blame]
# -----------------------------------------------------------------------
# Implementation of ARIMA model
# Currently, we implement CSS method
# -----------------------------------------------------------------------
from utilities.math_utils import compute_dot
import plpy
from utilities.utilities import _assert
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.utilities import __mad_version
from utilities.utilities import add_postfix
# ----------------------------------------------------------------------
version_wrapper = __mad_version()
madvec = version_wrapper.select_vecfunc()
def arima_forecast(schema_madlib, model_table, output_table, steps_ahead):
""" Main function to forecast timeseries data using a trained ARIMA model
@brief ARIMA forecast is used to predict future values for a time series
data using an ARIMA model
@param model_table: str, Name of the table containing the ARIMA model
@param output_table: str, Name of the table to output forecast values
@param steps_ahead: type, Number of steps to forecast
_validate_params(schema_madlib, model_table, output_table, steps_ahead)
old_messages = plpy.execute("""SELECT setting
FROM pg_settings
WHERE name = 'client_min_messages'"""
plpy.execute('SET client_min_messages TO warning')
# get all parameters from table
model_meta_params = _extract_meta_params_from_tbl(add_postfix(model_table, "_summary"))
# use madvec to handle array behavior in GPDB 4.1
non_seasonal_orders = map(int,
mean, arparams, maparams = _extract_params_from_tbl(
# residuals and data are in chronological order.
# To get dot product of the form:
# ar[0]*data[-1] + ar[1]*data[-2] + ...,
# we reverse the params list to easily perform the dot product
rev_ar_params = list(reversed(arparams))
rev_ma_params = list(reversed(maparams))
p, d, q = non_seasonal_orders
# get last 'p + d' data items (for endogenous variable) and
# last 'q' residual values
orig_data, residual = _extract_end_data_residuals(
add_postfix(model_table, "_residual"),
# mean center and difference the original source data since ARIMA
# model is trained on transformed data
mean_centered_data = [i - mean for i in orig_data]
diff_data = _calculate_diff(mean_centered_data, d)
# at this point diff_data should have 'p' elements (after the differencing)
diff_data.extend([0] * steps_ahead) # store future values in diff_data
i = (0, -1)[q == 0] # if q == 0, AR loop should start from 0 (== i+1)
for i in range(min(q, steps_ahead)):
# AR and MA forecasting
diff_data[i + p] = (compute_dot(rev_ar_params, diff_data[i:i+p]) +
compute_dot(rev_ma_params[:q-i], residual[i:]))
for i in range(i+1, steps_ahead):
# only AR forecasting
diff_data[i + p] = compute_dot(rev_ar_params, diff_data[i: i+p])
# reverse the differencing operation and add the mean back to get
# actual forecast
forecast = [i + mean for i in _calculate_rev_diff(diff_data[p:], mean_centered_data[-d:], d)]
forecast_str = ','.join(map(str, forecast))
# create output table
plpy.execute("""CREATE TABLE {output}
AS SELECT generate_series(1, {end}) AS steps_ahead,
AS forecast_value
plpy.execute('SET client_min_messages TO ' + old_messages)
# -------------------------------------------------------------------------
def _validate_params(schema_madlib, model_table, output_table, steps_ahead):
""" Validate all argument values to the ARIMA forecast function
@param model_table: str, Name of the table containing the ARIMA model parameters
@param output_table: str, Name of the table to output forecast values
@param steps_ahead: int, Number of steps to forecast
plpy.error, if any of the arguments are invalid
if model_table is not None and model_table.strip():
"ARIMA error: Model table '{0}' does not exist!".
plpy.error("ARIMA error: Invalid model table parameter")
# validate if summary table exists and has all columns
summary_table_name = add_postfix(model_table, "_summary")
"ARIMA error: Model summary table '{0}' does not exist!".
meta_param_columns = ["input_table", "timestamp_col",
"timeseries_col", "include_mean",
"ARIMA error: Table '{0}' missing at least one column from {1}!".
format(summary_table_name, str(meta_param_columns)))
## validate if model table has the parameter columns
model_meta_params = _extract_meta_params_from_tbl(summary_table_name)
# use 'madvec' to extract an array from the table to ensure correct
# behavior for GPDB 4.1
p, d, q = map(int, madvec(model_meta_params["non_seasonal_orders"]))
if p:
_assert(columns_exist_in_table(model_table, ["ar_params"],
"ARIMA error: Table {0} missing ar_params column!".
if q:
_assert(columns_exist_in_table(model_table, ["ma_params"],
"ARIMA error: Table {0} missing ma_params column!".
if d == 0 and model_meta_params["include_mean"]:
_assert(columns_exist_in_table(model_table, ["mean"], schema_madlib),
"ARIMA error: Table {0} missing mean column!".
# validate if output table can be created
if output_table is not None and output_table.strip() != '':
_assert(not table_exists(output_table, only_first_schema=True),
"ARIMA error: Output table {0} already exists!".
plpy.error("ARIMA error: Invalid output table name")
# steps_ahead has to be a valid positive integer
_assert(steps_ahead > 0 and isinstance(steps_ahead, int),
"ARIMA error: Invalid steps_ahead parameter!")
_assert(steps_ahead <= 1e9,
"ARIMA error: Maximum number of steps that can be forecasted is "
# -------------------------------------------------------------------------
def _extract_params_from_tbl(model_table, include_mean, model_orders):
""" Extract the ARIMA parameters from the model table
@brief ARIMA model is stored in a table created from the train function. This
function extracts the parameters and returns as multiple lists of parameters.
The function assumes that tables 'model_table' and 'model_table_summary'
exist with the appropriate columns.
@param model_table_name: str, Name of the table containing the model
@param include_mean: bool, Does the model contain the mean value?
Tuple. (ar_params=list(), ma_params=list(), mean=float)
The tuple contains the AR parameters, MA parameters, and the mean value
(if present in the model table).
model_params = plpy.execute("""SELECT * from {0}""".
if include_mean and model_orders[1] == 0:
# mean value is defined only for d == 0
mean = model_params["mean"]
mean = 0
if model_orders[0] > 0:
ar_params = map(float, madvec(model_params["ar_params"]))
ar_params = []
if model_orders[2] > 0:
ma_params = map(float, madvec(model_params["ma_params"]))
ma_params = []
return (mean, ar_params, ma_params)
# -------------------------------------------------------------------------
def _extract_meta_params_from_tbl(summary_table_name):
""" Extract the meta-parameters used to train the ARIMA model
@param arg: str, Name of the summary table
Dictionary. A dict of meta parameters that includes the following keys:
* input_table: str, Name of the original source table
* timestamp_col: str, Name of the timestamp column
* timeseries_col: str, Name of the timeseries column
* include_mean: bool, indicates if mean was included in original table
* non_seasonal_orders: list, Array of integers with length of 3,
containing orders for AR, I and MA
# the function currently only returns the values in first row of table
# this needs to be extended for grouping
model_meta_params = plpy.execute("""SELECT * from {0}""".
return model_meta_params
# -------------------------------------------------------------------------
def _extract_end_data_residuals(input_table, residual_table,
timeseries_col, order):
""" Extract the data and residual values for the final few timeseries entries.
@brief Extract the residual values from the end to use for the MA part and
the data values from the end to use for AR part of the ARIMA forecasting
@param input_table: str, Name of the source data table
@param residual_table: str, Name of the residual table
@param timestamp_col: str, Name of the column containing timestamp (index) values
@param order: tuple/list, Orders of the ARIMA model (p, d, q)
data_values: List. Array of floats that represents the last 'p + d' data values.
residual: List. Array of floats that represents the last 'q' residual values.
residual_values = plpy.execute("""
SELECT residual
FROM {table}
ORDER BY {timestamp} DESC
LIMIT {q}) q1
ORDER BY {timestamp} ASC""".format(table=residual_table,
residual_values = [val["residual"] for val in residual_values]
data_values = plpy.execute("""
SELECT {value} as data
FROM {table}
ORDER BY {timestamp} DESC
LIMIT {pd}) q1
ORDER BY {timestamp} ASC""". format(table=input_table,
data_values = [val["data"] for val in data_values]
return data_values, residual_values
# -------------------------------------------------------------------------
def _calculate_diff(data, diff_order=0):
""" Calculate the n-th order discrete difference.
@brief Calculate the n-th order discrete difference.
@param data: list, Input vector to compute differences
@param diff_order: int, The order of the differencing operator (Default=0)
List. The nth order differences. The size of the list will be smaller
than the original data.
if not diff_order:
return data
output = data
while diff_order > 0:
# Compute actual differencing as an iterative process
output = [output[i] - output[i-1] for i in range(1, len(output))]
diff_order -= 1
return output
# -------------------------------------------------------------------------
def _calculate_rev_diff(diff_data, orig_data_seed_values=None, diff_order=0):
""" Calculate the original data from n-th order discrete difference.
@brief Calculate the n-th order discrete difference.
@param diff_data: list[float], The nth order difference of the original data
@param orig_data_seed_values: list[float], 'n' values from the original
data that preceedes the original data
vectors to be used for seeding.
If None, then seed values
are assumed to be zero
@param diff_order: int, The order of the differencing operator (Default=0)
List. The original data corresponding to the differences
if not diff_order:
return diff_data
# diff_data is smaller than the actual data that produced the diff
# hence we need some seed/initial values to construct the original data vector
intermed_initial_vals = [0]*diff_order
if orig_data_seed_values:
# if seed values are not provided then all initial values are 0
curr_diff = orig_data_seed_values
intermed_initial_vals[0] = curr_diff[-1]
for each_level in range(1, min(diff_order, len(orig_data_seed_values))):
curr_diff = _calculate_diff(curr_diff, 1)
intermed_initial_vals[each_level] = curr_diff[-1]
output = diff_data
n_elements = len(diff_data)
while diff_order > 0:
curr_output = [0] * n_elements
curr_output[0] = output[0] + intermed_initial_vals[diff_order-1]
for i in range(1, len(output)):
# this loop is same as computing cumulative sum
curr_output[i] = output[i] + curr_output[i-1]
output = curr_output
diff_order -= 1
return output # Compute actual differencing
# -------------------------------------------------------------------------
def arima_forecast_help_message(schema_madlib, message, **kwargs):
""" Display a help message about ARIMA forecast function
@param schema_madlib: String. Schema where MADlib is installed
@param message: String. Which help message to display?
String. A detailed help message
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
Forecasting function for ARIMA modeling
SELECT {schema_madlib}.arima_forecast
model_table TEXT, -- Model table name returned by ARIMA training function
output_table TEXT, -- Output table to store forecast values
steps_ahead INTEGER -- Number of steps to forecast ahead
The following table is created:
output_table: Table containing the ARIMA model forecast with the following columns
- group_by_cols -- Grouping col value
-- (if grouping parameter provided during model training)
- steps_ahead INTEGER -- Time step ahead
- forecast_value FLOAT8 -- Forecast of the time step
return """
ARIMA forecast function
For an overview on usage, run:
SELECT {schema_madlib}.arima_forecast('usage');
# -----------------------------------------------------------------------
# ------------------------------------------------------------------------------
# -- Unit tests for python only functions---------------------------------------
import unittest
class TestForecast(unittest.TestCase):
The import statements will have to be commented to execute the unittest
since the modules cannot be imported when run standalone.
def setUp(self):
self.time_data1 = range(10)
self.time_data2 = [i**2 for i in range(10)]
self.time_data3 = [i**3 for i in range(10)]
self.time_data4 = [i**4 for i in range(10)]
self.diff_data1 = _calculate_diff(self.time_data1, 1)
self.diff_data2 = _calculate_diff(self.time_data2, 2)
self.diff_data3 = _calculate_diff(self.time_data3, 3)
self.diff_data4 = _calculate_diff(self.time_data4, 4)
self.diff_data4_3 = _calculate_diff(self.time_data4, 3)
def tearDown(self):
def test_diffs(self):
self.assertEqual([1]*9, _calculate_diff(self.time_data1, 1))
self.assertEqual([2]*8, _calculate_diff(self.time_data2, 2))
self.assertEqual([6, 12, 18, 24, 30, 36, 42, 48],
_calculate_diff(self.time_data3, 2))
self.assertEqual([6]*7, _calculate_diff(self.time_data3, 3))
self.assertEqual([24]*6, _calculate_diff(self.time_data4, 4))
self.assertEqual([36, 60, 84, 108, 132, 156, 180],
_calculate_diff(self.time_data4, 3))
def test_rev_diffs(self):
self.time_data1[0], 1))
self.time_data2[0:2], 2))
self.time_data3[0:3], 3))
self.time_data4[0:4], 4))
self.time_data4[0:3], 3))
if __name__ == '__main__':