blob: c288703aa916258488b1daa5cf7d4b195752d156 [file] [log] [blame]
package backtype.storm.testing;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class BoltTracker implements IRichBolt {
IBolt _delegate;
BoltTrackerOutputCollector _trackCollector;
private static class BoltTrackerOutputCollector extends OutputCollector {
public OutputCollector _collector;
public int transferred = 0;
public BoltTrackerOutputCollector(OutputCollector collector) {
_collector = collector;
}
public List<Integer> emit(int streamId, List<Tuple> anchors, List<Object> tuple) {
List<Integer> ret = _collector.emit(streamId, anchors, tuple);
transferred += ret.size();
//for the extra ack on branching or finishing
if(ret.size()!=1) {
Set<Long> anchorIds = new HashSet<Long>();
for(Tuple t: anchors) {
anchorIds.addAll(t.getMessageId().getAnchors());
}
transferred += anchorIds.size();
}
return ret;
}
public void emitDirect(int taskId, int streamId, List<Tuple> anchors, List<Object> tuple) {
_collector.emitDirect(taskId, streamId, anchors, tuple);
transferred++;
}
public void ack(Tuple input) {
_collector.ack(input);
transferred+=input.getMessageId().getAnchors().size();
}
public void fail(Tuple input) {
_collector.fail(input);
transferred+=input.getMessageId().getAnchors().size();
}
public void reportError(Throwable error) {
_collector.reportError(error);
}
}
/*
* IBolt so it can be used to wrap acker bolts (which don't declare streams because they're implicit)
*/
public BoltTracker(IBolt delegate) {
_delegate = delegate;
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_trackCollector = new BoltTrackerOutputCollector(collector);
_delegate.prepare(stormConf, context, _trackCollector);
}
public void execute(Tuple input) {
int currTransferred = _trackCollector.transferred;
_delegate.execute(input);
_trackCollector._collector.emit(TrackerAggregator.TRACK_STREAM,
new Values(1, _trackCollector.transferred - currTransferred, 0));
}
public void cleanup() {
_delegate.cleanup();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
if(_delegate instanceof IRichBolt) {
((IRichBolt)_delegate).declareOutputFields(declarer);
}
declarer.declareStream(TrackerAggregator.TRACK_STREAM, new Fields("processed", "transferred", "spoutEmitted"));
}
}