Merge branch 'S4-57' into piper
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index bc20e98..2e816e6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -27,6 +27,8 @@
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.SlotFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +38,7 @@
 
 /**
  * Container base class to hold all processing elements.
- *
+ * 
  * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
  */
 public abstract class App {
@@ -410,12 +412,12 @@
 
     }
 
-    public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
-            int numSlots) {
+    public <T extends AbstractSlidingWindowPE> T createSlidingWindowPE(Class<T> type, long slotDuration,
+            TimeUnit timeUnit, int numSlots, SlotFactory slotFactory) {
         try {
-            Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+            Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class, SlotFactory.class };
             T pe = type.getDeclaredConstructor(types).newInstance(
-                    new Object[] { this, slotDuration, timeUnit, numSlots });
+                    new Object[] { this, slotDuration, timeUnit, numSlots, slotFactory });
             return pe;
         } catch (Exception e) {
             logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 730ff56..6f8264a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,12 +1,15 @@
 package org.apache.s4.core;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import net.jcip.annotations.ThreadSafe;
@@ -29,6 +32,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * <p>
@@ -101,14 +105,14 @@
     transient Map<Class<? extends Event>, Trigger> triggers;
 
     /* PE instance id. */
-    String id = "";
+    protected String id = "";
 
     /* Private fields. */
     transient private ProcessingElement pePrototype;
     transient private boolean haveTriggers = false;
     transient private long timerIntervalInMilliseconds = 0;
-    transient private Timer triggerTimer;
-    transient private Timer checkpointingTimer;
+    transient private ScheduledExecutorService triggerTimer;
+    transient private ScheduledExecutorService checkpointingTimer;
     transient private boolean isPrototype = true;
     transient private boolean isThreadSafe = false;
     transient private String name = null;
@@ -375,13 +379,22 @@
         Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
 
         if (triggerTimer != null) {
-            triggerTimer.cancel();
+            triggerTimer.shutdownNow();
         }
 
-        if (interval == 0)
+        if (interval == 0) {
             return this;
+        }
 
-        triggerTimer = new Timer();
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+                    @Override
+                    public void uncaughtException(Thread t, Throwable e) {
+                        logger.error("Expection from timer thread", e);
+                    }
+                }).setNameFormat("Timer-" + getClass().getSimpleName()).build();
+        triggerTimer = Executors.newSingleThreadScheduledExecutor(threadFactory);
         return this;
     }
 
@@ -487,8 +500,8 @@
 
         /* Close resources in prototype. */
         if (triggerTimer != null) {
-            triggerTimer.cancel();
-            logger.info("Timer stopped.");
+            triggerTimer.shutdownNow();
+            logger.info("Trigger timer stopped.");
         }
 
         /* Remove all the instances. */
@@ -510,7 +523,7 @@
     }
 
     /* This method is called by App just before the application starts. */
-    void initPEPrototypeInternal() {
+    protected void initPEPrototypeInternal() {
 
         /* Eagerly create singleton PE. */
         if (isSingleton) {
@@ -524,19 +537,27 @@
 
         /* Start timer. */
         if (triggerTimer != null) {
-            triggerTimer
-                    .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
-            logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
+            triggerTimer.scheduleAtFixedRate(new OnTimeTask(), 0, timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
+            logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
                     this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
         }
 
         if (checkpointingConfig.mode == CheckpointingMode.TIME) {
-            checkpointingTimer = new Timer();
-            checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
-                    checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
-                    checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
-            logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
-                    new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
+            ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+                        @Override
+                        public void uncaughtException(Thread t, Throwable e) {
+                            logger.error("Expection from checkpointing thread", e);
+                        }
+                    }).setNameFormat("Checkpointing-trigger-" + getClass().getSimpleName()).build();
+            checkpointingTimer = Executors.newSingleThreadScheduledExecutor(threadFactory);
+            checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this), checkpointingConfig.frequency,
+                    checkpointingConfig.frequency, checkpointingConfig.timeUnit);
+            logger.debug(
+                    "Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}] [{}].",
+                    new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency),
+                            String.valueOf(checkpointingConfig.timeUnit.toString()) });
         }
 
         /* Check if this PE is annotated as thread safe. */
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
deleted file mode 100644
index 0b6c0af..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (c) 2011 The S4 Project, http://s4.io.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.core;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
- * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
- * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
- * 
- * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
- * efficient for sparse event streams.
- */
-public abstract class WindowingPE<T> extends ProcessingElement {
-
-    private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
-
-    final private int numSlots;
-    private CircularFifoBuffer<T> circularBuffer;
-    final private Timer timer;
-    final private long slotDurationInMilliseconds;
-
-    /**
-     * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
-     * 
-     * @param app
-     *            the application
-     * @param slotDuration
-     *            the slot duration in timeUnit
-     * @param timeUnit
-     *            the unit of time
-     * @param numSlots
-     *            the number of slots to be stored
-     */
-    public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
-        super(app);
-        this.numSlots = numSlots;
-
-        if (slotDuration > 0l) {
-            slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
-            timer = new Timer();
-            timer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
-            logger.trace("TIMER: " + slotDurationInMilliseconds);
-
-        } else {
-            slotDurationInMilliseconds = 0;
-            timer = null;
-        }
-    }
-
-    /**
-     * 
-     * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
-     * concrete class.
-     * 
-     * @param app
-     *            the application
-     * @param numSlots
-     *            the number of slots to be stored
-     */
-    public WindowingPE(App app, int numSlots) {
-        this(app, 0l, null, numSlots);
-    }
-
-    /**
-     * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
-     * implement the logic required to create a slot. For example, compute statistics from aggregations and get
-     * variables ready for the new slot.
-     * 
-     * If the implementation class doesn't use periodic slots, this method will never be called. Use
-     * {@link #addSlot(Object)} instead.
-     * 
-     * @return the slot object
-     */
-    abstract protected T addPeriodicSlot();
-
-    /**
-     * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
-     * {@link #addPeriodicSlot()} instead.
-     * 
-     * @param slot
-     */
-    protected void addSlot(T slot) {
-
-        if (timer != null) {
-            logger.error("Calling method addSlot() in a periodic window is not allowed.");
-            return;
-        }
-        circularBuffer.add(slot);
-    }
-
-    protected void onCreate() {
-        circularBuffer = new CircularFifoBuffer<T>(numSlots);
-    }
-
-    /**
-     * 
-     * @return the least recently inserted slot
-     */
-    protected T getOldestSlot() {
-
-        return circularBuffer.get();
-    }
-
-    /** Stops the the sliding window. */
-    protected void stop() {
-        timer.cancel();
-    }
-
-    /**
-     * 
-     * @return the collection of slots
-     */
-    protected Collection<T> getSlots() {
-        return circularBuffer;
-    }
-
-    private class SlotTask extends TimerTask {
-
-        @Override
-        public void run() {
-
-            logger.trace("START TIMER TASK");
-
-            /* Iterate over all instances and put a new slot in the buffer. */
-            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
-                logger.trace("pe id: " + entry.getValue().id);
-                @SuppressWarnings("unchecked")
-                WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
-
-                if (peInstance.circularBuffer == null) {
-                    peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
-                }
-                synchronized (peInstance) {
-                    peInstance.circularBuffer.add(peInstance.addPeriodicSlot());
-                }
-            }
-        }
-    }
-}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
new file mode 100644
index 0000000..f26ede6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
@@ -0,0 +1,219 @@
+package org.apache.s4.core.window;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
+ * slot. Each slot represents a segment of time or a fixed number of events. Slots are consecutive in time or events.
+ * 
+ * Users are expected to provide a factory for creating new slots, and a method to perform a global computation on the
+ * current window.
+ * 
+ * Slots are automatically added.
+ * 
+ * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
+ * efficient for sparse event streams.
+ * 
+ * @param <T>
+ *            type of the slot implementation used for this window
+ * 
+ * @param <U>
+ *            type of the values added to the window slots
+ */
+public abstract class AbstractSlidingWindowPE<T extends Slot<U>, U, V> extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractSlidingWindowPE.class);
+
+    final private int numSlots;
+    private CircularFifoBuffer<T> circularBuffer;
+    final private ScheduledExecutorService windowingTimerService;
+    final private long slotDurationInMilliseconds;
+
+    private T openSlot;
+    private final SlotFactory<T> slotFactory;
+
+    private long slotCapacity = 0;
+    private int eventCount = 0;
+
+    /**
+     * 
+     * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
+     * concrete class.
+     * 
+     * @param app
+     *            the application
+     * @param numSlots
+     *            the number of slots to be stored
+     */
+    public AbstractSlidingWindowPE(App app, int numSlots, long slotCapacity, SlotFactory<T> slotFactory) {
+        this(app, 0L, null, numSlots, slotFactory, slotCapacity);
+    }
+
+    /**
+     * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
+     * 
+     * @param app
+     *            the application
+     * @param slotDuration
+     *            the slot duration in timeUnit
+     * @param timeUnit
+     *            the unit of time
+     * @param numSlots
+     *            the number of slots to be stored
+     */
+    public AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+            SlotFactory<T> slotFactory) {
+        this(app, slotDuration, timeUnit, numSlots, slotFactory, 0);
+
+    }
+
+    private AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+            SlotFactory<T> slotFactory, long slotCapacity) {
+        super(app);
+        this.numSlots = numSlots;
+        this.slotFactory = slotFactory;
+        this.slotCapacity = slotCapacity;
+        if (slotDuration > 0l) {
+            slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
+            ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                    .setNameFormat("SlidingWindow-" + getClass().getSimpleName()).build();
+            windowingTimerService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+        } else {
+            slotDurationInMilliseconds = 0;
+            windowingTimerService = null;
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    /**
+     * For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.
+     */
+    public final void onTrigger(Event event) {
+        if (windowingTimerService == null) {
+            if (eventCount % slotCapacity == 0) {
+                addSlot();
+            }
+        }
+    }
+
+    @Override
+    protected void initPEPrototypeInternal() {
+        super.initPEPrototypeInternal();
+        windowingTimerService.scheduleAtFixedRate(new SlotTask(), slotDurationInMilliseconds,
+                slotDurationInMilliseconds, TimeUnit.MILLISECONDS);
+        logger.trace("TIMER: " + slotDurationInMilliseconds);
+
+    }
+
+    /**
+     * User provided function that evaluates the whole content of the window. It must iterate across all slots. Current
+     * slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is
+     * safe.
+     * 
+     * @return
+     */
+    abstract protected V evaluateWindow(Collection<T> slots);
+
+    /**
+     * Add an object to the sliding window. Use it when the window is not periodic.
+     * 
+     * @param slot
+     */
+    protected final void addSlot() {
+
+        if (windowingTimerService != null) {
+            logger.error("Calling method addSlot() in a periodic window is not allowed.");
+            return;
+        }
+        addNewSlot((AbstractSlidingWindowPE<T, U, V>) this);
+    }
+
+    protected void onCreate() {
+        eventCount = 0;
+        circularBuffer = new CircularFifoBuffer<T>(numSlots);
+        if (slotDurationInMilliseconds > 0) {
+            openSlot = slotFactory.createSlot();
+            circularBuffer.add(openSlot);
+        }
+    }
+
+    protected void updateOpenSlot(U data) {
+        openSlot.update(data);
+    }
+
+    /**
+     * 
+     * @return the least recently inserted slot
+     */
+    protected T getOldestSlot() {
+
+        return circularBuffer.get();
+    }
+
+    /** Stops the the sliding window. */
+    protected void stop() {
+        windowingTimerService.shutdownNow();
+    }
+
+    /**
+     * 
+     * @return the collection of slots
+     */
+    protected Collection<T> getSlots() {
+        return circularBuffer;
+    }
+
+    protected T getOpenSlot() {
+        return openSlot;
+    }
+
+    private class SlotTask extends TimerTask {
+
+        @Override
+        public void run() {
+
+            logger.trace("Starting slot task");
+
+            /* Iterate over all instances and put a new slot in the buffer. */
+            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
+                logger.trace("pe id: " + entry.getValue().getId());
+                @SuppressWarnings("unchecked")
+                AbstractSlidingWindowPE<T, U, V> peInstance = (AbstractSlidingWindowPE<T, U, V>) entry.getValue();
+
+                if (peInstance.circularBuffer == null) {
+                    peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
+                }
+                addNewSlot(peInstance);
+            }
+        }
+    }
+
+    private void addNewSlot(AbstractSlidingWindowPE<T, U, V> peInstance) {
+        synchronized (peInstance) {
+            peInstance.openSlot.close();
+            peInstance.openSlot = slotFactory.createSlot();
+            peInstance.circularBuffer.add(peInstance.openSlot);
+        }
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
new file mode 100644
index 0000000..61af0e6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
@@ -0,0 +1,51 @@
+package org.apache.s4.core.window;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Window slot that keeps all data elements as a list.
+ * 
+ * @param <T>
+ *            Type of slot elements
+ */
+public class DefaultAggregatingSlot<T> implements Slot<T> {
+
+    List<T> data = null;
+    boolean open = true;
+
+    @Override
+    public void update(T datum) {
+        if (open) {
+            if (data == null) {
+                data = new ArrayList<T>();
+            }
+            data.add(datum);
+        }
+    }
+
+    @Override
+    public void close() {
+        open = false;
+        if (data == null) {
+            data = ImmutableList.of();
+        } else {
+            data = ImmutableList.copyOf(data);
+        }
+    }
+
+    public List<T> getAggregatedData() {
+        return data == null ? (List<T>) ImmutableList.of() : data;
+    }
+
+    public static class DefaultAggregatingSlotFactory<T> implements SlotFactory<DefaultAggregatingSlot<T>> {
+
+        @Override
+        public DefaultAggregatingSlot<T> createSlot() {
+            return new DefaultAggregatingSlot<T>();
+        }
+
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
new file mode 100644
index 0000000..17410d3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
@@ -0,0 +1,61 @@
+package org.apache.s4.core.window;
+
+public class OHCLSlot implements Slot<Double> {
+
+    double open = -1;
+    double high = -1;
+    double low = -1;
+    double close = -1;
+    long ticks = 0;
+    boolean isOpen;
+
+    @Override
+    public void update(Double data) {
+        if (isOpen) {
+            if (open == -1) {
+                open = low = high = close = data;
+            } else if (data > high) {
+                high = data;
+            } else if (data < low) {
+                low = data;
+            }
+            close = data;
+            ticks++;
+        }
+    }
+
+    @Override
+    public void close() {
+        isOpen = false;
+    }
+
+    double getOpen() {
+        return open;
+    }
+
+    double getClose() {
+        return close;
+    }
+
+    double getHigh() {
+        return high;
+    }
+
+    double getLow() {
+        return low;
+    }
+
+    long getTicksCount() {
+        return ticks;
+    }
+
+    public static class OHCLSlotFactory implements SlotFactory<OHCLSlot> {
+
+        @Override
+        public OHCLSlot createSlot() {
+            return new OHCLSlot();
+        }
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
new file mode 100644
index 0000000..839639f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
@@ -0,0 +1,23 @@
+package org.apache.s4.core.window;
+
+/**
+ * A convenience window slot, that aggregates elements of type <T>.
+ * 
+ * Users must add suitable getter methods to retrieve aggregated data.
+ * 
+ * @param <T>
+ *            elements to aggregate
+ */
+public interface Slot<T> {
+
+    /**
+     * Add a single data element
+     */
+    void update(T data);
+
+    /**
+     * Compute aggregated data on available gathered slot data, place slot and slot data in immutable state.
+     */
+    void close();
+
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
new file mode 100644
index 0000000..d40bc92
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.core.window;
+
+/**
+ * Defines factory for window slots
+ * 
+ * @param <T>
+ *            slot class or interface that is produced
+ */
+public interface SlotFactory<T> {
+
+    T createSlot();
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 70fe4ec..b3ce9ed 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -7,9 +7,10 @@
 import junit.framework.Assert;
 
 import org.apache.s4.base.EventMessage;
-import org.apache.s4.comm.BareCommModule;
 import org.apache.s4.core.triggers.TriggeredApp;
 import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.s4.wordcount.StringEvent;
 import org.apache.zookeeper.KeeperException;
@@ -46,7 +47,7 @@
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
         app = injector.getInstance(TriggeredApp.class);
         app.init();
         app.start();
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index ef3fabd..a9649e5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -13,13 +13,13 @@
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.KeyFinder;
-import org.apache.s4.comm.BareCommModule;
 import org.apache.s4.core.App;
-import org.apache.s4.core.BareCoreModule;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
 import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
 import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
@@ -63,7 +63,7 @@
         final CountDownLatch signalCheckpointed = new CountDownLatch(1);
         CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
 
-        Injector injector = Guice.createInjector(new BareCommModule(),
+        Injector injector = Guice.createInjector(new MockCommModule(),
                 new MockCoreModuleWithFileBaseCheckpointingBackend());
         TestApp app = injector.getInstance(TestApp.class);
         app.init();
@@ -139,7 +139,7 @@
 
     }
 
-    private static class MockCoreModuleWithFileBaseCheckpointingBackend extends BareCoreModule {
+    private static class MockCoreModuleWithFileBaseCheckpointingBackend extends MockCoreModule {
 
         protected void configure() {
             super.configure();
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
index 3bbd38e..e1c0c11 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -10,11 +10,11 @@
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.KeyFinder;
-import org.apache.s4.comm.BareCommModule;
 import org.apache.s4.core.App;
-import org.apache.s4.core.BareCoreModule;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +34,7 @@
      */
     @Test
     public void testSynchronization() throws IOException, InterruptedException {
-        Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
         TestApp app = injector.getInstance(TestApp.class);
         app.count = 2; // One for the event, another for the timer
         app.init();
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
new file mode 100644
index 0000000..76e60b8
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -0,0 +1,67 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.SlotFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WindowingPE1 extends AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> {
+
+    private static Logger logger = LoggerFactory.getLogger(WindowingPE1.class);
+    AtomicInteger counter = new AtomicInteger();
+
+    public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+            SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+        super(app, slotDuration, timeUnit, numSlots, slotFactory);
+    }
+
+    public WindowingPE1(App app, int numSlots, long slotCapacity,
+            SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+        super(app, numSlots, slotCapacity, slotFactory);
+    }
+
+    public void onEvent(Event event) {
+
+        Integer value = event.get("value", Integer.class);
+        updateOpenSlot(value);
+        counter.incrementAndGet();
+        if (counter.get() % 1000 == 0) {
+            logger.trace("received value [{}]", event.get("value", Integer.class));
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+
+    @Override
+    protected void onTime() {
+        if (counter.get() == WindowingPETest.NB_EVENTS) {
+            // System.out.println(Arrays.toString(values.toArray(new Integer[] {})));
+            WindowingPETest.allValues.addAll(evaluateWindow(getSlots()));
+            WindowingPETest.signalAllEventsProcessed.countDown();
+        }
+
+    }
+
+    @Override
+    protected List<Integer> evaluateWindow(Collection<DefaultAggregatingSlot<Integer>> slots) {
+        List<Integer> values = new ArrayList<Integer>();
+
+        for (DefaultAggregatingSlot<Integer> slot : getSlots()) {
+            values.addAll(slot.getAggregatedData());
+        }
+        return values;
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
new file mode 100644
index 0000000..80ba837
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -0,0 +1,99 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.DefaultAggregatingSlot.DefaultAggregatingSlotFactory;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class WindowingPETest {
+
+    public static final long NB_EVENTS = 1000000;
+    public static final CountDownLatch signalAllEventsProcessed = new CountDownLatch(1);
+    public static final List<Integer> allValues = new ArrayList<Integer>();
+
+    private static final String STREAM_NAME = "stream1";
+    private static final String APP_NAME = "app1";
+
+    @Test
+    public void test() {
+        ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
+                .getLogger(Logger.ROOT_LOGGER_NAME);
+        root.setLevel(Level.DEBUG);
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+        TestTimeWindowedApp app = injector.getInstance(TestTimeWindowedApp.class);
+        app.init();
+        app.start();
+
+        for (int i = 0; i < NB_EVENTS; i++) {
+            Event e = new Event();
+            e.put("value", Integer.class, i);
+            app.stream1.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(e)));
+        }
+
+        try {
+            Assert.assertTrue(signalAllEventsProcessed.await(30, TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(NB_EVENTS, allValues.size());
+        for (int i = 0; i < NB_EVENTS; i++) {
+            Assert.assertEquals((Integer) i, allValues.get(i));
+        }
+    }
+
+    public static class TestTimeWindowedApp extends App {
+
+        private Stream<Event> stream1;
+
+        @Override
+        protected void onStart() {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        protected void onInit() {
+            AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> wPE1 = createSlidingWindowPE(
+                    WindowingPE1.class, 10L, TimeUnit.MILLISECONDS, 100000,
+                    new DefaultAggregatingSlotFactory<Integer>());
+            wPE1.setTimerInterval(10, TimeUnit.MILLISECONDS);
+            stream1 = createStream(STREAM_NAME, new KeyFinder<Event>() {
+
+                @Override
+                public List<String> get(final Event event) {
+                    return ImmutableList.of("X");
+                }
+            }, wPE1);
+
+        }
+
+        @Override
+        protected void onClose() {
+            // TODO Auto-generated method stub
+
+        }
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
similarity index 83%
rename from subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
rename to subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index c7b9923..a9a72ce 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -1,7 +1,9 @@
-package org.apache.s4.comm;
+package org.apache.s4.fixtures;
 
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.RemoteEmitterFactory;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Assignment;
@@ -16,12 +18,12 @@
 import com.google.inject.name.Names;
 
 /**
- * Default configuration module for the communication layer. Parameterizable through a configuration file.
- * 
+ * Mock module for the comm layer. Mocks comm layer basic functionalities, and uses some default when required.
+ *
  */
-public class BareCommModule extends AbstractModule {
+public class MockCommModule extends AbstractModule {
 
-    public BareCommModule() {
+    public MockCommModule() {
         super();
     }
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
similarity index 66%
rename from subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
rename to subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index b131c3f..7c52c94 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -1,7 +1,8 @@
-package org.apache.s4.core;
+package org.apache.s4.fixtures;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
+import org.apache.s4.core.Receiver;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
 import org.mockito.Mockito;
@@ -11,16 +12,15 @@
 import com.google.inject.AbstractModule;
 
 /**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
+ * Core module mocking basic platform functionalities.
  * 
  */
-public class BareCoreModule extends AbstractModule {
+public class MockCoreModule extends AbstractModule {
 
     @SuppressWarnings("unused")
-    private static Logger logger = LoggerFactory.getLogger(BareCoreModule.class);
+    private static Logger logger = LoggerFactory.getLogger(MockCoreModule.class);
 
-    public BareCoreModule() {
+    public MockCoreModule() {
     }
 
     @Override