Merge branch 'storm-537-infinite-reconnection' of https://github.com/Sergeant007/storm into STORM-537
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 93a4b67..2691fb7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,7 @@
  * STORM-540: Change default time format in logs to ISO8601 in order to include timezone
  * STORM-511: Storm-Kafka spout keeps sending fetch requests with invalid offset
  * STORM-538: Guava com.google.thirdparty.publicsuffix is not shaded
+ * STORM-497: don't modify the netty server taskToQueueId mapping while the someone could be reading it.
 
 ## 0.9.3-rc1
  * STORM-519: add tuple as an input param to HBaseValueMapper 
diff --git a/DEVELOPER.md b/DEVELOPER.md
index 81a4122..4af1e3c 100644
--- a/DEVELOPER.md
+++ b/DEVELOPER.md
@@ -229,6 +229,9 @@
     # Build the code and run the tests (requires nodejs, python and ruby installed) 
     $ mvn clean install
 
+    # Build the code and run the tests, with specifying default test timeout (in millisecond)
+    $ mvn clean install -DSTORM_TEST_TIMEOUT_MS=10000
+
     # Build the code but skip the tests
     $ mvn clean install -DskipTests=true
 
diff --git a/README.markdown b/README.markdown
index 2f90a4e..2e9286b 100644
--- a/README.markdown
+++ b/README.markdown
@@ -79,6 +79,7 @@
 * Michael G. Noll ([@miguno](https://github.com/miguno))
 * Kishor Patil ([@kishorvpatil](https://github.com/kishorvpatil))
 * Sriharsha Chintalapani([@harshach](https://github.com/harshach))
+* Sean Zhong ([@clockfly] (http://github.com/clockfly))
 
 ## Contributors
 
diff --git a/bin/storm.cmd b/bin/storm.cmd
index 359acb6..0cf6b52 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -69,7 +69,16 @@
     set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS% -Dstorm.jar=%2
     set CLASSPATH=%CLASSPATH%;%2
     set CLASS=%3
-    set storm-command-arguments=%4 %5 %6 %7 %8 %9
+    set args=%4
+    shift
+    :start
+    if [%4] == [] goto done
+    set args=%args% %4
+    shift
+    goto start
+
+    :done
+    set storm-command-arguments=%args%
   )
   
   if not defined STORM_LOG_FILE (
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 73cf334..02d9767 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -33,9 +33,9 @@
     Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
     Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
     GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
-    partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
-    partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
-    partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
+    partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
+    partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
+    partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
     StaticHosts hosts = new StaticHosts(partitionInfo);
 ```
 
@@ -121,7 +121,6 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>
             <version>0.8.1.1</version>
-            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index cce89bc..b370eb7 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -502,6 +502,8 @@
     :distributed [supervisor storm-id port worker-id]
     (let [conf (:conf supervisor)
           storm-home (System/getProperty "storm.home")
+          storm-options (System/getProperty "storm.options")
+          storm-conf-file (System/getProperty "storm.conf.file")
           storm-log-dir (or (System/getProperty "storm.log.dir") (str storm-home file-path-separator "logs"))
           stormroot (supervisor-stormdist-root conf storm-id)
           jlp (jlp stormroot conf)
@@ -528,6 +530,8 @@
                     [(str "-Djava.library.path=" jlp)
                      (str "-Dlogfile.name=" logfilename)
                      (str "-Dstorm.home=" storm-home)
+                     (str "-Dstorm.conf.file=" storm-conf-file)
+                     (str "-Dstorm.options=" storm-options)
                      (str "-Dstorm.log.dir=" storm-log-dir)
                      (str "-Dlogback.configurationFile=" storm-home file-path-separator "logback" file-path-separator "cluster.xml")
                      (str "-Dstorm.id=" storm-id)
@@ -625,4 +629,4 @@
         ))))
 
 (defn -main []
-  (-launch (standalone-supervisor)))
\ No newline at end of file
+  (-launch (standalone-supervisor)))
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 798dccc..79d3277 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -187,7 +187,9 @@
       ;; on windows, the host process still holds lock on the logfile
       (catch Exception e (log-message (.getMessage e)))) ))
 
-(def TEST-TIMEOUT-MS 5000)
+(def TEST-TIMEOUT-MS
+  (let [timeout (System/getProperty "STORM_TEST_TIMEOUT_MS")]
+    (parse-int (if timeout timeout "5000"))))
 
 (defmacro while-timeout [timeout-ms condition & body]
   `(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
@@ -594,7 +596,7 @@
                            (not= (global-amt track-id "transferred")                                 
                                  (global-amt track-id "processed"))
                            ))]
-        (while-timeout TEST-TIMEOUT-MS (waiting?)
+        (while-timeout timeout-ms (waiting?)
                        ;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
                        ;; (println "Processed: " (global-amt track-id "processed"))
                        ;; (println "Transferred: " (global-amt track-id "transferred"))
diff --git a/storm-core/src/clj/backtype/storm/testing4j.clj b/storm-core/src/clj/backtype/storm/testing4j.clj
index ff939a9..7790b6b 100644
--- a/storm-core/src/clj/backtype/storm/testing4j.clj
+++ b/storm-core/src/clj/backtype/storm/testing4j.clj
@@ -44,6 +44,7 @@
              ^:static [mkTrackedTopology [backtype.storm.ILocalCluster backtype.storm.generated.StormTopology] backtype.storm.testing.TrackedTopology]
              ^:static [trackedWait [backtype.storm.testing.TrackedTopology] void]
              ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer] void]
+             ^:static [trackedWait [backtype.storm.testing.TrackedTopology Integer Integer] void]
              ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer Integer] void]
              ^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer] void]
              ^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
@@ -123,6 +124,8 @@
       (TrackedTopology.)))
 
 (defn -trackedWait
+  ([^TrackedTopology trackedTopology ^Integer amt ^Integer timeout-ms]
+    (tracked-wait trackedTopology amt timeout-ms))
   ([^TrackedTopology trackedTopology ^Integer amt]
    (tracked-wait trackedTopology amt))
   ([^TrackedTopology trackedTopology]
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
index d2a3082..a2aa976 100755
--- a/storm-core/src/dev/resources/storm.py
+++ b/storm-core/src/dev/resources/storm.py
@@ -210,8 +210,12 @@
             while True:
                 tup = readTuple()
                 ANCHOR_TUPLE = tup
-                self.process(tup)
-                ack(tup)
+                try:
+                    self.process(tup)
+                    ack(tup)
+                except Exception, e:
+                    reportError(traceback.format_exc(e))
+                    fail(tup)
         except Exception, e:
             reportError(traceback.format_exc(e))
 
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 20a147d..2499e65 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -56,7 +56,7 @@
     final ServerBootstrap bootstrap;
     
     private int queueCount;
-    HashMap<Integer, Integer> taskToQueueId = null;
+    private volatile HashMap<Integer, Integer> taskToQueueId = null;
     int roundRobinQueueId;
 	
     boolean closing = false;
@@ -131,18 +131,18 @@
     
     private Integer getMessageQueueId(int task) {
       // try to construct the map from taskId -> queueId in round robin manner.
-      
       Integer queueId = taskToQueueId.get(task);
       if (null == queueId) {
-        synchronized(taskToQueueId) {
-          //assgin task to queue in round-robin manner
-          if (null == taskToQueueId.get(task)) {
+        synchronized (this) {
+          queueId = taskToQueueId.get(task);
+          if (queueId == null) {
             queueId = roundRobinQueueId++;
-            
-            taskToQueueId.put(task, queueId);
             if (roundRobinQueueId == queueCount) {
               roundRobinQueueId = 0;
             }
+            HashMap<Integer, Integer> newRef = new HashMap<Integer, Integer>(taskToQueueId);
+            newRef.put(task, queueId);
+            taskToQueueId = newRef;
           }
         }
       }
diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
index 0d0ae07..14a45da 100644
--- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java
@@ -18,6 +18,8 @@
 package backtype.storm.utils;
 
 import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Map;
@@ -30,28 +32,42 @@
  * Every read/write hits disk.
  */
 public class LocalState {
+    public static Logger LOG = LoggerFactory.getLogger(LocalState.class);
+
     private VersionedStore _vs;
     
     public LocalState(String backingDir) throws IOException {
         _vs = new VersionedStore(backingDir);
     }
-    
+
     public synchronized Map<Object, Object> snapshot() throws IOException {
         int attempts = 0;
         while(true) {
-            String latestPath = _vs.mostRecentVersionPath();
-            if(latestPath==null) return new HashMap<Object, Object>();
             try {
-                return (Map<Object, Object>) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath)));
-            } catch(IOException e) {
+                return deserializeLatestVersion();
+            } catch (IOException e) {
                 attempts++;
-                if(attempts >= 10) {
+                if (attempts >= 10) {
                     throw e;
                 }
             }
         }
     }
-    
+
+    private Map<Object, Object> deserializeLatestVersion() throws IOException {
+        String latestPath = _vs.mostRecentVersionPath();
+        Map<Object, Object> result = new HashMap<Object, Object>();
+        if (latestPath != null) {
+            byte[] serialized = FileUtils.readFileToByteArray(new File(latestPath));
+            if (serialized.length == 0) {
+                LOG.warn("LocalState file '{}' contained no data, resetting state", latestPath);
+            } else {
+                result = (Map<Object, Object>) Utils.deserialize(serialized);
+            }
+        }
+        return result;
+    }
+
     public Object get(Object key) throws IOException {
         return snapshot().get(key);
     }
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index d2a3082..a2aa976 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -210,8 +210,12 @@
             while True:
                 tup = readTuple()
                 ANCHOR_TUPLE = tup
-                self.process(tup)
-                ack(tup)
+                try:
+                    self.process(tup)
+                    ack(tup)
+                except Exception, e:
+                    reportError(traceback.format_exc(e))
+                    fail(tup)
         except Exception, e:
             reportError(traceback.format_exc(e))
 
diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj
index ba2b969..4bd58ec 100644
--- a/storm-core/test/clj/backtype/storm/local_state_test.clj
+++ b/storm-core/test/clj/backtype/storm/local_state_test.clj
@@ -16,7 +16,9 @@
 (ns backtype.storm.local-state-test
   (:use [clojure test])
   (:use [backtype.storm testing])
-  (:import [backtype.storm.utils LocalState]))
+  (:import [backtype.storm.utils LocalState]
+           [org.apache.commons.io FileUtils]
+           [java.io File]))
 
 (deftest test-local-state
   (with-local-tmp [dir1 dir2]
@@ -41,3 +43,13 @@
       (.put ls2 "b" 8)
       (is (= 8 (.get ls2 "b")))
       )))
+
+(deftest empty-state
+  (with-local-tmp [dir]
+    (let [ls (LocalState. dir)
+          data (FileUtils/openOutputStream (File. dir "12345"))
+          version (FileUtils/openOutputStream (File. dir "12345.version"))]
+      (is (= nil (.get ls "c")))
+      (.put ls "a" 1)
+      (is (= 1 (.get ls "a")))
+  )))
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index ba74d88..0bb47f3 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -255,6 +255,8 @@
                                ["-Djava.library.path="
                                 (str "-Dlogfile.name=worker-" mock-port ".log")
                                 "-Dstorm.home="
+                                "-Dstorm.conf.file="
+                                "-Dstorm.options="
                                 (str "-Dstorm.log.dir=" file-path-separator "logs")
                                 (str "-Dlogback.configurationFile=" file-path-separator "logback" file-path-separator "cluster.xml")
                                 (str "-Dstorm.id=" mock-storm-id)
@@ -469,4 +471,4 @@
      (validate-launched-once (:launched changed)
                              {"sup1" [3 4]}
                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-     )))
\ No newline at end of file
+     )))