Merge branch 'storm-537-infinite-reconnection' of https://github.com/Sergeant007/storm into STORM-537
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 93a4b67..2691fb7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,7 @@
* STORM-540: Change default time format in logs to ISO8601 in order to include timezone
* STORM-511: Storm-Kafka spout keeps sending fetch requests with invalid offset
* STORM-538: Guava com.google.thirdparty.publicsuffix is not shaded
+ * STORM-497: don't modify the netty server taskToQueueId mapping while the someone could be reading it.
## 0.9.3-rc1
* STORM-519: add tuple as an input param to HBaseValueMapper
diff --git a/DEVELOPER.md b/DEVELOPER.md
index 81a4122..4af1e3c 100644
--- a/DEVELOPER.md
+++ b/DEVELOPER.md
@@ -229,6 +229,9 @@
# Build the code and run the tests (requires nodejs, python and ruby installed)
$ mvn clean install
+ # Build the code and run the tests, with specifying default test timeout (in millisecond)
+ $ mvn clean install -DSTORM_TEST_TIMEOUT_MS=10000
+
# Build the code but skip the tests
$ mvn clean install -DskipTests=true
diff --git a/README.markdown b/README.markdown
index 2f90a4e..2e9286b 100644
--- a/README.markdown
+++ b/README.markdown
@@ -79,6 +79,7 @@
* Michael G. Noll ([@miguno](https://github.com/miguno))
* Kishor Patil ([@kishorvpatil](https://github.com/kishorvpatil))
* Sriharsha Chintalapani([@harshach](https://github.com/harshach))
+* Sean Zhong ([@clockfly] (http://github.com/clockfly))
## Contributors
diff --git a/bin/storm.cmd b/bin/storm.cmd
index 359acb6..0cf6b52 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -69,7 +69,16 @@
set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% -Dstorm.jar=%2
set CLASSPATH=%CLASSPATH%;%2
set CLASS=%3
- set storm-command-arguments=%4 %5 %6 %7 %8 %9
+ set args=%4
+ shift
+ :start
+ if [%4] == [] goto done
+ set args=%args% %4
+ shift
+ goto start
+
+ :done
+ set storm-command-arguments=%args%
)
if not defined STORM_LOG_FILE (
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 73cf334..02d9767 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -33,9 +33,9 @@
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
- partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
- partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
- partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
+ partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
+ partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
+ partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);
```
@@ -121,7 +121,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
- <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index cce89bc..b370eb7 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -502,6 +502,8 @@
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
+ storm-options (System/getProperty "storm.options")
+ storm-conf-file (System/getProperty "storm.conf.file")
storm-log-dir (or (System/getProperty "storm.log.dir") (str storm-home file-path-separator "logs"))
stormroot (supervisor-stormdist-root conf storm-id)
jlp (jlp stormroot conf)
@@ -528,6 +530,8 @@
[(str "-Djava.library.path=" jlp)
(str "-Dlogfile.name=" logfilename)
(str "-Dstorm.home=" storm-home)
+ (str "-Dstorm.conf.file=" storm-conf-file)
+ (str "-Dstorm.options=" storm-options)
(str "-Dstorm.log.dir=" storm-log-dir)
(str "-Dlogback.configurationFile=" storm-home file-path-separator "logback" file-path-separator "cluster.xml")
(str "-Dstorm.id=" storm-id)
@@ -625,4 +629,4 @@
))))
(defn -main []
- (-launch (standalone-supervisor)))
\ No newline at end of file
+ (-launch (standalone-supervisor)))
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 798dccc..79d3277 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -187,7 +187,9 @@
;; on windows, the host process still holds lock on the logfile
(catch Exception e (log-message (.getMessage e)))) ))
-(def TEST-TIMEOUT-MS 5000)
+(def TEST-TIMEOUT-MS
+ (let [timeout (System/getProperty "STORM_TEST_TIMEOUT_MS")]
+ (parse-int (if timeout timeout "5000"))))
(defmacro while-timeout [timeout-ms condition & body]
`(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
@@ -594,7 +596,7 @@
(not= (global-amt track-id "transferred")
(global-amt track-id "processed"))
))]
- (while-timeout TEST-TIMEOUT-MS (waiting?)
+ (while-timeout timeout-ms (waiting?)
;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
;; (println "Processed: " (global-amt track-id "processed"))
;; (println "Transferred: " (global-amt track-id "transferred"))
diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj
index ff939a9..7790b6b 100644
--- a/storm-core/src/clj/backtype/storm/testing4j.clj
+++ b/storm-core/src/clj/backtype/storm/testing4j.clj
@@ -44,6 +44,7 @@
^:static [mkTrackedTopology [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology] backtype.storm.testing.TrackedTopology]
^:static [trackedWait [backtype.storm.testing.TrackedTopology] void]
^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer] void]
+ ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer Integer] void]
^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer Integer] void]
^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer] void]
^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
@@ -123,6 +124,8 @@
(TrackedTopology.)))
(defn -trackedWait
+ ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms]
+ (tracked-wait trackedTopology amt timeout-ms))
([^TrackedTopology trackedTopology ^Integer amt]
(tracked-wait trackedTopology amt))
([^TrackedTopology trackedTopology]
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
index d2a3082..a2aa976 100755
--- a/storm-core/src/dev/resources/storm.py
+++ b/storm-core/src/dev/resources/storm.py
@@ -210,8 +210,12 @@
while True:
tup = readTuple()
ANCHOR_TUPLE = tup
- self.process(tup)
- ack(tup)
+ try:
+ self.process(tup)
+ ack(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+ fail(tup)
except Exception, e:
reportError(traceback.format_exc(e))
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 20a147d..2499e65 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -56,7 +56,7 @@
final ServerBootstrap bootstrap;
private int queueCount;
- HashMap<Integer, Integer> taskToQueueId = null;
+ private volatile HashMap<Integer, Integer> taskToQueueId = null;
int roundRobinQueueId;
boolean closing = false;
@@ -131,18 +131,18 @@
private Integer getMessageQueueId(int task) {
// try to construct the map from taskId -> queueId in round robin manner.
-
Integer queueId = taskToQueueId.get(task);
if (null == queueId) {
- synchronized(taskToQueueId) {
- //assgin task to queue in round-robin manner
- if (null == taskToQueueId.get(task)) {
+ synchronized (this) {
+ queueId = taskToQueueId.get(task);
+ if (queueId == null) {
queueId = roundRobinQueueId++;
-
- taskToQueueId.put(task, queueId);
if (roundRobinQueueId == queueCount) {
roundRobinQueueId = 0;
}
+ HashMap<Integer, Integer> newRef = new HashMap<Integer, Integer>(taskToQueueId);
+ newRef.put(task, queueId);
+ taskToQueueId = newRef;
}
}
}
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 0d0ae07..14a45da 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -18,6 +18,8 @@
package backtype.storm.utils;
import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Map;
@@ -30,28 +32,42 @@
* Every read/write hits disk.
*/
public class LocalState {
+ public static Logger LOG = LoggerFactory.getLogger(LocalState.class);
+
private VersionedStore _vs;
public LocalState(String backingDir) throws IOException {
_vs = new VersionedStore(backingDir);
}
-
+
public synchronized Map<Object, Object> snapshot() throws IOException {
int attempts = 0;
while(true) {
- String latestPath = _vs.mostRecentVersionPath();
- if(latestPath==null) return new HashMap<Object, Object>();
try {
- return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
- } catch(IOException e) {
+ return deserializeLatestVersion();
+ } catch (IOException e) {
attempts++;
- if(attempts >= 10) {
+ if (attempts >= 10) {
throw e;
}
}
}
}
-
+
+ private Map<Object, Object> deserializeLatestVersion() throws IOException {
+ String latestPath = _vs.mostRecentVersionPath();
+ Map<Object, Object> result = new HashMap<Object, Object>();
+ if (latestPath != null) {
+ byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath));
+ if (serialized.length == 0) {
+ LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath);
+ } else {
+ result = (Map<Object, Object>) Utils.deserialize(serialized);
+ }
+ }
+ return result;
+ }
+
public Object get(Object key) throws IOException {
return snapshot().get(key);
}
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index d2a3082..a2aa976 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -210,8 +210,12 @@
while True:
tup = readTuple()
ANCHOR_TUPLE = tup
- self.process(tup)
- ack(tup)
+ try:
+ self.process(tup)
+ ack(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+ fail(tup)
except Exception, e:
reportError(traceback.format_exc(e))
diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj
index ba2b969..4bd58ec 100644
--- a/storm-core/test/clj/backtype/storm/local_state_test.clj
+++ b/storm-core/test/clj/backtype/storm/local_state_test.clj
@@ -16,7 +16,9 @@
(ns backtype.storm.local-state-test
(:use [clojure test])
(:use [backtype.storm testing])
- (:import [backtype.storm.utils LocalState]))
+ (:import [backtype.storm.utils LocalState]
+ [org.apache.commons.io FileUtils]
+ [java.io File]))
(deftest test-local-state
(with-local-tmp [dir1 dir2]
@@ -41,3 +43,13 @@
(.put ls2 "b" 8)
(is (= 8 (.get ls2 "b")))
)))
+
+(deftest empty-state
+ (with-local-tmp [dir]
+ (let [ls (LocalState. dir)
+ data (FileUtils/openOutputStream (File. dir "12345"))
+ version (FileUtils/openOutputStream (File. dir "12345.version"))]
+ (is (= nil (.get ls "c")))
+ (.put ls "a" 1)
+ (is (= 1 (.get ls "a")))
+ )))
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index ba74d88..0bb47f3 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -255,6 +255,8 @@
["-Djava.library.path="
(str "-Dlogfile.name=worker-" mock-port ".log")
"-Dstorm.home="
+ "-Dstorm.conf.file="
+ "-Dstorm.options="
(str "-Dstorm.log.dir=" file-path-separator "logs")
(str "-Dlogback.configurationFile=" file-path-separator "logback" file-path-separator "cluster.xml")
(str "-Dstorm.id=" mock-storm-id)
@@ -469,4 +471,4 @@
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
(get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
\ No newline at end of file
+ )))