WIP working on STREAMPIPES-451
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java
index 3d1178f..a8d5d9e 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessor.java
@@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class EventsPerTimeProcessor extends StreamPipesDataProcessor {
@@ -57,6 +58,8 @@
private int freezeTime;
private String fireOption;
+ private Long lastFire;
+
List<Long> timestamps;
public EventsPerTimeProcessor() {
@@ -69,7 +72,7 @@
String timeOption,
int freezeTime,
String fireOption) {
- this.timestampKey = timestampKey;
+ this.timestamps = timestamps;
this.numberOfEvents = numberOfEvents;
this.timeWindow = timeWindow;
this.timeOption = timeOption;
@@ -119,8 +122,11 @@
public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
long timestamp = event.getFieldBySelector(this.timestampKey).getAsPrimitive().getAsLong();
+ this.timestamps.add(timestamp);
- spOutputCollector.collect(event);
+ if (this.applyRule()) {
+ spOutputCollector.collect(event);
+ }
}
@Override
@@ -129,9 +135,25 @@
}
public boolean applyRule() {
- boolean result = false;
+ boolean result;
- // remove old events
+ // remove old events (TODO change unit)
+ long leftWindowTimestamp = this.timestamps.get(this.timestamps.size() - 1) - this.timeWindow * getMilliseconds();
+ timestamps = timestamps
+ .stream()
+ .filter(t -> t > leftWindowTimestamp)
+ .collect(Collectors.toList());
+
+ // validate if enough events are in time window
+ result = timestamps.size() > this.numberOfEvents;
+
+// if (result && )
+
+ // check if it should only fire once
+ if (ONCE.equals(fireOption)) {
+
+ }
+
return result;
@@ -140,4 +162,17 @@
public List<Long> getTimestamps() {
return timestamps;
}
+
+ private int getMilliseconds() {
+ switch (this.timeOption) {
+ case SECOND:
+ return 1000;
+ case MINUTE:
+ return 60000;
+ case HOUR:
+ return 86400;
+ default:
+ return 1000;
+ }
+ }
}
diff --git a/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java b/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java
index e3bf419..f15beae 100644
--- a/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java
+++ b/streampipes-processors-filters-jvm/src/test/java/org/apache/streampipes/processors/filters/jvm/processor/time/EventsPerTimeProcessorTest.java
@@ -25,11 +25,10 @@
import static org.junit.Assert.assertEquals;
public class EventsPerTimeProcessorTest {
+ private List<Long> timestamps = Arrays.asList(1000L, 2000L, 3000L, 4000L);
@Test
- public void testFieldAddition() {
- List<Long> timestamps = Arrays.asList(1000L, 2000L, 3000L, 4000L);
-
+ public void testFieldAddition1() {
EventsPerTimeProcessor eventsPerTimeProcessor = new EventsPerTimeProcessor(
timestamps,
3,
@@ -41,4 +40,32 @@
assertEquals(true, eventsPerTimeProcessor.applyRule());
assertEquals(4, eventsPerTimeProcessor.getTimestamps().size());
}
+
+ @Test
+ public void testFieldAddition2() {
+ EventsPerTimeProcessor eventsPerTimeProcessor = new EventsPerTimeProcessor(
+ timestamps,
+ 5,
+ 5,
+ EventsPerTimeProcessor.SECOND,
+ 2,
+ EventsPerTimeProcessor.EACH);
+
+ assertEquals(false, eventsPerTimeProcessor.applyRule());
+ assertEquals(4, eventsPerTimeProcessor.getTimestamps().size());
+ }
+
+ @Test
+ public void testFieldAddition3() {
+ EventsPerTimeProcessor eventsPerTimeProcessor = new EventsPerTimeProcessor(
+ timestamps,
+ 2,
+ 2,
+ EventsPerTimeProcessor.SECOND,
+ 2,
+ EventsPerTimeProcessor.EACH);
+
+ assertEquals(false, eventsPerTimeProcessor.applyRule());
+ assertEquals(2, eventsPerTimeProcessor.getTimestamps().size());
+ }
}
\ No newline at end of file