blob: 1222a50ac6a87c4bb859ab5793f013f477e1a6f1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.api.windowing;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.apache.heron.api.windowing.EvictionPolicy.Action;
import static org.apache.heron.api.windowing.EvictionPolicy.Action.EXPIRE;
import static org.apache.heron.api.windowing.EvictionPolicy.Action.PROCESS;
import static org.apache.heron.api.windowing.EvictionPolicy.Action.STOP;
/**
* Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
* on expiry of events or activation of the window due to {@link TriggerPolicy}.
*
* @param <T> the type of event in the window.
*/
public class WindowManager<T extends Serializable> implements TriggerHandler {
private static final Logger LOG = Logger.getLogger(WindowManager.class.getName());
private static final String EVICTION_STATE_KEY = "es";
private static final String TRIGGER_STATE_KEY = "ts";
private static final String QUEUE = "queue";
private static final String EXPIRED_EVENTS = "expired.events";
private static final String PRE_WINDOW_EVENTS = "pre.window.events";
private static final String EVENTS_SINCE_LAST_EXPIRY = "events.since.last.expiry";
/**
* Expire old events every EXPIRE_EVENTS_THRESHOLD to
* keep the window size in check.
* <p>
* Note that if the eviction policy is based on watermarks, events will not be evicted until a new
* watermark would cause them to be considered expired anyway, regardless of this limit
*/
public static final int EXPIRE_EVENTS_THRESHOLD = 100;
protected final Collection<Event<T>> queue;
protected EvictionPolicy<T, ?> evictionPolicy;
protected TriggerPolicy<T, ?> triggerPolicy;
protected final WindowLifecycleListener<T> windowLifecycleListener;
private final List<T> expiredEvents;
private final Set<Event<T>> prevWindowEvents;
private final AtomicInteger eventsSinceLastExpiry;
/**
* Constructs a {@link WindowManager}
*
* @param lifecycleListener the {@link WindowLifecycleListener}
* @param queue a collection where the events in the window can be enqueued.
* <br>
* <b>Note:</b> This collection has to be thread safe.
*/
public WindowManager(WindowLifecycleListener<T> lifecycleListener, Collection<Event<T>> queue) {
windowLifecycleListener = lifecycleListener;
this.queue = queue;
expiredEvents = new ArrayList<>();
prevWindowEvents = new HashSet<>();
eventsSinceLastExpiry = new AtomicInteger();
}
/**
* For testing purpose
* See {@link org.apache.heron.api.windowing.WindowManagerTest}
* @param lifecycleListener
*/
public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
this(lifecycleListener, new ConcurrentLinkedQueue<>());
}
public void setEvictionPolicy(EvictionPolicy<T, ?> evictionPolicy) {
this.evictionPolicy = evictionPolicy;
}
public void setTriggerPolicy(TriggerPolicy<T, ?> triggerPolicy) {
this.triggerPolicy = triggerPolicy;
}
/**
* Add an event into the window, with {@link System#currentTimeMillis()} as
* the tracking ts.
*
* @param event the event to add
*/
public void add(T event) {
add(event, System.currentTimeMillis());
}
/**
* Add an event into the window, with the given ts as the tracking ts.
*
* @param event the event to track
* @param ts the timestamp
*/
public void add(T event, long ts) {
add(new EventImpl<T>(event, ts));
}
/**
* Tracks a window event
*
* @param windowEvent the window event to track
*/
public void add(Event<T> windowEvent) {
// watermark events are not added to the queue.
if (windowEvent.isWatermark()) {
LOG.fine(String.format("Got watermark event with ts %d", windowEvent.getTimestamp()));
} else {
queue.add(windowEvent);
}
track(windowEvent);
compactWindow();
}
/**
* The callback invoked by the trigger policy.
*/
@Override
public boolean onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
*/
windowEvents = scanEvents(true);
expired = new ArrayList<>(expiredEvents);
expiredEvents.clear();
List<T> events = new ArrayList<>();
List<T> newEvents = new ArrayList<>();
for (Event<T> event : windowEvents) {
events.add(event.get());
if (!prevWindowEvents.contains(event)) {
newEvents.add(event.get());
}
}
prevWindowEvents.clear();
if (!events.isEmpty()) {
prevWindowEvents.addAll(windowEvents);
LOG.fine(String.format("invoking windowLifecycleListener onActivation, [%d] events in "
+ "window.", events.size()));
windowLifecycleListener.onActivation(events, newEvents, expired,
evictionPolicy.getContext().getReferenceTime());
} else {
LOG.fine("No events in the window, skipping onActivation");
}
triggerPolicy.reset();
return !events.isEmpty();
}
public void shutdown() {
LOG.fine("Shutting down WindowManager");
if (triggerPolicy != null) {
triggerPolicy.shutdown();
}
}
/**
* expires events that fall out of the window every
* EXPIRE_EVENTS_THRESHOLD so that the window does not grow
* too big.
*/
protected void compactWindow() {
if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
scanEvents(false);
}
}
/**
* feed the event to the eviction and trigger policies
* for bookkeeping and optionally firing the trigger.
*/
private void track(Event<T> windowEvent) {
evictionPolicy.track(windowEvent);
triggerPolicy.track(windowEvent);
}
/**
* Scan events in the queue, using the expiration policy to check
* if the event should be evicted or not.
*
* @param fullScan if set, will scan the entire queue; if not set, will stop
* as soon as an event not satisfying the expiration policy is found
* @return the list of events to be processed as a part of the current window
*/
private List<Event<T>> scanEvents(boolean fullScan) {
LOG.fine(String.format("Scan events, eviction policy %s", evictionPolicy));
List<T> eventsToExpire = new ArrayList<>();
List<Event<T>> eventsToProcess = new ArrayList<>();
Iterator<Event<T>> it = queue.iterator();
while (it.hasNext()) {
Event<T> windowEvent = it.next();
Action action = evictionPolicy.evict(windowEvent);
if (action == EXPIRE) {
eventsToExpire.add(windowEvent.get());
it.remove();
} else if (!fullScan || action == STOP) {
break;
} else if (action == PROCESS) {
eventsToProcess.add(windowEvent);
}
}
expiredEvents.addAll(eventsToExpire);
eventsSinceLastExpiry.set(0);
LOG.fine(String.format("[%d] events expired from window.", eventsToExpire.size()));
if (!eventsToExpire.isEmpty()) {
LOG.fine("invoking windowLifecycleListener.onExpiry");
windowLifecycleListener.onExpiry(eventsToExpire);
}
return eventsToProcess;
}
/**
* Scans the event queue and returns the next earliest event ts
* between the startTs and endTs
*
* @param startTs the start ts (exclusive)
* @param endTs the end ts (inclusive)
* @return the earliest event ts between startTs and endTs
*/
public long getEarliestEventTs(long startTs, long endTs) {
long minTs = Long.MAX_VALUE;
for (Event<T> event : queue) {
if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
minTs = Math.min(minTs, event.getTimestamp());
}
}
return minTs;
}
/**
* Scans the event queue and returns number of events having
* timestamp less than or equal to the reference time.
*
* @param referenceTime the reference timestamp in millis
* @return the count of events with timestamp less than or equal to referenceTime
*/
public int getEventCount(long referenceTime) {
int count = 0;
for (Event<T> event : queue) {
if (event.getTimestamp() <= referenceTime) {
++count;
}
}
return count;
}
/**
* Scans the event queue and returns the list of event ts
* falling between startTs (exclusive) and endTs (inclusive)
* at each sliding interval counts.
*
* @param startTs the start timestamp (exclusive)
* @param endTs the end timestamp (inclusive)
* @param slidingCount the sliding interval count
* @return the list of event ts
*/
public List<Long> getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
List<Long> timestamps = new ArrayList<>();
if (endTs > startTs) {
int count = 0;
long ts = Long.MIN_VALUE;
for (Event<T> event : queue) {
if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
ts = Math.max(ts, event.getTimestamp());
if (++count % slidingCount == 0) {
timestamps.add(ts);
}
}
}
}
return timestamps;
}
@Override
public String toString() {
return "WindowManager{" + "evictionPolicy=" + evictionPolicy + ", triggerPolicy="
+ triggerPolicy + '}';
}
/**
* Restore state associated with the window manager
* @param state
*/
@SuppressWarnings("unchecked")
public void restoreState(Map<String, Serializable> state) {
LOG.info("Restoring window manager state");
//restore eviction policy state
if (state.get(EVICTION_STATE_KEY) != null) {
((EvictionPolicy) evictionPolicy).restoreState(state.get(EVICTION_STATE_KEY));
}
// restore trigger policy state
if (state.get(TRIGGER_STATE_KEY) != null) {
((TriggerPolicy) triggerPolicy).restoreState(state.get(TRIGGER_STATE_KEY));
}
// restore all pending events to the queue
this.queue.addAll((Collection<Event<T>>) state.get(QUEUE));
// restore all expired events
this.expiredEvents.addAll((List<T>) state.get(EXPIRED_EVENTS));
// restore all prevWindowEvents
this.prevWindowEvents.addAll((Set<Event<T>>) state.get(PRE_WINDOW_EVENTS));
// restore the count of the number events since last expiry
this.eventsSinceLastExpiry.set((int) state.get(EVENTS_SINCE_LAST_EXPIRY));
}
/**
* Get the state of the window manager
* @return a Map representing the state of the window manager
*/
public Map<String, Serializable> getState() {
Map<String, Serializable> ret = new HashMap<>();
// get potential eviction policy state
if (evictionPolicy.getState() != null) {
ret.put(EVICTION_STATE_KEY, (Serializable) evictionPolicy.getState());
}
// get potential trigger policy state
if (triggerPolicy.getState() != null) {
ret.put(TRIGGER_STATE_KEY, (Serializable) triggerPolicy.getState());
}
ret.put(QUEUE, (Serializable) this.queue);
ret.put(EXPIRED_EVENTS, (Serializable) this.expiredEvents);
ret.put(PRE_WINDOW_EVENTS, (Serializable) this.prevWindowEvents);
ret.put(EVENTS_SINCE_LAST_EXPIRY, this.eventsSinceLastExpiry.get());
return ret;
}
}