Merge branch 'S4-57' into piper
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index bc20e98..2e816e6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -27,6 +27,8 @@
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.SlotFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@
/**
* Container base class to hold all processing elements.
- *
+ *
* It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
*/
public abstract class App {
@@ -410,12 +412,12 @@
}
- public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
- int numSlots) {
+ public <T extends AbstractSlidingWindowPE> T createSlidingWindowPE(Class<T> type, long slotDuration,
+ TimeUnit timeUnit, int numSlots, SlotFactory slotFactory) {
try {
- Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+ Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class, SlotFactory.class };
T pe = type.getDeclaredConstructor(types).newInstance(
- new Object[] { this, slotDuration, timeUnit, numSlots });
+ new Object[] { this, slotDuration, timeUnit, numSlots, slotFactory });
return pe;
} catch (Exception e) {
logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 730ff56..6f8264a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,12 +1,15 @@
package org.apache.s4.core;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.Map;
-import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
@@ -29,6 +32,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* <p>
@@ -101,14 +105,14 @@
transient Map<Class<? extends Event>, Trigger> triggers;
/* PE instance id. */
- String id = "";
+ protected String id = "";
/* Private fields. */
transient private ProcessingElement pePrototype;
transient private boolean haveTriggers = false;
transient private long timerIntervalInMilliseconds = 0;
- transient private Timer triggerTimer;
- transient private Timer checkpointingTimer;
+ transient private ScheduledExecutorService triggerTimer;
+ transient private ScheduledExecutorService checkpointingTimer;
transient private boolean isPrototype = true;
transient private boolean isThreadSafe = false;
transient private String name = null;
@@ -375,13 +379,22 @@
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
if (triggerTimer != null) {
- triggerTimer.cancel();
+ triggerTimer.shutdownNow();
}
- if (interval == 0)
+ if (interval == 0) {
return this;
+ }
- triggerTimer = new Timer();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Expection from timer thread", e);
+ }
+ }).setNameFormat("Timer-" + getClass().getSimpleName()).build();
+ triggerTimer = Executors.newSingleThreadScheduledExecutor(threadFactory);
return this;
}
@@ -487,8 +500,8 @@
/* Close resources in prototype. */
if (triggerTimer != null) {
- triggerTimer.cancel();
- logger.info("Timer stopped.");
+ triggerTimer.shutdownNow();
+ logger.info("Trigger timer stopped.");
}
/* Remove all the instances. */
@@ -510,7 +523,7 @@
}
/* This method is called by App just before the application starts. */
- void initPEPrototypeInternal() {
+ protected void initPEPrototypeInternal() {
/* Eagerly create singleton PE. */
if (isSingleton) {
@@ -524,19 +537,27 @@
/* Start timer. */
if (triggerTimer != null) {
- triggerTimer
- .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
- logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
+ triggerTimer.scheduleAtFixedRate(new OnTimeTask(), 0, timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
+ logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
}
if (checkpointingConfig.mode == CheckpointingMode.TIME) {
- checkpointingTimer = new Timer();
- checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
- checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
- checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
- logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
- new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Expection from checkpointing thread", e);
+ }
+ }).setNameFormat("Checkpointing-trigger-" + getClass().getSimpleName()).build();
+ checkpointingTimer = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this), checkpointingConfig.frequency,
+ checkpointingConfig.frequency, checkpointingConfig.timeUnit);
+ logger.debug(
+ "Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}] [{}].",
+ new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency),
+ String.valueOf(checkpointingConfig.timeUnit.toString()) });
}
/* Check if this PE is annotated as thread safe. */
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
deleted file mode 100644
index 0b6c0af..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (c) 2011 The S4 Project, http://s4.io.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.core;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
- * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
- * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
- *
- * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
- * efficient for sparse event streams.
- */
-public abstract class WindowingPE<T> extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
-
- final private int numSlots;
- private CircularFifoBuffer<T> circularBuffer;
- final private Timer timer;
- final private long slotDurationInMilliseconds;
-
- /**
- * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
- *
- * @param app
- * the application
- * @param slotDuration
- * the slot duration in timeUnit
- * @param timeUnit
- * the unit of time
- * @param numSlots
- * the number of slots to be stored
- */
- public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
- super(app);
- this.numSlots = numSlots;
-
- if (slotDuration > 0l) {
- slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
- timer = new Timer();
- timer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
- logger.trace("TIMER: " + slotDurationInMilliseconds);
-
- } else {
- slotDurationInMilliseconds = 0;
- timer = null;
- }
- }
-
- /**
- *
- * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
- * concrete class.
- *
- * @param app
- * the application
- * @param numSlots
- * the number of slots to be stored
- */
- public WindowingPE(App app, int numSlots) {
- this(app, 0l, null, numSlots);
- }
-
- /**
- * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
- * implement the logic required to create a slot. For example, compute statistics from aggregations and get
- * variables ready for the new slot.
- *
- * If the implementation class doesn't use periodic slots, this method will never be called. Use
- * {@link #addSlot(Object)} instead.
- *
- * @return the slot object
- */
- abstract protected T addPeriodicSlot();
-
- /**
- * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
- * {@link #addPeriodicSlot()} instead.
- *
- * @param slot
- */
- protected void addSlot(T slot) {
-
- if (timer != null) {
- logger.error("Calling method addSlot() in a periodic window is not allowed.");
- return;
- }
- circularBuffer.add(slot);
- }
-
- protected void onCreate() {
- circularBuffer = new CircularFifoBuffer<T>(numSlots);
- }
-
- /**
- *
- * @return the least recently inserted slot
- */
- protected T getOldestSlot() {
-
- return circularBuffer.get();
- }
-
- /** Stops the the sliding window. */
- protected void stop() {
- timer.cancel();
- }
-
- /**
- *
- * @return the collection of slots
- */
- protected Collection<T> getSlots() {
- return circularBuffer;
- }
-
- private class SlotTask extends TimerTask {
-
- @Override
- public void run() {
-
- logger.trace("START TIMER TASK");
-
- /* Iterate over all instances and put a new slot in the buffer. */
- for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
- logger.trace("pe id: " + entry.getValue().id);
- @SuppressWarnings("unchecked")
- WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
-
- if (peInstance.circularBuffer == null) {
- peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
- }
- synchronized (peInstance) {
- peInstance.circularBuffer.add(peInstance.addPeriodicSlot());
- }
- }
- }
- }
-}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
new file mode 100644
index 0000000..f26ede6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
@@ -0,0 +1,219 @@
+package org.apache.s4.core.window;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
+ * slot. Each slot represents a segment of time or a fixed number of events. Slots are consecutive in time or events.
+ *
+ * Users are expected to provide a factory for creating new slots, and a method to perform a global computation on the
+ * current window.
+ *
+ * Slots are automatically added.
+ *
+ * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
+ * efficient for sparse event streams.
+ *
+ * @param <T>
+ * type of the slot implementation used for this window
+ *
+ * @param <U>
+ * type of the values added to the window slots
+ */
+public abstract class AbstractSlidingWindowPE<T extends Slot<U>, U, V> extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractSlidingWindowPE.class);
+
+ final private int numSlots;
+ private CircularFifoBuffer<T> circularBuffer;
+ final private ScheduledExecutorService windowingTimerService;
+ final private long slotDurationInMilliseconds;
+
+ private T openSlot;
+ private final SlotFactory<T> slotFactory;
+
+ private long slotCapacity = 0;
+ private int eventCount = 0;
+
+ /**
+ *
+ * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
+ * concrete class.
+ *
+ * @param app
+ * the application
+ * @param numSlots
+ * the number of slots to be stored
+ */
+ public AbstractSlidingWindowPE(App app, int numSlots, long slotCapacity, SlotFactory<T> slotFactory) {
+ this(app, 0L, null, numSlots, slotFactory, slotCapacity);
+ }
+
+ /**
+ * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
+ *
+ * @param app
+ * the application
+ * @param slotDuration
+ * the slot duration in timeUnit
+ * @param timeUnit
+ * the unit of time
+ * @param numSlots
+ * the number of slots to be stored
+ */
+ public AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+ SlotFactory<T> slotFactory) {
+ this(app, slotDuration, timeUnit, numSlots, slotFactory, 0);
+
+ }
+
+ private AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+ SlotFactory<T> slotFactory, long slotCapacity) {
+ super(app);
+ this.numSlots = numSlots;
+ this.slotFactory = slotFactory;
+ this.slotCapacity = slotCapacity;
+ if (slotDuration > 0l) {
+ slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("SlidingWindow-" + getClass().getSimpleName()).build();
+ windowingTimerService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+ } else {
+ slotDurationInMilliseconds = 0;
+ windowingTimerService = null;
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.
+ */
+ public final void onTrigger(Event event) {
+ if (windowingTimerService == null) {
+ if (eventCount % slotCapacity == 0) {
+ addSlot();
+ }
+ }
+ }
+
+ @Override
+ protected void initPEPrototypeInternal() {
+ super.initPEPrototypeInternal();
+ windowingTimerService.scheduleAtFixedRate(new SlotTask(), slotDurationInMilliseconds,
+ slotDurationInMilliseconds, TimeUnit.MILLISECONDS);
+ logger.trace("TIMER: " + slotDurationInMilliseconds);
+
+ }
+
+ /**
+ * User provided function that evaluates the whole content of the window. It must iterate across all slots. Current
+ * slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is
+ * safe.
+ *
+ * @return
+ */
+ abstract protected V evaluateWindow(Collection<T> slots);
+
+ /**
+ * Add an object to the sliding window. Use it when the window is not periodic.
+ *
+ * @param slot
+ */
+ protected final void addSlot() {
+
+ if (windowingTimerService != null) {
+ logger.error("Calling method addSlot() in a periodic window is not allowed.");
+ return;
+ }
+ addNewSlot((AbstractSlidingWindowPE<T, U, V>) this);
+ }
+
+ protected void onCreate() {
+ eventCount = 0;
+ circularBuffer = new CircularFifoBuffer<T>(numSlots);
+ if (slotDurationInMilliseconds > 0) {
+ openSlot = slotFactory.createSlot();
+ circularBuffer.add(openSlot);
+ }
+ }
+
+ protected void updateOpenSlot(U data) {
+ openSlot.update(data);
+ }
+
+ /**
+ *
+ * @return the least recently inserted slot
+ */
+ protected T getOldestSlot() {
+
+ return circularBuffer.get();
+ }
+
+ /** Stops the the sliding window. */
+ protected void stop() {
+ windowingTimerService.shutdownNow();
+ }
+
+ /**
+ *
+ * @return the collection of slots
+ */
+ protected Collection<T> getSlots() {
+ return circularBuffer;
+ }
+
+ protected T getOpenSlot() {
+ return openSlot;
+ }
+
+ private class SlotTask extends TimerTask {
+
+ @Override
+ public void run() {
+
+ logger.trace("Starting slot task");
+
+ /* Iterate over all instances and put a new slot in the buffer. */
+ for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
+ logger.trace("pe id: " + entry.getValue().getId());
+ @SuppressWarnings("unchecked")
+ AbstractSlidingWindowPE<T, U, V> peInstance = (AbstractSlidingWindowPE<T, U, V>) entry.getValue();
+
+ if (peInstance.circularBuffer == null) {
+ peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
+ }
+ addNewSlot(peInstance);
+ }
+ }
+ }
+
+ private void addNewSlot(AbstractSlidingWindowPE<T, U, V> peInstance) {
+ synchronized (peInstance) {
+ peInstance.openSlot.close();
+ peInstance.openSlot = slotFactory.createSlot();
+ peInstance.circularBuffer.add(peInstance.openSlot);
+ }
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
new file mode 100644
index 0000000..61af0e6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
@@ -0,0 +1,51 @@
+package org.apache.s4.core.window;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Window slot that keeps all data elements as a list.
+ *
+ * @param <T>
+ * Type of slot elements
+ */
+public class DefaultAggregatingSlot<T> implements Slot<T> {
+
+ List<T> data = null;
+ boolean open = true;
+
+ @Override
+ public void update(T datum) {
+ if (open) {
+ if (data == null) {
+ data = new ArrayList<T>();
+ }
+ data.add(datum);
+ }
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ if (data == null) {
+ data = ImmutableList.of();
+ } else {
+ data = ImmutableList.copyOf(data);
+ }
+ }
+
+ public List<T> getAggregatedData() {
+ return data == null ? (List<T>) ImmutableList.of() : data;
+ }
+
+ public static class DefaultAggregatingSlotFactory<T> implements SlotFactory<DefaultAggregatingSlot<T>> {
+
+ @Override
+ public DefaultAggregatingSlot<T> createSlot() {
+ return new DefaultAggregatingSlot<T>();
+ }
+
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
new file mode 100644
index 0000000..17410d3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
@@ -0,0 +1,61 @@
+package org.apache.s4.core.window;
+
+public class OHCLSlot implements Slot<Double> {
+
+ double open = -1;
+ double high = -1;
+ double low = -1;
+ double close = -1;
+ long ticks = 0;
+ boolean isOpen;
+
+ @Override
+ public void update(Double data) {
+ if (isOpen) {
+ if (open == -1) {
+ open = low = high = close = data;
+ } else if (data > high) {
+ high = data;
+ } else if (data < low) {
+ low = data;
+ }
+ close = data;
+ ticks++;
+ }
+ }
+
+ @Override
+ public void close() {
+ isOpen = false;
+ }
+
+ double getOpen() {
+ return open;
+ }
+
+ double getClose() {
+ return close;
+ }
+
+ double getHigh() {
+ return high;
+ }
+
+ double getLow() {
+ return low;
+ }
+
+ long getTicksCount() {
+ return ticks;
+ }
+
+ public static class OHCLSlotFactory implements SlotFactory<OHCLSlot> {
+
+ @Override
+ public OHCLSlot createSlot() {
+ return new OHCLSlot();
+ }
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
new file mode 100644
index 0000000..839639f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
@@ -0,0 +1,23 @@
+package org.apache.s4.core.window;
+
+/**
+ * A convenience window slot, that aggregates elements of type <T>.
+ *
+ * Users must add suitable getter methods to retrieve aggregated data.
+ *
+ * @param <T>
+ * elements to aggregate
+ */
+public interface Slot<T> {
+
+ /**
+ * Add a single data element
+ */
+ void update(T data);
+
+ /**
+ * Compute aggregated data on available gathered slot data, place slot and slot data in immutable state.
+ */
+ void close();
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
new file mode 100644
index 0000000..d40bc92
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.core.window;
+
+/**
+ * Defines factory for window slots
+ *
+ * @param <T>
+ * slot class or interface that is produced
+ */
+public interface SlotFactory<T> {
+
+ T createSlot();
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 70fe4ec..b3ce9ed 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -7,9 +7,10 @@
import junit.framework.Assert;
import org.apache.s4.base.EventMessage;
-import org.apache.s4.comm.BareCommModule;
import org.apache.s4.core.triggers.TriggeredApp;
import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.s4.wordcount.StringEvent;
import org.apache.zookeeper.KeeperException;
@@ -46,7 +47,7 @@
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
- Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
app = injector.getInstance(TriggeredApp.class);
app.init();
app.start();
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index ef3fabd..a9649e5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -13,13 +13,13 @@
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.KeyFinder;
-import org.apache.s4.comm.BareCommModule;
import org.apache.s4.core.App;
-import org.apache.s4.core.BareCoreModule;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
@@ -63,7 +63,7 @@
final CountDownLatch signalCheckpointed = new CountDownLatch(1);
CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
- Injector injector = Guice.createInjector(new BareCommModule(),
+ Injector injector = Guice.createInjector(new MockCommModule(),
new MockCoreModuleWithFileBaseCheckpointingBackend());
TestApp app = injector.getInstance(TestApp.class);
app.init();
@@ -139,7 +139,7 @@
}
- private static class MockCoreModuleWithFileBaseCheckpointingBackend extends BareCoreModule {
+ private static class MockCoreModuleWithFileBaseCheckpointingBackend extends MockCoreModule {
protected void configure() {
super.configure();
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
index 3bbd38e..e1c0c11 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -10,11 +10,11 @@
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.KeyFinder;
-import org.apache.s4.comm.BareCommModule;
import org.apache.s4.core.App;
-import org.apache.s4.core.BareCoreModule;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@
*/
@Test
public void testSynchronization() throws IOException, InterruptedException {
- Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
TestApp app = injector.getInstance(TestApp.class);
app.count = 2; // One for the event, another for the timer
app.init();
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
new file mode 100644
index 0000000..76e60b8
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -0,0 +1,67 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.SlotFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WindowingPE1 extends AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> {
+
+ private static Logger logger = LoggerFactory.getLogger(WindowingPE1.class);
+ AtomicInteger counter = new AtomicInteger();
+
+ public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+ SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+ super(app, slotDuration, timeUnit, numSlots, slotFactory);
+ }
+
+ public WindowingPE1(App app, int numSlots, long slotCapacity,
+ SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+ super(app, numSlots, slotCapacity, slotFactory);
+ }
+
+ public void onEvent(Event event) {
+
+ Integer value = event.get("value", Integer.class);
+ updateOpenSlot(value);
+ counter.incrementAndGet();
+ if (counter.get() % 1000 == 0) {
+ logger.trace("received value [{}]", event.get("value", Integer.class));
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+
+ @Override
+ protected void onTime() {
+ if (counter.get() == WindowingPETest.NB_EVENTS) {
+ // System.out.println(Arrays.toString(values.toArray(new Integer[] {})));
+ WindowingPETest.allValues.addAll(evaluateWindow(getSlots()));
+ WindowingPETest.signalAllEventsProcessed.countDown();
+ }
+
+ }
+
+ @Override
+ protected List<Integer> evaluateWindow(Collection<DefaultAggregatingSlot<Integer>> slots) {
+ List<Integer> values = new ArrayList<Integer>();
+
+ for (DefaultAggregatingSlot<Integer> slot : getSlots()) {
+ values.addAll(slot.getAggregatedData());
+ }
+ return values;
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
new file mode 100644
index 0000000..80ba837
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -0,0 +1,99 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.DefaultAggregatingSlot.DefaultAggregatingSlotFactory;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class WindowingPETest {
+
+ public static final long NB_EVENTS = 1000000;
+ public static final CountDownLatch signalAllEventsProcessed = new CountDownLatch(1);
+ public static final List<Integer> allValues = new ArrayList<Integer>();
+
+ private static final String STREAM_NAME = "stream1";
+ private static final String APP_NAME = "app1";
+
+ @Test
+ public void test() {
+ ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
+ .getLogger(Logger.ROOT_LOGGER_NAME);
+ root.setLevel(Level.DEBUG);
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+ TestTimeWindowedApp app = injector.getInstance(TestTimeWindowedApp.class);
+ app.init();
+ app.start();
+
+ for (int i = 0; i < NB_EVENTS; i++) {
+ Event e = new Event();
+ e.put("value", Integer.class, i);
+ app.stream1.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(e)));
+ }
+
+ try {
+ Assert.assertTrue(signalAllEventsProcessed.await(30, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(NB_EVENTS, allValues.size());
+ for (int i = 0; i < NB_EVENTS; i++) {
+ Assert.assertEquals((Integer) i, allValues.get(i));
+ }
+ }
+
+ public static class TestTimeWindowedApp extends App {
+
+ private Stream<Event> stream1;
+
+ @Override
+ protected void onStart() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> wPE1 = createSlidingWindowPE(
+ WindowingPE1.class, 10L, TimeUnit.MILLISECONDS, 100000,
+ new DefaultAggregatingSlotFactory<Integer>());
+ wPE1.setTimerInterval(10, TimeUnit.MILLISECONDS);
+ stream1 = createStream(STREAM_NAME, new KeyFinder<Event>() {
+
+ @Override
+ public List<String> get(final Event event) {
+ return ImmutableList.of("X");
+ }
+ }, wPE1);
+
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
similarity index 83%
rename from subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
rename to subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index c7b9923..a9a72ce 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -1,7 +1,9 @@
-package org.apache.s4.comm;
+package org.apache.s4.fixtures;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.RemoteEmitterFactory;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Assignment;
@@ -16,12 +18,12 @@
import com.google.inject.name.Names;
/**
- * Default configuration module for the communication layer. Parameterizable through a configuration file.
- *
+ * Mock module for the comm layer. Mocks comm layer basic functionalities, and uses some default when required.
+ *
*/
-public class BareCommModule extends AbstractModule {
+public class MockCommModule extends AbstractModule {
- public BareCommModule() {
+ public MockCommModule() {
super();
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
similarity index 66%
rename from subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
rename to subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index b131c3f..7c52c94 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -1,7 +1,8 @@
-package org.apache.s4.core;
+package org.apache.s4.fixtures;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
+import org.apache.s4.core.Receiver;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.NoOpDeploymentManager;
import org.mockito.Mockito;
@@ -11,16 +12,15 @@
import com.google.inject.AbstractModule;
/**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
+ * Core module mocking basic platform functionalities.
*
*/
-public class BareCoreModule extends AbstractModule {
+public class MockCoreModule extends AbstractModule {
@SuppressWarnings("unused")
- private static Logger logger = LoggerFactory.getLogger(BareCoreModule.class);
+ private static Logger logger = LoggerFactory.getLogger(MockCoreModule.class);
- public BareCoreModule() {
+ public MockCoreModule() {
}
@Override