blob: 272099f025b46ba53f6fcbad3a4c50ee15757196 [file] [log] [blame]
package backtype.storm.testing;
import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
public class SpoutTracker implements IRichSpout {
IRichSpout _delegate;
SpoutTrackOutputCollector _tracker;
private static class SpoutTrackOutputCollector implements ISpoutOutputCollector {
public int transferred = 0;
public int emitted = 0;
public SpoutOutputCollector _collector;
public SpoutTrackOutputCollector(SpoutOutputCollector collector) {
_collector = collector;
}
public List<Integer> emit(int streamId, List<Object> tuple, Object messageId) {
List<Integer> ret = _collector.emit(streamId, tuple, messageId);
emitted++;
transferred += ret.size();
if(messageId!=null) transferred++;
//for the extra ack on branching or finishing
if(ret.size()!=1) {
transferred += 1;
}
return ret;
}
public void emitDirect(int taskId, int streamId, List<Object> tuple, Object messageId) {
_collector.emitDirect(taskId, streamId, tuple, messageId);
emitted++;
transferred++;
if(messageId!=null) transferred++;
}
}
public SpoutTracker(IRichSpout delegate) {
_delegate = delegate;
}
public boolean isDistributed() {
return _delegate.isDistributed();
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_tracker = new SpoutTrackOutputCollector(collector);
_delegate.open(conf, context, new SpoutOutputCollector(_tracker));
}
public void close() {
_delegate.close();
}
public void nextTuple() {
int curr = _tracker.transferred;
int currEmitted = _tracker.emitted;
_delegate.nextTuple();
int transferred = _tracker.transferred - curr;
if(transferred>0) {
_tracker._collector.emit(TrackerAggregator.TRACK_STREAM, new Values(0, transferred, _tracker.emitted - currEmitted));
}
}
public void ack(Object msgId) {
_delegate.ack(msgId);
_tracker._collector.emit(TrackerAggregator.TRACK_STREAM, new Values(1, 0, 0));
}
public void fail(Object msgId) {
_delegate.fail(msgId);
//TODO: this is not strictly correct, since the message could have just timed out
_tracker._collector.emit(TrackerAggregator.TRACK_STREAM, new Values(1, 0, 0));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_delegate.declareOutputFields(declarer);
declarer.declareStream(TrackerAggregator.TRACK_STREAM, new Fields("processed", "transferred", "spoutEmitted"));
}
}