Merge branch '0.9.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 0.9.x-branch
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7fb3313..afef702 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.9.7
+ * STORM-442: multilang ShellBolt/ShellSpout die() can be hang when Exception happened
+ * STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
+
 ## 0.9.6
  * STORM-1027: Use overflow buffer for emitting metrics
  * STORM-996: netty-unit-tests/test-batch demonstrates out-of-order delivery
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 96442b1..35fef08 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
   <parent>
       <artifactId>storm</artifactId>
       <groupId>org.apache.storm</groupId>
-      <version>0.9.7-SNAPSHOT</version>
+      <version>0.9.7</version>
       <relativePath>../../pom.xml</relativePath>
   </parent>
 
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index 6a68a17..3db31b2 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 5a97fc2..159b1ce 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 4f9bf4b..9fe8110 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
diff --git a/pom.xml b/pom.xml
index 9676f86..459bd7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
     <groupId>org.apache.storm</groupId>
     <artifactId>storm</artifactId>
-    <version>0.9.7-SNAPSHOT</version>
+    <version>0.9.7</version>
     <packaging>pom</packaging>
     <name>Storm</name>
     <description>Distributed and fault-tolerant realtime computation</description>
@@ -166,7 +166,7 @@
     <scm>
         <connection>scm:git:https://git-wip-us.apache.org/repos/asf/storm.git</connection>
         <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/storm.git</developerConnection>
-        <tag>HEAD</tag>
+        <tag>v0.9.7</tag>
         <url>https://git-wip-us.apache.org/repos/asf/storm</url>
     </scm>
 
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 735301b..6e14fc6 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index d21a634..b7d0d18 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
     </parent>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..dca7579 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,12 +25,15 @@
 import backtype.storm.multilang.SpoutMsg;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
+
+import java.util.Arrays;
 import java.util.Map;
 import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import clojure.lang.RT;
@@ -45,7 +48,9 @@
     private SpoutOutputCollector _collector;
     private String[] _command;
     private ShellProcess _process;
-    
+    private volatile boolean _running = true;
+    private volatile RuntimeException _exception;
+
     private TopologyContext _context;
     
     private SpoutMsg _spoutMsg;
@@ -53,6 +58,7 @@
     private int workerTimeoutMills;
     private ScheduledExecutorService heartBeatExecutorService;
     private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+    private AtomicBoolean waitingOnSubprocess = new AtomicBoolean(false);
 
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
@@ -80,9 +86,14 @@
     public void close() {
         heartBeatExecutorService.shutdownNow();
         _process.destroy();
+        _running = false;
     }
 
     public void nextTuple() {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -92,6 +103,10 @@
     }
 
     public void ack(Object msgId) {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -101,6 +116,10 @@
     }
 
     public void fail(Object msgId) {
+        if (_exception != null) {
+            throw _exception;
+        }
+
         if (_spoutMsg == null) {
             _spoutMsg = new SpoutMsg();
         }
@@ -139,6 +158,7 @@
 
     private void querySubprocess() {
         try {
+            markWaitingSubprocess();
             _process.writeSpoutMsg(_spoutMsg);
 
             while (true) {
@@ -178,9 +198,12 @@
         } catch (Exception e) {
             String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
             throw new RuntimeException(processInfo, e);
+        } finally {
+            completedWaitingSubprocess();
         }
     }
 
+
     private void handleLog(ShellMsg shellMsg) {
         String msg = shellMsg.getMsg();
         msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
@@ -233,13 +256,25 @@
         return lastHeartbeatTimestamp.get();
     }
 
-    private void die(Throwable exception) {
-        heartBeatExecutorService.shutdownNow();
+    private void markWaitingSubprocess() {
+        waitingOnSubprocess.compareAndSet(false, true);
+    }
 
-        LOG.error("Halting process: ShellSpout died.", exception);
+    private void completedWaitingSubprocess() {
+        waitingOnSubprocess.compareAndSet(true, false);
+    }
+
+    private void die(Throwable exception) {
+        String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+        _exception = new RuntimeException(processInfo, exception);
+        String message = String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s",
+            Arrays.toString(_command),
+            processInfo);
+        LOG.error(message, exception);
         _collector.reportError(exception);
-        _process.destroy();
-        System.exit(11);
+        if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
+            System.exit(11);
+        }
     }
 
     private class SpoutHeartbeatTimerTask extends TimerTask {
@@ -251,13 +286,14 @@
 
         @Override
         public void run() {
-            long currentTimeMillis = System.currentTimeMillis();
             long lastHeartbeat = getLastHeartbeat();
+            long currentTimestamp = System.currentTimeMillis();
+            boolean isWaitingOnSubprocess = waitingOnSubprocess.get();
 
-            LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
-                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+            LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}",
+                    lastHeartbeat, isWaitingOnSubprocess, workerTimeoutMills);
 
-            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+            if (isWaitingOnSubprocess && currentTimestamp - lastHeartbeat > workerTimeoutMills) {
                 spout.die(new RuntimeException("subprocess heartbeat timeout"));
             }
         }
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
index 6bcdf26..811d5b9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -135,7 +135,14 @@
     public String getErrorsString() {
         if (processErrorStream != null) {
             try {
-                return IOUtils.toString(processErrorStream);
+                StringBuilder sb = new StringBuilder();
+                while (processErrorStream.available() > 0) {
+                    int bufferSize = processErrorStream.available();
+                    byte[] errorReadingBuffer = new byte[bufferSize];
+                    processErrorStream.read(errorReadingBuffer, 0, bufferSize);
+                    sb.append(new String(errorReadingBuffer));
+                }
+                return sb.toString();
             } catch (IOException e) {
                 return "(Unable to capture error stream)";
             }
@@ -179,4 +186,4 @@
     public String getProcessTerminationInfoString() {
         return String.format(" exitCode:%s, errorString:%s ", getExitCode(), getErrorsString());
     }
-}
\ No newline at end of file
+}
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 9aa84ac..5e04c70 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>
diff --git a/storm-dist/source/pom.xml b/storm-dist/source/pom.xml
index a468153..7ec8918 100644
--- a/storm-dist/source/pom.xml
+++ b/storm-dist/source/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.7-SNAPSHOT</version>
+        <version>0.9.7</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>