TimeClassificationPerformanceEvaluator

Fix #77
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
index a77831a..9717640 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicClassificationPerformanceEvaluator.java
@@ -96,7 +96,8 @@
   }
 
   @Override
-  public void addResult(Instance inst, double[] classVotes, String instanceIdentifier) {
+  public void addResult(Instance inst, double[] classVotes, String instanceIdentifier,
+          long delay) {
     double weight = inst.weight();
     int trueClass = (int) inst.classValue();
     if (weight > 0.0) {
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicRegressionPerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicRegressionPerformanceEvaluator.java
index ab16904..00bbca8 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicRegressionPerformanceEvaluator.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/BasicRegressionPerformanceEvaluator.java
@@ -73,7 +73,8 @@
   }
 
   @Override
-  public void addResult(Instance inst, double[] prediction, String instanceIdentifier) {
+  public void addResult(Instance inst, double[] prediction, String instanceIdentifier,
+          long delay) {
     double weight = inst.weight();
     double classValue = inst.classValue();
     if (weight > 0.0) {
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorCVProcessor.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorCVProcessor.java
index 05d0a27..6444e24 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorCVProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorCVProcessor.java
@@ -90,7 +90,7 @@
     addStatisticsForInstanceReceived(instanceIndex, result.getEvaluationIndex(), 1);
 
     evaluators[result.getEvaluationIndex()].addResult(result.getInstance(), result.getClassVotes(),
-        String.valueOf(instanceIndex));
+        String.valueOf(instanceIndex), result.getArrivalTimestamp());
 
     if (hasAllVotesArrivedInstance(instanceIndex)) {
       totalCount += 1;
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorProcessor.java
index e78395a..3ad2817 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/EvaluatorProcessor.java
@@ -80,7 +80,7 @@
   public boolean process(ContentEvent event) {
 
     ResultContentEvent result = (ResultContentEvent) event;
-
+    
     if ((totalCount > 0) && (totalCount % samplingFrequency) == 0) {
       long sampleEnd = System.nanoTime();
       long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd - sampleStart, TimeUnit.NANOSECONDS);
@@ -94,14 +94,15 @@
     if ((immediatePredictionStream != null) && (totalCount > 0) && (totalCount % labelSamplingFrequency) == 0) {
       this.addVote();
     }
-
+    
     if (result.isLastEvent()) {
       this.concludeMeasurement();
       return true;
     }
-
+    
     String instanceIndex = String.valueOf(result.getInstanceIndex());
-    evaluator.addResult(result.getInstance(), result.getClassVotes(), instanceIndex);
+    evaluator.addResult(result.getInstance(), result.getClassVotes(), instanceIndex,
+            System.currentTimeMillis() - result.getArrivalTimestamp());
     totalCount += 1;
 
     if (totalCount == 1) {
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/F1ClassificationPerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/F1ClassificationPerformanceEvaluator.java
index d54296d..aafb5b4 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/F1ClassificationPerformanceEvaluator.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/F1ClassificationPerformanceEvaluator.java
@@ -73,7 +73,8 @@
     }
 
     @Override
-    public void addResult(Instance inst, double[] classVotes, String instanceIndex) {
+    public void addResult(Instance inst, double[] classVotes, String instanceIndex,
+            long delay) {
         if (numClasses==-1) reset(inst.numClasses());
         int trueClass = (int) inst.classValue();
         this.support[trueClass] += 1;
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/PerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/PerformanceEvaluator.java
index c4c4a0b..15cfa2b 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/PerformanceEvaluator.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/PerformanceEvaluator.java
@@ -48,7 +48,8 @@
    *          an array containing the estimated membership probabilities of the test instance in each class
    * @return an array of measurements monitored in this evaluator
    */
-  public void addResult(Instance inst, double[] classVotes, String instanceIdentifier);
+  public void addResult(Instance inst, double[] classVotes, String instanceIdentifier,
+          long delay);
 
   /**
    * Gets the current measurements monitored by this evaluator.
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/TimeClassificationPerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/TimeClassificationPerformanceEvaluator.java
new file mode 100644
index 0000000..24bfb7f
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/TimeClassificationPerformanceEvaluator.java
@@ -0,0 +1,450 @@
+package org.apache.samoa.evaluation;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2018 Łukasz Ławniczak
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.moa.core.Measurement;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.core.Utils;
+import org.apache.samoa.moa.core.Vote;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+public class TimeClassificationPerformanceEvaluator extends AbstractOptionHandler
+        implements ClassificationPerformanceEvaluator {
+    private int numClasses;
+    private PredictionAggregator aggregator;
+    private Indicator[] indicators;
+    
+    public FloatOption maxProcessingTimeOption = new FloatOption("maxProcessingTime",
+            'l', "Processing time limit per instance [ms]", 3600e3, 0, 3600e3);
+    
+    public FloatOption timeWeightOption = new FloatOption("timeWeight",
+            'a', "How many times speed is more important than accuracy", 1, 0, 100);
+    
+    public IntOption widthOption = new IntOption("width", 'w', 
+            "Size of Window", 1000);
+    
+    @Override
+    public void reset() {
+        double timeLimit = maxProcessingTimeOption.getValue();
+        double weight = timeWeightOption.getValue();
+        
+        aggregator = new PredictionAggregator(numClasses, new EstimatorFactory() {
+            @Override
+            public WindowEstimator create() {
+                int w = widthOption.getValue();
+                return new WindowEstimator(w);
+            }
+        });
+        aggregator.setProcessingTimeLimit(timeLimit);
+        Indicator timeIndicator = new TimeIndicator(aggregator, timeLimit);
+        KappaIndicator kappaIndicator = new KappaIndicator(aggregator);
+        indicators = new Indicator[] {
+            new AccuracyIndicator(aggregator),  
+            kappaIndicator,
+            new KappaMIndicator(aggregator),
+            timeIndicator,
+            new TooLatePredictionsIndicator(aggregator),
+            new WeightedSumIndicator(new Indicator[]{
+                kappaIndicator,
+                new KappaMIndicator(aggregator),
+                timeIndicator
+            }, new double[] {0.5, 0.5, -weight}),
+            new EfficiencyIndicator(new WeightedSumIndicator(
+                    new Indicator[]{
+                        kappaIndicator,
+                        new KappaMIndicator(aggregator)
+                    },
+                    new double[]{0.5, 0.5}
+            ), new WeightedSumIndicator(
+                    new Indicator[]{ timeIndicator },
+                    new double[]{weight}
+            )),
+            new SimpleIntuitiveIndicator(new Indicator[]{
+                new OneMinusKappaIndicator(kappaIndicator),
+                timeIndicator
+            }, new double[]{1, 0}, new double[]{0, 0}, new double[]{1, 1})
+        };
+    }
+
+    @Override
+    public Measurement[] getPerformanceMeasurements() {
+        Measurement[] meas = new Measurement[indicators.length];
+        for(int i=0;i<indicators.length;++i)
+            meas[i] = new Measurement(indicators[i].getDescription(), indicators[i].getValue());
+        return meas;
+    }
+    
+    @Override
+    public void getDescription(StringBuilder sb, int indent) {
+        Measurement.getMeasurementsDescription(getPerformanceMeasurements(),
+                sb, indent);
+    }
+
+    @Override
+    protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) {
+        
+    }
+
+    @Override
+    public void addResult(Instance inst, double[] classVotes, String instanceIdentifier, long delay) {
+        if(indicators == null) {
+            numClasses = inst.numClasses();
+            reset();
+        }
+        aggregator.add(inst, classVotes, delay);
+    }
+
+    @Override
+    public Vote[] getPredictionVotes() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+}
+
+interface EstimatorFactory {
+    WindowEstimator create();
+}
+
+class PredictionAggregator {
+    private final int numClasses;
+    private final WindowEstimator weightCorrect;
+    private final WindowEstimator[] columnKappa;
+    private final WindowEstimator[] rowKappa;
+    private final WindowEstimator weightMajorityClassifier;
+    private final WindowEstimator weightNoChangeClassifier;
+    private final WindowEstimator processingTime;
+    private final WindowEstimator delayedFraction;
+    private int lastSeenClass;
+    private int numClassifiedInstances;
+    private double processingTimeLimit;
+    
+    public PredictionAggregator(int numClasses, EstimatorFactory factory) {
+        this.numClasses = numClasses;
+        columnKappa = new WindowEstimator[numClasses];
+        rowKappa = new WindowEstimator[numClasses];
+        for (int i = 0; i < numClasses; i++) {
+            this.rowKappa[i] = factory.create();
+            this.columnKappa[i] = factory.create();
+        }
+        weightCorrect = factory.create();
+        weightMajorityClassifier = factory.create();
+        weightNoChangeClassifier = factory.create();
+        processingTime = factory.create();
+        processingTimeLimit = Double.MAX_VALUE;
+        delayedFraction = factory.create();
+        numClassifiedInstances = 0;
+    }
+    
+    public void add(Instance inst, double[] votes, long delay) {
+        if(inst.classIsMissing())
+            return;
+        double weight = inst.weight();
+        int trueClass = (int) inst.classValue();
+        int predictedClass = Utils.maxIndex(votes);
+        if(delay > processingTimeLimit) {
+            predictedClass = -1;
+            delayedFraction.add(weight);
+        } else {
+            delayedFraction.add(0);
+        }
+        for (int i = 0; i < numClasses; i++) {
+            this.rowKappa[i].add(predictedClass == i ? weight : 0);
+            this.columnKappa[i].add(trueClass == i ? weight : 0);
+        }
+        weightMajorityClassifier.add(getMajorityClass() == trueClass ? weight : 0);
+        weightCorrect.add(trueClass == predictedClass ? weight : 0);
+        weightNoChangeClassifier.add(trueClass == lastSeenClass ? weight : 0);
+        processingTime.add(delay);
+        lastSeenClass = trueClass;
+        ++numClassifiedInstances;
+    }
+    
+    public int getNumClasses() {
+        return numClasses;
+    }
+    
+    public int getNumClassifiedInstances() {
+        return numClassifiedInstances;
+    }
+    
+    public double getMeanCorrect() {
+        return weightCorrect.estimation();
+    }
+    
+    public double getMeanMajorityCorrect() {
+        return weightMajorityClassifier.estimation();
+    }
+    
+    public double getMeanRow(int rowNumber) {
+        return rowKappa[rowNumber].estimation();
+    }
+    
+    public double getMeanColumn(int columnNumber) {
+        return columnKappa[columnNumber].estimation();
+    }
+    
+    public double getMeanProcessingTime() {
+        return processingTime.estimation();
+    }
+    
+    public void setProcessingTimeLimit(double limit) {
+        this.processingTimeLimit = limit;
+    }
+    
+    public double getDelayedFraction() {
+        return delayedFraction.estimation();
+    }
+    
+    private int getMajorityClass() {
+        int majorityClass = 0;
+        double maxProbClass = 0.0;
+        for (int i = 0; i < this.numClasses; i++) {
+            if (this.columnKappa[i].estimation() > maxProbClass) {
+                majorityClass = i;
+                maxProbClass = this.columnKappa[i].estimation();
+            }
+        }
+        return majorityClass;
+    }
+}
+
+interface Indicator {
+    String getDescription();
+    double getValue();
+}
+
+class AccuracyIndicator implements Indicator {
+    private final PredictionAggregator aggregator;
+    
+    public AccuracyIndicator(PredictionAggregator aggregator) {
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Accuracy";
+    }
+
+    @Override
+    public double getValue() {
+        return aggregator.getMeanCorrect();
+    }
+}
+
+class KappaIndicator implements Indicator {
+    private final PredictionAggregator aggregator;
+    
+    public KappaIndicator(PredictionAggregator aggregator) {
+        this.aggregator = aggregator;   
+    }
+    
+    @Override
+    public String getDescription() {
+        return "Kappa Statistic";
+    }
+    
+    @Override
+    public double getValue() {
+        double p0 = aggregator.getMeanCorrect();
+        double pc = 0;
+        for(int i=0;i<aggregator.getNumClasses();++i)
+            pc += aggregator.getMeanRow(i)*aggregator.getMeanColumn(i);
+        return (p0-pc)/(1-pc);
+    }
+}
+
+class OneMinusKappaIndicator implements Indicator {
+    private final KappaIndicator indicator;
+    
+    public OneMinusKappaIndicator(KappaIndicator indicator) {
+        this.indicator = indicator;
+    }
+
+    @Override
+    public String getDescription() {
+        return "1-Kappa Statistic";
+    }
+
+    @Override
+    public double getValue() {
+        return 1-indicator.getValue();
+    }
+}
+
+class KappaMIndicator implements Indicator {
+    private final PredictionAggregator aggregator;
+    
+    public KappaMIndicator(PredictionAggregator aggregator) {
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Kappa M Statistic";
+    }
+
+    @Override
+    public double getValue() {
+        double p0 = aggregator.getMeanCorrect();
+        double pc = aggregator.getMeanMajorityCorrect();
+        return (p0-pc)/(1-pc);
+    }    
+}
+
+class TimeIndicator implements Indicator {
+    private final PredictionAggregator aggregator;
+    private final double timeUnit;
+    
+    public TimeIndicator(PredictionAggregator aggregator, double timeUnit) {
+        this.aggregator = aggregator;
+        this.timeUnit = timeUnit;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Mean processing time";
+    }
+
+    @Override
+    public double getValue() {
+        return aggregator.getMeanProcessingTime() / timeUnit;
+    }
+}
+
+class TooLatePredictionsIndicator implements Indicator {
+    private final PredictionAggregator aggregator;
+    
+    public TooLatePredictionsIndicator(PredictionAggregator aggregator) {
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Too late predictions";
+    }
+
+    @Override
+    public double getValue() {
+        return aggregator.getDelayedFraction();
+    }
+}
+
+
+///////////// aggregators
+class WeightedSumIndicator implements Indicator {
+    private Indicator[] indicators;
+    private double[] weights;
+    
+    public WeightedSumIndicator(Indicator[] indicators, double[] weights) {
+        this.indicators = indicators;
+        this.weights = weights;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Measure-Based";
+    }
+
+    @Override
+    public double getValue() {
+        double sum = 0;
+        for(int i=0;i < indicators.length;++i)
+            sum += weights[i] * indicators[i].getValue();
+        return sum;
+    }
+    
+    
+}
+
+class EfficiencyIndicator implements Indicator {
+    private WeightedSumIndicator positive;
+    private WeightedSumIndicator negative;
+    
+    public EfficiencyIndicator(WeightedSumIndicator positive, WeightedSumIndicator negative) {
+        this.positive = positive;
+        this.negative = negative;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Efficiency Indicator";
+    }
+
+    @Override
+    public double getValue() {
+        return positive.getValue() / negative.getValue();
+    }
+}
+
+class SimpleIntuitiveIndicator implements Indicator {
+    private Indicator[] indicators;
+    private double[] optimums;
+    private double[] lBounds;
+    private double[] uBounds;
+    
+    public SimpleIntuitiveIndicator(Indicator[] indicators, double[] optimums) {
+        this.indicators = indicators;
+        this.optimums = optimums;
+        lBounds = new double[optimums.length];
+        uBounds = new double[optimums.length];
+        for(int i=0;i<lBounds.length;++i) {
+            lBounds[i] = Double.MIN_VALUE;
+            uBounds[i] = Double.MAX_VALUE;
+        }
+    }
+    
+    public SimpleIntuitiveIndicator(Indicator[] indicators, double[] optimums,
+            double[] lBounds, double[] uBounds) {
+        this(indicators, optimums);
+        this.lBounds = lBounds;
+        this.uBounds = uBounds;
+    }
+    
+    public void adjustBound(int i, double lower, double upper) {
+        lBounds[i] = lower;
+        uBounds[i] = upper;
+    }
+    
+    @Override
+    public String getDescription() {
+        return "Simple Intuitive Measure";
+    }
+
+    @Override
+    public double getValue() {
+        double prod = 1, worst = 1;
+        for(int i=0;i<indicators.length;++i) {
+            double val = indicators[i].getValue();
+            if(val < lBounds[i] || val > uBounds[i])
+                return Double.MAX_VALUE;
+            prod *= Math.abs(indicators[i].getValue() - optimums[i]);
+            if(lBounds[i] != Double.MIN_VALUE && uBounds[i] != Double.MAX_VALUE)
+                worst *= uBounds[i] - lBounds[i];
+        }
+        return prod / worst;
+    }
+    
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowClassificationPerformanceEvaluator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowClassificationPerformanceEvaluator.java
index 6ea40ed..44b4029 100644
--- a/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowClassificationPerformanceEvaluator.java
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowClassificationPerformanceEvaluator.java
@@ -48,19 +48,19 @@
 
   protected double TotalweightObserved = 0;
 
-  protected Estimator weightObserved;
+  protected WindowEstimator weightObserved;
 
-  protected Estimator weightCorrect;
+  protected WindowEstimator weightCorrect;
 
-  protected Estimator weightCorrectNoChangeClassifier;
+  protected WindowEstimator weightCorrectNoChangeClassifier;
 
   protected double lastSeenClass;
 
-  protected Estimator[] columnKappa;
+  protected WindowEstimator[] columnKappa;
 
-  protected Estimator[] rowKappa;
+  protected WindowEstimator[] rowKappa;
 
-  protected Estimator[] classAccuracy;
+  protected WindowEstimator[] classAccuracy;
 
   protected int numClasses;
 
@@ -68,48 +68,6 @@
   private Instance lastSeenInstance;
   protected double[] classVotes;
 
-  public class Estimator {
-
-    protected double[] window;
-
-    protected int posWindow;
-
-    protected int lenWindow;
-
-    protected int SizeWindow;
-
-    protected double sum;
-
-    public Estimator(int sizeWindow) {
-      window = new double[sizeWindow];
-      SizeWindow = sizeWindow;
-      posWindow = 0;
-      lenWindow = 0;
-    }
-
-    public void add(double value) {
-      sum -= window[posWindow];
-      sum += value;
-      window[posWindow] = value;
-      posWindow++;
-      if (posWindow == SizeWindow) {
-        posWindow = 0;
-      }
-      if (lenWindow < SizeWindow) {
-        lenWindow++;
-      }
-    }
-
-    public double total() {
-      return sum;
-    }
-
-    public double length() {
-      return lenWindow;
-    }
-
-  }
-
   /*
    * public void setWindowWidth(int w) { this.width = w; reset(); }
    */
@@ -120,23 +78,24 @@
 
   public void reset(int numClasses) {
     this.numClasses = numClasses;
-    this.rowKappa = new Estimator[numClasses];
-    this.columnKappa = new Estimator[numClasses];
-    this.classAccuracy = new Estimator[numClasses];
+    this.rowKappa = new WindowEstimator[numClasses];
+    this.columnKappa = new WindowEstimator[numClasses];
+    this.classAccuracy = new WindowEstimator[numClasses];
     for (int i = 0; i < this.numClasses; i++) {
-      this.rowKappa[i] = new Estimator(this.widthOption.getValue());
-      this.columnKappa[i] = new Estimator(this.widthOption.getValue());
-      this.classAccuracy[i] = new Estimator(this.widthOption.getValue());
+      this.rowKappa[i] = new WindowEstimator(this.widthOption.getValue());
+      this.columnKappa[i] = new WindowEstimator(this.widthOption.getValue());
+      this.classAccuracy[i] = new WindowEstimator(this.widthOption.getValue());
     }
-    this.weightCorrect = new Estimator(this.widthOption.getValue());
-    this.weightCorrectNoChangeClassifier = new Estimator(this.widthOption.getValue());
-    this.weightObserved = new Estimator(this.widthOption.getValue());
+    this.weightCorrect = new WindowEstimator(this.widthOption.getValue());
+    this.weightCorrectNoChangeClassifier = new WindowEstimator(this.widthOption.getValue());
+    this.weightObserved = new WindowEstimator(this.widthOption.getValue());
     this.TotalweightObserved = 0;
     this.lastSeenClass = 0;
   }
 
   @Override
-  public void addResult(Instance inst, double[] classVotes, String instanceIndex) {
+  public void addResult(Instance inst, double[] classVotes, String instanceIndex,
+          long delay) {
     double weight = inst.weight();
     int trueClass = (int) inst.classValue();
     if (weight > 0.0) {
diff --git a/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowEstimator.java b/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowEstimator.java
new file mode 100644
index 0000000..9fec556
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/WindowEstimator.java
@@ -0,0 +1,66 @@
+package org.apache.samoa.evaluation;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2018 Łukasz Ławniczak
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+public class WindowEstimator {
+    protected double[] window;
+
+    protected int posWindow;
+
+    protected int lenWindow;
+
+    protected int SizeWindow;
+
+    protected double sum;
+
+    public WindowEstimator(int sizeWindow) {
+      window = new double[sizeWindow];
+      SizeWindow = sizeWindow;
+      posWindow = 0;
+      lenWindow = 0;
+    }
+
+    public void add(double value) {
+      sum -= window[posWindow];
+      sum += value;
+      window[posWindow] = value;
+      posWindow++;
+      if (posWindow == SizeWindow) {
+        posWindow = 0;
+      }
+      if (lenWindow < SizeWindow) {
+        lenWindow++;
+      }
+    }
+
+    public double total() {
+      return sum;
+    }
+
+    public double length() {
+      return lenWindow;
+    }
+    
+    public double estimation() {
+        return sum / lenWindow;
+    }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java
index c1a1de6..1bba4bf 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java
@@ -41,9 +41,10 @@
 	 */
   private static final long serialVersionUID = -8620668863064613845L;
   private InstanceContent instanceContent;
+  private long arrivalTimestamp;
 
   public InstanceContentEvent() {
-
+      
   }
 
   /**
@@ -58,7 +59,9 @@
    */
   public InstanceContentEvent(long index, Instance instance,
       boolean isTraining, boolean isTesting) {
-    this.instanceContent = new InstanceContent(index, instance, isTraining, isTesting);
+    this.instanceContent = new InstanceContent(index, instance, 
+            isTraining, isTesting);
+    arrivalTimestamp = System.currentTimeMillis();
   }
 
   /**
@@ -194,6 +197,10 @@
 
   public void setLast(boolean isLast) {
     this.instanceContent.setLast(isLast);
+  }  
+  
+  public long getArrivalTimestamp() {
+      return arrivalTimestamp;
   }
   /**
    * Gets the Instance Content.
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java b/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java
index 3ede55c..59a5354 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java
@@ -46,6 +46,7 @@
   private double[] classVotes;
 
   private final boolean isLast;
+  private long arrivalTimestamp;
 
   public ResultContentEvent() {
     this.isLast = false;
@@ -68,7 +69,7 @@
    *          the class votes
    */
   public ResultContentEvent(long instanceIndex, Instance instance, int classId,
-      double[] classVotes, boolean isLast) {
+      double[] classVotes, boolean isLast, long arrivalTimestamp) {
     if (instance != null) {
       this.instance = new SerializableInstance(instance);
     }
@@ -76,6 +77,7 @@
     this.classId = classId;
     this.classVotes = classVotes;
     this.isLast = isLast;
+    this.arrivalTimestamp = arrivalTimestamp;
   }
 
   /**
@@ -209,5 +211,9 @@
   public boolean isLastEvent() {
     return isLast;
   }
+  
+  public long getArrivalTimestamp() {
+      return arrivalTimestamp;
+  }
 
 }
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
index 5e2c927..1fb9c5e 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
@@ -154,7 +154,7 @@
     if (inEvent.getInstanceIndex() < 0) {
       // end learning
       ResultContentEvent outContentEvent = new ResultContentEvent(-1, instance, 0,
-          new double[0], inEvent.isLastEvent());
+          new double[0], inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
       outContentEvent.setClassifierIndex(this.modelId);
       outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
       outputStream.put(outContentEvent);
@@ -164,7 +164,7 @@
     if (inEvent.isTesting()) {
       double[] dist = model.getVotesForInstance(instance);
       ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(),
-          instance, inEvent.getClassId(), dist, inEvent.isLastEvent());
+          instance, inEvent.getClassId(), dist, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
       outContentEvent.setClassifierIndex(this.modelId);
       outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
       logger.trace(inEvent.getInstanceIndex() + " {} {}", modelId, dist);
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
index 6cfcfae..6adee5e 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
@@ -73,7 +73,7 @@
       }
       ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(),
           inEvent.getInstance(), inEvent.getClassId(),
-          combinedVote.getArrayCopy(), inEvent.isLastEvent());
+          combinedVote.getArrayCopy(), inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
       outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
       outputStream.put(outContentEvent);
       clearStatisticsInstance(instanceIndex);
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
index 92a209b..d21bc19 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
@@ -111,7 +111,8 @@
         combinedVote = new DoubleVector(new double[inEvent.getInstance().numClasses()]);
       }
       ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-          inEvent.getClassId(), combinedVote.getArrayCopy(), inEvent.isLastEvent());
+          inEvent.getClassId(), combinedVote.getArrayCopy(), inEvent.isLastEvent(),
+          inEvent.getArrivalTimestamp());
       outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
       outputStream.put(outContentEvent);
       clearStatisticsInstance(instanceIndex);
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
index 685e2f9..3bee1e6 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/centralized/AMRulesRegressorProcessor.java
@@ -148,7 +148,7 @@
    */
   private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) {
     ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+        inEvent.getClassId(), prediction, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
     rce.setClassifierIndex(this.processorId);
     rce.setEvaluationIndex(inEvent.getEvaluationIndex());
     return rce;
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
index 9f2b9c2..d1b3ff3 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRDefaultRuleProcessor.java
@@ -119,7 +119,7 @@
 
   private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) {
     ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+        inEvent.getClassId(), prediction, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
     rce.setClassifierIndex(this.processorId);
     rce.setEvaluationIndex(inEvent.getEvaluationIndex());
     return rce;
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
index beb7e40..6e392c7 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java
@@ -173,7 +173,7 @@
 
   private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) {
     ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+        inEvent.getClassId(), prediction, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
     rce.setClassifierIndex(this.processorId);
     rce.setEvaluationIndex(inEvent.getEvaluationIndex());
     return rce;
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java
index 2131db4..84126d5 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java
@@ -222,7 +222,7 @@
    */
   private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) {
     ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+        inEvent.getClassId(), prediction, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
     rce.setClassifierIndex(this.processorId);
     rce.setEvaluationIndex(inEvent.getEvaluationIndex());
     return rce;
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
index 2c81fd0..83d54e4 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
@@ -136,7 +136,7 @@
    */
   private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) {
     ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+        inEvent.getClassId(), prediction, inEvent.isLastEvent(), inEvent.getArrivalTimestamp());
     rce.setClassifierIndex(this.processorId);
     rce.setEvaluationIndex(inEvent.getEvaluationIndex());
     return rce;
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
index b0ce82e..a9fa5bc 100644
--- a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
@@ -253,7 +253,7 @@
    */
   private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContent inEvent) {
     ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+        inEvent.getClassId(), prediction, inEvent.isLastEvent(), 0);
     rce.setClassifierIndex(this.processorId);
     rce.setEvaluationIndex(inEvent.getEvaluationIndex());
     return rce;