blob: 8e0bf3ca96d5599e3d3f9e7f90dfe542d7c097ae [file] [log] [blame]
package backtype.storm.spout;
import backtype.storm.generated.ShellComponent;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.List;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
public class ShellSpout implements ISpout {
public static Logger LOG = Logger.getLogger(ShellSpout.class);
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
public ShellSpout(String... command) {
_command = command;
}
public void open(Map stormConf, TopologyContext context,
SpoutOutputCollector collector) {
_process = new ShellProcess(_command);
_collector = collector;
try {
Number subpid = _process.launch(stormConf, context);
LOG.info("Launched subprocess with pid " + subpid);
} catch (IOException e) {
throw new RuntimeException("Error when launching multilang subprocess", e);
}
}
public void close() {
_process.destroy();
}
private JSONObject _next;
public void nextTuple() {
if (_next == null) {
_next = new JSONObject();
_next.put("command", "next");
}
querySubprocess(_next);
}
private JSONObject _ack;
public void ack(Object msgId) {
if (_ack == null) {
_ack = new JSONObject();
_ack.put("command", "ack");
}
_ack.put("id", msgId);
querySubprocess(_ack);
}
private JSONObject _fail;
public void fail(Object msgId) {
if (_fail == null) {
_fail = new JSONObject();
_fail.put("command", "fail");
}
_fail.put("id", msgId);
querySubprocess(_fail);
}
private void querySubprocess(Object query) {
try {
_process.writeMessage(query);
while (true) {
JSONObject action = _process.readMessage();
String command = (String) action.get("command");
if (command.equals("sync")) {
return;
} else if (command.equals("log")) {
String msg = (String) action.get("msg");
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
String stream = (String) action.get("stream");
if (stream == null) stream = Utils.DEFAULT_STREAM_ID;
Long task = (Long) action.get("task");
List<Object> tuple = (List) action.get("tuple");
Object messageId = (Object) action.get("id");
if (task == null) {
List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
_process.writeMessage(outtasks);
} else {
_collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
}