Merge branch '0.9.x-branch' of https://git-wip-us.apache.org/repos/asf/storm into 0.9.x-branch
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7fb3313..afef702 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.9.7
+ * STORM-442: multilang ShellBolt/ShellSpout die() can be hang when Exception happened
+ * STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
+
## 0.9.6
* STORM-1027: Use overflow buffer for emitting metrics
* STORM-996: netty-unit-tests/test-batch demonstrates out-of-order delivery
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 96442b1..35fef08 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml
index 6a68a17..3db31b2 100644
--- a/external/storm-hbase/pom.xml
+++ b/external/storm-hbase/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 5a97fc2..159b1ce 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 4f9bf4b..9fe8110 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/pom.xml b/pom.xml
index 9676f86..459bd7e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<packaging>pom</packaging>
<name>Storm</name>
<description>Distributed and fault-tolerant realtime computation</description>
@@ -166,7 +166,7 @@
<scm>
<connection>scm:git:https://git-wip-us.apache.org/repos/asf/storm.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/storm.git</developerConnection>
- <tag>HEAD</tag>
+ <tag>v0.9.7</tag>
<url>https://git-wip-us.apache.org/repos/asf/storm</url>
</scm>
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 735301b..6e14fc6 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index d21a634..b7d0d18 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..dca7579 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,12 +25,15 @@
import backtype.storm.multilang.SpoutMsg;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
+
+import java.util.Arrays;
import java.util.Map;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import clojure.lang.RT;
@@ -45,7 +48,9 @@
private SpoutOutputCollector _collector;
private String[] _command;
private ShellProcess _process;
-
+ private volatile boolean _running = true;
+ private volatile RuntimeException _exception;
+
private TopologyContext _context;
private SpoutMsg _spoutMsg;
@@ -53,6 +58,7 @@
private int workerTimeoutMills;
private ScheduledExecutorService heartBeatExecutorService;
private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+ private AtomicBoolean waitingOnSubprocess = new AtomicBoolean(false);
public ShellSpout(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
@@ -80,9 +86,14 @@
public void close() {
heartBeatExecutorService.shutdownNow();
_process.destroy();
+ _running = false;
}
public void nextTuple() {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -92,6 +103,10 @@
}
public void ack(Object msgId) {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -101,6 +116,10 @@
}
public void fail(Object msgId) {
+ if (_exception != null) {
+ throw _exception;
+ }
+
if (_spoutMsg == null) {
_spoutMsg = new SpoutMsg();
}
@@ -139,6 +158,7 @@
private void querySubprocess() {
try {
+ markWaitingSubprocess();
_process.writeSpoutMsg(_spoutMsg);
while (true) {
@@ -178,9 +198,12 @@
} catch (Exception e) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
throw new RuntimeException(processInfo, e);
+ } finally {
+ completedWaitingSubprocess();
}
}
+
private void handleLog(ShellMsg shellMsg) {
String msg = shellMsg.getMsg();
msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
@@ -233,13 +256,25 @@
return lastHeartbeatTimestamp.get();
}
- private void die(Throwable exception) {
- heartBeatExecutorService.shutdownNow();
+ private void markWaitingSubprocess() {
+ waitingOnSubprocess.compareAndSet(false, true);
+ }
- LOG.error("Halting process: ShellSpout died.", exception);
+ private void completedWaitingSubprocess() {
+ waitingOnSubprocess.compareAndSet(true, false);
+ }
+
+ private void die(Throwable exception) {
+ String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
+ _exception = new RuntimeException(processInfo, exception);
+ String message = String.format("Halting process: ShellSpout died. Command: %s, ProcessInfo %s",
+ Arrays.toString(_command),
+ processInfo);
+ LOG.error(message, exception);
_collector.reportError(exception);
- _process.destroy();
- System.exit(11);
+ if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
+ System.exit(11);
+ }
}
private class SpoutHeartbeatTimerTask extends TimerTask {
@@ -251,13 +286,14 @@
@Override
public void run() {
- long currentTimeMillis = System.currentTimeMillis();
long lastHeartbeat = getLastHeartbeat();
+ long currentTimestamp = System.currentTimeMillis();
+ boolean isWaitingOnSubprocess = waitingOnSubprocess.get();
- LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
- currentTimeMillis, lastHeartbeat, workerTimeoutMills);
+ LOG.debug("last heartbeat : {}, waiting subprocess now : {}, worker timeout (ms) : {}",
+ lastHeartbeat, isWaitingOnSubprocess, workerTimeoutMills);
- if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+ if (isWaitingOnSubprocess && currentTimestamp - lastHeartbeat > workerTimeoutMills) {
spout.die(new RuntimeException("subprocess heartbeat timeout"));
}
}
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
index 6bcdf26..811d5b9 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -135,7 +135,14 @@
public String getErrorsString() {
if (processErrorStream != null) {
try {
- return IOUtils.toString(processErrorStream);
+ StringBuilder sb = new StringBuilder();
+ while (processErrorStream.available() > 0) {
+ int bufferSize = processErrorStream.available();
+ byte[] errorReadingBuffer = new byte[bufferSize];
+ processErrorStream.read(errorReadingBuffer, 0, bufferSize);
+ sb.append(new String(errorReadingBuffer));
+ }
+ return sb.toString();
} catch (IOException e) {
return "(Unable to capture error stream)";
}
@@ -179,4 +186,4 @@
public String getProcessTerminationInfoString() {
return String.format(" exitCode:%s, errorString:%s ", getExitCode(), getErrorsString());
}
-}
\ No newline at end of file
+}
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 9aa84ac..5e04c70 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
diff --git a/storm-dist/source/pom.xml b/storm-dist/source/pom.xml
index a468153..7ec8918 100644
--- a/storm-dist/source/pom.xml
+++ b/storm-dist/source/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.7-SNAPSHOT</version>
+ <version>0.9.7</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>