blob: f26ede686170179f944f5e5d47b59a5a560ced0a [file] [log] [blame]
package org.apache.s4.core.window;
import java.util.Collection;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections15.buffer.CircularFifoBuffer;
import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
* slot. Each slot represents a segment of time or a fixed number of events. Slots are consecutive in time or events.
*
* Users are expected to provide a factory for creating new slots, and a method to perform a global computation on the
* current window.
*
* Slots are automatically added.
*
* WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
* efficient for sparse event streams.
*
* @param <T>
* type of the slot implementation used for this window
*
* @param <U>
* type of the values added to the window slots
*/
public abstract class AbstractSlidingWindowPE<T extends Slot<U>, U, V> extends ProcessingElement {
private static final Logger logger = LoggerFactory.getLogger(AbstractSlidingWindowPE.class);
final private int numSlots;
private CircularFifoBuffer<T> circularBuffer;
final private ScheduledExecutorService windowingTimerService;
final private long slotDurationInMilliseconds;
private T openSlot;
private final SlotFactory<T> slotFactory;
private long slotCapacity = 0;
private int eventCount = 0;
/**
*
* Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
* concrete class.
*
* @param app
* the application
* @param numSlots
* the number of slots to be stored
*/
public AbstractSlidingWindowPE(App app, int numSlots, long slotCapacity, SlotFactory<T> slotFactory) {
this(app, 0L, null, numSlots, slotFactory, slotCapacity);
}
/**
* Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
*
* @param app
* the application
* @param slotDuration
* the slot duration in timeUnit
* @param timeUnit
* the unit of time
* @param numSlots
* the number of slots to be stored
*/
public AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
SlotFactory<T> slotFactory) {
this(app, slotDuration, timeUnit, numSlots, slotFactory, 0);
}
private AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
SlotFactory<T> slotFactory, long slotCapacity) {
super(app);
this.numSlots = numSlots;
this.slotFactory = slotFactory;
this.slotCapacity = slotCapacity;
if (slotDuration > 0l) {
slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SlidingWindow-" + getClass().getSimpleName()).build();
windowingTimerService = Executors.newSingleThreadScheduledExecutor(threadFactory);
} else {
slotDurationInMilliseconds = 0;
windowingTimerService = null;
}
}
@Override
protected void onRemove() {
// TODO Auto-generated method stub
}
/**
* For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.
*/
public final void onTrigger(Event event) {
if (windowingTimerService == null) {
if (eventCount % slotCapacity == 0) {
addSlot();
}
}
}
@Override
protected void initPEPrototypeInternal() {
super.initPEPrototypeInternal();
windowingTimerService.scheduleAtFixedRate(new SlotTask(), slotDurationInMilliseconds,
slotDurationInMilliseconds, TimeUnit.MILLISECONDS);
logger.trace("TIMER: " + slotDurationInMilliseconds);
}
/**
* User provided function that evaluates the whole content of the window. It must iterate across all slots. Current
* slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is
* safe.
*
* @return
*/
abstract protected V evaluateWindow(Collection<T> slots);
/**
* Add an object to the sliding window. Use it when the window is not periodic.
*
* @param slot
*/
protected final void addSlot() {
if (windowingTimerService != null) {
logger.error("Calling method addSlot() in a periodic window is not allowed.");
return;
}
addNewSlot((AbstractSlidingWindowPE<T, U, V>) this);
}
protected void onCreate() {
eventCount = 0;
circularBuffer = new CircularFifoBuffer<T>(numSlots);
if (slotDurationInMilliseconds > 0) {
openSlot = slotFactory.createSlot();
circularBuffer.add(openSlot);
}
}
protected void updateOpenSlot(U data) {
openSlot.update(data);
}
/**
*
* @return the least recently inserted slot
*/
protected T getOldestSlot() {
return circularBuffer.get();
}
/** Stops the the sliding window. */
protected void stop() {
windowingTimerService.shutdownNow();
}
/**
*
* @return the collection of slots
*/
protected Collection<T> getSlots() {
return circularBuffer;
}
protected T getOpenSlot() {
return openSlot;
}
private class SlotTask extends TimerTask {
@Override
public void run() {
logger.trace("Starting slot task");
/* Iterate over all instances and put a new slot in the buffer. */
for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
logger.trace("pe id: " + entry.getValue().getId());
@SuppressWarnings("unchecked")
AbstractSlidingWindowPE<T, U, V> peInstance = (AbstractSlidingWindowPE<T, U, V>) entry.getValue();
if (peInstance.circularBuffer == null) {
peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
}
addNewSlot(peInstance);
}
}
}
private void addNewSlot(AbstractSlidingWindowPE<T, U, V> peInstance) {
synchronized (peInstance) {
peInstance.openSlot.close();
peInstance.openSlot = slotFactory.createSlot();
peInstance.circularBuffer.add(peInstance.openSlot);
}
}
}