blob: b3920c0ca3ceac18a5be61a5c52ac762c24c7657 [file] [log] [blame]
package org.apache.storm.beam.translation;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.storm.beam.StormPipelineOptions;
import org.apache.storm.topology.IRichSpout;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Maintains the state necessary during Pipeline translation to build a Storm topology.
*/
public class TranslationContext {
private StormPipelineOptions options;
private TransformTreeNode currentTransform;
private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
private Map<String, Object> boltMap = new HashMap<String, Object>();
private List<Stream> streams = new ArrayList<Stream>();
public TranslationContext(StormPipelineOptions options){
this.options = options;
}
private String gbkTo = null;
public StormPipelineOptions getOptions(){
return this.options;
}
public void addSpout(String id, IRichSpout spout){
this.spoutMap.put(id, spout);
}
public Map<String, IRichSpout> getSpouts(){
return this.spoutMap;
}
public void addBolt(String id, Object bolt){
this.boltMap.put(id, bolt);
}
public Object getBolt(String id){
return this.boltMap.get(id);
}
public void addStream(Stream stream){
this.streams.add(stream);
}
public List<Stream> getStreams(){
return this.streams;
}
public void setCurrentTransform(TransformTreeNode transform){
this.currentTransform = transform;
}
public TransformTreeNode getCurrentTransform(){
return this.currentTransform;
}
public <InputT extends PInput> InputT getInput() {
return (InputT) getCurrentTransform().getInput();
}
public <OutputT extends POutput> OutputT getOutput() {
return (OutputT) getCurrentTransform().getOutput();
}
public void activateGBK(String gbkTo){
this.gbkTo = gbkTo;
}
public String completeGBK(){
String gbkTo = this.gbkTo;
this.gbkTo = null;
return gbkTo;
}
public boolean isGBKActive(){
return this.gbkTo != null;
}
public static class Stream {
private String from;
private String to;
private Grouping grouping;
public Stream(String from, String to, Grouping grouping){
this.from = from;
this.to = to;
this.grouping = grouping;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public Grouping getGrouping() {
return grouping;
}
public void setGrouping(Grouping grouping) {
this.grouping = grouping;
}
}
public static class Grouping {
/**
* Types of stream groupings Storm allows
*/
public static enum Type {
ALL,
CUSTOM,
DIRECT,
SHUFFLE,
LOCAL_OR_SHUFFLE,
FIELDS,
GLOBAL,
NONE
}
private Type type;
private String streamId; // for named streams, other than DEFAULT
private List<String> args; // arguments for fields grouping
public Grouping(Type type){
this.type = type;
}
public Grouping(List<String> args){
this.type = Type.FIELDS;
this.args = args;
}
public List<String> getArgs() {
return args;
}
public void setArgs(List<String> args) {
this.args = args;
}
public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
}
public String getStreamId() {
return streamId;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
}
}