blob: 6ae75888cb9be5ad803f3ce312f46449b66b0a4a [file] [log] [blame]
# -----------------------------------------------------------------------
# Implementation of ARIMA model
#
# Currently, we implement CSS method
# -----------------------------------------------------------------------
from utilities.utilities import unique_string
import plpy
import random
import math
import time
from utilities.utilities import _assert
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import table_exists
from utilities.utilities import __mad_version
from utilities.utilities import _array_to_string
from utilities.utilities import preprocess_keyvalue_params
from utilities.utilities import add_postfix
# ----------------------------------------------------------------------
version_wrapper = __mad_version()
madvec = version_wrapper.select_vecfunc()
# ----------------------------------------------------------------------
# -- Help messages -----------------------------------------------------
def arima_train_help_message (schema_madlib, message=None, **kwargs):
"""
Given a help string, provide usage information
Args:
schema_madlib: String. Schema where MADlib is installed
message: String. Which help message to display
Returns:
String. A detailed help message
"""
if message is not None and \
message.lower() in ("usage", "help", "?"):
return """
ARIMA modeling for time series analysis
-----------------------------------------------------------------------
USAGE
-----------------------------------------------------------------------
SELECT madlib.arima_train
(
input_table TEXT, -- Source table name
output_table TEXT, -- Output table prefix to create model,
-- summary and residual table for the ARIMA model
-- Tables created are:
-- output_table
-- <output_table>_summary
-- <output_table>_residual
timestamp_col TEXT, -- Index column (e.g. timestamp) that orders the time series
timeseries_col TEXT, -- Column that contains the values of the time series
group_by_cols TEXT, -- Comma-separated list of grouping columns
-- (Optional, Default: NULL)
-- Currently disabled - Any non-NULL value is ignored.
include_mean BOOLEAN, -- Include constant mean in model?
-- (Optional, Default: False)
non_seasonal_orders INTEGER[] -- Array of non-seasonal AR, I, and MA orders
-- (Optional, Default: ARRAY[1,1,1])
optimizer_params TEXT -- Control parameters for optimizer
)
The optimizer_params is a string with the format of
'max_iter=..., e1=..., e2=..., e3=..., tau=..., hessian_delta=...
The parameters can be in any order and, if a parameter is missing,
the default value is used for that parameter.
If 'optimizer_params' is not provided, then all parameters have the
following default values:
max_iter = 100, tau = 1e-3,
e1 = 1e-15, e2 = 1e-15, e3 = 1e-15,
hessian_delta=1e-4.
See user doc for details about each parameter.
-----------------------------------------------------------------------
OUTPUT
-----------------------------------------------------------------------
The following tables are created:
1. output_table: Table containing the ARIMA model
Contains following columns:
- mean: Model intercept (only if 'include_mean' is True)
mean_std_error: Standard errors for intercept
- ar_params: Auto-regressions parameters of the ARIMA model.
ar_std_errors: Standard errors for AR parameters
- ma_params: Moving average parameters of the ARIMA model.
ma_std_errors: Standard errors for MA parameters
2. <output_table>_summary: Table containing descriptive statistics of the ARIMA model
Contains following columns:
- input_table: Table name with the source data
- timestamp_col: Column name in source table that contains the timestamp index to data
- timeseries_col: Column name in source table that contains the data values
- non_seasonal_orders: Orders of the non-seasonal ARIMA model
- include_mean: True if intercept was included in ARIMA model
- residual_variance: Variance of the residuals
- log_likelihood: Log likelihood value (when using MLE)
- iter_num: How many iterations
- exec_time: Total time taken to train the model
3. <output_table>_residual: Table containing the residuals for each data point in 'input_table'
Contains following columns:
- <timestamp_col>: Same as the 'timestamp_col' parameter
(all indices from source table included except the first
'd' elements, where 'd' is the differencing order value
from 'non_seasonal_orders')
- residual: Residual value for each data point
"""
else:
return """
Given a time series, the ARIMA model is a tool for understanding and,
perhaps, predicting future values in this series. The model
consists of three parts, an autoregressive (AR) part, a moving average (MA)
part, and integrated (I) part where an initial differencing step can be applied
to remove any non-stationarity in the signal. The model is generally referred to
as an ARIMA(p,d,q) model where parameters p, d, and q are non-negative integers
that refer to the order of the autoregressive, integrated, and moving average
parts of the model respectively.
-------
For an overview on usage, run:
SELECT madlib.arima_train('usage');
-------
""".format(schema_madlib=schema_madlib)
# ----------------------------------------------------------------------
def arima_train (schema_madlib, input_table, output_table,
timestamp_col, timeseries_col, grouping_cols,
include_mean, non_seasonal_orders, optimizer_params):
""" The main Python function to train the ARIMA models
@brief Estimate the ARIMA(p,d,q) model coefficients for a given time
series. Grouping support is not implemented at this point.
Args:
@param schema_madlib - string, Schema of MADlib
@param input_table - string, A table that contains a time series
@param output_table - string, Output table name prefix
@param timestamp_col - string, the order id column of time series
@param timeseries_col - string, Column for time series
@param grouping_cols - string, Place holder for support of
grouping, default value is None
@param include_mean - boolean, whether to include the mean value,
default value is False. This should not be
called "intercept"
@param non_seasonal_orders - array of integers with length of 3,
orders for AR, I and MA, default
value is [1,1,1]
Returns:
This function returns a list that contains (mu, phi, theta),
where
* mu - double, estimated mean value of the time series
* phi - array of doubles, estimated AR coefficients
* theta - array of doubles, estimated MA coefficients
This function also creates three output tables:
* <output_table> - contains the above estimated values
* <output_table>_summary - contains diagnostic info
* <output_table>_perf - contains performance info
* <output_table>_residual - contains the residuals
"""
old_msg_level = plpy.execute("""
select setting from pg_settings
where name='client_min_messages'
""")[0]['setting']
plpy.execute("set client_min_messages to error")
t0 = time.time()
input_name = input_table
if non_seasonal_orders:
non_seasonal_orders = madvec(non_seasonal_orders)
try:
non_seasonal_orders = map(int, non_seasonal_orders)
except:
plpy.error("ARIMA error: non_seasonal_orders are not integers")
# Validate the input parameters
_validate_params(schema_madlib, input_table, output_table,
timestamp_col, timeseries_col, non_seasonal_orders,
include_mean, grouping_cols)
# Fix the optimizer control parameters.
# Default values are set in _extract_params(...)
(max_iter, e1, e2, e3, tau, hessian_delta,
chunk_size, param_init) = _extract_params(schema_madlib, optimizer_params)
# deal with I-order
d = non_seasonal_orders[1]
if d > 0:
# mean value is always 0 for ARIMA models with differencing
include_mean = False
# use the differenced time series
(input_table, dist_tbl) = _diff(schema_madlib, input_table, timestamp_col,
timeseries_col, non_seasonal_orders[1],
chunk_size)
# set initial values
p = non_seasonal_orders[0]
q = non_seasonal_orders[2]
# Re-distribute the data, so that each segment has consecutive rows.
# Also add an auxiliary array column which contains p previous time
# series values.
# tbl_ts is the time series table we are going to use. For convenience,
# we fix the column names: tid, distid and tvals for time, distribution
# id and an array of current value and p previous values.
# if the data has been differenced, we only need to do adjustment
if d > 0:
# note that if p == 0 the input_table will be returned
tbl_ts = _adjust_diff_data(schema_madlib, input_table, p)
else:
(tbl_ts, dist_tbl) = _redist_data(schema_madlib, input_table, timestamp_col,
timeseries_col, p, chunk_size)
# inital values of phi and theta
if param_init == 'random':
# random values in [-0.05, 0.05]
phi = [(random.random() - 0.5) * 0.1 for i in range(p)]
theta = [(random.random() - 0.5) * 0.1 for i in range(q)]
else:
# fix to zero
phi = [0] * p
theta = [0] * q
if include_mean:
mean = plpy.execute("select avg({timeseries_col}) as avg"
" from {input_table}".format(
timeseries_col=timeseries_col,
input_table=input_table))[0]['avg']
else:
mean = None
# J^T*J, J^T*Z and |Z|^2 can be compted in one parallel scan of tbl_ts
# using an aggregate function.
# What we need in Python are just g and |Z|^2. We also
# need the initial value of u (see design doc for their meanings). We
# can load these values into Python since they are small.
# We also need each group's initial values for the next iteration,
# and a small table that stores J^T*J and g=J^T*Z for computing
# delta with different u's
tbl_grp_init = None
tbl_jj_g = None
g, z2, u, tbl_grp_init, tbl_jj_g = \
_compute_onestep(schema_madlib, tbl_ts, p, q, phi, theta,
mean, tbl_grp_init, tbl_jj_g, 0, 1)
u = tau * u
# update phi, theta and mean etc in iterations
stop = _vec_modulus(g) <= e1
v = 2 # initial value for step
k = 0
# previous iteration number
prev_iter = 1
iter_count = 1
while not stop and k <= max_iter:
k += 1
rho = 0
while not stop and rho <= 0:
delta = _lm_delta(schema_madlib, tbl_jj_g, u, prev_iter)
if mean is None:
params = phi + theta
else:
params = phi + theta + [mean]
if _vec_modulus(delta) <= e2 * _vec_modulus(params):
stop = True
else:
phi_new = [phi[i] + delta[i] for i in range(p)]
theta_new = [theta[i] + delta[i+p] for i in range(q)]
if mean is not None:
mean_new = mean + delta[p+q]
else:
mean_new = None
iter_count += 1
(g_new, z2_new, u_useless, tbl_grp_init,
tbl_jj_g) = _compute_onestep(schema_madlib, tbl_ts,
p, q, phi_new, theta_new,
mean_new, tbl_grp_init,
tbl_jj_g,
prev_iter, iter_count)
rho = ((z2 - z2_new) /
_vec_inner_prod(delta,
[u*a+b for a,b in zip(delta, g)]))
if rho > 0:
prev_iter = iter_count
phi = phi_new
theta = theta_new
mean = mean_new
g = g_new
z2 = z2_new
stop = _vec_modulus(g) <= e1 or z2 <= e3
v = 2
u = u * max(1./3, 1. - pow((2*rho - 1),3))
else:
v = 2 * v
u = u * v
# In the end, we need to compute and output the residuals to a table
_create_resid_tbl(schema_madlib, tbl_ts, p, non_seasonal_orders[1],
q, phi, theta, mean, add_postfix(output_table, "_residual"),
timestamp_col, tbl_grp_init, iter_count, dist_tbl)
std_errs, resid_var, loglik = _compute_statistics(schema_madlib,
tbl_ts, p, q, phi,
theta, mean, hessian_delta)
# clean up intermediate tables
_drop_tbl(tbl_ts)
_drop_tbl(tbl_jj_g)
_drop_tbl(tbl_grp_init)
if non_seasonal_orders[1] > 0:
_drop_tbl(input_table)
dt = time.time() - t0
_output_result(p, q, phi, theta, mean, std_errs,
output_table)
_output_summary(input_name, timestamp_col, timeseries_col,
non_seasonal_orders, include_mean, resid_var,
loglik, dt, k, add_postfix(output_table, "_summary"))
plpy.execute("set client_min_messages to " + old_msg_level)
return None
# ----------------------------------------------------------------------
def _output_summary (input_name, timestamp_col, timeseries_col,
non_seasonal_orders, include_mean, resid_var,
loglik, dt, iter_num, output_table):
""" Output the summary table
Args:
@param input_name - string, the original input table name
@param timestamp_col - string, column name of time stamps
@param timeseries_col - string, column name of time series
@param non_seasonal_orders - array of integers, [p,d,q]
@param include_mean - boolean, include mean or not
@param resid_var - double, the residual variance
@param loglik - double, log-likelihood
@param dt - double, how many ms time that was used in ARIMA
@param output_table - string, the output summary table name
"""
plpy.execute(
"""
create table {output_table} as
select
'{input_name}' as input_table,
'{timestamp_col}' as timestamp_col,
'{timeseries_col}' as timeseries_col,
'{non_seasonal_orders}'::integer[] as non_seasonal_orders,
{include_mean}::boolean as include_mean,
{resid_var}::double precision as residual_variance,
{loglik}::double precision as log_likelihood,
{iter_num}::integer as iter_num,
{dt:.2f}::double precision as \"exec_time (s)\";
""".format(input_name=input_name, timestamp_col=timestamp_col,
timeseries_col=timeseries_col, resid_var=resid_var,
include_mean=include_mean, loglik=loglik,
dt=dt, output_table=output_table, iter_num=iter_num,
non_seasonal_orders=_array_to_string(non_seasonal_orders)))
return None
# ----------------------------------------------------------------------
def _output_result (p, q, phi, theta, mean, std_errs, output_table):
""" Output the statistics summary into a table
Args:
@param p, q - integer, AR and MA orders
@param phi, theta - array of doubles, AR and MA coef
@param mean - double, mean value. None if include_mean is False
@param std_errs - array of doubles, the standard errors for all coef
@param output_table - string, the result table to be created
"""
# create other output tables
param_str = []
param_val = []
if p > 0:
param_str += ["ar_params double precision[]",
"ar_std_errors double precision[]"]
param_val += ["'" + _array_to_string(phi) + "'::double precision[]",
"'" + _array_to_string(std_errs[0:p]) + "'::double precision[]"]
if q > 0:
param_str += ["ma_params double precision[]",
"ma_std_errors double precision[]"]
param_val += ["'" + _array_to_string(theta) + "'::double precision[]",
"'" + _array_to_string(std_errs[p:(p+q)]) + "'::double precision[]"]
if mean is not None:
param_str += ["mean double precision",
"mean_std_error double precision"]
param_val += [str(mean), str(std_errs[p+q])]
param_str = ", ".join(param_str)
param_val = ", ".join(param_val)
plpy.execute(
"""
create table {output_table} ({param_str});
""".format(output_table=output_table, param_str=param_str))
plpy.execute(
"""
insert into {output_table} values ({param_val});
""".format(output_table=output_table, param_val=param_val))
return None
# -----------------------------------------------------------------------
def _validate_params (schema_madlib, input_table, output_table, timestamp_col,
timeseries_col, non_seasonal_orders, include_mean,
grouping_cols=None):
""" Validate the input parameters for arima_train
Args:
@param schema_madlib: str, Name of the schema where MADlib is installed
@param input_table: str, Name of the source table
@param output_table: str, Name of the output table
@param timestamp_col: str, Name of the timestamp (index) column
@param timeseries_col: str, Name of the timeserirs (data) column
@param non_seasonal_orders: list[int], Array of 3 integers indicating order of ARIMA
@param include_mean: bool, indicates if mean value should be included in model
@param grouping_cols: list[str], Name of the grouping columns
@param optimizer_params: list[str], Optimizer-specific params
Returns:
None
Throws:
plpy.error if any argument is invalid
"""
_assert(input_table is not None and table_exists(input_table),
"ARIMA error: Source data table does not exist!")
# output tables are created from the 'output_table' parameter name
if output_table:
output_table_names = [add_postfix(output_table, i) for i in ('', '_summary', '_residual')]
for each_table in output_table_names:
_assert(not table_exists(each_table, only_first_schema=True),
"ARIMA error: Output table {0}"
" already exists!".format(str(each_table)))
else:
plpy.error("ARIMA error: Invalid output table prefix!")
# timestamp and timeseries column names
_assert(columns_exist_in_table(input_table, [timestamp_col, timeseries_col],
schema_madlib),
"ARIMA error: {1} columns do not exist in {0}!"
.format(input_table,
str([timestamp_col, timeseries_col])))
_assert(non_seasonal_orders is not None, "ARIMA error: non_seasonal_orders should not be NULL")
_assert(len(non_seasonal_orders)==3, "ARIMA error: 3 integers not supplied "
"for non_seasonal_orders")
_assert(all(isinstance(i, int) for i in non_seasonal_orders),
"ARIMA error: Non-seasonal orders must be non-negative integers! ")
_assert(all(i >= 0 for i in non_seasonal_orders),
"ARIMA error: Non-seasonal orders must be non-negative!")
_assert(any(i != 0 for i in non_seasonal_orders),
"ARIMA error: Non-seasonal orders must not all be zeros!")
_assert(all(i < 20 for i in non_seasonal_orders),
"ARIMA error: Non-seasonal orders should be less than 20! ")
_assert(non_seasonal_orders[0] != 0 or non_seasonal_orders[2] != 0,
"ARIMA error: AR order and MA order cannot both be zeros!")
_assert(isinstance(include_mean, bool),
"ARIMA error: include_mean is not a Boolean! ")
if grouping_cols:
_assert(columns_exist_in_table(input_table, grouping_cols, schema_madlib),
"ARIMA error: Some of the grouping "
" columns do not exist in {0}!".
format(input_table))
# -----------------------------------------------------------------------
def _extract_params (schema_madlib, optimizer_params):
""" Extract optimizer control parameters or set the default values
@brief optimizer_params is a string with the format of
'max_iter=..., e1=..., e2=..., e3=..., tau=...'. The order
does not matter. If a parameter is missing, then the default
value for it is used. If optimizer_params is None or '',
then all default values are used. If the parameter specified
is none of 'max_iter', 'e1', 'e2', 'e3', 'tau', 'hessian_delta',
'chunk_size', and 'param_init' then an error is raised. This function
also validates the values of these parameters.
Raises:
* ARIMA error - If the parameter is unsupported or the value is
not valid.
"""
allowed_params = set(["max_iter", "tau", "e1", "e2", "e3",
"hessian_delta", "chunk_size", "param_init"])
name_value = dict(max_iter=100, tau=1e-3, e1=1e-15, e2=1e-15, e3=1e-15,
hessian_delta=1e-4, chunk_size=10000, param_init='zero')
if optimizer_params is None or len(optimizer_params) == 0:
return (name_value['max_iter'], name_value['e1'], name_value['e2'],
name_value['e3'], name_value['tau'],
name_value['hessian_delta'], name_value['chunk_size'],
name_value['param_init'])
for s in preprocess_keyvalue_params(optimizer_params):
items = s.split("=")
if (len(items) != 2):
plpy.error("ARIMA error: Optimizer parameter list has incorrect format!")
param_name = items[0].strip(" \"").lower()
param_value = items[1].strip(" \"").lower()
if param_name not in allowed_params:
plpy.error(
"""
ARIMA error: {param_name} is not a valida parameter name.
Run:
SELECT {schema_madlib}.arima_train('usage');
to see the allowed parameters.
""".format(param_name=param_name,
schema_madlib=schema_madlib))
if param_name == "max_iter":
try:
name_value["max_iter"] = int(param_value)
except:
plpy.error("ARIMA error: max_iter must be an integer number!")
if param_name == "tau":
try:
name_value["tau"] = float(param_value)
except:
plpy.error("ARIMA error: tau must be a double value!")
if param_name == "e1":
try:
name_value["e1"] = float(param_value)
except:
plpy.error("ARIMA error: e1 must be a double value!")
if param_name == "e2":
try:
name_value["e2"] = float(param_value)
except:
plpy.error("ARIMA error: e2 must be a double value!")
if param_name == "e3":
try:
name_value["e3"] = float(param_value)
except:
plpy.error("ARIMA error: e3 must be a double value!")
if param_name == "hessian_delta":
try:
name_value["hessian_delta"] = float(param_value)
except:
plpy.error("ARIMA error: hessian_delta must be a double value!")
if param_name == "chunk_size":
try:
name_value["chunk_size"] = int(param_value)
except:
plpy.error("ARIMA error: chunk_size must be an integer value!")
if param_name == "param_init":
name_value["param_init"] = param_value
if name_value["max_iter"] <= 0:
plpy.error("ARIMA error: max_iter must be positive!")
if name_value["tau"] <= 0:
plpy.error("ARIMA error: tau must be positive!")
if name_value["e1"] <= 0:
plpy.error("ARIMA error: e1 must be positive!")
if name_value["e2"] <= 0:
plpy.error("ARIMA error: e2 must be positive!")
if name_value["e3"] <= 0:
plpy.error("ARIMA error: e3 must be positive!")
if name_value["hessian_delta"] <= 0:
plpy.error("ARIMA error: hessian_delta must be positive!")
if name_value["chunk_size"] < 10000:
plpy.error("ARIMA error: chunk_size must be at least 10000!")
if name_value["param_init"] not in ('zero', 'random'):
plpy.error("ARIMA error: param_init must be 'zero' or 'random'!")
return (name_value['max_iter'], name_value['e1'], name_value['e2'],
name_value['e3'], name_value['tau'], name_value['hessian_delta'],
name_value['chunk_size'], name_value['param_init'])
# -----------------------------------------------------------------------
def _redist_data(schema_madlib, input_table, timestamp_col,
timeseries_col, d, chunk_size=10000):
""" Re-distribute data for parallel differencing
@brief Redistribute the data by distributing the consecutive records
to same segment, and also insert some redundant records for
parallel differencing. This is only for data differencing,
and a separate re-distribution function is needed for the
actual computation.
Args:
@param schema_madlib - str, MADlib schema name
@param input_table - str, A table that contains a time series
@param timestamp_col - str, the order of time series
@param timeseries_col - str, the values of time series
@param d - int, order of differencing
@param length - int, length of consecutive records that goes to the
same chunk
Returns:
* output_tables - list of strs, name of the table that contains the
redistributed data
"""
dist_table = unique_string() + "redist__"
# Redistribute the data
# <tid, tval, distid>
plpy.execute(
"""
create temp table {dist_table} as
select
((tid - 1) / {chunk_size})::integer + 1 as distid,
tid, origin_tid, tval
from (
select
row_number() over (order by {timestamp_col}) as tid,
{timestamp_col} as origin_tid,
{timeseries_col} as tval
from
{input_table}
) s
m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)')
""".format(dist_table=dist_table, chunk_size=chunk_size,
timestamp_col=timestamp_col,
timeseries_col=timeseries_col, input_table=input_table))
# Insert redundant rows
# For each chunk except for the first one,
# insert the last d rows from the previous chunk
# These redundant rows have origin_tid value NULL
if (d > 0):
plpy.execute(
"""
insert into {dist_table}
select
o.distid + 1, tid, NULL, tval
from (
select
distid, max(tid) maxid
from
{dist_table}
where
distid <> (select max(distid) from {dist_table})
group by distid
) q1, {dist_table} o
where
q1.distid = o.distid and
q1.maxid - o.tid < {d}
""".format(dist_table=dist_table, d=d))
output_table = unique_string() + "redist_agg__"
# Aggregate the chunk into an array
# Note that each aggregated chunk except for the first one
# has additonal p or d elements and tid is 1 for the first
# chunk and is the actual starting tid for the other chunks
# - the corresponding tval is the pth (0 based) element in
# tvals
# Note that both distid and tid starts from 1
plpy.execute(
"""
create temp table {output_table} as
select
distid,
(distid - 1) * {chunk_size} + 1 as tid,
array_agg(tval order by tid) as tvals
from
{dist_table}
group by
distid
m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)')
""".format(schema_madlib=schema_madlib, output_table=output_table, dist_table=dist_table,
chunk_size=chunk_size))
return (output_table, dist_table)
# -----------------------------------------------------------------------
def _diff (schema_madlib, input_table, timestamp_col, timeseries_col, d,
chunk_size=10000):
""" Compute the time series differencing
@brief For a time series ts, compute (1-B)^d ts, where d is the diff
order.
Args:
@param schema_madlib - str, MADlib schema name
@param input_table - str, A table that contains a time series
@param timestamp_col - str, the order of time series
@param timeseries_col - str, the values of time series
@param d - int, order of differencing i.e. I in ARIMA
Returns:
* output_table - str, name of the table that contains the
differenced data
"""
# Redistribute the data for parallel preprocessing
(redist_table, dist_tbl) = _redist_data(
schema_madlib, input_table, timestamp_col, timeseries_col, d,
chunk_size)
output_table = unique_string() + 'diff_tbl__'
# Generate the coefficients for d-th order differencing
plpy.execute(
"""
create temp table {output_table} as
select
distid::integer,
case when tid = 1 then 1::integer
else (tid - {d})::integer
end as tid,
tvals
from
(
select
distid, tid,
{schema_madlib}.__arima_diff(tvals, {d}) tvals
from
{input_table}
) q
m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)')
""".format(schema_madlib=schema_madlib, d=d,
input_table=redist_table,
output_table=output_table))
# clean up the intermediate table
_drop_tbl(redist_table)
return (output_table, dist_tbl)
# -----------------------------------------------------------------------
def _adjust_diff_data(schema_madlib, diff_table, p):
"""
Add the last p values of the previous chunk
to each chunk except for the first chunk
"""
if p < 1:
return diff_table
output_table = unique_string() + 'adjust_diff_tbl__'
plpy.execute(
"""
create temp table {output_table} as
select
t1.distid::integer, t1.tid::integer,
{schema_madlib}.__arima_adjust(t1.distid, t1.tvals, t2.tvals, {p}) as tvals
from
{diff_table} as t1, {diff_table} as t2
where
t1.distid = t2.distid + 1 or
(t1.distid = 1 and t2.distid = 1)
m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)')
""".format(output_table=output_table, diff_table=diff_table,
p=p, schema_madlib=schema_madlib))
_drop_tbl(diff_table)
return output_table
# -----------------------------------------------------------------------
def _create_resid_tbl (schema_madlib, tbl_ts, p, d, q, phi,
theta, mean, tbl_resid, timestamp_col,
tbl_grp_init, iter_count, dist_tbl):
""" Compute the residuals (noise) for ARIMA model
@brief Compute the residuals and store them in a table. The table
name is given by the user.
Args:
@param schema_madlib - str, MADlib schema
@param table_ts - str, time series table name
@param timestamp_col - str, name of time id column
@param p - int, AR order
@param q - int, MA order
@param phi - double array, AR coefficients in current iteration
@param theta - double array, MA coefficients in current iteration
@param mean - double, mean value in current iteration. None if
include_mean is False.
@param tbl_resid - str, name of the residual table for output
@param timestamp_col - str, name of the index column for residual table
@param tbl_grp_init - string, name of the table that stores
the inital Z values for each group. The
Z values are from the previous iteration.
This table will be dropped at the end of
this function.
@param iter_count - int, final iteration count
Returns:
Returns nothing, but creates a table to store the residual values.
The table name is in tbl_resid.
"""
mean = 0.0 if mean is None else mean
phi_str = "NULL"
if p != 0:
phi_str = "array[" + ",".join(str(i) for i in phi) + "]"
theta_str = "NULL"
if q != 0:
theta_str = "array[" + ",".join(str(i) for i in theta) + "]"
# Joining on tid is really slow on large data
# (more than 100 times slower).
# This is why we use join on distid, which gives almost the
# same performance as that of the original code.
plpy.execute(
"""
create table {tbl_resid} as
select
unnest(origin_tid) as {timestamp_col},
unnest(residuals) as residual
from (
select
s1.distid,
{schema_madlib}.__arima_residual(
s1.distid, s1.tvals, {p}, {d}, {q},
{phi}::double precision[],
{theta}::double precision[], {mean},
s2.prez
) as residuals
from
{tbl_ts} s1, {tbl_grp_init} s2
where
((s1.distid = s2.distid + 1) or
(s1.distid = 1 and s2.distid = 1)) and
s2.iter = {iter_count}
) s1, (
select
distid,
array_agg(origin_tid order by origin_tid) as origin_tid
from {dist_tbl}
where origin_tid is not NULL
group by distid
) s2
where s1.distid = s2.distid
""".format(tbl_resid=tbl_resid, timestamp_col=timestamp_col,
schema_madlib=schema_madlib,
phi=phi_str, theta=theta_str,
mean=mean, p=p, d=d, q=q,
tbl_ts=tbl_ts, tbl_grp_init=tbl_grp_init,
iter_count=iter_count, dist_tbl=dist_tbl))
_drop_tbl(dist_tbl)
return None
# ----------------------------------------------------------------------
def _compute_onestep (schema_madlib, tbl_ts, p, q, phi, theta, mean,
tbl_grp_init=None, tbl_jj_g=None, prev_iter=0,
iter_count=0):
""" Call an aggregate function to compute quantities in an
iteration
@brief J^T*J, J^T*Z and |Z|^2 can be compted in one parallel scan
of tbl_ts.
Args:
@param schema_madlib - string, MADlib schema name
@param tbl_ts - string, name of the re-distributed data table
@param p - integer, AR order
@param q - integer, MA order
@param phi - array of doubles, current estimate for AR
coefficients
@param theta - array of doubles, current estimate for MA
coefficients
@param mean - double, current estimate for mean value of
time series. If it is None, then it is not
estimated.
@param tbl_grp_init - string, name of the table that stores
the inital Z values for each group. The
Z values are from the previous iteration.
This table will be dropped at the end of
this function.
Returns:
* delta - array of doubles, change of coefficients
* g - array of doubles, vector J^T * Z
* Z_sq - double, |Z|^2
* u - double, updated value for u, used in changing step size
* tbl_grp_init - string, a new table is created using newly
computed noise values.
It has columns: iter, jj, jz, z2, distid, preZ,
preJ. Each row
of prez is an array of q previous noise values.
preJ contains the initial values for the group in
the previous iteration. iter is the current iteration
number.
Each iteration inserts new values into this table
* tbl_jj_g - string, a table storing J^T*J and J^T*Z. Each iteration
inserts new values into this table
@note u and tbl_grp_init are both None when this function is called
for the first time. Otherwise, neither of them is None. The
first time this function got called, tbl_grp_init are
initialized with zeros.
tbl_grp_init is needed only when q != 0
"""
# If mean value is not going to be estimated
if mean is None:
mean = "NULL"
phi_str = "NULL"
if p != 0:
phi_str = "array[" + ",".join(str(i) for i in phi) + "]"
theta_str = "NULL"
if q != 0:
theta_str = "array[" + ",".join(str(i) for i in theta) + "]"
# If this is the first time to call this function
if tbl_grp_init is None:
# initialize the previous noise values with 0
tbl_grp_init = unique_string() + "init__"
# How many groups
ngrps = plpy.execute("""
select max(distid) as ngrps from {0}
""".format(tbl_ts))[0]['ngrps']
l = p + q if mean == "NULL" else p + q + 1
# initial previous iteration values, use 0
arrz_str = "NULL"
if q != 0:
arrz_str = "array[" + ",".join(['0'] * q) + "]"
arrj_str = "NULL"
if q != 0:
arrj_str = "array[" + ",".join(['0'] * (q * l)) + "]"
arrjj_str = "array[" + ",".join(['0'] * (l * l)) + "]"
arrjz_str = "array[" + ",".join(['0'] * l) + "]"
# Besides distid and prez, this table has extra columns that
# stores J^T*J, J^T*Z and Z^2 for each group
# These column names are jj, jz, and z2
# Each group uses previous group's final prez and prej from
# previous iteration.
# ** First group's prez and prej are always 0. **
plpy.execute(
"""
create temp table {tbl_grp_init} as
select
i as distid,
{arrz}::double precision[] as prez,
{arrj}::double precision[] as prej,
{arrjj}::double precision[] as jj,
{arrjz}::double precision[] as jz,
0.0::double precision as z2, 0 as iter
from generate_series(1,{ngrps}) i
m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid, iter)');
alter table {tbl_grp_init} add primary key (distid, iter);
""".format(tbl_grp_init=tbl_grp_init, arrz=arrz_str,
arrj=arrj_str, ngrps=ngrps, arrjj=arrjj_str,
arrjz=arrjz_str))
if tbl_jj_g is None:
tbl_jj_g = unique_string() + "jj_g__"
plpy.execute(
"""
create temp table {tbl_jj_g} (
jj double precision[],
jz double precision[],
z2 double precision,
u double precision,
iter integer
)
m4_ifdef(`__POSTGRESQL__', `', `distributed by (iter)');
alter table {tbl_jj_g} add primary key (iter);
""".format(tbl_jj_g=tbl_jj_g))
plpy.execute(
"""
insert into {tbl_grp_init}
select
distid, (f).prez, (f).prej, (f).jj, (f).jz, (f).z2,
{iter_count}
from (
select
{schema_madlib}.__arima_lm(
s1.distid, tvals, {p}, {q},
{phi}::double precision[],
{theta}::double precision[],
{mean}, prez, prej
) as f,
s1.distid
--from {tbl_ts} s1, {tbl_grp_init} s2
--where
-- (s1.distid = s2.distid + 1 or
-- (s1.distid = 1 and s2.distid = 1)) and
-- s2.iter = {prev_iter}
from {tbl_ts} s1 left join {tbl_grp_init} s2
on (s1.distid = s2.distid + 1 and s2.iter = {prev_iter})
) s
""".format(schema_madlib=schema_madlib, tbl_ts=tbl_ts,
tbl_grp_init=tbl_grp_init, p=p, q=q,
phi=phi_str, mean=mean,
theta=theta_str,
iter_count=iter_count,
prev_iter=prev_iter))
# Need to sum J^T*J, J^T*Z and Z^2 over all groups to get
# the result.
plpy.execute(
"""
insert into {tbl_jj_g}
select (f).jj, (f).jz, (f).z2, (f).u, {iter_count}
from (
select
{schema_madlib}.__arima_lm_result_agg(
jj, jz, z2
) as f
from {tbl_grp_init}
where iter = {iter_count}
) s
""".format(schema_madlib=schema_madlib, tbl_jj_g=tbl_jj_g,
tbl_grp_init=tbl_grp_init, iter_count=iter_count))
res = plpy.execute("select jz, z2, u from {tbl_jj_g} where"
" iter = {iter_count}".format(tbl_jj_g=tbl_jj_g,
iter_count=iter_count))[0]
return (madvec(res['jz'], text = False),
res['z2'], res['u'], tbl_grp_init, tbl_jj_g)
# ----------------------------------------------------------------------
def _vec_modulus (vec):
""" Compute the modulus of a vector
@param vec - array of doubles, a vector
"""
return math.sqrt(sum(v*v for v in vec))
# ----------------------------------------------------------------------
def _vec_inner_prod (vec1, vec2):
""" Compute the inner product of two vectors
@brief Since this function is only used internally, we do not check
whether the two vectors have the same length
@param vec1, vec2 - arrays of doubles
"""
return sum(a*b for a, b in zip(vec1, vec2))
# ----------------------------------------------------------------------
def _lm_delta (schema_madlib, tbl_jj_g, u, iter_count):
""" Compute the vector delta
Args:
@param schema_madlib - string, MADlib schema
@param tbl_jj_g - string, a table containing J^T*J and J^T*Z. It
has only one row, two columns
@param u - double, the current step size
Returns:
* delta - array of doubles, the change of coefficients
"""
return madvec(plpy.execute(
"""
select
{schema_madlib}.__arima_lm_delta(
jj, jz, {u}
) as delta
from {tbl_jj_g}
where iter = {iter_count}
""".format(schema_madlib=schema_madlib,
u=u, iter_count=iter_count,
tbl_jj_g=tbl_jj_g))[0]['delta'],
text = False)
# ----------------------------------------------------------------------
def _drop_tbl (tbl):
""" Drop a table if exists
@param tbl - string, the table name
"""
plpy.execute("drop table if exists {0}".format(tbl))
# ----------------------------------------------------------------------
def _compute_statistics (schema_madlib, tbl_ts, p, q, phi, theta,
mean, delta):
""" Compute residual variance, log-likelihood, standard errors
@brief Compute z^2 for different (phi, theta, mean) and numerically
differentiate to compute Hessian. Residual variance and
log-likelihood are computed using Eqs. (1.2.13, 1.2.14).
Args:
@param schema_madlib - MADlib schema name
@param tbl_ts - time series table name, it has two columns: tid
and tvals, which is the p previous values plus the
current value ordered by time.
@param p - integer, AR order
@param q - integer, MA order
@param phi - array of doubles, AR coefficients
@param theta - array of doubles, MA coefficients
@param mean - double, mean value if include_mean is True, None if
False
@param delta - double, the small step size used to compute Hessian
Returns:
* std_errs - array of doubles, the standard errors of the coefficients
* resid_var - double, the residual variance
* loglik - double, log-likelihood
"""
if mean is None:
mean = "NULL"
res = plpy.execute(
"""
select (f).*
from (
select
{schema_madlib}.__arima_lm_stat_agg(
distid, tvals, {p}, {q},
'{phi}'::double precision[],
'{theta}'::double precision[],
{mean},
{delta}
order by tid
) as f
from {tbl_ts}
) s
""".format(schema_madlib=schema_madlib, p=p, q=q,
phi=_array_to_string(phi), mean=mean,
theta=_array_to_string(theta), delta=delta,
tbl_ts=tbl_ts))[0]
return (madvec(res['std_errs'], text=False),
res['resid_var'], res['loglik'])
# ----------------------------------------------------------------------