| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package backtype.storm.task; |
| |
| import backtype.storm.Config; |
| import backtype.storm.generated.ShellComponent; |
| import backtype.storm.tuple.Tuple; |
| import backtype.storm.utils.ShellProcess; |
| import backtype.storm.multilang.BoltMsg; |
| import backtype.storm.multilang.ShellMsg; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import java.util.Map; |
| import java.util.Random; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A bolt that shells out to another process to process tuples. ShellBolt |
| * communicates with that process over stdio using a special protocol. An ~100 |
| * line library is required to implement that protocol, and adapter libraries |
| * currently exist for Ruby and Python. |
| * |
| * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be |
| * in the resources directory within the jar submitted to the master. |
| * During development/testing on a local machine, that resources directory just |
| * needs to be on the classpath.</p> |
| * |
| * <p>When creating topologies using the Java API, subclass this bolt and implement |
| * the IRichBolt interface to create components for the topology that use other languages. For example: |
| * </p> |
| * |
| * <pre> |
| * public class MyBolt extends ShellBolt implements IRichBolt { |
| * public MyBolt() { |
| * super("python", "mybolt.py"); |
| * } |
| * |
| * public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| * declarer.declare(new Fields("field1", "field2")); |
| * } |
| * } |
| * </pre> |
| */ |
| public class ShellBolt implements IBolt { |
| public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class); |
| Process _subprocess; |
| OutputCollector _collector; |
| Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>(); |
| |
| private String[] _command; |
| private ShellProcess _process; |
| private volatile boolean _running = true; |
| private volatile Throwable _exception; |
| private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue(); |
| private Random _rand; |
| |
| private Thread _readerThread; |
| private Thread _writerThread; |
| |
| public ShellBolt(ShellComponent component) { |
| this(component.get_execution_command(), component.get_script()); |
| } |
| |
| public ShellBolt(String... command) { |
| _command = command; |
| } |
| |
| public void prepare(Map stormConf, TopologyContext context, |
| final OutputCollector collector) { |
| Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); |
| if (maxPending != null) { |
| this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue()); |
| } |
| _rand = new Random(); |
| _collector = collector; |
| _process = new ShellProcess(_command); |
| |
| //subprocesses must send their pid first thing |
| Number subpid = _process.launch(stormConf, context); |
| LOG.info("Launched subprocess with pid " + subpid); |
| |
| // reader |
| _readerThread = new Thread(new Runnable() { |
| public void run() { |
| while (_running) { |
| try { |
| ShellMsg shellMsg = _process.readShellMsg(); |
| |
| String command = shellMsg.getCommand(); |
| if(command.equals("ack")) { |
| handleAck(shellMsg.getId()); |
| } else if (command.equals("fail")) { |
| handleFail(shellMsg.getId()); |
| } else if (command.equals("error")) { |
| handleError(shellMsg.getMsg()); |
| } else if (command.equals("log")) { |
| String msg = shellMsg.getMsg(); |
| LOG.info("Shell msg: " + msg); |
| } else if (command.equals("emit")) { |
| handleEmit(shellMsg); |
| } |
| } catch (InterruptedException e) { |
| } catch (Throwable t) { |
| die(t); |
| } |
| } |
| } |
| }); |
| |
| _readerThread.start(); |
| |
| _writerThread = new Thread(new Runnable() { |
| public void run() { |
| while (_running) { |
| try { |
| Object write = _pendingWrites.poll(1, SECONDS); |
| if (write instanceof BoltMsg) { |
| _process.writeBoltMsg((BoltMsg)write); |
| } else if (write instanceof List<?>) { |
| _process.writeTaskIds((List<Integer>)write); |
| } else if (write != null) { |
| throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); |
| } |
| } catch (InterruptedException e) { |
| } catch (Throwable t) { |
| die(t); |
| } |
| } |
| } |
| }); |
| |
| _writerThread.start(); |
| } |
| |
| public void execute(Tuple input) { |
| if (_exception != null) { |
| throw new RuntimeException(_exception); |
| } |
| |
| //just need an id |
| String genId = Long.toString(_rand.nextLong()); |
| _inputs.put(genId, input); |
| try { |
| BoltMsg boltMsg = new BoltMsg(); |
| boltMsg.setId(genId); |
| boltMsg.setComp(input.getSourceComponent()); |
| boltMsg.setStream(input.getSourceStreamId()); |
| boltMsg.setTask(input.getSourceTask()); |
| boltMsg.setTuple(input.getValues()); |
| |
| _pendingWrites.put(boltMsg); |
| } catch(InterruptedException e) { |
| throw new RuntimeException("Error during multilang processing", e); |
| } |
| } |
| |
| public void cleanup() { |
| _running = false; |
| _process.destroy(); |
| _inputs.clear(); |
| } |
| |
| private void handleAck(Object id) { |
| Tuple acked = _inputs.remove(id); |
| if(acked==null) { |
| throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id); |
| } |
| _collector.ack(acked); |
| } |
| |
| private void handleFail(Object id) { |
| Tuple failed = _inputs.remove(id); |
| if(failed==null) { |
| throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id); |
| } |
| _collector.fail(failed); |
| } |
| |
| private void handleError(String msg) { |
| _collector.reportError(new Exception("Shell Process Exception: " + msg)); |
| } |
| |
| private void handleEmit(ShellMsg shellMsg) throws InterruptedException { |
| List<Tuple> anchors = new ArrayList<Tuple>(); |
| List<String> recvAnchors = shellMsg.getAnchors(); |
| if (recvAnchors != null) { |
| for (String anchor : recvAnchors) { |
| Tuple t = _inputs.get(anchor); |
| if (t == null) { |
| throw new RuntimeException("Anchored onto " + anchor + " after ack/fail"); |
| } |
| anchors.add(t); |
| } |
| } |
| |
| if(shellMsg.getTask() == 0) { |
| List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple()); |
| if (shellMsg.areTaskIdsNeeded()) { |
| _pendingWrites.put(outtasks); |
| } |
| } else { |
| _collector.emitDirect((int) shellMsg.getTask(), |
| shellMsg.getStream(), anchors, shellMsg.getTuple()); |
| } |
| } |
| |
| private void die(Throwable exception) { |
| _exception = exception; |
| } |
| } |