blob: 53cd3788d3d971498ea2ebe6bcd479872d669711 [file] [log] [blame]
package backtype.storm.drpc;
import backtype.storm.Constants;
import backtype.storm.ILocalDRPC;
import backtype.storm.drpc.CoordinatedBolt.SourceArgs;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.topology.BasicBoltExecutor;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.InputDeclarer;
import backtype.storm.topology.OutputFieldsGetter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LinearDRPCTopologyBuilder {
String _function;
List<Component> _components = new ArrayList<Component>();
public LinearDRPCTopologyBuilder(String function) {
_function = function;
}
public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, int parallelism) {
Component component = new Component(bolt, parallelism);
_components.add(component);
return new InputDeclarerImpl(component);
}
public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
return addBolt(bolt, 1);
}
public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, int parallelism) {
return addBolt(new BasicBoltExecutor(bolt), parallelism);
}
public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
return addBolt(bolt, 1);
}
public StormTopology createLocalTopology(ILocalDRPC drpc) {
return createTopology(new DRPCSpout(_function, drpc));
}
public StormTopology createRemoteTopology() {
return createTopology(new DRPCSpout(_function));
}
private StormTopology createTopology(DRPCSpout spout) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, spout);
builder.setBolt(2, new PrepareRequest())
.noneGrouping(1);
int id = 3;
for(int i=0; i<_components.size();i++) {
Component component = _components.get(i);
SourceArgs source;
if(i==0) {
source = null;
} else if (i==1) {
source = SourceArgs.single();
} else {
source = SourceArgs.all();
}
boolean allOut = (i < _components.size()-2);
InputDeclarer declarer = builder.setBolt(
id,
new CoordinatedBolt(component.bolt, source, allOut),
component.parallelism);
if(i==0 && component.declarations.size()==0) {
declarer.noneGrouping(2, PrepareRequest.ARGS_STREAM);
} else {
for(InputDeclaration declaration: component.declarations) {
declaration.declare(id-1, declarer);
}
}
if(i>0) {
declarer.directGrouping(id-1, Constants.COORDINATED_STREAM_ID);
}
id++;
}
IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
OutputFieldsGetter getter = new OutputFieldsGetter();
lastBolt.declareOutputFields(getter);
Map<Integer, StreamInfo> streams = getter.getFieldsDeclaration();
if(streams.size()!=1) {
throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
}
int outputStream = streams.keySet().iterator().next();
List<String> fields = streams.get(outputStream).get_output_fields();
if(fields.size()!=2) {
throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
}
builder.setBolt(id, new JoinResult(2))
.fieldsGrouping(id-1, outputStream, new Fields(fields.get(0)))
.fieldsGrouping(2, PrepareRequest.RETURN_STREAM, new Fields("request"));
id++;
builder.setBolt(id, new ReturnResults())
.noneGrouping(id-1);
return builder.createTopology();
}
private static class Component {
public IRichBolt bolt;
public int parallelism;
public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
public Component(IRichBolt bolt, int parallelism) {
this.bolt = bolt;
this.parallelism = parallelism;
}
}
private static interface InputDeclaration {
public void declare(int prevComponent, InputDeclarer declarer);
}
private class InputDeclarerImpl implements LinearDRPCInputDeclarer {
Component _component;
public InputDeclarerImpl(Component component) {
_component = component;
}
@Override
public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.fieldsGrouping(prevComponent, fields);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer fieldsGrouping(final int streamId, final Fields fields) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.fieldsGrouping(prevComponent, streamId, fields);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer globalGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.globalGrouping(prevComponent);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer globalGrouping(final int streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.globalGrouping(prevComponent, streamId);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer shuffleGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.shuffleGrouping(prevComponent);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer shuffleGrouping(final int streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.shuffleGrouping(prevComponent, streamId);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer noneGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.noneGrouping(prevComponent);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer noneGrouping(final int streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.noneGrouping(prevComponent, streamId);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer allGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.allGrouping(prevComponent);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer allGrouping(final int streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.allGrouping(prevComponent, streamId);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer directGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.directGrouping(prevComponent);
}
});
return this;
}
@Override
public LinearDRPCInputDeclarer directGrouping(final int streamId) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(int prevComponent, InputDeclarer declarer) {
declarer.directGrouping(prevComponent, streamId);
}
});
return this;
}
private void addDeclaration(InputDeclaration declaration) {
_component.declarations.add(declaration);
}
}
// public static void main(String[] args) {
// LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
// builder.addBolt(new GetTweeters(), 10);
// builder.addBolt(new KeyedFairBolt(new GetFollowers()), 40)
// .shuffleGrouping();
// builder.addBolt(new PartialDistinct(), 20)
// .fieldsGrouping(new Fields("id", "follower"));
// builder.addBolt(new CountAggregator(), 3)
// .fieldsGrouping(new Fields("id"));
//
// Config conf = new Config();
//
//
// if(args.length==0) {
// LocalDRPC drpc = new LocalDRPC();
// LocalCluster cluster = new LocalCluster();
//
// conf.setDebug(true);
// conf.setMaxTaskParallelism(3);
//
// cluster.submitTopology("reach-demo", conf, builder.createLocalTopology(drpc));
//
// System.out.println(drpc.execute("reach", "http://nathanmarz.com"));
//
// cluster.shutdown();
// drpc.shutdown();
// } else {
// conf.setNumWorkers(20);
//
// StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
// }
// }
}