blob: 6320a213130af6f3dd3ae3449db79d72028d809e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.trident.planner;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.jgrapht.DirectedGraph;
import org.jgrapht.graph.DirectedSubgraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import storm.trident.planner.processor.TridentContext;
import storm.trident.state.State;
import storm.trident.topology.BatchInfo;
import storm.trident.topology.ITridentBatchBolt;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTuple.Factory;
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
import storm.trident.tuple.TridentTupleView.RootFactory;
import storm.trident.util.TridentUtils;
// TODO: parameterizing it like this with everything might be a high deserialization cost if there's lots of tasks?
// TODO: memory problems?
// TODO: can avoid these problems by adding a boltfactory abstraction, so that boltfactory is deserialized once
// bolt factory -> returns coordinatedbolt per task, but deserializes the batch bolt one time and caches
public class SubtopologyBolt implements ITridentBatchBolt {
DirectedGraph _graph;
Set<Node> _nodes;
Map<String, InitialReceiver> _roots = new HashMap<>();
Map<Node, Factory> _outputFactories = new HashMap<>();
Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap<>();
Map<Node, String> _batchGroups;
//given processornodes and static state nodes
public SubtopologyBolt(DirectedGraph graph, Set<Node> nodes, Map<Node, String> batchGroups) {
_nodes = nodes;
_graph = graph;
_batchGroups = batchGroups;
}
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector batchCollector) {
int thisComponentNumTasks = context.getComponentTasks(context.getThisComponentId()).size();
for(Node n: _nodes) {
if(n.stateInfo!=null) {
State s = n.stateInfo.spec.stateFactory.makeState(conf, context, context.getThisTaskIndex(), thisComponentNumTasks);
context.setTaskData(n.stateInfo.id, s);
}
}
DirectedSubgraph<Node, Object> subgraph = new DirectedSubgraph(_graph, _nodes, null);
TopologicalOrderIterator it = new TopologicalOrderIterator<>(subgraph);
int stateIndex = 0;
while(it.hasNext()) {
Node n = (Node) it.next();
if(n instanceof ProcessorNode) {
ProcessorNode pn = (ProcessorNode) n;
String batchGroup = _batchGroups.get(n);
if(!_myTopologicallyOrdered.containsKey(batchGroup)) {
_myTopologicallyOrdered.put(batchGroup, new ArrayList());
}
_myTopologicallyOrdered.get(batchGroup).add(pn.processor);
List<String> parentStreams = new ArrayList<>();
List<Factory> parentFactories = new ArrayList<>();
for(Node p: TridentUtils.getParents(_graph, n)) {
parentStreams.add(p.streamId);
if(_nodes.contains(p)) {
parentFactories.add(_outputFactories.get(p));
} else {
if(!_roots.containsKey(p.streamId)) {
_roots.put(p.streamId, new InitialReceiver(p.streamId, getSourceOutputFields(context, p.streamId)));
}
_roots.get(p.streamId).addReceiver(pn.processor);
parentFactories.add(_roots.get(p.streamId).getOutputFactory());
}
}
List<TupleReceiver> targets = new ArrayList<>();
boolean outgoingNode = false;
for(Node cn: TridentUtils.getChildren(_graph, n)) {
if(_nodes.contains(cn)) {
targets.add(((ProcessorNode) cn).processor);
} else {
outgoingNode = true;
}
}
if(outgoingNode) {
targets.add(new BridgeReceiver(batchCollector));
}
TridentContext triContext = new TridentContext(
pn.selfOutFields,
parentFactories,
parentStreams,
targets,
pn.streamId,
stateIndex,
batchCollector
);
pn.processor.prepare(conf, context, triContext);
_outputFactories.put(n, pn.processor.getOutputFactory());
}
stateIndex++;
}
// TODO: get prepared one time into executor data... need to avoid the ser/deser
// for each task (probably need storm to support boltfactory)
}
private Fields getSourceOutputFields(TopologyContext context, String sourceStream) {
for(GlobalStreamId g: context.getThisSources().keySet()) {
if(g.get_streamId().equals(sourceStream)) {
return context.getComponentOutputFields(g);
}
}
throw new RuntimeException("Could not find fields for source stream " + sourceStream);
}
@Override
public void execute(BatchInfo batchInfo, Tuple tuple) {
String sourceStream = tuple.getSourceStreamId();
InitialReceiver ir = _roots.get(sourceStream);
if(ir==null) {
throw new RuntimeException("Received unexpected tuple " + tuple.toString());
}
ir.receive((ProcessorContext) batchInfo.state, tuple);
}
@Override
public void finishBatch(BatchInfo batchInfo) {
for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) {
p.finishBatch((ProcessorContext) batchInfo.state);
}
}
@Override
public Object initBatchState(String batchGroup, Object batchId) {
ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]);
for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) {
p.startBatch(ret);
}
return ret;
}
@Override
public void cleanup() {
for(String bg: _myTopologicallyOrdered.keySet()) {
for(TridentProcessor p: _myTopologicallyOrdered.get(bg)) {
p.cleanup();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for(Node n: _nodes) {
declarer.declareStream(n.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), n.allOutputFields));
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
protected static class InitialReceiver {
List<TridentProcessor> _receivers = new ArrayList<>();
RootFactory _factory;
ProjectionFactory _project;
String _stream;
public InitialReceiver(String stream, Fields allFields) {
// TODO: don't want to project for non-batch bolts...???
// how to distinguish "batch" streams from non-batch streams?
_stream = stream;
_factory = new RootFactory(allFields);
List<String> projected = new ArrayList<>(allFields.toList());
projected.remove(0);
_project = new ProjectionFactory(_factory, new Fields(projected));
}
public void receive(ProcessorContext context, Tuple tuple) {
TridentTuple t = _project.create(_factory.create(tuple));
for(TridentProcessor r: _receivers) {
r.execute(context, _stream, t);
}
}
public void addReceiver(TridentProcessor p) {
_receivers.add(p);
}
public Factory getOutputFactory() {
return _project;
}
}
}