/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.library.anomaly.util;

import org.apache.iotdb.library.util.Util;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowIterator;

import org.apache.commons.math3.stat.regression.SimpleRegression;
import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList;
import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;
import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList;

import java.util.ArrayList;

/** Anomaly detection for anomalies of data missing. */
public class MissDetector {

  private final LongArrayList time = new LongArrayList();
  private final DoubleArrayList value = new DoubleArrayList();
  private final IntArrayList predictLabel = new IntArrayList();
  private final int len;

  private int minLength;
  private double threshold = 0.9999;
  private double lineBoundary = 0.6;
  private long startTime;
  private int windowCnt = 0;
  private double r2 = 0;
  private final ArrayList<MissingSubSeries> anomalyList = new ArrayList<>();

  public MissDetector(RowIterator iterator, int minLength) throws Exception {
    while (iterator.hasNextRow()) {
      Row row = iterator.next();
      double v = Util.getValueAsDouble(row);
      if (Double.isFinite(v)) {
        this.time.add(row.getTime());
        this.value.add(v);
      }
    }
    this.len = this.time.size();
    this.minLength = minLength;
  }

  public void detect() {
    getPredictLabel().addAll(new int[getLen()]);
    this.startTime = getTime().get(0);
    int i = 0;
    int windowSize = minLength / 2;
    double[][] data = new double[windowSize][2];
    SimpleRegression regression = new SimpleRegression();
    while (i + windowSize < getLen()) {
      for (int j = 0; j < windowSize; j++) {
        data[j][0] = (double) getTime().get(i + j) - startTime;
        data[j][1] = getValue().get(i + j);
      }
      regression.addData(data);
      double alpha = regression.getRSquare();
      if (Double.isNaN(alpha) || alpha >= threshold) {
        i = extend(regression, i, i + windowSize);
      } else {
        i += windowSize;
      }
      regression.clear();
      r2 += Double.isNaN(alpha) ? 1 : alpha;
      windowCnt++;
    }
    label();
  }

  private int extend(SimpleRegression regression, int start, int end) {
    boolean horizon = Double.isNaN(regression.getRSquare());
    double standard = regression.getIntercept();
    int bindex = start;
    while (bindex > 0) {
      bindex--;
      regression.addData((double) getTime().get(bindex) - startTime, getValue().get(bindex));
      double alpha = regression.getRSquare();
      if ((horizon && getValue().get(bindex) != standard) || (!horizon && alpha < threshold)) {
        break;
      }
    }
    regression.removeData((double) getTime().get(bindex) - startTime, getValue().get(bindex));
    if (bindex == 0) {
      return end;
    }
    int findex = end;
    while (findex < getLen()) {
      regression.addData((double) getTime().get(findex) - startTime, getValue().get(findex));
      double alpha = regression.getRSquare();
      if ((horizon && getValue().get(findex) != standard) || (!horizon && alpha < threshold)) {
        break;
      }
      findex++;
    }
    if (findex == getLen()) {
      return end;
    }
    MissingSubSeries m = new MissingSubSeries(bindex + 1, findex);
    anomalyList.add(m);
    return findex;
  }

  private void label() {
    if (r2 / windowCnt < lineBoundary) {
      for (MissingSubSeries m : anomalyList) {
        if (m.getLength() >= minLength) {
          for (int i = m.getStart(); i < m.getEnd(); i++) {
            getPredictLabel().set(i, 1);
          }
        }
      }
    }
  }

  /**
   * @return the minLength
   */
  public int getMinLength() {
    return minLength;
  }

  /**
   * @param minLength the minLength to set
   */
  public void setMinLength(int minLength) {
    this.minLength = minLength;
  }

  /**
   * @return the threshold
   */
  public double getThreshold() {
    return threshold;
  }

  /**
   * @param threshold the threshold to set
   */
  public void setThreshold(double threshold) {
    this.threshold = threshold;
  }

  private class MissingSubSeries implements Comparable<MissingSubSeries> {

    private int start;
    private int end;
    private double slope;

    public MissingSubSeries() {}

    public MissingSubSeries(int start, int end) {
      this.start = start;
      this.end = end;
      this.slope =
          (getValue().get(end - 1) - getValue().get(start))
              / (getTime().get(end - 1) - getTime().get(start));
    }

    @Override
    public String toString() {
      return String.format(
          "Start: %d, End: %d, Length: %d, Slope: %.12f", start, end, end - start, slope);
    }

    public int getLength() {
      return end - start;
    }

    /**
     * @return the start
     */
    public int getStart() {
      return start;
    }

    /**
     * @param start the start to set
     */
    public void setStart(int start) {
      this.start = start;
    }

    /**
     * @return the end
     */
    public int getEnd() {
      return end;
    }

    /**
     * @param end the end to set
     */
    public void setEnd(int end) {
      this.end = end;
    }

    /**
     * @return the slope
     */
    public double getSlope() {
      return slope;
    }

    /**
     * @param slope the slope to set
     */
    public void setSlope(double slope) {
      this.slope = slope;
    }

    @Override
    public int compareTo(MissingSubSeries o) {
      if (this.getLength() > o.getLength()) {
        return -1;
      } else if (this.getLength() == o.getLength()) {
        return 0;
      } else {
        return 1;
      }
    }
  }

  /**
   * @return the predictLabel
   */
  public IntArrayList getPredictLabel() {
    return predictLabel;
  }

  /**
   * @return the len
   */
  public int getLen() {
    return len;
  }

  /**
   * @return the time
   */
  public LongArrayList getTime() {
    return time;
  }

  /**
   * @return the value
   */
  public DoubleArrayList getValue() {
    return value;
  }
}
