blob: 6603abf83c469583aa2877b0f8531463a1d6d428 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.windowing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import static backtype.storm.topology.base.BaseWindowedBolt.Count;
import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
/**
* Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
* on expiry of events or activation of the window due to {@link TriggerPolicy}.
*
* @param <T> the type of event in the window.
*/
public class WindowManager<T> implements TriggerHandler {
private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
/**
* Expire old events every EXPIRE_EVENTS_THRESHOLD to
* keep the window size in check.
*/
public static final int EXPIRE_EVENTS_THRESHOLD = 100;
private final WindowLifecycleListener<T> windowLifecycleListener;
private final ConcurrentLinkedQueue<Event<T>> window;
private final List<T> expiredEvents;
private final Set<Event<T>> prevWindowEvents;
private final AtomicInteger eventsSinceLastExpiry;
private final ReentrantLock lock;
private EvictionPolicy<T> evictionPolicy;
private TriggerPolicy<T> triggerPolicy;
public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
windowLifecycleListener = lifecycleListener;
window = new ConcurrentLinkedQueue<>();
expiredEvents = new ArrayList<>();
prevWindowEvents = new HashSet<>();
eventsSinceLastExpiry = new AtomicInteger();
lock = new ReentrantLock(true);
}
public void setWindowLength(Count count) {
this.evictionPolicy = new CountEvictionPolicy<>(count.value);
}
public void setWindowLength(Duration duration) {
this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
}
public void setSlidingInterval(Count count) {
this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
}
public void setSlidingInterval(Duration duration) {
this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
}
/**
* Add an event into the window, with {@link System#currentTimeMillis()} as
* the tracking ts.
*
* @param event the event to add
*/
public void add(T event) {
add(event, System.currentTimeMillis());
}
/**
* Add an event into the window, with the given ts as the tracking ts.
*
* @param event the event to track
* @param ts the timestamp
*/
public void add(T event, long ts) {
Event<T> windowEvent = new EventImpl<T>(event, ts);
window.add(windowEvent);
track(windowEvent);
compactWindow();
}
/**
* The callback invoked by the trigger policy.
*/
@Override
public void onTrigger() {
List<Event<T>> windowEvents = null;
List<T> expired = null;
try {
lock.lock();
/*
* scan the entire window to handle out of order events in
* the case of time based windows.
*/
windowEvents = expireEvents(true);
expired = new ArrayList<>(expiredEvents);
expiredEvents.clear();
} finally {
lock.unlock();
}
List<T> events = new ArrayList<>();
List<T> newEvents = new ArrayList<>();
for (Event<T> event : windowEvents) {
events.add(event.get());
if (!prevWindowEvents.contains(event)) {
newEvents.add(event.get());
}
}
prevWindowEvents.clear();
prevWindowEvents.addAll(windowEvents);
LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
windowLifecycleListener.onActivation(events, newEvents, expired);
triggerPolicy.reset();
}
public void shutdown() {
LOG.debug("Shutting down WindowManager");
if (triggerPolicy != null) {
triggerPolicy.shutdown();
}
}
/**
* expires events that fall out of the window every
* EXPIRE_EVENTS_THRESHOLD so that the window does not grow
* too big.
*/
private void compactWindow() {
if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
expireEvents(false);
}
}
/**
* feed the event to the eviction and trigger policies
* for bookkeeping and optionally firing the trigger.
*/
private void track(Event<T> windowEvent) {
evictionPolicy.track(windowEvent);
triggerPolicy.track(windowEvent);
}
/**
* Expire events from the window, using the expiration policy to check
* if the event should be evicted or not.
*
* @param fullScan if set, will scan the entire window; if not set, will stop
* as soon as an event not satisfying the expiration policy is found
* @return the list of remaining events in the window after expiry
*/
private List<Event<T>> expireEvents(boolean fullScan) {
LOG.debug("Expire events, eviction policy {}", evictionPolicy);
List<T> eventsToExpire = new ArrayList<>();
List<Event<T>> remaining = new ArrayList<>();
try {
lock.lock();
Iterator<Event<T>> it = window.iterator();
while (it.hasNext()) {
Event<T> windowEvent = it.next();
if (evictionPolicy.evict(windowEvent)) {
eventsToExpire.add(windowEvent.get());
it.remove();
} else if (!fullScan) {
break;
} else {
remaining.add(windowEvent);
}
}
expiredEvents.addAll(eventsToExpire);
} finally {
lock.unlock();
}
eventsSinceLastExpiry.set(0);
LOG.debug("[{}] events expired from window.", eventsToExpire.size());
windowLifecycleListener.onExpiry(eventsToExpire);
return remaining;
}
@Override
public String toString() {
return "WindowManager{" +
"evictionPolicy=" + evictionPolicy +
", triggerPolicy=" + triggerPolicy +
'}';
}
}