blob: 72dfd6688c033dfc2966179cd66c9aac8cdf80dd [file] [log] [blame]
package backtype.storm.task;
import backtype.storm.generated.ShellComponent;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
/**
* A bolt that shells out to another process to process tuples. ShellBolt
* communicates with that process over stdio using a special protocol. An ~100
* line library is required to implement that protocol, and adapter libraries
* currently exist for Ruby and Python.
*
* <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
* in the resources directory within the jar submitted to the master.
* During development/testing on a local machine, that resources directory just
* needs to be on the classpath.</p>
*
* <p>When creating topologies using the Java API, subclass this bolt and implement
* the IRichBolt interface to create components for the topology that use other languages. For example:
* </p>
*
* <pre>
* public class MyBolt extends ShellBolt implements IRichBolt {
* public MyBolt() {
* super("python", "mybolt.py");
* }
*
* public void declareOutputFields(OutputFieldsDeclarer declarer) {
* declarer.declare(new Fields("field1", "field2"));
* }
* }
* </pre>
*/
public class ShellBolt implements IBolt {
public static Logger LOG = Logger.getLogger(ShellBolt.class);
String _shellCommand;
String _codeResource;
Process _subprocess;
DataOutputStream _processin;
BufferedReader _processout;
OutputCollector _collector;
Map<Long, Tuple> _inputs = new HashMap<Long, Tuple>();
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
public ShellBolt(String shellCommand, String codeResource) {
_shellCommand = shellCommand;
_codeResource = codeResource;
}
private String initializeSubprocess(TopologyContext context) {
//can change this to launchSubprocess and have it return the pid (that the subprcess returns)
ProcessBuilder builder = new ProcessBuilder(_shellCommand, _codeResource);
builder.directory(new File(context.getCodeDir()));
try {
_subprocess = builder.start();
_processin = new DataOutputStream(_subprocess.getOutputStream());
_processout = new BufferedReader(new InputStreamReader(_subprocess.getInputStream()));
sendToSubprocess(context.getPIDDir());
//subprocesses must send their pid first thing
String subpid = _processout.readLine();
LOG.info("Launched subprocess with pid " + subpid);
return subpid;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
try {
initializeSubprocess(context);
_collector = collector;
sendToSubprocess(JSONValue.toJSONString(stormConf));
sendToSubprocess(context.toJSONString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void execute(Tuple input) {
//just need an id
long genId = MessageId.generateId();
_inputs.put(genId, input);
try {
JSONObject obj = new JSONObject();
obj.put("id", genId);
obj.put("comp", input.getSourceComponent());
obj.put("stream", input.getSourceStreamId());
obj.put("task", input.getSourceTask());
obj.put("tuple", input.getValues());
sendToSubprocess(obj.toString());
while(true) {
String line = "";
while(true) {
String subline = _processout.readLine();
if(subline==null)
throw new RuntimeException("Pipe to subprocess seems to be broken!");
if(subline.equals("sync")) {
line = subline;
break;
}
if(subline.equals("end")) {
break;
}
if(line.length()!=0) {
line+="\n";
}
line+=subline;
}
if(line.equals("sync")) {
break;
} else {
Map action = (Map) JSONValue.parse(line);
String command = (String) action.get("command");
if(command.equals("ack")) {
Long id = (Long) action.get("id");
Tuple acked = _inputs.remove(id);
if(acked==null) {
throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
}
_collector.ack(acked);
} else if (command.equals("fail")) {
Long id = (Long) action.get("id");
Tuple failed = _inputs.remove(id);
if(failed==null) {
throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
}
_collector.fail(failed);
} else if (command.equals("log")) {
String msg = (String) action.get("msg");
LOG.info("Shell msg: " + msg);
} else if(command.equals("emit")) {
Long stream = (Long) action.get("stream");
if(stream==null) stream = (long)Utils.DEFAULT_STREAM_ID;
Long task = (Long) action.get("task");
List<Object> tuple = (List) action.get("tuple");
List<Tuple> anchors = new ArrayList<Tuple>();
Object anchorObj = action.get("anchors");
if(anchorObj!=null) {
if(anchorObj instanceof Long) {
anchorObj = Arrays.asList(anchorObj);
}
for(Object o: (List) anchorObj) {
anchors.add(_inputs.get((Long) o));
}
}
if(task==null) {
List<Integer> outtasks = _collector.emit((int)stream.longValue(), anchors, tuple);
sendToSubprocess(JSONValue.toJSONString(outtasks));
} else {
_collector.emitDirect((int)task.longValue(), (int)stream.longValue(), anchors, tuple);
}
}
}
}
} catch(IOException e) {
throw new RuntimeException(e);
}
}
public void cleanup() {
_subprocess.destroy();
_inputs.clear();
_processin = null;
_processout = null;
_collector = null;
}
private void sendToSubprocess(String str) throws IOException {
_processin.writeBytes(str + "\n");
_processin.writeBytes("end\n");
_processin.flush();
}
}