blob: 87740bf2c330337be4d59720cb89d90e084b7a41 [file] [log] [blame]
package backtype.storm.task;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.Map;
//On tuple emit:
//
//we have: Map<Long, Long> for each anchor
//
//create map from root to id
//
//for each anchor, add a new unique id for its output (I)
//udpate root -> id with xor of I for each of anchor's roots
//
//this map is the message id map for the new tuple
public class OutputCollectorImpl extends OutputCollector {
private TopologyContext _context;
private IInternalOutputCollector _collector;
private Map<Tuple, List<Long>> _pendingAcks = new ConcurrentHashMap<Tuple, List<Long>>();
public OutputCollectorImpl(TopologyContext context, IInternalOutputCollector collector) {
_context = context;
_collector = collector;
}
public List<Integer> emit(int streamId, List<Tuple> anchors, List<Object> tuple) {
return _collector.emit(anchorTuple(anchors, streamId, tuple));
}
public void emitDirect(int taskId, int streamId, List<Tuple> anchors, List<Object> tuple) {
_collector.emitDirect(taskId, anchorTuple(anchors, streamId, tuple));
}
private Tuple anchorTuple(List<Tuple> anchors, int streamId, List<Object> tuple) {
Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
if(anchors!=null) {
for(Tuple anchor: anchors) {
long newId = MessageId.generateId();
getExistingOutput(anchor).add(newId);
for(long root: anchor.getMessageId().getAnchorsToIds().keySet()) {
Long curr = anchorsToIds.get(root);
if(curr == null) curr = 0L;
anchorsToIds.put(root, curr ^ newId);
}
}
}
return new Tuple(_context, tuple, _context.getThisTaskId(), streamId, MessageId.makeId(anchorsToIds));
}
public void ack(Tuple input) {
List<Long> generated = getExistingOutput(input);
_pendingAcks.remove(input); //don't just do this directly in case there was no output
_collector.ack(input, generated);
}
public void fail(Tuple input) {
_pendingAcks.remove(input);
_collector.fail(input);
}
public void reportError(Throwable error) {
_collector.reportError(error);
}
private List<Long> getExistingOutput(Tuple anchor) {
if(_pendingAcks.containsKey(anchor)) {
return _pendingAcks.get(anchor);
} else {
List<Long> ret = new ArrayList<Long>();
_pendingAcks.put(anchor, ret);
return ret;
}
}
}