blob: c7d6f701685106859314b60b6428dc08d4fb9b6a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.topology;
import backtype.storm.Config;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.windowing.TupleWindowImpl;
import backtype.storm.windowing.WindowLifecycleListener;
import backtype.storm.windowing.WindowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static backtype.storm.topology.base.BaseWindowedBolt.Count;
import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
/**
* An {@link IWindowedBolt} wrapper that does the windowing of tuples.
*/
public class WindowedBoltExecutor implements IRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
private IWindowedBolt bolt;
private transient WindowedOutputCollector windowedOutputCollector;
private transient WindowLifecycleListener<Tuple> listener;
private transient WindowManager<Tuple> windowManager;
public WindowedBoltExecutor(IWindowedBolt bolt) {
this.bolt = bolt;
}
private int getTopologyTimeoutMillis(Map stormConf) {
if (stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) != null) {
boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if (!timeOutsEnabled) {
return Integer.MAX_VALUE;
}
}
int timeout = 0;
if (stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) != null) {
timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
}
return timeout * 1000;
}
private int getMaxSpoutPending(Map stormConf) {
int maxPending = Integer.MAX_VALUE;
if (stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING) != null) {
maxPending = ((Number) stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).intValue();
}
return maxPending;
}
private void ensureDurationLessThanTimeout(int duration, int timeout) {
if (duration > timeout) {
throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
" is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
" value " + timeout);
}
}
private void ensureCountLessThanMaxPending(int count, int maxPending) {
if (count > maxPending) {
throw new IllegalArgumentException("Window count (length + sliding interval) value " + count +
" is more than " + Config.TOPOLOGY_MAX_SPOUT_PENDING +
" value " + maxPending);
}
}
private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
Count slidingIntervalCount, Duration slidingIntervalDuration) {
int topologyTimeout = getTopologyTimeoutMillis(stormConf);
int maxSpoutPending = getMaxSpoutPending(stormConf);
if (windowLengthDuration != null && slidingIntervalDuration != null) {
ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
} else if (windowLengthDuration != null) {
ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
} else if (slidingIntervalDuration != null) {
ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout);
}
if (windowLengthCount != null && slidingIntervalCount != null) {
ensureCountLessThanMaxPending(windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending);
} else if (windowLengthCount != null) {
ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending);
} else if (slidingIntervalCount != null) {
ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending);
}
}
private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
Duration windowLengthDuration = null;
Count windowLengthCount = null;
Duration slidingIntervalDuration = null;
Count slidingIntervalCount = null;
if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
} else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
windowLengthDuration = new Duration(
((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
TimeUnit.MILLISECONDS);
}
if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
} else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
} else {
// default is a sliding window of count 1
slidingIntervalCount = new Count(1);
}
// validate
validate(stormConf, windowLengthCount, windowLengthDuration,
slidingIntervalCount, slidingIntervalDuration);
if (windowLengthCount != null) {
manager.setWindowLength(windowLengthCount);
} else {
manager.setWindowLength(windowLengthDuration);
}
if (slidingIntervalCount != null) {
manager.setSlidingInterval(slidingIntervalCount);
} else {
manager.setSlidingInterval(slidingIntervalDuration);
}
return manager;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.windowedOutputCollector = new WindowedOutputCollector(collector);
bolt.prepare(stormConf, context, windowedOutputCollector);
this.listener = newWindowLifecycleListener();
this.windowManager = initWindowManager(listener, stormConf);
LOG.debug("Initialized window manager {} ", this.windowManager);
}
@Override
public void execute(Tuple input) {
windowManager.add(input);
}
@Override
public void cleanup() {
windowManager.shutdown();
bolt.cleanup();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
bolt.declareOutputFields(declarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return bolt.getComponentConfiguration();
}
private WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
return new WindowLifecycleListener<Tuple>() {
@Override
public void onExpiry(List<Tuple> tuples) {
for (Tuple tuple : tuples) {
windowedOutputCollector.ack(tuple);
}
}
@Override
public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
windowedOutputCollector.setContext(tuples);
bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples));
}
};
}
/**
* Creates an {@link OutputCollector} wrapper that automatically
* anchors the tuples to inputTuples while emitting.
*/
private static class WindowedOutputCollector extends OutputCollector {
private List<Tuple> inputTuples;
WindowedOutputCollector(IOutputCollector delegate) {
super(delegate);
}
void setContext(List<Tuple> inputTuples) {
this.inputTuples = inputTuples;
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
return emit(streamId, inputTuples, tuple);
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
emitDirect(taskId, streamId, inputTuples, tuple);
}
}
}