blob: 854aa8f53204e371263e4445462e946e7d098116 [file] [log] [blame]
package backtype.storm.task;
import backtype.storm.generated.ShellComponent;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellProcess;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.json.simple.JSONObject;
* A bolt that shells out to another process to process tuples. ShellBolt
* communicates with that process over stdio using a special protocol. An ~100
* line library is required to implement that protocol, and adapter libraries
* currently exist for Ruby and Python.
* <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
* in the resources directory within the jar submitted to the master.
* During development/testing on a local machine, that resources directory just
* needs to be on the classpath.</p>
* <p>When creating topologies using the Java API, subclass this bolt and implement
* the IRichBolt interface to create components for the topology that use other languages. For example:
* </p>
* <pre>
* public class MyBolt extends ShellBolt implements IRichBolt {
* public MyBolt() {
* super("python", "");
* }
* public void declareOutputFields(OutputFieldsDeclarer declarer) {
* declarer.declare(new Fields("field1", "field2"));
* }
* }
* </pre>
public class ShellBolt implements IBolt {
public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
Process _subprocess;
OutputCollector _collector;
Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
private String[] _command;
private ShellProcess _process;
private volatile boolean _running = true;
private volatile Throwable _exception;
private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
private Random _rand;
private Thread _readerThread;
private Thread _writerThread;
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
public ShellBolt(String... command) {
_command = command;
public void prepare(Map stormConf, TopologyContext context,
final OutputCollector collector) {
_rand = new Random();
_process = new ShellProcess(_command);
_collector = collector;
try {
//subprocesses must send their pid first thing
Number subpid = _process.launch(stormConf, context);"Launched subprocess with pid " + subpid);
} catch (IOException e) {
throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
// reader
_readerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
JSONObject action = _process.readMessage();
if (action == null) {
// ignore sync
String command = (String) action.get("command");
if(command.equals("ack")) {
} else if (command.equals("fail")) {
} else if (command.equals("error")) {
} else if (command.equals("log")) {
String msg = (String) action.get("msg");"Shell msg: " + msg);
} else if (command.equals("emit")) {
} catch (InterruptedException e) {
} catch (Throwable t) {
_writerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
Object write = _pendingWrites.poll(1, SECONDS);
if (write != null) {
// drain the error stream to avoid dead lock because of full error stream buffer
} catch (InterruptedException e) {
} catch (Throwable t) {
public void execute(Tuple input) {
if (_exception != null) {
throw new RuntimeException(_exception);
//just need an id
String genId = Long.toString(_rand.nextLong());
_inputs.put(genId, input);
try {
JSONObject obj = new JSONObject();
obj.put("id", genId);
obj.put("comp", input.getSourceComponent());
obj.put("stream", input.getSourceStreamId());
obj.put("task", input.getSourceTask());
obj.put("tuple", input.getValues());
} catch(InterruptedException e) {
throw new RuntimeException("Error during multilang processing", e);
public void cleanup() {
_running = false;
private void handleAck(Map action) {
String id = (String) action.get("id");
Tuple acked = _inputs.remove(id);
if(acked==null) {
throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
private void handleFail(Map action) {
String id = (String) action.get("id");
Tuple failed = _inputs.remove(id);
if(failed==null) {
throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
private void handleError(Map action) {
String msg = (String) action.get("msg");
_collector.reportError(new Exception("Shell Process Exception: " + msg));
private void handleEmit(Map action) throws InterruptedException {
String stream = (String) action.get("stream");
if(stream==null) stream = Utils.DEFAULT_STREAM_ID;
Long task = (Long) action.get("task");
List<Object> tuple = (List) action.get("tuple");
List<Tuple> anchors = new ArrayList<Tuple>();
Object anchorObj = action.get("anchors");
if(anchorObj!=null) {
if(anchorObj instanceof String) {
anchorObj = Arrays.asList(anchorObj);
for(Object o: (List) anchorObj) {
Tuple t = _inputs.get((String) o);
if (t == null) {
throw new RuntimeException("Anchored onto " + o + " after ack/fail");
if(task==null) {
List<Integer> outtasks = _collector.emit(stream, anchors, tuple);
Object need_task_ids = action.get("need_task_ids");
if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
} else {
_collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
private void die(Throwable exception) {
_exception = exception;