| # ----------------------------------------------------------------------- |
| # Implementation of ARIMA model |
| # |
| # Currently, we implement CSS method |
| # ----------------------------------------------------------------------- |
| |
| from utilities.utilities import unique_string |
| import plpy |
| import random |
| import math |
| import time |
| from utilities.utilities import _assert |
| from utilities.validate_args import columns_exist_in_table |
| from utilities.validate_args import table_exists |
| from utilities.utilities import __mad_version |
| from utilities.utilities import _array_to_string |
| from utilities.utilities import preprocess_keyvalue_params |
| from utilities.utilities import add_postfix |
| # ---------------------------------------------------------------------- |
| |
| version_wrapper = __mad_version() |
| madvec = version_wrapper.select_vecfunc() |
| |
| # ---------------------------------------------------------------------- |
| # -- Help messages ----------------------------------------------------- |
| |
| def arima_train_help_message (schema_madlib, message=None, **kwargs): |
| """ |
| Given a help string, provide usage information |
| Args: |
| schema_madlib: String. Schema where MADlib is installed |
| message: String. Which help message to display |
| |
| Returns: |
| String. A detailed help message |
| """ |
| if message is not None and \ |
| message.lower() in ("usage", "help", "?"): |
| return """ |
| ARIMA modeling for time series analysis |
| ----------------------------------------------------------------------- |
| USAGE |
| ----------------------------------------------------------------------- |
| SELECT madlib.arima_train |
| ( |
| input_table TEXT, -- Source table name |
| output_table TEXT, -- Output table prefix to create model, |
| -- summary and residual table for the ARIMA model |
| -- Tables created are: |
| -- output_table |
| -- <output_table>_summary |
| -- <output_table>_residual |
| timestamp_col TEXT, -- Index column (e.g. timestamp) that orders the time series |
| timeseries_col TEXT, -- Column that contains the values of the time series |
| group_by_cols TEXT, -- Comma-separated list of grouping columns |
| -- (Optional, Default: NULL) |
| -- Currently disabled - Any non-NULL value is ignored. |
| include_mean BOOLEAN, -- Include constant mean in model? |
| -- (Optional, Default: False) |
| non_seasonal_orders INTEGER[] -- Array of non-seasonal AR, I, and MA orders |
| -- (Optional, Default: ARRAY[1,1,1]) |
| optimizer_params TEXT -- Control parameters for optimizer |
| ) |
| |
| The optimizer_params is a string with the format of |
| 'max_iter=..., e1=..., e2=..., e3=..., tau=..., hessian_delta=... |
| The parameters can be in any order and, if a parameter is missing, |
| the default value is used for that parameter. |
| If 'optimizer_params' is not provided, then all parameters have the |
| following default values: |
| max_iter = 100, tau = 1e-3, |
| e1 = 1e-15, e2 = 1e-15, e3 = 1e-15, |
| hessian_delta=1e-4. |
| See user doc for details about each parameter. |
| |
| ----------------------------------------------------------------------- |
| OUTPUT |
| ----------------------------------------------------------------------- |
| The following tables are created: |
| 1. output_table: Table containing the ARIMA model |
| Contains following columns: |
| - mean: Model intercept (only if 'include_mean' is True) |
| mean_std_error: Standard errors for intercept |
| - ar_params: Auto-regressions parameters of the ARIMA model. |
| ar_std_errors: Standard errors for AR parameters |
| - ma_params: Moving average parameters of the ARIMA model. |
| ma_std_errors: Standard errors for MA parameters |
| 2. <output_table>_summary: Table containing descriptive statistics of the ARIMA model |
| Contains following columns: |
| - input_table: Table name with the source data |
| - timestamp_col: Column name in source table that contains the timestamp index to data |
| - timeseries_col: Column name in source table that contains the data values |
| - non_seasonal_orders: Orders of the non-seasonal ARIMA model |
| - include_mean: True if intercept was included in ARIMA model |
| - residual_variance: Variance of the residuals |
| - log_likelihood: Log likelihood value (when using MLE) |
| - iter_num: How many iterations |
| - exec_time: Total time taken to train the model |
| 3. <output_table>_residual: Table containing the residuals for each data point in 'input_table' |
| Contains following columns: |
| - <timestamp_col>: Same as the 'timestamp_col' parameter |
| (all indices from source table included except the first |
| 'd' elements, where 'd' is the differencing order value |
| from 'non_seasonal_orders') |
| - residual: Residual value for each data point |
| """ |
| else: |
| return """ |
| Given a time series, the ARIMA model is a tool for understanding and, |
| perhaps, predicting future values in this series. The model |
| consists of three parts, an autoregressive (AR) part, a moving average (MA) |
| part, and integrated (I) part where an initial differencing step can be applied |
| to remove any non-stationarity in the signal. The model is generally referred to |
| as an ARIMA(p,d,q) model where parameters p, d, and q are non-negative integers |
| that refer to the order of the autoregressive, integrated, and moving average |
| parts of the model respectively. |
| ------- |
| For an overview on usage, run: |
| SELECT madlib.arima_train('usage'); |
| ------- |
| """.format(schema_madlib=schema_madlib) |
| |
| # ---------------------------------------------------------------------- |
| |
| def arima_train (schema_madlib, input_table, output_table, |
| timestamp_col, timeseries_col, grouping_cols, |
| include_mean, non_seasonal_orders, optimizer_params): |
| """ The main Python function to train the ARIMA models |
| |
| @brief Estimate the ARIMA(p,d,q) model coefficients for a given time |
| series. Grouping support is not implemented at this point. |
| |
| Args: |
| @param schema_madlib - string, Schema of MADlib |
| @param input_table - string, A table that contains a time series |
| @param output_table - string, Output table name prefix |
| @param timestamp_col - string, the order id column of time series |
| @param timeseries_col - string, Column for time series |
| @param grouping_cols - string, Place holder for support of |
| grouping, default value is None |
| @param include_mean - boolean, whether to include the mean value, |
| default value is False. This should not be |
| called "intercept" |
| @param non_seasonal_orders - array of integers with length of 3, |
| orders for AR, I and MA, default |
| value is [1,1,1] |
| |
| Returns: |
| This function returns a list that contains (mu, phi, theta), |
| where |
| * mu - double, estimated mean value of the time series |
| * phi - array of doubles, estimated AR coefficients |
| * theta - array of doubles, estimated MA coefficients |
| |
| This function also creates three output tables: |
| * <output_table> - contains the above estimated values |
| * <output_table>_summary - contains diagnostic info |
| * <output_table>_perf - contains performance info |
| * <output_table>_residual - contains the residuals |
| """ |
| old_msg_level = plpy.execute(""" |
| select setting from pg_settings |
| where name='client_min_messages' |
| """)[0]['setting'] |
| plpy.execute("set client_min_messages to error") |
| |
| t0 = time.time() |
| input_name = input_table |
| |
| if non_seasonal_orders: |
| non_seasonal_orders = madvec(non_seasonal_orders) |
| try: |
| non_seasonal_orders = map(int, non_seasonal_orders) |
| except: |
| plpy.error("ARIMA error: non_seasonal_orders are not integers") |
| |
| # Validate the input parameters |
| _validate_params(schema_madlib, input_table, output_table, |
| timestamp_col, timeseries_col, non_seasonal_orders, |
| include_mean, grouping_cols) |
| |
| # Fix the optimizer control parameters. |
| # Default values are set in _extract_params(...) |
| (max_iter, e1, e2, e3, tau, hessian_delta, |
| chunk_size, param_init) = _extract_params(schema_madlib, optimizer_params) |
| |
| # deal with I-order |
| d = non_seasonal_orders[1] |
| if d > 0: |
| # mean value is always 0 for ARIMA models with differencing |
| include_mean = False |
| # use the differenced time series |
| (input_table, dist_tbl) = _diff(schema_madlib, input_table, timestamp_col, |
| timeseries_col, non_seasonal_orders[1], |
| chunk_size) |
| |
| # set initial values |
| p = non_seasonal_orders[0] |
| q = non_seasonal_orders[2] |
| |
| # Re-distribute the data, so that each segment has consecutive rows. |
| # Also add an auxiliary array column which contains p previous time |
| # series values. |
| # tbl_ts is the time series table we are going to use. For convenience, |
| # we fix the column names: tid, distid and tvals for time, distribution |
| # id and an array of current value and p previous values. |
| # if the data has been differenced, we only need to do adjustment |
| if d > 0: |
| # note that if p == 0 the input_table will be returned |
| tbl_ts = _adjust_diff_data(schema_madlib, input_table, p) |
| else: |
| (tbl_ts, dist_tbl) = _redist_data(schema_madlib, input_table, timestamp_col, |
| timeseries_col, p, chunk_size) |
| |
| # inital values of phi and theta |
| if param_init == 'random': |
| # random values in [-0.05, 0.05] |
| phi = [(random.random() - 0.5) * 0.1 for i in range(p)] |
| theta = [(random.random() - 0.5) * 0.1 for i in range(q)] |
| else: |
| # fix to zero |
| phi = [0] * p |
| theta = [0] * q |
| |
| if include_mean: |
| mean = plpy.execute("select avg({timeseries_col}) as avg" |
| " from {input_table}".format( |
| timeseries_col=timeseries_col, |
| input_table=input_table))[0]['avg'] |
| else: |
| mean = None |
| |
| # J^T*J, J^T*Z and |Z|^2 can be compted in one parallel scan of tbl_ts |
| # using an aggregate function. |
| # What we need in Python are just g and |Z|^2. We also |
| # need the initial value of u (see design doc for their meanings). We |
| # can load these values into Python since they are small. |
| # We also need each group's initial values for the next iteration, |
| # and a small table that stores J^T*J and g=J^T*Z for computing |
| # delta with different u's |
| tbl_grp_init = None |
| tbl_jj_g = None |
| g, z2, u, tbl_grp_init, tbl_jj_g = \ |
| _compute_onestep(schema_madlib, tbl_ts, p, q, phi, theta, |
| mean, tbl_grp_init, tbl_jj_g, 0, 1) |
| u = tau * u |
| |
| # update phi, theta and mean etc in iterations |
| stop = _vec_modulus(g) <= e1 |
| |
| v = 2 # initial value for step |
| k = 0 |
| |
| # previous iteration number |
| prev_iter = 1 |
| iter_count = 1 |
| |
| while not stop and k <= max_iter: |
| k += 1 |
| rho = 0 |
| while not stop and rho <= 0: |
| delta = _lm_delta(schema_madlib, tbl_jj_g, u, prev_iter) |
| |
| if mean is None: |
| params = phi + theta |
| else: |
| params = phi + theta + [mean] |
| |
| |
| if _vec_modulus(delta) <= e2 * _vec_modulus(params): |
| stop = True |
| else: |
| phi_new = [phi[i] + delta[i] for i in range(p)] |
| theta_new = [theta[i] + delta[i+p] for i in range(q)] |
| if mean is not None: |
| mean_new = mean + delta[p+q] |
| else: |
| mean_new = None |
| |
| iter_count += 1 |
| (g_new, z2_new, u_useless, tbl_grp_init, |
| tbl_jj_g) = _compute_onestep(schema_madlib, tbl_ts, |
| p, q, phi_new, theta_new, |
| mean_new, tbl_grp_init, |
| tbl_jj_g, |
| prev_iter, iter_count) |
| |
| rho = ((z2 - z2_new) / |
| _vec_inner_prod(delta, |
| [u*a+b for a,b in zip(delta, g)])) |
| if rho > 0: |
| prev_iter = iter_count |
| phi = phi_new |
| theta = theta_new |
| mean = mean_new |
| g = g_new |
| z2 = z2_new |
| stop = _vec_modulus(g) <= e1 or z2 <= e3 |
| v = 2 |
| u = u * max(1./3, 1. - pow((2*rho - 1),3)) |
| else: |
| v = 2 * v |
| u = u * v |
| |
| # In the end, we need to compute and output the residuals to a table |
| _create_resid_tbl(schema_madlib, tbl_ts, p, non_seasonal_orders[1], |
| q, phi, theta, mean, add_postfix(output_table, "_residual"), |
| timestamp_col, tbl_grp_init, iter_count, dist_tbl) |
| |
| std_errs, resid_var, loglik = _compute_statistics(schema_madlib, |
| tbl_ts, p, q, phi, |
| theta, mean, hessian_delta) |
| # clean up intermediate tables |
| _drop_tbl(tbl_ts) |
| _drop_tbl(tbl_jj_g) |
| _drop_tbl(tbl_grp_init) |
| if non_seasonal_orders[1] > 0: |
| _drop_tbl(input_table) |
| |
| dt = time.time() - t0 |
| _output_result(p, q, phi, theta, mean, std_errs, |
| output_table) |
| _output_summary(input_name, timestamp_col, timeseries_col, |
| non_seasonal_orders, include_mean, resid_var, |
| loglik, dt, k, add_postfix(output_table, "_summary")) |
| |
| plpy.execute("set client_min_messages to " + old_msg_level) |
| return None |
| |
| # ---------------------------------------------------------------------- |
| |
| def _output_summary (input_name, timestamp_col, timeseries_col, |
| non_seasonal_orders, include_mean, resid_var, |
| loglik, dt, iter_num, output_table): |
| """ Output the summary table |
| |
| Args: |
| @param input_name - string, the original input table name |
| @param timestamp_col - string, column name of time stamps |
| @param timeseries_col - string, column name of time series |
| @param non_seasonal_orders - array of integers, [p,d,q] |
| @param include_mean - boolean, include mean or not |
| @param resid_var - double, the residual variance |
| @param loglik - double, log-likelihood |
| @param dt - double, how many ms time that was used in ARIMA |
| @param output_table - string, the output summary table name |
| """ |
| plpy.execute( |
| """ |
| create table {output_table} as |
| select |
| '{input_name}' as input_table, |
| '{timestamp_col}' as timestamp_col, |
| '{timeseries_col}' as timeseries_col, |
| '{non_seasonal_orders}'::integer[] as non_seasonal_orders, |
| {include_mean}::boolean as include_mean, |
| {resid_var}::double precision as residual_variance, |
| {loglik}::double precision as log_likelihood, |
| {iter_num}::integer as iter_num, |
| {dt:.2f}::double precision as \"exec_time (s)\"; |
| """.format(input_name=input_name, timestamp_col=timestamp_col, |
| timeseries_col=timeseries_col, resid_var=resid_var, |
| include_mean=include_mean, loglik=loglik, |
| dt=dt, output_table=output_table, iter_num=iter_num, |
| non_seasonal_orders=_array_to_string(non_seasonal_orders))) |
| return None |
| |
| # ---------------------------------------------------------------------- |
| |
| def _output_result (p, q, phi, theta, mean, std_errs, output_table): |
| """ Output the statistics summary into a table |
| |
| Args: |
| @param p, q - integer, AR and MA orders |
| @param phi, theta - array of doubles, AR and MA coef |
| @param mean - double, mean value. None if include_mean is False |
| @param std_errs - array of doubles, the standard errors for all coef |
| @param output_table - string, the result table to be created |
| """ |
| # create other output tables |
| param_str = [] |
| param_val = [] |
| if p > 0: |
| param_str += ["ar_params double precision[]", |
| "ar_std_errors double precision[]"] |
| param_val += ["'" + _array_to_string(phi) + "'::double precision[]", |
| "'" + _array_to_string(std_errs[0:p]) + "'::double precision[]"] |
| |
| if q > 0: |
| param_str += ["ma_params double precision[]", |
| "ma_std_errors double precision[]"] |
| param_val += ["'" + _array_to_string(theta) + "'::double precision[]", |
| "'" + _array_to_string(std_errs[p:(p+q)]) + "'::double precision[]"] |
| if mean is not None: |
| param_str += ["mean double precision", |
| "mean_std_error double precision"] |
| param_val += [str(mean), str(std_errs[p+q])] |
| |
| param_str = ", ".join(param_str) |
| param_val = ", ".join(param_val) |
| |
| plpy.execute( |
| """ |
| create table {output_table} ({param_str}); |
| """.format(output_table=output_table, param_str=param_str)) |
| |
| plpy.execute( |
| """ |
| insert into {output_table} values ({param_val}); |
| """.format(output_table=output_table, param_val=param_val)) |
| |
| return None |
| |
| # ----------------------------------------------------------------------- |
| |
| def _validate_params (schema_madlib, input_table, output_table, timestamp_col, |
| timeseries_col, non_seasonal_orders, include_mean, |
| grouping_cols=None): |
| """ Validate the input parameters for arima_train |
| |
| Args: |
| @param schema_madlib: str, Name of the schema where MADlib is installed |
| @param input_table: str, Name of the source table |
| @param output_table: str, Name of the output table |
| @param timestamp_col: str, Name of the timestamp (index) column |
| @param timeseries_col: str, Name of the timeserirs (data) column |
| @param non_seasonal_orders: list[int], Array of 3 integers indicating order of ARIMA |
| @param include_mean: bool, indicates if mean value should be included in model |
| @param grouping_cols: list[str], Name of the grouping columns |
| @param optimizer_params: list[str], Optimizer-specific params |
| |
| Returns: |
| None |
| |
| Throws: |
| plpy.error if any argument is invalid |
| """ |
| _assert(input_table is not None and table_exists(input_table), |
| "ARIMA error: Source data table does not exist!") |
| |
| # output tables are created from the 'output_table' parameter name |
| if output_table: |
| output_table_names = [add_postfix(output_table, i) for i in ('', '_summary', '_residual')] |
| for each_table in output_table_names: |
| _assert(not table_exists(each_table, only_first_schema=True), |
| "ARIMA error: Output table {0}" |
| " already exists!".format(str(each_table))) |
| else: |
| plpy.error("ARIMA error: Invalid output table prefix!") |
| # timestamp and timeseries column names |
| _assert(columns_exist_in_table(input_table, [timestamp_col, timeseries_col], |
| schema_madlib), |
| "ARIMA error: {1} columns do not exist in {0}!" |
| .format(input_table, |
| str([timestamp_col, timeseries_col]))) |
| |
| _assert(non_seasonal_orders is not None, "ARIMA error: non_seasonal_orders should not be NULL") |
| _assert(len(non_seasonal_orders)==3, "ARIMA error: 3 integers not supplied " |
| "for non_seasonal_orders") |
| _assert(all(isinstance(i, int) for i in non_seasonal_orders), |
| "ARIMA error: Non-seasonal orders must be non-negative integers! ") |
| _assert(all(i >= 0 for i in non_seasonal_orders), |
| "ARIMA error: Non-seasonal orders must be non-negative!") |
| _assert(any(i != 0 for i in non_seasonal_orders), |
| "ARIMA error: Non-seasonal orders must not all be zeros!") |
| _assert(all(i < 20 for i in non_seasonal_orders), |
| "ARIMA error: Non-seasonal orders should be less than 20! ") |
| _assert(non_seasonal_orders[0] != 0 or non_seasonal_orders[2] != 0, |
| "ARIMA error: AR order and MA order cannot both be zeros!") |
| _assert(isinstance(include_mean, bool), |
| "ARIMA error: include_mean is not a Boolean! ") |
| if grouping_cols: |
| _assert(columns_exist_in_table(input_table, grouping_cols, schema_madlib), |
| "ARIMA error: Some of the grouping " |
| " columns do not exist in {0}!". |
| format(input_table)) |
| # ----------------------------------------------------------------------- |
| |
| def _extract_params (schema_madlib, optimizer_params): |
| """ Extract optimizer control parameters or set the default values |
| |
| @brief optimizer_params is a string with the format of |
| 'max_iter=..., e1=..., e2=..., e3=..., tau=...'. The order |
| does not matter. If a parameter is missing, then the default |
| value for it is used. If optimizer_params is None or '', |
| then all default values are used. If the parameter specified |
| is none of 'max_iter', 'e1', 'e2', 'e3', 'tau', 'hessian_delta', |
| 'chunk_size', and 'param_init' then an error is raised. This function |
| also validates the values of these parameters. |
| |
| Raises: |
| * ARIMA error - If the parameter is unsupported or the value is |
| not valid. |
| """ |
| allowed_params = set(["max_iter", "tau", "e1", "e2", "e3", |
| "hessian_delta", "chunk_size", "param_init"]) |
| name_value = dict(max_iter=100, tau=1e-3, e1=1e-15, e2=1e-15, e3=1e-15, |
| hessian_delta=1e-4, chunk_size=10000, param_init='zero') |
| |
| if optimizer_params is None or len(optimizer_params) == 0: |
| return (name_value['max_iter'], name_value['e1'], name_value['e2'], |
| name_value['e3'], name_value['tau'], |
| name_value['hessian_delta'], name_value['chunk_size'], |
| name_value['param_init']) |
| |
| for s in preprocess_keyvalue_params(optimizer_params): |
| items = s.split("=") |
| if (len(items) != 2): |
| plpy.error("ARIMA error: Optimizer parameter list has incorrect format!") |
| param_name = items[0].strip(" \"").lower() |
| param_value = items[1].strip(" \"").lower() |
| |
| if param_name not in allowed_params: |
| plpy.error( |
| """ |
| ARIMA error: {param_name} is not a valida parameter name. |
| Run: |
| SELECT {schema_madlib}.arima_train('usage'); |
| to see the allowed parameters. |
| """.format(param_name=param_name, |
| schema_madlib=schema_madlib)) |
| |
| if param_name == "max_iter": |
| try: |
| name_value["max_iter"] = int(param_value) |
| except: |
| plpy.error("ARIMA error: max_iter must be an integer number!") |
| |
| if param_name == "tau": |
| try: |
| name_value["tau"] = float(param_value) |
| except: |
| plpy.error("ARIMA error: tau must be a double value!") |
| |
| if param_name == "e1": |
| try: |
| name_value["e1"] = float(param_value) |
| except: |
| plpy.error("ARIMA error: e1 must be a double value!") |
| |
| if param_name == "e2": |
| try: |
| name_value["e2"] = float(param_value) |
| except: |
| plpy.error("ARIMA error: e2 must be a double value!") |
| |
| if param_name == "e3": |
| try: |
| name_value["e3"] = float(param_value) |
| except: |
| plpy.error("ARIMA error: e3 must be a double value!") |
| |
| if param_name == "hessian_delta": |
| try: |
| name_value["hessian_delta"] = float(param_value) |
| except: |
| plpy.error("ARIMA error: hessian_delta must be a double value!") |
| |
| if param_name == "chunk_size": |
| try: |
| name_value["chunk_size"] = int(param_value) |
| except: |
| plpy.error("ARIMA error: chunk_size must be an integer value!") |
| |
| if param_name == "param_init": |
| name_value["param_init"] = param_value |
| |
| if name_value["max_iter"] <= 0: |
| plpy.error("ARIMA error: max_iter must be positive!") |
| |
| if name_value["tau"] <= 0: |
| plpy.error("ARIMA error: tau must be positive!") |
| |
| if name_value["e1"] <= 0: |
| plpy.error("ARIMA error: e1 must be positive!") |
| |
| if name_value["e2"] <= 0: |
| plpy.error("ARIMA error: e2 must be positive!") |
| |
| if name_value["e3"] <= 0: |
| plpy.error("ARIMA error: e3 must be positive!") |
| |
| if name_value["hessian_delta"] <= 0: |
| plpy.error("ARIMA error: hessian_delta must be positive!") |
| |
| if name_value["chunk_size"] < 10000: |
| plpy.error("ARIMA error: chunk_size must be at least 10000!") |
| |
| if name_value["param_init"] not in ('zero', 'random'): |
| plpy.error("ARIMA error: param_init must be 'zero' or 'random'!") |
| |
| return (name_value['max_iter'], name_value['e1'], name_value['e2'], |
| name_value['e3'], name_value['tau'], name_value['hessian_delta'], |
| name_value['chunk_size'], name_value['param_init']) |
| |
| # ----------------------------------------------------------------------- |
| |
| def _redist_data(schema_madlib, input_table, timestamp_col, |
| timeseries_col, d, chunk_size=10000): |
| """ Re-distribute data for parallel differencing |
| |
| @brief Redistribute the data by distributing the consecutive records |
| to same segment, and also insert some redundant records for |
| parallel differencing. This is only for data differencing, |
| and a separate re-distribution function is needed for the |
| actual computation. |
| |
| Args: |
| @param schema_madlib - str, MADlib schema name |
| @param input_table - str, A table that contains a time series |
| @param timestamp_col - str, the order of time series |
| @param timeseries_col - str, the values of time series |
| @param d - int, order of differencing |
| @param length - int, length of consecutive records that goes to the |
| same chunk |
| |
| Returns: |
| * output_tables - list of strs, name of the table that contains the |
| redistributed data |
| """ |
| dist_table = unique_string() + "redist__" |
| |
| # Redistribute the data |
| # <tid, tval, distid> |
| plpy.execute( |
| """ |
| create temp table {dist_table} as |
| select |
| ((tid - 1) / {chunk_size})::integer + 1 as distid, |
| tid, origin_tid, tval |
| from ( |
| select |
| row_number() over (order by {timestamp_col}) as tid, |
| {timestamp_col} as origin_tid, |
| {timeseries_col} as tval |
| from |
| {input_table} |
| ) s |
| m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)') |
| """.format(dist_table=dist_table, chunk_size=chunk_size, |
| timestamp_col=timestamp_col, |
| timeseries_col=timeseries_col, input_table=input_table)) |
| |
| # Insert redundant rows |
| # For each chunk except for the first one, |
| # insert the last d rows from the previous chunk |
| # These redundant rows have origin_tid value NULL |
| if (d > 0): |
| plpy.execute( |
| """ |
| insert into {dist_table} |
| select |
| o.distid + 1, tid, NULL, tval |
| from ( |
| select |
| distid, max(tid) maxid |
| from |
| {dist_table} |
| where |
| distid <> (select max(distid) from {dist_table}) |
| group by distid |
| ) q1, {dist_table} o |
| where |
| q1.distid = o.distid and |
| q1.maxid - o.tid < {d} |
| """.format(dist_table=dist_table, d=d)) |
| |
| output_table = unique_string() + "redist_agg__" |
| # Aggregate the chunk into an array |
| # Note that each aggregated chunk except for the first one |
| # has additonal p or d elements and tid is 1 for the first |
| # chunk and is the actual starting tid for the other chunks |
| # - the corresponding tval is the pth (0 based) element in |
| # tvals |
| # Note that both distid and tid starts from 1 |
| plpy.execute( |
| """ |
| create temp table {output_table} as |
| select |
| distid, |
| (distid - 1) * {chunk_size} + 1 as tid, |
| array_agg(tval order by tid) as tvals |
| from |
| {dist_table} |
| group by |
| distid |
| m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)') |
| """.format(schema_madlib=schema_madlib, output_table=output_table, dist_table=dist_table, |
| chunk_size=chunk_size)) |
| |
| return (output_table, dist_table) |
| |
| # ----------------------------------------------------------------------- |
| |
| def _diff (schema_madlib, input_table, timestamp_col, timeseries_col, d, |
| chunk_size=10000): |
| """ Compute the time series differencing |
| |
| @brief For a time series ts, compute (1-B)^d ts, where d is the diff |
| order. |
| |
| Args: |
| @param schema_madlib - str, MADlib schema name |
| @param input_table - str, A table that contains a time series |
| @param timestamp_col - str, the order of time series |
| @param timeseries_col - str, the values of time series |
| @param d - int, order of differencing i.e. I in ARIMA |
| |
| Returns: |
| * output_table - str, name of the table that contains the |
| differenced data |
| """ |
| # Redistribute the data for parallel preprocessing |
| (redist_table, dist_tbl) = _redist_data( |
| schema_madlib, input_table, timestamp_col, timeseries_col, d, |
| chunk_size) |
| output_table = unique_string() + 'diff_tbl__' |
| |
| # Generate the coefficients for d-th order differencing |
| plpy.execute( |
| """ |
| create temp table {output_table} as |
| select |
| distid::integer, |
| case when tid = 1 then 1::integer |
| else (tid - {d})::integer |
| end as tid, |
| tvals |
| from |
| ( |
| select |
| distid, tid, |
| {schema_madlib}.__arima_diff(tvals, {d}) tvals |
| from |
| {input_table} |
| ) q |
| m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)') |
| """.format(schema_madlib=schema_madlib, d=d, |
| input_table=redist_table, |
| output_table=output_table)) |
| |
| # clean up the intermediate table |
| _drop_tbl(redist_table) |
| |
| return (output_table, dist_tbl) |
| |
| # ----------------------------------------------------------------------- |
| |
| def _adjust_diff_data(schema_madlib, diff_table, p): |
| """ |
| Add the last p values of the previous chunk |
| to each chunk except for the first chunk |
| """ |
| if p < 1: |
| return diff_table |
| |
| output_table = unique_string() + 'adjust_diff_tbl__' |
| plpy.execute( |
| """ |
| create temp table {output_table} as |
| select |
| t1.distid::integer, t1.tid::integer, |
| {schema_madlib}.__arima_adjust(t1.distid, t1.tvals, t2.tvals, {p}) as tvals |
| from |
| {diff_table} as t1, {diff_table} as t2 |
| where |
| t1.distid = t2.distid + 1 or |
| (t1.distid = 1 and t2.distid = 1) |
| m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid)') |
| """.format(output_table=output_table, diff_table=diff_table, |
| p=p, schema_madlib=schema_madlib)) |
| |
| _drop_tbl(diff_table) |
| |
| return output_table |
| |
| # ----------------------------------------------------------------------- |
| |
| def _create_resid_tbl (schema_madlib, tbl_ts, p, d, q, phi, |
| theta, mean, tbl_resid, timestamp_col, |
| tbl_grp_init, iter_count, dist_tbl): |
| """ Compute the residuals (noise) for ARIMA model |
| |
| @brief Compute the residuals and store them in a table. The table |
| name is given by the user. |
| |
| Args: |
| @param schema_madlib - str, MADlib schema |
| @param table_ts - str, time series table name |
| @param timestamp_col - str, name of time id column |
| @param p - int, AR order |
| @param q - int, MA order |
| @param phi - double array, AR coefficients in current iteration |
| @param theta - double array, MA coefficients in current iteration |
| @param mean - double, mean value in current iteration. None if |
| include_mean is False. |
| @param tbl_resid - str, name of the residual table for output |
| @param timestamp_col - str, name of the index column for residual table |
| @param tbl_grp_init - string, name of the table that stores |
| the inital Z values for each group. The |
| Z values are from the previous iteration. |
| This table will be dropped at the end of |
| this function. |
| @param iter_count - int, final iteration count |
| |
| Returns: |
| Returns nothing, but creates a table to store the residual values. |
| The table name is in tbl_resid. |
| """ |
| mean = 0.0 if mean is None else mean |
| |
| phi_str = "NULL" |
| if p != 0: |
| phi_str = "array[" + ",".join(str(i) for i in phi) + "]" |
| theta_str = "NULL" |
| if q != 0: |
| theta_str = "array[" + ",".join(str(i) for i in theta) + "]" |
| |
| # Joining on tid is really slow on large data |
| # (more than 100 times slower). |
| # This is why we use join on distid, which gives almost the |
| # same performance as that of the original code. |
| plpy.execute( |
| """ |
| create table {tbl_resid} as |
| select |
| unnest(origin_tid) as {timestamp_col}, |
| unnest(residuals) as residual |
| from ( |
| select |
| s1.distid, |
| {schema_madlib}.__arima_residual( |
| s1.distid, s1.tvals, {p}, {d}, {q}, |
| {phi}::double precision[], |
| {theta}::double precision[], {mean}, |
| s2.prez |
| ) as residuals |
| from |
| {tbl_ts} s1, {tbl_grp_init} s2 |
| where |
| ((s1.distid = s2.distid + 1) or |
| (s1.distid = 1 and s2.distid = 1)) and |
| s2.iter = {iter_count} |
| ) s1, ( |
| select |
| distid, |
| array_agg(origin_tid order by origin_tid) as origin_tid |
| from {dist_tbl} |
| where origin_tid is not NULL |
| group by distid |
| ) s2 |
| where s1.distid = s2.distid |
| """.format(tbl_resid=tbl_resid, timestamp_col=timestamp_col, |
| schema_madlib=schema_madlib, |
| phi=phi_str, theta=theta_str, |
| mean=mean, p=p, d=d, q=q, |
| tbl_ts=tbl_ts, tbl_grp_init=tbl_grp_init, |
| iter_count=iter_count, dist_tbl=dist_tbl)) |
| |
| _drop_tbl(dist_tbl) |
| return None |
| |
| # ---------------------------------------------------------------------- |
| |
| def _compute_onestep (schema_madlib, tbl_ts, p, q, phi, theta, mean, |
| tbl_grp_init=None, tbl_jj_g=None, prev_iter=0, |
| iter_count=0): |
| """ Call an aggregate function to compute quantities in an |
| iteration |
| |
| @brief J^T*J, J^T*Z and |Z|^2 can be compted in one parallel scan |
| of tbl_ts. |
| |
| Args: |
| @param schema_madlib - string, MADlib schema name |
| @param tbl_ts - string, name of the re-distributed data table |
| @param p - integer, AR order |
| @param q - integer, MA order |
| @param phi - array of doubles, current estimate for AR |
| coefficients |
| @param theta - array of doubles, current estimate for MA |
| coefficients |
| @param mean - double, current estimate for mean value of |
| time series. If it is None, then it is not |
| estimated. |
| @param tbl_grp_init - string, name of the table that stores |
| the inital Z values for each group. The |
| Z values are from the previous iteration. |
| This table will be dropped at the end of |
| this function. |
| |
| Returns: |
| * delta - array of doubles, change of coefficients |
| * g - array of doubles, vector J^T * Z |
| * Z_sq - double, |Z|^2 |
| * u - double, updated value for u, used in changing step size |
| * tbl_grp_init - string, a new table is created using newly |
| computed noise values. |
| It has columns: iter, jj, jz, z2, distid, preZ, |
| preJ. Each row |
| of prez is an array of q previous noise values. |
| preJ contains the initial values for the group in |
| the previous iteration. iter is the current iteration |
| number. |
| Each iteration inserts new values into this table |
| * tbl_jj_g - string, a table storing J^T*J and J^T*Z. Each iteration |
| inserts new values into this table |
| |
| @note u and tbl_grp_init are both None when this function is called |
| for the first time. Otherwise, neither of them is None. The |
| first time this function got called, tbl_grp_init are |
| initialized with zeros. |
| tbl_grp_init is needed only when q != 0 |
| """ |
| # If mean value is not going to be estimated |
| if mean is None: |
| mean = "NULL" |
| phi_str = "NULL" |
| if p != 0: |
| phi_str = "array[" + ",".join(str(i) for i in phi) + "]" |
| theta_str = "NULL" |
| if q != 0: |
| theta_str = "array[" + ",".join(str(i) for i in theta) + "]" |
| |
| # If this is the first time to call this function |
| if tbl_grp_init is None: |
| # initialize the previous noise values with 0 |
| tbl_grp_init = unique_string() + "init__" |
| # How many groups |
| ngrps = plpy.execute(""" |
| select max(distid) as ngrps from {0} |
| """.format(tbl_ts))[0]['ngrps'] |
| l = p + q if mean == "NULL" else p + q + 1 |
| |
| # initial previous iteration values, use 0 |
| arrz_str = "NULL" |
| if q != 0: |
| arrz_str = "array[" + ",".join(['0'] * q) + "]" |
| arrj_str = "NULL" |
| if q != 0: |
| arrj_str = "array[" + ",".join(['0'] * (q * l)) + "]" |
| arrjj_str = "array[" + ",".join(['0'] * (l * l)) + "]" |
| arrjz_str = "array[" + ",".join(['0'] * l) + "]" |
| |
| # Besides distid and prez, this table has extra columns that |
| # stores J^T*J, J^T*Z and Z^2 for each group |
| # These column names are jj, jz, and z2 |
| # Each group uses previous group's final prez and prej from |
| # previous iteration. |
| # ** First group's prez and prej are always 0. ** |
| plpy.execute( |
| """ |
| create temp table {tbl_grp_init} as |
| select |
| i as distid, |
| {arrz}::double precision[] as prez, |
| {arrj}::double precision[] as prej, |
| {arrjj}::double precision[] as jj, |
| {arrjz}::double precision[] as jz, |
| 0.0::double precision as z2, 0 as iter |
| from generate_series(1,{ngrps}) i |
| m4_ifdef(`__POSTGRESQL__', `', `distributed by (distid, iter)'); |
| alter table {tbl_grp_init} add primary key (distid, iter); |
| """.format(tbl_grp_init=tbl_grp_init, arrz=arrz_str, |
| arrj=arrj_str, ngrps=ngrps, arrjj=arrjj_str, |
| arrjz=arrjz_str)) |
| |
| if tbl_jj_g is None: |
| tbl_jj_g = unique_string() + "jj_g__" |
| plpy.execute( |
| """ |
| create temp table {tbl_jj_g} ( |
| jj double precision[], |
| jz double precision[], |
| z2 double precision, |
| u double precision, |
| iter integer |
| ) |
| m4_ifdef(`__POSTGRESQL__', `', `distributed by (iter)'); |
| alter table {tbl_jj_g} add primary key (iter); |
| """.format(tbl_jj_g=tbl_jj_g)) |
| |
| plpy.execute( |
| """ |
| insert into {tbl_grp_init} |
| select |
| distid, (f).prez, (f).prej, (f).jj, (f).jz, (f).z2, |
| {iter_count} |
| from ( |
| select |
| {schema_madlib}.__arima_lm( |
| s1.distid, tvals, {p}, {q}, |
| {phi}::double precision[], |
| {theta}::double precision[], |
| {mean}, prez, prej |
| ) as f, |
| s1.distid |
| --from {tbl_ts} s1, {tbl_grp_init} s2 |
| --where |
| -- (s1.distid = s2.distid + 1 or |
| -- (s1.distid = 1 and s2.distid = 1)) and |
| -- s2.iter = {prev_iter} |
| from {tbl_ts} s1 left join {tbl_grp_init} s2 |
| on (s1.distid = s2.distid + 1 and s2.iter = {prev_iter}) |
| ) s |
| """.format(schema_madlib=schema_madlib, tbl_ts=tbl_ts, |
| tbl_grp_init=tbl_grp_init, p=p, q=q, |
| phi=phi_str, mean=mean, |
| theta=theta_str, |
| iter_count=iter_count, |
| prev_iter=prev_iter)) |
| |
| # Need to sum J^T*J, J^T*Z and Z^2 over all groups to get |
| # the result. |
| plpy.execute( |
| """ |
| insert into {tbl_jj_g} |
| select (f).jj, (f).jz, (f).z2, (f).u, {iter_count} |
| from ( |
| select |
| {schema_madlib}.__arima_lm_result_agg( |
| jj, jz, z2 |
| ) as f |
| from {tbl_grp_init} |
| where iter = {iter_count} |
| ) s |
| """.format(schema_madlib=schema_madlib, tbl_jj_g=tbl_jj_g, |
| tbl_grp_init=tbl_grp_init, iter_count=iter_count)) |
| |
| res = plpy.execute("select jz, z2, u from {tbl_jj_g} where" |
| " iter = {iter_count}".format(tbl_jj_g=tbl_jj_g, |
| iter_count=iter_count))[0] |
| |
| return (madvec(res['jz'], text = False), |
| res['z2'], res['u'], tbl_grp_init, tbl_jj_g) |
| |
| # ---------------------------------------------------------------------- |
| |
| def _vec_modulus (vec): |
| """ Compute the modulus of a vector |
| @param vec - array of doubles, a vector |
| """ |
| return math.sqrt(sum(v*v for v in vec)) |
| |
| # ---------------------------------------------------------------------- |
| |
| def _vec_inner_prod (vec1, vec2): |
| """ Compute the inner product of two vectors |
| @brief Since this function is only used internally, we do not check |
| whether the two vectors have the same length |
| @param vec1, vec2 - arrays of doubles |
| """ |
| return sum(a*b for a, b in zip(vec1, vec2)) |
| |
| # ---------------------------------------------------------------------- |
| |
| def _lm_delta (schema_madlib, tbl_jj_g, u, iter_count): |
| """ Compute the vector delta |
| |
| Args: |
| @param schema_madlib - string, MADlib schema |
| @param tbl_jj_g - string, a table containing J^T*J and J^T*Z. It |
| has only one row, two columns |
| @param u - double, the current step size |
| |
| Returns: |
| * delta - array of doubles, the change of coefficients |
| """ |
| return madvec(plpy.execute( |
| """ |
| select |
| {schema_madlib}.__arima_lm_delta( |
| jj, jz, {u} |
| ) as delta |
| from {tbl_jj_g} |
| where iter = {iter_count} |
| """.format(schema_madlib=schema_madlib, |
| u=u, iter_count=iter_count, |
| tbl_jj_g=tbl_jj_g))[0]['delta'], |
| text = False) |
| |
| # ---------------------------------------------------------------------- |
| |
| def _drop_tbl (tbl): |
| """ Drop a table if exists |
| @param tbl - string, the table name |
| """ |
| plpy.execute("drop table if exists {0}".format(tbl)) |
| |
| # ---------------------------------------------------------------------- |
| |
| def _compute_statistics (schema_madlib, tbl_ts, p, q, phi, theta, |
| mean, delta): |
| """ Compute residual variance, log-likelihood, standard errors |
| |
| @brief Compute z^2 for different (phi, theta, mean) and numerically |
| differentiate to compute Hessian. Residual variance and |
| log-likelihood are computed using Eqs. (1.2.13, 1.2.14). |
| |
| Args: |
| @param schema_madlib - MADlib schema name |
| @param tbl_ts - time series table name, it has two columns: tid |
| and tvals, which is the p previous values plus the |
| current value ordered by time. |
| @param p - integer, AR order |
| @param q - integer, MA order |
| @param phi - array of doubles, AR coefficients |
| @param theta - array of doubles, MA coefficients |
| @param mean - double, mean value if include_mean is True, None if |
| False |
| @param delta - double, the small step size used to compute Hessian |
| |
| Returns: |
| * std_errs - array of doubles, the standard errors of the coefficients |
| * resid_var - double, the residual variance |
| * loglik - double, log-likelihood |
| """ |
| if mean is None: |
| mean = "NULL" |
| |
| res = plpy.execute( |
| """ |
| select (f).* |
| from ( |
| select |
| {schema_madlib}.__arima_lm_stat_agg( |
| distid, tvals, {p}, {q}, |
| '{phi}'::double precision[], |
| '{theta}'::double precision[], |
| {mean}, |
| {delta} |
| order by tid |
| ) as f |
| from {tbl_ts} |
| ) s |
| """.format(schema_madlib=schema_madlib, p=p, q=q, |
| phi=_array_to_string(phi), mean=mean, |
| theta=_array_to_string(theta), delta=delta, |
| tbl_ts=tbl_ts))[0] |
| |
| return (madvec(res['std_errs'], text=False), |
| res['resid_var'], res['loglik']) |
| |
| # ---------------------------------------------------------------------- |