WIP working on STREAMPIPES-451
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java
index 3d1178f..a8d5d9e 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java
@@ -34,6 +34,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class EventsPerTimeProcessor extends StreamPipesDataProcessor {
 
@@ -57,6 +58,8 @@
   private int freezeTime;
   private String fireOption;
 
+  private Long lastFire;
+
   List<Long> timestamps;
 
   public EventsPerTimeProcessor() {
@@ -69,7 +72,7 @@
                                 String timeOption,
                                 int freezeTime,
                                 String fireOption) {
-    this.timestampKey = timestampKey;
+    this.timestamps = timestamps;
     this.numberOfEvents = numberOfEvents;
     this.timeWindow = timeWindow;
     this.timeOption = timeOption;
@@ -119,8 +122,11 @@
   public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
     long timestamp = event.getFieldBySelector(this.timestampKey).getAsPrimitive().getAsLong();
 
+    this.timestamps.add(timestamp);
 
-    spOutputCollector.collect(event);
+    if (this.applyRule()) {
+      spOutputCollector.collect(event);
+    }
   }
 
   @Override
@@ -129,9 +135,25 @@
   }
 
   public boolean applyRule() {
-    boolean result = false;
+    boolean result;
 
-    // remove old events
+    // remove old events (TODO change unit)
+    long leftWindowTimestamp = this.timestamps.get(this.timestamps.size() - 1) - this.timeWindow * getMilliseconds();
+    timestamps = timestamps
+            .stream()
+            .filter(t -> t > leftWindowTimestamp)
+            .collect(Collectors.toList());
+
+    // validate if enough events  are in time window
+    result = timestamps.size() > this.numberOfEvents;
+
+//    if  (result  && )
+
+    // check if it should  only fire once
+    if (ONCE.equals(fireOption)) {
+
+    }
+
 
 
     return result;
@@ -140,4 +162,17 @@
   public List<Long> getTimestamps() {
     return timestamps;
   }
+
+  private int getMilliseconds() {
+    switch (this.timeOption) {
+      case SECOND:
+        return 1000;
+      case MINUTE:
+        return 60000;
+      case HOUR:
+        return 86400;
+      default:
+        return 1000;
+    }
+  }
 }
diff --git a/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java b/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java
index e3bf419..f15beae 100644
--- a/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java
+++ b/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java
@@ -25,11 +25,10 @@
 import static org.junit.Assert.assertEquals;
 
 public class EventsPerTimeProcessorTest {
+    private List<Long> timestamps = Arrays.asList(1000L, 2000L, 3000L, 4000L);
 
     @Test
-    public void testFieldAddition() {
-        List<Long> timestamps = Arrays.asList(1000L, 2000L, 3000L, 4000L);
-
+    public void testFieldAddition1() {
         EventsPerTimeProcessor eventsPerTimeProcessor = new EventsPerTimeProcessor(
                 timestamps,
                 3,
@@ -41,4 +40,32 @@
         assertEquals(true, eventsPerTimeProcessor.applyRule());
         assertEquals(4, eventsPerTimeProcessor.getTimestamps().size());
     }
+
+    @Test
+    public void testFieldAddition2() {
+        EventsPerTimeProcessor eventsPerTimeProcessor = new EventsPerTimeProcessor(
+                timestamps,
+                5,
+                5,
+                EventsPerTimeProcessor.SECOND,
+                2,
+                EventsPerTimeProcessor.EACH);
+
+        assertEquals(false, eventsPerTimeProcessor.applyRule());
+        assertEquals(4, eventsPerTimeProcessor.getTimestamps().size());
+    }
+
+    @Test
+    public void testFieldAddition3() {
+        EventsPerTimeProcessor eventsPerTimeProcessor = new EventsPerTimeProcessor(
+                timestamps,
+                2,
+                2,
+                EventsPerTimeProcessor.SECOND,
+                2,
+                EventsPerTimeProcessor.EACH);
+
+        assertEquals(false, eventsPerTimeProcessor.applyRule());
+        assertEquals(2, eventsPerTimeProcessor.getTimestamps().size());
+    }
 }
\ No newline at end of file