blob: 9e37aef19805ed51f769ea8008aba93bc103d32d [file] [log] [blame]
# -----------------------------------------------------------------------
# Implementation of ARIMA model
#
# Currently, we implement CSS method
# -----------------------------------------------------------------------
from utilities.math_utils import compute_dot
import plpy
from utilities.utilities import _assert
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.utilities import __mad_version
from utilities.utilities import add_postfix
# ----------------------------------------------------------------------
version_wrapper = __mad_version()
madvec = version_wrapper.select_vecfunc()
def arima_forecast(schema_madlib, model_table, output_table, steps_ahead):
""" Main function to forecast timeseries data using a trained ARIMA model
@brief ARIMA forecast is used to predict future values for a time series
data using an ARIMA model
Args:
@param model_table: str, Name of the table containing the ARIMA model
@param output_table: str, Name of the table to output forecast values
@param steps_ahead: type, Number of steps to forecast
Returns:
None
"""
_validate_params(schema_madlib, model_table, output_table, steps_ahead)
old_messages = plpy.execute("""SELECT setting
FROM pg_settings
WHERE name = 'client_min_messages'"""
)[0]["setting"]
plpy.execute('SET client_min_messages TO warning')
# get all parameters from table
model_meta_params = _extract_meta_params_from_tbl(add_postfix(model_table, "_summary"))
# use madvec to handle array behavior in GPDB 4.1
non_seasonal_orders = map(int,
madvec(model_meta_params['non_seasonal_orders']))
mean, arparams, maparams = _extract_params_from_tbl(
model_table,
model_meta_params["include_mean"],
non_seasonal_orders)
# residuals and data are in chronological order.
# To get dot product of the form:
# ar[0]*data[-1] + ar[1]*data[-2] + ...,
# we reverse the params list to easily perform the dot product
rev_ar_params = list(reversed(arparams))
rev_ma_params = list(reversed(maparams))
p, d, q = non_seasonal_orders
# get last 'p + d' data items (for endogenous variable) and
# last 'q' residual values
orig_data, residual = _extract_end_data_residuals(
model_meta_params["input_table"],
add_postfix(model_table, "_residual"),
model_meta_params["timestamp_col"],
model_meta_params["timeseries_col"],
non_seasonal_orders)
# mean center and difference the original source data since ARIMA
# model is trained on transformed data
mean_centered_data = [i - mean for i in orig_data]
diff_data = _calculate_diff(mean_centered_data, d)
# at this point diff_data should have 'p' elements (after the differencing)
diff_data.extend([0] * steps_ahead) # store future values in diff_data
i = (0, -1)[q == 0] # if q == 0, AR loop should start from 0 (== i+1)
for i in range(min(q, steps_ahead)):
# AR and MA forecasting
diff_data[i + p] = (compute_dot(rev_ar_params, diff_data[i:i+p]) +
compute_dot(rev_ma_params[:q-i], residual[i:]))
for i in range(i+1, steps_ahead):
# only AR forecasting
diff_data[i + p] = compute_dot(rev_ar_params, diff_data[i: i+p])
# reverse the differencing operation and add the mean back to get
# actual forecast
forecast = [i + mean for i in _calculate_rev_diff(diff_data[p:], mean_centered_data[-d:], d)]
forecast_str = ','.join(map(str, forecast))
# create output table
plpy.execute("""CREATE TABLE {output}
AS SELECT generate_series(1, {end}) AS steps_ahead,
unnest(ARRAY[{forecast_str}]::FLOAT8[])
AS forecast_value
""".format(output=output_table,
end=steps_ahead,
forecast_str=forecast_str))
plpy.execute('SET client_min_messages TO ' + old_messages)
# -------------------------------------------------------------------------
def _validate_params(schema_madlib, model_table, output_table, steps_ahead):
""" Validate all argument values to the ARIMA forecast function
Args:
@param model_table: str, Name of the table containing the ARIMA model parameters
@param output_table: str, Name of the table to output forecast values
@param steps_ahead: int, Number of steps to forecast
Returns:
None
Raises:
plpy.error, if any of the arguments are invalid
"""
if model_table is not None and model_table.strip():
_assert(table_exists(model_table),
"ARIMA error: Model table '{0}' does not exist!".
format(model_table))
else:
plpy.error("ARIMA error: Invalid model table parameter")
# validate if summary table exists and has all columns
summary_table_name = add_postfix(model_table, "_summary")
_assert(table_exists(summary_table_name),
"ARIMA error: Model summary table '{0}' does not exist!".
format(summary_table_name))
meta_param_columns = ["input_table", "timestamp_col",
"timeseries_col", "include_mean",
"non_seasonal_orders"]
_assert(columns_exist_in_table(summary_table_name,
meta_param_columns,
schema_madlib),
"ARIMA error: Table '{0}' missing at least one column from {1}!".
format(summary_table_name, str(meta_param_columns)))
## validate if model table has the parameter columns
model_meta_params = _extract_meta_params_from_tbl(summary_table_name)
# use 'madvec' to extract an array from the table to ensure correct
# behavior for GPDB 4.1
p, d, q = map(int, madvec(model_meta_params["non_seasonal_orders"]))
if p:
_assert(columns_exist_in_table(model_table, ["ar_params"],
schema_madlib),
"ARIMA error: Table {0} missing ar_params column!".
format(model_table))
if q:
_assert(columns_exist_in_table(model_table, ["ma_params"],
schema_madlib),
"ARIMA error: Table {0} missing ma_params column!".
format(model_table))
if d == 0 and model_meta_params["include_mean"]:
_assert(columns_exist_in_table(model_table, ["mean"], schema_madlib),
"ARIMA error: Table {0} missing mean column!".
format(model_table))
# validate if output table can be created
if output_table is not None and output_table.strip() != '':
_assert(not table_exists(output_table, only_first_schema=True),
"ARIMA error: Output table {0} already exists!".
format(str(output_table)))
else:
plpy.error("ARIMA error: Invalid output table name")
# steps_ahead has to be a valid positive integer
_assert(steps_ahead > 0 and isinstance(steps_ahead, int),
"ARIMA error: Invalid steps_ahead parameter!")
_assert(steps_ahead <= 1e9,
"ARIMA error: Maximum number of steps that can be forecasted is "
"{0}!".format(1e9))
# -------------------------------------------------------------------------
def _extract_params_from_tbl(model_table, include_mean, model_orders):
""" Extract the ARIMA parameters from the model table
@brief ARIMA model is stored in a table created from the train function. This
function extracts the parameters and returns as multiple lists of parameters.
The function assumes that tables 'model_table' and 'model_table_summary'
exist with the appropriate columns.
Args:
@param model_table_name: str, Name of the table containing the model
@param include_mean: bool, Does the model contain the mean value?
Returns:
Tuple. (ar_params=list(), ma_params=list(), mean=float)
The tuple contains the AR parameters, MA parameters, and the mean value
(if present in the model table).
"""
model_params = plpy.execute("""SELECT * from {0}""".
format(model_table))[0]
if include_mean and model_orders[1] == 0:
# mean value is defined only for d == 0
mean = model_params["mean"]
else:
mean = 0
if model_orders[0] > 0:
ar_params = map(float, madvec(model_params["ar_params"]))
else:
ar_params = []
if model_orders[2] > 0:
ma_params = map(float, madvec(model_params["ma_params"]))
else:
ma_params = []
return (mean, ar_params, ma_params)
# -------------------------------------------------------------------------
def _extract_meta_params_from_tbl(summary_table_name):
""" Extract the meta-parameters used to train the ARIMA model
Args:
@param arg: str, Name of the summary table
Returns:
Dictionary. A dict of meta parameters that includes the following keys:
* input_table: str, Name of the original source table
* timestamp_col: str, Name of the timestamp column
* timeseries_col: str, Name of the timeseries column
* include_mean: bool, indicates if mean was included in original table
* non_seasonal_orders: list, Array of integers with length of 3,
containing orders for AR, I and MA
"""
# the function currently only returns the values in first row of table
# this needs to be extended for grouping
model_meta_params = plpy.execute("""SELECT * from {0}""".
format(summary_table_name))[0]
return model_meta_params
# -------------------------------------------------------------------------
def _extract_end_data_residuals(input_table, residual_table,
timestamp_col,
timeseries_col, order):
""" Extract the data and residual values for the final few timeseries entries.
@brief Extract the residual values from the end to use for the MA part and
the data values from the end to use for AR part of the ARIMA forecasting
Args:
@param input_table: str, Name of the source data table
@param residual_table: str, Name of the residual table
@param timestamp_col: str, Name of the column containing timestamp (index) values
@param order: tuple/list, Orders of the ARIMA model (p, d, q)
Returns:
data_values: List. Array of floats that represents the last 'p + d' data values.
residual: List. Array of floats that represents the last 'q' residual values.
"""
residual_values = plpy.execute("""
SELECT residual
FROM (
SELECT *
FROM {table}
ORDER BY {timestamp} DESC
LIMIT {q}) q1
ORDER BY {timestamp} ASC""".format(table=residual_table,
timestamp=timestamp_col,
q=order[2]))
residual_values = [val["residual"] for val in residual_values]
data_values = plpy.execute("""
SELECT {value} as data
FROM (
SELECT *
FROM {table}
ORDER BY {timestamp} DESC
LIMIT {pd}) q1
ORDER BY {timestamp} ASC""". format(table=input_table,
timestamp=timestamp_col,
value=timeseries_col,
pd=(order[0]+order[1])))
data_values = [val["data"] for val in data_values]
return data_values, residual_values
# -------------------------------------------------------------------------
def _calculate_diff(data, diff_order=0):
""" Calculate the n-th order discrete difference.
@brief Calculate the n-th order discrete difference.
Args:
@param data: list, Input vector to compute differences
@param diff_order: int, The order of the differencing operator (Default=0)
Returns:
List. The nth order differences. The size of the list will be smaller
than the original data.
"""
if not diff_order:
return data
output = data
while diff_order > 0:
# Compute actual differencing as an iterative process
output = [output[i] - output[i-1] for i in range(1, len(output))]
diff_order -= 1
return output
# -------------------------------------------------------------------------
def _calculate_rev_diff(diff_data, orig_data_seed_values=None, diff_order=0):
""" Calculate the original data from n-th order discrete difference.
@brief Calculate the n-th order discrete difference.
Args:
@param diff_data: list[float], The nth order difference of the original data
@param orig_data_seed_values: list[float], 'n' values from the original
data that preceedes the original data
vectors to be used for seeding.
If None, then seed values
are assumed to be zero
@param diff_order: int, The order of the differencing operator (Default=0)
Returns:
List. The original data corresponding to the differences
"""
if not diff_order:
return diff_data
# diff_data is smaller than the actual data that produced the diff
# hence we need some seed/initial values to construct the original data vector
intermed_initial_vals = [0]*diff_order
if orig_data_seed_values:
# if seed values are not provided then all initial values are 0
curr_diff = orig_data_seed_values
intermed_initial_vals[0] = curr_diff[-1]
for each_level in range(1, min(diff_order, len(orig_data_seed_values))):
curr_diff = _calculate_diff(curr_diff, 1)
intermed_initial_vals[each_level] = curr_diff[-1]
output = diff_data
n_elements = len(diff_data)
while diff_order > 0:
curr_output = [0] * n_elements
curr_output[0] = output[0] + intermed_initial_vals[diff_order-1]
for i in range(1, len(output)):
# this loop is same as computing cumulative sum
curr_output[i] = output[i] + curr_output[i-1]
output = curr_output
diff_order -= 1
return output # Compute actual differencing
# -------------------------------------------------------------------------
def arima_forecast_help_message(schema_madlib, message, **kwargs):
""" Display a help message about ARIMA forecast function
Args:
@param schema_madlib: String. Schema where MADlib is installed
@param message: String. Which help message to display?
Returns:
String. A detailed help message
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
Forecasting function for ARIMA modeling
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT {schema_madlib}.arima_forecast
(
model_table TEXT, -- Model table name returned by ARIMA training function
output_table TEXT, -- Output table to store forecast values
steps_ahead INTEGER -- Number of steps to forecast ahead
)
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The following table is created:
output_table: Table containing the ARIMA model forecast with the following columns
- group_by_cols -- Grouping col value
-- (if grouping parameter provided during model training)
- steps_ahead INTEGER -- Time step ahead
- forecast_value FLOAT8 -- Forecast of the time step
"""
else:
return """
ARIMA forecast function
--------------------------------------
For an overview on usage, run:
SELECT {schema_madlib}.arima_forecast('usage');
""".format(schema_madlib=schema_madlib)
# -----------------------------------------------------------------------
# ------------------------------------------------------------------------------
# -- Unit tests for python only functions---------------------------------------
import unittest
class TestForecast(unittest.TestCase):
"""
The import statements will have to be commented to execute the unittest
since the modules cannot be imported when run standalone.
"""
def setUp(self):
self.time_data1 = range(10)
self.time_data2 = [i**2 for i in range(10)]
self.time_data3 = [i**3 for i in range(10)]
self.time_data4 = [i**4 for i in range(10)]
self.diff_data1 = _calculate_diff(self.time_data1, 1)
self.diff_data2 = _calculate_diff(self.time_data2, 2)
self.diff_data3 = _calculate_diff(self.time_data3, 3)
self.diff_data4 = _calculate_diff(self.time_data4, 4)
self.diff_data4_3 = _calculate_diff(self.time_data4, 3)
def tearDown(self):
pass
def test_diffs(self):
self.assertEqual([1]*9, _calculate_diff(self.time_data1, 1))
self.assertEqual([2]*8, _calculate_diff(self.time_data2, 2))
self.assertEqual([6, 12, 18, 24, 30, 36, 42, 48],
_calculate_diff(self.time_data3, 2))
self.assertEqual([6]*7, _calculate_diff(self.time_data3, 3))
self.assertEqual([24]*6, _calculate_diff(self.time_data4, 4))
self.assertEqual([36, 60, 84, 108, 132, 156, 180],
_calculate_diff(self.time_data4, 3))
def test_rev_diffs(self):
self.assertEqual(self.time_data1[1:],
_calculate_rev_diff(self.diff_data1,
self.time_data1[0], 1))
self.assertEqual(self.time_data2[2:],
_calculate_rev_diff(self.diff_data2,
self.time_data2[0:2], 2))
self.assertEqual(self.time_data3[3:],
_calculate_rev_diff(self.diff_data3,
self.time_data3[0:3], 3))
self.assertEqual(self.time_data4[4:],
_calculate_rev_diff(self.diff_data4,
self.time_data4[0:4], 4))
self.assertEqual(self.time_data4[3:],
_calculate_rev_diff(self.diff_data4_3,
self.time_data4[0:3], 3))
if __name__ == '__main__':
unittest.main()