[SYSTEMDS-2790] New outlierByArima (AR) built-in function

DIA project WS2020/21.
Closes #1149.
diff --git a/scripts/builtin/outlierByArima.dml b/scripts/builtin/outlierByArima.dml
new file mode 100644
index 0000000..b00c07e
--- /dev/null
+++ b/scripts/builtin/outlierByArima.dml
@@ -0,0 +1,105 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+# Built-in function for detecting and repairing outliers in time series,
+# by training an ARIMA model and classifying values that are more than 
+# k standard-deviations away from the predicated values as outliers.
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME            TYPE      DEFAULT  MEANING
+# ---------------------------------------------------------------------------------------------
+# X               Double    ---      Matrix X
+# k               Double    3        threshold values 1, 2, 3 for 68%, 95%, 99.7% respectively (3-sigma rule)
+# repairMethod    Integer   1        values: 0 = delete rows having outliers, 1 = replace outliers as zeros
+#                                            2 = replace outliers as missing values 
+# p               Int       0        non-seasonal AR order
+# d               Int       0        non-seasonal differencing order
+# q               Int       0        non-seasonal MA order
+# P               Int       0        seasonal AR order
+# D               Int       0        seasonal differencing order
+# Q               Int       0        seasonal MA order
+# s               Int       1        period in terms of number of time-steps
+# include_mean    Bool      FALSE
+# solver          String    "jacobi" solver, is either "cg" or "jacobi"
+# ---------------------------------------------------------------------------------------------
+ 
+
+#Output(s)
+# ---------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT     MEANING
+# ---------------------------------------------------------------------------------------------
+# X_corrected     Double   ---     Matrix X with no outliers
+
+m_outlierByArima = function(Matrix[Double] X, Double k = 3, Integer repairMethod = 1, Integer p=0, Integer d=0,
+  Integer q=0, Integer P=0, Integer D=0, Integer Q=0, Integer s=1, Boolean include_mean=FALSE, String solver="jacobi")
+  return(Matrix[Double] X_corrected)
+{
+  outlierFilter = as.matrix(0)
+
+  if( k < 1 | k > 7)
+    stop("outlierBySd: invalid argument - k should be in range 1-7 found "+k)
+
+  features = transform_matrix(X,p)
+  X_adapted = X[p+1:nrow(X),]
+
+  # TODO replace by ARIMA once fully supported, LM only emulated the AR part
+  model = lm(X=features, y=X_adapted)
+  y_hat = lmpredict(X=features, w=model)
+
+  upperBound = sd(X) + k * y_hat
+  lowerBound = sd(X) - k * y_hat
+  outlierFilter = (X_adapted < lowerBound) | (X_adapted > upperBound)
+  outlierFilter = rbind(matrix(0.0, rows=p,cols=1), outlierFilter)
+  X_corrected = fix_outliers(X, outlierFilter, repairMethod)
+}
+
+transform_matrix = function(Matrix[Double] X, Integer p) return (Matrix[Double] features){
+  nrows = nrow(X)
+  features = matrix(0, rows=nrows-p, cols=1)
+
+  for (i in 1:p){
+    features = cbind(features, X[p+1-i:nrows-i,])
+  }
+  features = features[,2:p+1]
+}
+
+fix_outliers = function(Matrix[Double] X, Matrix[Double] outlierFilter, Integer repairMethod)
+  return (Matrix[Double] X_filtered)
+{
+  rows = nrow(X)
+  cols = ncol(X)
+  if(repairMethod == 0) {
+    sel = (outlierFilter == 0)
+    X = removeEmpty(target = X, margin = "rows", select = sel)
+  }
+  else if(repairMethod == 1)
+    X = (outlierFilter == 0) * X
+  else if (repairMethod == 2) {
+    outlierFilter = replace(target = (outlierFilter == 0), pattern = 0, replacement = NaN)
+    X = outlierFilter * X
+  }
+  else{
+    stop("outlierBySd: invalid argument - repair required 0-1 found: "+repairMethod)
+  }
+  X_filtered = X
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 5bcc85e..1a230bf 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -166,6 +166,7 @@
 	OUTLIER("outlier", true, false), //TODO parameterize opposite
 	OUTLIER_SD("outlierBySd", true),
 	OUTLIER_IQR("outlierByIQR", true),
+	OUTLIER_ARIMA("outlierByArima",true),
 	PCA("pca", true),
 	PCAINVERSE("pcaInverse", true),
 	PCATRANSFORM("pcaTransform", true),
diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinOutlierByArima.java b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinOutlierByArima.java
new file mode 100644
index 0000000..8830ff2
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinOutlierByArima.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.builtin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import org.junit.Test;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.LopProperties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class BuiltinOutlierByArima extends AutomatedTestBase {
+	private final static String TEST_NAME = "outlierByArima";
+	private final static String TEST_DIR = "functions/builtin/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + BuiltinArimaTest.class.getSimpleName() + "/";
+
+	protected int max_func_invoc, p, d, q, P, D, Q, s, include_mean, useJacobi, repairMethod;
+
+	public BuiltinOutlierByArima(int m, int p, int d, int q, int P, int D, int Q, int s, int include_mean, int useJacobi, int repairMethod){
+		this.max_func_invoc = m;
+		this.p = p;
+		this.d = d;
+		this.q = q;
+		this.P = P;
+		this.D = D;
+		this.Q = Q;
+		this.s = s;
+		this.include_mean = include_mean;
+		this.useJacobi = useJacobi;
+		this.repairMethod = repairMethod;
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+			{1, 2, 0, 0, 0, 0, 0, 24, 1, 1, 1},
+			{1, 2, 0, 0, 0, 0, 0, 24, 1, 1, 2},
+			{1, 2, 0, 0, 0, 0, 0, 24, 1, 1, 3}});
+	}
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"B"}));
+	}
+
+	@Test
+	public void testOutlierByArima(){
+		Types.ExecMode platformOld = setExecMode(LopProperties.ExecType.CP);
+		try {
+			loadTestConfiguration(getTestConfiguration(TEST_NAME));
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			fullRScriptName = HOME + TEST_NAME + ".R";
+
+			programArgs = new String[]{
+				"-nvargs", "X=" + input("col.mtx"), "p=" + p, "repairMethod=" + 1,
+				"outputfilename=" + output("result"),};
+			rCmd = getRCmd(input("bad.mtx"), expected("result"));
+
+			int timeSeriesLength = 3000;
+			int num_outliers = 10;
+			double[][] timeSeries = getRandomMatrix(timeSeriesLength, 1, 1, 3, 1, System.currentTimeMillis());
+			double[][] comparisonSeries = deepCopy(timeSeries);
+			for(int i=0; i<num_outliers; i++) {
+				int r = ThreadLocalRandom.current().nextInt(0, timeSeries.length);
+				double badValue = ThreadLocalRandom.current().nextDouble(10, 50);
+				timeSeries[r][0] = badValue;
+				if (repairMethod == 1)
+					comparisonSeries[r][0] = 0.0;
+				else if (repairMethod == 2)
+					comparisonSeries[r][0] = Double.NaN;
+			}
+
+			MatrixCharacteristics mc = new MatrixCharacteristics(timeSeriesLength,1,-1,-1);
+			writeInputMatrixWithMTD("col", timeSeries, true, mc);
+			writeInputMatrixWithMTD("bad", comparisonSeries, true, mc);
+
+			runTest(true, false, null, -1);
+			runRScript(true);
+
+			HashMap<CellIndex, Double> time_series_SYSTEMDS = readDMLMatrixFromOutputDir("result");
+			HashMap<CellIndex, Double> time_series_real = readRMatrixFromExpectedDir("result");
+
+			double tol = Math.pow(10, -14);
+			if (repairMethod == 3)
+				TestUtils.compareScalars(time_series_real.size()-num_outliers, time_series_SYSTEMDS.size(), tol);
+			else
+				TestUtils.compareMatrices(time_series_real, time_series_SYSTEMDS, tol, "time_series_real", "time_series_SYSTEMDS");
+		}
+		finally {
+			rtplatform = platformOld;
+		}
+	}
+
+	private static double[][] deepCopy(double[][] input) {
+		double[][] result = new double[input.length][];
+		for (int r = 0; r < input.length; r++) {
+			result[r] = input[r].clone();
+		}
+		return result;
+	}
+}
diff --git a/src/test/scripts/functions/builtin/outlierByArima.R b/src/test/scripts/functions/builtin/outlierByArima.R
new file mode 100644
index 0000000..ecc0a83
--- /dev/null
+++ b/src/test/scripts/functions/builtin/outlierByArima.R
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+args <- commandArgs(TRUE)
+library(Matrix)
+
+X = as(readMM(args[1]),"CsparseMatrix")
+
+# (for whatever reason) NaNs are seen as NAs after reading. replace them by zero instead of NAN since resulting NAN
+# will be seen as NA after reading it again in test file
+X[is.na(X)] = 0.0
+
+writeMM(X, file=args[2])
diff --git a/src/test/scripts/functions/builtin/outlierByArima.dml b/src/test/scripts/functions/builtin/outlierByArima.dml
new file mode 100644
index 0000000..461f7db
--- /dev/null
+++ b/src/test/scripts/functions/builtin/outlierByArima.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($X)
+
+Y = outlierByArima(X=X, p=$p, repairMethod=$repairMethod)
+write(Y, $outputfilename, sparse=FALSE)