blob: fd4af90d540228f3665700c08cb636eb94a1bf1e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.topology.base;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IWindowedBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.windowing.TupleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public abstract class BaseWindowedBolt implements IWindowedBolt {
private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
private final transient Map<String, Object> windowConfiguration;
/**
* Holds a count value for count based windows and sliding intervals.
*/
public static class Count {
public final int value;
public Count(int value) {
this.value = value;
}
}
/**
* Holds a Time duration for time based windows and sliding intervals.
*/
public static class Duration {
public final int value;
public Duration(int value, TimeUnit timeUnit) {
this.value = (int) timeUnit.toMillis(value);
}
}
protected BaseWindowedBolt() {
windowConfiguration = new HashMap<>();
}
private BaseWindowedBolt withWindowLength(Count count) {
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
return this;
}
private BaseWindowedBolt withWindowLength(Duration duration) {
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
return this;
}
private BaseWindowedBolt withSlidingInterval(Count count) {
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
return this;
}
private BaseWindowedBolt withSlidingInterval(Duration duration) {
windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
return this;
}
/**
* Tuple count based sliding window configuration.
*
* @param windowLength the number of tuples in the window
* @param slidingInterval the number of tuples after which the window slides
*/
public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
}
/**
* Tuple count and time duration based sliding window configuration.
*
* @param windowLength the number of tuples in the window
* @param slidingInterval the time duration after which the window slides
*/
public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
}
/**
* Time duration and count based sliding window configuration.
*
* @param windowLength the time duration of the window
* @param slidingInterval the number of tuples after which the window slides
*/
public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
}
/**
* Time duration based sliding window configuration.
*
* @param windowLength the time duration of the window
* @param slidingInterval the time duration after which the window slides
*/
public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
}
/**
* A tuple count based window that slides with every incoming tuple.
*
* @param windowLength the number of tuples in the window
*/
public BaseWindowedBolt withWindow(Count windowLength) {
return withWindowLength(windowLength).withSlidingInterval(new Count(1));
}
/**
* A time duration based window that slides with every incoming tuple.
*
* @param windowLength the time duration of the window
*/
public BaseWindowedBolt withWindow(Duration windowLength) {
return withWindowLength(windowLength).withSlidingInterval(new Count(1));
}
/**
* A count based tumbling window.
*
* @param count the number of tuples after which the window tumbles
*/
public BaseWindowedBolt withTumblingWindow(Count count) {
return withWindowLength(count).withSlidingInterval(count);
}
/**
* A time duration based tumbling window.
*
* @param duration the time duration after which the window tumbles
*/
public BaseWindowedBolt withTumblingWindow(Duration duration) {
return withWindowLength(duration).withSlidingInterval(duration);
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// NOOP
}
@Override
public void cleanup() {
// NOOP
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// NOOP
}
@Override
public Map<String, Object> getComponentConfiguration() {
return windowConfiguration;
}
}