Merge branch 'STORM-1995_downloadChunk_should_close_input_stream' of https://github.com/abellina/storm into STORM-1995
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2839ab8..4fcd9a4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,10 @@
 ## 2.0.0
+ * STORM-2009: port org.apache.storm.blobstore.clj and org.apache.storm.command.blobstore.clj to Java
+ * STORM-1876: Option to build storm-kafka and storm-kafka-client with different kafka client version
+ * STORM-2000: Package storm-opentsdb as part of external dir in installation
+ * STORM-1962: support python 3 and 2 in multilang
+ * STORM-1964: Unexpected behavior when using count window together with timestamp extraction
+ * STORM-1890: ensure we refetch static resources after package build
  * STORM-1966 Expand metric having Map type as value into multiple metrics based on entries
  * STORM-1737: storm-kafka-client has compilation errors with Apache Kafka 0.10
  * STORM-1910 One topology cannot use hdfs spout to read from two locations
@@ -123,6 +129,8 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0 
+ * STORM-1988: Kafka Offset not showing due to bad classpath.
+ * STORM-1987: Fix TridentKafkaWordCount arg handling in distributed mode.
  * STORM-1969: Modify HiveTopology to show usage of non-partition table.
  * STORM-1950: Change response json of "Topology Lag" REST API to keyed by spoutId, topic, partition.
  * STORM-1833: Simple equi-join in storm-sql standalone mode
@@ -156,6 +164,8 @@
  * STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
 
 ## 1.0.2
+ * STORM-1976: Remove cleanup-corrupt-topologies!
+ * STORM-1977: Restore logic: give up leadership when elected as leader but doesn't have one or more of topology codes on local
  * STORM-1939: Frequent InterruptedException raised by ShellBoltMessageQueue.poll
  * STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
  * STORM-1922: Supervisor summary default order by host
diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor
index a47a27d..bfe50b7 100755
--- a/bin/storm-kafka-monitor
+++ b/bin/storm-kafka-monitor
@@ -40,4 +40,4 @@
   JAVA="$JAVA_HOME/bin/java"
 fi
 
-exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/storm-kafka-monitor*.jar" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
+exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
diff --git a/bin/storm.py b/bin/storm.py
index 74a7825..0da2ae1 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -326,7 +326,7 @@
     storm blobstore create mytopo:data.tgz -f data.tgz -a u:alice:rwa,u:bob:rw,o::r
     """
     exec_storm_class(
-        "org.apache.storm.command.blobstore",
+        "org.apache.storm.command.Blobstore",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index e143844..09cb798 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -158,12 +158,12 @@
       </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>${kafka.artifact.id}</artifactId>
-      <scope>provided</scope>
+      <artifactId>${storm.kafka.artifact.id}</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
+      <version>${storm.kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.storm</groupId>
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
index 2b25fad..0499041 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
@@ -213,7 +213,7 @@
             System.exit(1);
         } else if (args.length == 1) {
             zkUrl = args[0];
-        } else if (args.length == 2) {
+        } else {
             zkUrl = args[0];
             brokerUrl = args[1];
         }
diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml
index 48cc151..f1272ac 100644
--- a/external/flux/flux-examples/pom.xml
+++ b/external/flux/flux-examples/pom.xml
@@ -95,7 +95,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
         </dependency>
     </dependencies>
 
diff --git a/external/flux/pom.xml b/external/flux/pom.xml
index 56d9bab..5083055 100644
--- a/external/flux/pom.xml
+++ b/external/flux/pom.xml
@@ -78,7 +78,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
index 0642d17..a55fa74 100644
--- a/external/sql/storm-sql-kafka/pom.xml
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -63,12 +63,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
-            <scope>provided</scope>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.version}</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
index 63108b5..7436388 100644
--- a/external/storm-kafka-client/README.md
+++ b/external/storm-kafka-client/README.md
@@ -11,8 +11,6 @@
 
 Multiple topics and topic wildcards can use the same `KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical.
 
-### <a name="compatibility"></a>Apache Kafka Version Compatibility 
-This spout implementation only supports Apache Kafka version **0.10 or newer**. To use a version of Kafka prior to 0.10, e.g 0.9.x or 0.8.x, please refer to the [prior implementation of Kafka Spout] (https://github.com/apache/storm/tree/master/external/storm-kafka). Note that the prior implementation also works with 0.10, but the goal is to have this new Spout implementation to be the de-facto.
 
 # Usage Examples
 
@@ -133,6 +131,31 @@
 With the debug level logs enabled it is possible to see the messages of each topic being redirected to the appropriate Bolt as defined 
 by the streams defined and choice of shuffle grouping.   
 
+## Using storm-kafka-client with different versions of kafka
+
+Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka dependency compatible with your kafka cluster.
+
+When building a project with storm-kafka-client, you must explicitly add the Kafka clients dependency. For example, to
+use Kafka-clients 0.10.0.0, you would use the following dependency in your `pom.xml`:
+
+```xml
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.0.0</version>
+        </dependency>
+```
+
+You can also override the kafka clients version while building from maven, with parameter `storm.kafka.client.version`
+e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
+
+When selecting a kafka client version, you should ensure - 
+ 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or newer** kafka client API. For older versions,
+ you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka).  
+ 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
+ 0.8.x broker. 
+
 #Future Work
  Implement comprehensive metrics. Trident spout is coming soon.
 
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 6c82b6a..f7a387c 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -52,6 +52,7 @@
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.client.version}</version>
         </dependency>
         <!--test dependencies -->
         <dependency>
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 315e3e9..1beec0e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -313,12 +313,16 @@
                 "kafkaProps=" + kafkaProps +
                 ", keyDeserializer=" + keyDeserializer +
                 ", valueDeserializer=" + valueDeserializer +
-                ", topics=" + getSubscribedTopics() +
-                ", topicWildcardPattern=" + getTopicWildcardPattern() +
-                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
                 ", pollTimeoutMs=" + pollTimeoutMs +
                 ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
                 ", maxRetries=" + maxRetries +
+                ", maxUncommittedOffsets=" + maxUncommittedOffsets +
+                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
+                ", kafkaSpoutStreams=" + kafkaSpoutStreams +
+                ", tuplesBuilder=" + tuplesBuilder +
+                ", retryService=" + retryService +
+                ", topics=" + getSubscribedTopics() +
+                ", topicWildcardPattern=" + getTopicWildcardPattern() +
                 '}';
     }
 }
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 78fd265..6fe997c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -264,8 +264,8 @@
     private long nextTime(KafkaSpoutMessageId msgId) {
         final long currentTimeNanos = System.nanoTime();
         final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
-                ? currentTimeNanos + initialDelay.lengthNanos()
-                : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+                ? currentTimeNanos + initialDelay.lengthNanos
+                : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
         return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
     }
 
diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml
index 3af24ce..b00a129 100644
--- a/external/storm-kafka-monitor/pom.xml
+++ b/external/storm-kafka-monitor/pom.xml
@@ -46,10 +46,12 @@
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.client.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
+            <version>${storm.kafka.version}</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 0a07629..80f31c3 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -214,10 +214,10 @@
 offset defined by `KafkaConfig.startOffsetTime` as described above.
 
 
-## Using storm-kafka with different versions of Scala
+## Using storm-kafka with different versions of kafka
 
 Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
-as a transitive dependency. This allows you to use a version of Kafka built against a specific Scala version.
+as a transitive dependency. This allows you to use a version of Kafka dependency compatible with your kafka cluster.
 
 When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to
 use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`:
@@ -242,6 +242,16 @@
 
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
 
+You can also override the kafka dependency version while building from maven, with parameter `kafka.version` and `kafka.artifact.id`
+e.g. `mvn clean install -Dkafka.artifact.id=kafka_2.11 -Dkafka.version=0.9.0.1`
+
+When selecting a kafka dependency version, you should ensure - 
+ 1. kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API is supported by storm-kafka 
+ module. If you want to use a higher version, storm-kafka-client module should be used instead.
+ 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with 
+ 0.8.x broker. 
+
+
 ##Writing to Kafka as part of your topology
 You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
 are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 79824ef..08550c3 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -30,10 +30,6 @@
     <name>storm-kafka</name>
     <description>Storm Spouts for Apache Kafka</description>
 
-    <properties>
-        <kafka.version>0.8.2.1</kafka.version>
-        <kafka.artifact.id>kafka_2.10</kafka.artifact.id>
-    </properties>
     <build>
         <plugins>
             <plugin>
@@ -123,12 +119,14 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>${kafka.artifact.id}</artifactId>
+            <artifactId>${storm.kafka.artifact.id}</artifactId>
+            <version>${storm.kafka.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
diff --git a/pom.xml b/pom.xml
index 06ea3a4..0a4e65b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,19 +209,19 @@
         <!-- dependency versions -->
         <clojure.version>1.7.0</clojure.version>
         <java_jmx.version>0.3.1</java_jmx.version>
-        <compojure.version>1.1.3</compojure.version>
+        <compojure.version>1.1.9</compojure.version>
         <hiccup.version>0.3.6</hiccup.version>
         <commons-compress.version>1.4.1</commons-compress.version>
         <commons-io.version>2.5</commons-io.version>
         <commons-lang.version>2.5</commons-lang.version>
         <commons-exec.version>1.1</commons-exec.version>
-        <commons-fileupload.version>1.2.1</commons-fileupload.version>
+        <commons-fileupload.version>1.3.2</commons-fileupload.version>
         <commons-codec.version>1.6</commons-codec.version>
         <commons-cli.version>1.3.1</commons-cli.version>
         <clj-time.version>0.8.0</clj-time.version>
         <curator.version>2.10.0</curator.version>
         <json-simple.version>1.1</json-simple.version>
-        <ring.version>1.3.0</ring.version>
+        <ring.version>1.3.1</ring.version>
         <ring-json.version>0.3.1</ring-json.version>
         <jetty.version>7.6.13.v20130916</jetty.version>
         <clojure.tools.logging.version>0.2.3</clojure.tools.logging.version>
@@ -258,8 +258,13 @@
         <calcite.version>1.4.0-incubating</calcite.version>
         <jackson.version>2.6.3</jackson.version>
         <maven-surefire.version>2.18.1</maven-surefire.version>
-        <kafka.version>0.10.0.0</kafka.version>
-        <kafka.artifact.id>kafka_2.11</kafka.artifact.id>
+        <!-- Kafka version used by old storm-kafka spout code -->
+        <storm.kafka.version>0.8.2.1</storm.kafka.version>
+        <storm.kafka.artifact.id>kafka_2.10</storm.kafka.artifact.id>
+
+        <!-- kafka version used by new storm-kafka-client spout code -->
+        <storm.kafka.client.version>0.10.0.0</storm.kafka.client.version>
+
 
         <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
         <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
@@ -673,6 +678,11 @@
             </dependency>
             <dependency>
                 <groupId>ring</groupId>
+                <artifactId>ring-core</artifactId>
+                <version>${ring.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>ring</groupId>
                 <artifactId>ring-devel</artifactId>
                 <version>${ring.version}</version>
             </dependency>
@@ -862,10 +872,13 @@
                 <artifactId>jackson-databind</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
+
+            <!-- kafka artifact dependency needed for storm-kafka -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>
-                <artifactId>${kafka.artifact.id}</artifactId>
-                <version>${kafka.version}</version>
+                <artifactId>${storm.kafka.artifact.id}</artifactId>
+                <version>${storm.kafka.version}</version>
+                <scope>provided</scope>
                 <exclusions>
                     <exclusion>
                         <groupId>org.apache.zookeeper</groupId>
@@ -884,7 +897,8 @@
             <dependency>
                 <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka-clients</artifactId>
-                <version>${kafka.version}</version>
+                <version>${storm.kafka.client.version}</version>
+                <scope>provided</scope>
             </dependency>
             <dependency>
                 <groupId>uk.org.lidalia</groupId>
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b03dc20..1220746 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -74,6 +74,10 @@
         </dependency>
         <dependency>
             <groupId>ring</groupId>
+            <artifactId>ring-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ring</groupId>
             <artifactId>ring-devel</artifactId>
         </dependency>
         <dependency>
diff --git a/storm-core/src/clj/org/apache/storm/blobstore.clj b/storm-core/src/clj/org/apache/storm/blobstore.clj
deleted file mode 100644
index 92fb44f..0000000
--- a/storm-core/src/clj/org/apache/storm/blobstore.clj
+++ /dev/null
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.blobstore
-  (:import [org.apache.storm.utils Utils ConfigUtils])
-  (:import [org.apache.storm.blobstore ClientBlobStore])
-  (:use [org.apache.storm config util]))
-
-(defmacro with-configured-blob-client
-  [client-sym & body]
-  `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
-         ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)]
-     (try
-       ~@body
-       (finally (.shutdown ~client-sym)))))
diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
deleted file mode 100644
index 924f825..0000000
--- a/storm-core/src/clj/org/apache/storm/command/blobstore.clj
+++ /dev/null
@@ -1,163 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.blobstore
-  (:import [java.io InputStream OutputStream]
-           [org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
-            KeyNotFoundException]
-           [org.apache.storm.blobstore BlobStoreAclHandler]
-           [org.apache.storm.utils Utils])
-  (:use [org.apache.storm config]
-        [clojure.string :only [split]]
-        [clojure.tools.cli :only [cli]]
-        [clojure.java.io :only [copy input-stream output-stream]]
-        [org.apache.storm blobstore log])
-  (:gen-class))
-
-(defn update-blob-from-stream
-  "Update a blob in the blob store from an InputStream"
-  [key ^InputStream in]
-  (with-configured-blob-client blobstore
-    (let [out (.updateBlob blobstore key)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (log-message e)
-          (.cancel out)
-          (throw e))))))
-
-(defn create-blob-from-stream
-  "Create a blob in the blob store from an InputStream"
-  [key ^InputStream in ^SettableBlobMeta meta]
-  (with-configured-blob-client blobstore
-    (let [out (.createBlob blobstore key meta)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (.cancel out)
-          (throw e))))))
-
-(defn read-blob
-  "Read a blob in the blob store and write to an OutputStream"
-  [key ^OutputStream out]
-  (with-configured-blob-client blobstore
-    (with-open [in (.getBlob blobstore key)]
-      (copy in out))))
-
-(defn as-access-control
-  "Convert a parameter to an AccessControl object"
-  [param]
-  (BlobStoreAclHandler/parseAccessControl (str param)))
-
-(defn as-acl
-  [param]
-  (map as-access-control (split param #",")))
-
-(defn access-control-str
-  [^AccessControl acl]
-  (BlobStoreAclHandler/accessControlToString acl))
-
-(defn read-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (output-stream file)]
-        (read-blob key f))
-      (read-blob key System/out))))
-
-(defn update-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (input-stream file)]
-        (update-blob-from-stream key f))
-      (update-blob-from-stream key System/in))
-    (log-message "Successfully updated " key)))
-
-(defn create-cli [args]
-  (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
-                                                  ["-a" "--acl" :default [] :parse-fn as-acl]
-                                                  ["-r" "--replication-factor" :default -1 :parse-fn #(Integer/parseInt %)])
-        meta (doto (SettableBlobMeta. acl)
-                   (.set_replication_factor replication-factor))]
-    (Utils/validateKeyName key)
-    (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
-    (if file
-      (with-open [f (input-stream file)]
-        (create-blob-from-stream key f meta))
-      (create-blob-from-stream key System/in meta))
-    (log-message "Successfully created " key)))
-
-(defn delete-cli [args]
-  (with-configured-blob-client blobstore
-    (doseq [key args]
-      (.deleteBlob blobstore key)
-      (log-message "deleted " key))))
-
-(defn list-cli [args]
-  (with-configured-blob-client blobstore
-    (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
-      (doseq [key keys]
-        (try
-          (let [meta (.getBlobMeta blobstore key)
-                version (.get_version meta)
-                acl (.get_acl (.get_settable meta))]
-            (log-message key " " version " " (pr-str (map access-control-str acl))))
-          (catch AuthorizationException ae
-            (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
-          (catch KeyNotFoundException knf
-            (if-not (empty? args) (log-error key " NOT FOUND"))))))))
-
-(defn set-acl-cli [args]
-  (let [[{set-acl :set} [key] _]
-           (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
-    (with-configured-blob-client blobstore
-      (let [meta (.getBlobMeta blobstore key)
-            acl (.get_acl (.get_settable meta))
-            new-acl (if set-acl set-acl acl)
-            new-meta (SettableBlobMeta. new-acl)]
-        (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl)))
-        (.setBlobMeta blobstore key new-meta)))))
-
-(defn rep-cli [args]
-  (let [sub-command (first args)
-        new-args (rest args)]
-    (with-configured-blob-client blobstore
-      (condp = sub-command
-      "--read" (let [key (first new-args)
-                     blob-replication (.getBlobReplication blobstore key)]
-                 (log-message "Current replication factor " blob-replication)
-                 blob-replication)
-      "--update" (let [[{replication-factor :replication-factor} [key] _]
-                        (cli new-args ["-r" "--replication-factor" :parse-fn #(Integer/parseInt %)])]
-                   (if (nil? replication-factor)
-                     (throw (RuntimeException. (str "Please set the replication factor")))
-                     (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
-                       (log-message "Replication factor is set to " blob-replication)
-                       blob-replication)))
-      :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command")))))))
-
-(defn -main [& args]
-  (let [command (first args)
-        new-args (rest args)]
-    (condp = command
-      "cat" (read-cli new-args)
-      "create" (create-cli new-args)
-      "update" (update-cli new-args)
-      "delete" (delete-cli new-args)
-      "list" (list-cli new-args)
-      "set-acl" (set-acl-cli new-args)
-      "replication" (rep-cli new-args)
-      :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8d193e6..5ed4d39 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -25,7 +25,8 @@
 
 (defn -main [^String tmpjarpath & args]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        zk-leader-elector (Zookeeper/zkLeaderElector conf)
+        ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
+        zk-leader-elector (Zookeeper/zkLeaderElector conf nil)
         leader-nimbus (.getLeader zk-leader-elector)
         host (.getHost leader-nimbus)
         port (.getPort leader-nimbus)
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index ba0f5e1..24d5398 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -179,7 +179,8 @@
     (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
 
 (defn nimbus-data [conf inimbus]
-  (let [forced-scheduler (.getForcedScheduler inimbus)]
+  (let [forced-scheduler (.getForcedScheduler inimbus)
+        blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))]
     {:conf conf
      :nimbus-host-port-info (NimbusInfo/fromConf conf)
      :inimbus inimbus
@@ -197,7 +198,7 @@
      :heartbeats-cache (atom {})
      :downloaders (file-cache-map conf)
      :uploaders (file-cache-map conf)
-     :blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+     :blob-store blob-store
      :blob-downloaders (mk-blob-cache-map conf)
      :blob-uploaders (mk-blob-cache-map conf)
      :blob-listers (mk-bloblist-cache-map conf)
@@ -211,7 +212,7 @@
                   (Utils/exitProcess 20 "Error when processing an event"))))
 
      :scheduler (mk-scheduler conf inimbus)
-     :leader-elector (Zookeeper/zkLeaderElector conf)
+     :leader-elector (Zookeeper/zkLeaderElector conf blob-store)
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
@@ -1166,19 +1167,6 @@
           topo-history-state (:topo-history-state nimbus)]
           (.filterOldTopologies ^LocalState topo-history-state cutoff-age))))
 
-(defn cleanup-corrupt-topologies! [nimbus]
-  (let [storm-cluster-state (:storm-cluster-state nimbus)
-        blob-store (:blob-store nimbus)
-        code-ids (set (code-ids blob-store))
-        active-topologies (set (.activeStorms storm-cluster-state))
-        corrupt-topologies (set/difference active-topologies code-ids)]
-    (doseq [corrupt corrupt-topologies]
-      (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
-      (.removeStorm storm-cluster-state corrupt)
-      (if (instance? LocalFsBlobStore blob-store)
-        (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
-          (.removeBlobstoreKey storm-cluster-state blob-key))))))
-
 (defn setup-blobstore [nimbus]
   "Sets up blobstore state for all current keys."
   (let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -2202,7 +2190,6 @@
         STORM-VERSION))
 
     (.addToLeaderLockQueue (:leader-elector nimbus))
-    (cleanup-corrupt-topologies! nimbus)
     (when (instance? LocalFsBlobStore blob-store)
       ;register call back for blob-store
       (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 3ad955d..831725a 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -18,7 +18,8 @@
   (:use compojure.core)
   (:use [clojure.java.shell :only [sh]])
   (:use ring.middleware.reload
-        ring.middleware.multipart-params)
+        ring.middleware.multipart-params
+        ring.middleware.multipart-params.temp-file)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
   (:use [org.apache.storm config util log converter])
diff --git a/storm-core/src/clj/org/apache/storm/ui/helpers.clj b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
index e681cfb..ac1ecd1 100644
--- a/storm-core/src/clj/org/apache/storm/ui/helpers.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
@@ -36,22 +36,35 @@
            [org.eclipse.jetty.server DispatcherType]
            [org.eclipse.jetty.servlets CrossOriginFilter]
            (org.json.simple JSONValue))
-  (:require [ring.util servlet])
+  (:require [ring.util servlet]
+            [ring.util.response :as response])
   (:require [compojure.route :as route]
             [compojure.handler :as handler]))
 
 ;; TODO this function and its callings will be replace when ui.core and logviewer and drpc move to Java
 (def num-web-requests (StormMetricsRegistry/registerMeter "num-web-requests"))
+
 (defn requests-middleware
-  "Coda Hale metric for counting the number of web requests."
+  "Wrap request with Coda Hale metric for counting the number of web requests, 
+  and add Cache-Control: no-cache for html files in root directory (index.html, topology.html, etc)"
   [handler]
   (fn [req]
     (.mark num-web-requests)
-    (handler req)))
+    (let [uri (:uri req)
+          res (handler req) 
+          content-type (response/get-header res "Content-Type")]
+      ;; check that the response is html and that the path is for a root page: a single / in the path 
+      ;; then we know we don't want it cached (e.g. /index.html)
+      (if (and (= content-type "text/html") 
+               (= 1 (count (re-seq #"/" uri))))
+        ;; response for html page in root directory, no-cache 
+        (response/header res "Cache-Control" "no-cache")
+        ;; else, carry on
+        res))))
 
 ;; TODO this function and its callings will be replace when ui.core and logviewer move to Java
 (defnk json-response
   [data callback :need-serialize true :status 200 :headers {}]
   {:status status
    :headers (UIHelpers/getJsonResponseHeaders callback headers)
-   :body (UIHelpers/getJsonResponseBody data callback need-serialize)})
\ No newline at end of file
+   :body (UIHelpers/getJsonResponseBody data callback need-serialize)})
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index 61e48b3..2cb1cce 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -23,7 +23,9 @@
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
 
 import java.util.Iterator;
 import java.util.Map;
@@ -50,6 +52,21 @@
 public abstract class ClientBlobStore implements Shutdownable {
     protected Map conf;
 
+    public interface WithBlobstore {
+        void run(ClientBlobStore blobStore) throws Exception;
+    }
+
+    public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        ClientBlobStore blobStore = Utils.getClientBlobStore(conf);
+
+        try {
+            withBlobstore.run(blobStore);
+        } finally {
+            blobStore.shutdown();
+        }
+    }
+
     /**
      * Sets up the client API by parsing the configs.
      * @param conf The storm conf containing the config details.
diff --git a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
new file mode 100644
index 0000000..a5b01fc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class Blobstore {
+    private static final Logger LOG = LoggerFactory.getLogger(Blobstore.class);
+
+    public static void main(String[] args) throws Exception {
+        if (args.length == 0) {
+            throw new IllegalArgumentException("You should provide command.");
+        }
+
+        String command = args[0];
+        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+
+        switch (command) {
+            case "cat":
+                readCli(newArgs);
+                break;
+
+            case "create":
+                createCli(newArgs);
+                break;
+
+            case "update":
+                updateCli(newArgs);
+                break;
+
+            case "delete":
+                deleteCli(newArgs);
+                break;
+
+            case "list":
+                listCli(newArgs);
+                break;
+
+            case "set-acl":
+                setAclCli(newArgs);
+                break;
+
+            case "replication":
+                replicationCli(newArgs);
+                break;
+
+            default:
+                throw new RuntimeException("" + command + " is not a supported blobstore command");
+        }
+    }
+
+    private static void readCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+        final String key = (String) cl.get("key");
+        final String file = (String) cl.get("f");
+
+        if (StringUtils.isNotEmpty(file)) {
+            try (BufferedOutputStream f = new BufferedOutputStream(new FileOutputStream(file))) {
+                BlobStoreSupport.readBlob(key, f);
+            }
+        } else {
+            BlobStoreSupport.readBlob(key, System.out);
+        }
+    }
+
+    private static void createCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+                .opt("a", "acl", Collections.emptyList(), new AsAclParser())
+                .opt("r", "replication-factor", -1, CLI.AS_INT)
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+
+        final String key = (String) cl.get("key");
+        final String file = (String) cl.get("f");
+        final List<AccessControl> acl = (List<AccessControl>) cl.get("a");
+        final Integer replicationFactor = (Integer) cl.get("r");
+
+        SettableBlobMeta meta = new SettableBlobMeta(acl);
+        meta.set_replication_factor(replicationFactor);
+
+        Utils.validateKeyName(key);
+
+        LOG.info("Creating {} with ACL {}", key, generateAccessControlsInfo(acl));
+
+        if (StringUtils.isNotEmpty(file)) {
+            try (BufferedInputStream f = new BufferedInputStream(new FileInputStream(file))) {
+                BlobStoreSupport.createBlobFromStream(key, f, meta);
+            }
+        } else {
+            BlobStoreSupport.createBlobFromStream(key, System.in, meta);
+        }
+
+        LOG.info("Successfully created {}", key);
+    }
+
+    private static void updateCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+
+        final String key = (String) cl.get("key");
+        final String file = (String) cl.get("f");
+
+        if (StringUtils.isNotEmpty(file)) {
+            try (BufferedInputStream f = new BufferedInputStream(new FileInputStream(file))) {
+                BlobStoreSupport.updateBlobFromStream(key, f);
+            }
+        } else {
+            BlobStoreSupport.updateBlobFromStream(key, System.in);
+        }
+
+        LOG.info("Successfully updated {}", key);
+    }
+
+    private static void deleteCli(final String[] args) throws Exception {
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                for (String key : args) {
+                    blobStore.deleteBlob(key);
+
+                    LOG.info("deleted {}", key);
+                }
+            }
+        });
+    }
+
+    private static void listCli(final String[] args) throws Exception {
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                Iterator<String> keys;
+                boolean isArgsEmpty = (args == null || args.length == 0);
+                if (isArgsEmpty) {
+                    keys = blobStore.listKeys();
+                } else {
+                    keys = Arrays.asList(args).iterator();
+                }
+
+                while (keys.hasNext()) {
+                    String key = keys.next();
+
+                    try {
+                        ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+                        long version = meta.get_version();
+                        List<AccessControl> acl = meta.get_settable().get_acl();
+
+                        LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
+                    } catch (AuthorizationException ae) {
+                        if (!isArgsEmpty) {
+                            LOG.error("ACCESS DENIED to key: {}", key);
+                        }
+                    } catch (KeyNotFoundException knf) {
+                        if (!isArgsEmpty) {
+                            LOG.error("{} NOT FOUND", key);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    private static void setAclCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("s", "set", Collections.emptyList(), new AsAclParser())
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+
+        final String key = (String) cl.get("key");
+        final List<AccessControl> setAcl = (List<AccessControl>) cl.get("s");
+
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+                List<AccessControl> acl = meta.get_settable().get_acl();
+                List<AccessControl> newAcl;
+                if (setAcl != null && !setAcl.isEmpty()) {
+                    newAcl = setAcl;
+                } else {
+                    newAcl = acl;
+                }
+
+                SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
+                LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
+                blobStore.setBlobMeta(key, newMeta);
+            }
+        });
+    }
+
+    private static void replicationCli(String[] args) throws Exception {
+        if (args.length == 0) {
+            throw new IllegalArgumentException("replication command needs at least subcommand as parameter.");
+        }
+        final String subCommand = args[0];
+        final String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                switch (subCommand) {
+                    case "--read":
+                        if (newArgs.length == 0) {
+                            throw new IllegalArgumentException("replication --read needs key as parameter.");
+                        }
+
+                        String key = newArgs[0];
+                        int blobReplication = blobStore.getBlobReplication(key);
+                        LOG.info("Current replication factor {}", blobReplication);
+                        break;
+
+                    case "--update":
+                        updateReplicationFactor(blobStore, newArgs);
+                        break;
+
+                    default:
+                        throw new RuntimeException("" + subCommand + " is not a supported blobstore command");
+                }
+            }
+
+            private void updateReplicationFactor(ClientBlobStore blobStore, String[] args) throws Exception {
+                Map<String, Object> cl = CLI.opt("r", "replication-factor", null, CLI.AS_INT)
+                        .arg("key", CLI.FIRST_WINS)
+                        .parse(args);
+
+                final String key = (String) cl.get("key");
+                final Integer replicationFactor = (Integer) cl.get("r");
+
+                if (replicationFactor == null) {
+                    throw new RuntimeException("Please set the replication factor");
+                }
+
+                int blobReplication = blobStore.updateBlobReplication(key, replicationFactor);
+                LOG.info("Replication factor is set to {}", blobReplication);
+            }
+        });
+    }
+
+    private static final class BlobStoreSupport {
+        static void readBlob(final String key, final OutputStream os) throws Exception {
+            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+                @Override
+                public void run(ClientBlobStore blobStore) throws Exception {
+                    try (InputStreamWithMeta is = blobStore.getBlob(key)) {
+                        IOUtils.copy(is, os);
+                    }
+                }
+            });
+        }
+
+        static void createBlobFromStream(final String key, final InputStream is, final SettableBlobMeta meta) throws Exception {
+            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+                @Override
+                public void run(ClientBlobStore blobStore) throws Exception {
+                    AtomicOutputStream os = blobStore.createBlob(key, meta);
+                    copyInputStreamToBlobOutputStream(is, os);
+                }
+            });
+        }
+
+        static void updateBlobFromStream(final String key, final InputStream is) throws Exception {
+            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+                @Override
+                public void run(ClientBlobStore blobStore) throws Exception {
+                    AtomicOutputStream os = blobStore.updateBlob(key);
+                    copyInputStreamToBlobOutputStream(is, os);
+                }
+            });
+        }
+
+        static void copyInputStreamToBlobOutputStream(InputStream is, AtomicOutputStream os) throws IOException {
+            try {
+                IOUtils.copy(is, os);
+                os.close();
+            } catch (Exception e) {
+                os.cancel();
+                throw e;
+            }
+        }
+    }
+
+    private static List<String> generateAccessControlsInfo(List<AccessControl> acl) {
+        List<String> accessControlStrings = new ArrayList<>();
+        for (AccessControl ac : acl) {
+            accessControlStrings.add(BlobStoreAclHandler.accessControlToString(ac));
+        }
+        return accessControlStrings;
+    }
+
+    private static final class AsAclParser implements CLI.Parse {
+        @Override
+        public Object parse(String value) {
+            List<AccessControl> accessControls = new ArrayList<>();
+            for (String part : value.split(",")) {
+                accessControls.add(asAccessControl(part));
+            }
+
+            return accessControls;
+        }
+
+        private AccessControl asAccessControl(String param) {
+            return BlobStoreAclHandler.parseAccessControl(param);
+        }
+    }
+}
diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index fe0f4da..3f245e1 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -253,7 +253,7 @@
                                                     WindowManager<Tuple> manager) {
         if (windowLengthCount != null) {
             if (isTupleTs()) {
-                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value, manager);
+                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
             } else {
                 return new CountEvictionPolicy<>(windowLengthCount.value);
             }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
index 0e7b233..fb12202 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.windowing;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * An eviction policy that tracks event counts and can
@@ -26,12 +26,12 @@
  * @param <T> the type of event tracked by this policy.
  */
 public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
-    private final int threshold;
-    protected final AtomicInteger currentCount;
+    protected final int threshold;
+    protected final AtomicLong currentCount;
 
     public CountEvictionPolicy(int count) {
         this.threshold = count;
-        this.currentCount = new AtomicInteger();
+        this.currentCount = new AtomicLong();
     }
 
     @Override
@@ -41,7 +41,7 @@
          * return if the event should be evicted
          */
         while (true) {
-            int curVal = currentCount.get();
+            long curVal = currentCount.get();
             if (curVal > threshold) {
                 if (currentCount.compareAndSet(curVal, curVal - 1)) {
                     return Action.EXPIRE;
@@ -61,7 +61,7 @@
     }
 
     @Override
-    public void setContext(Object context) {
+    public void setContext(EvictionContext context) {
         // NOOP
     }
 
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
index 2594c67..17750b6 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
@@ -44,7 +44,7 @@
     public void track(Event<T> event) {
         if (started && !event.isWatermark()) {
             if (currentCount.incrementAndGet() >= count) {
-                evictionPolicy.setContext(System.currentTimeMillis());
+                evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
                 handler.onTrigger();
             }
         }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
new file mode 100644
index 0000000..ef65f66
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+public class DefaultEvictionContext implements EvictionContext {
+    private final Long referenceTime;
+    private final Long currentCount;
+    private final Long slidingCount;
+
+    public DefaultEvictionContext(Long referenceTime) {
+        this(referenceTime, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount) {
+        this(referenceTime, currentCount, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) {
+        this.referenceTime = referenceTime;
+        this.currentCount = currentCount;
+        this.slidingCount = slidingCount;
+    }
+
+    @Override
+    public Long getReferenceTime() {
+        return referenceTime;
+    }
+
+    @Override
+    public Long getCurrentCount() {
+        return currentCount;
+    }
+
+    @Override
+    public Long getSlidingCount() {
+        return slidingCount;
+    }
+}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java
new file mode 100644
index 0000000..37dcfd9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+/**
+ * Context information that can be used by the eviction policy
+ */
+public interface EvictionContext {
+    /**
+     * Returns the reference time that the eviction policy could use to
+     * evict the events. In the case of event time processing, this would be
+     * the watermark time.
+     *
+     * @return the reference time in millis
+     */
+    Long getReferenceTime();
+
+    /**
+     * Returns the sliding count for count based windows
+     *
+     * @return the sliding count
+     */
+    Long getSlidingCount();
+
+    /**
+     * Returns the current count of events in the queue up to the reference tim
+     * based on which count based evictions can be performed.
+     *
+     * @return the current count
+     */
+    Long getCurrentCount();
+}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
index 8128d80..05e4d93 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
@@ -70,5 +70,6 @@
      *
      * @param context
      */
-    void setContext(Object context);
+    void setContext(EvictionContext context);
+
 }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index a2db239..e646207 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -57,8 +57,8 @@
     }
 
     @Override
-    public void setContext(Object context) {
-        referenceTime = ((Number) context).longValue();
+    public void setContext(EvictionContext context) {
+        referenceTime = context.getReferenceTime();
     }
 
     @Override
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index 1ad7714..b057afb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -115,7 +115,7 @@
                      * to evict the events
                      */
                     if (evictionPolicy != null) {
-                        evictionPolicy.setContext(System.currentTimeMillis());
+                        evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
                     }
                     handler.onTrigger();
                 } catch (Throwable th) {
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index e1af6fb..74240bb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -24,25 +24,29 @@
  * @param <T> the type of event tracked by this policy.
  */
 public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
-    private final WindowManager<T> windowManager;
     /*
      * The reference time in millis for window calculations and
      * expiring events. If not set it will default to System.currentTimeMillis()
      */
     private long referenceTime;
+    private long processed = 0L;
 
-    public WatermarkCountEvictionPolicy(int count, WindowManager<T> windowManager) {
+    public WatermarkCountEvictionPolicy(int count) {
         super(count);
-        this.windowManager = windowManager;
     }
 
     @Override
     public Action evict(Event<T> event) {
-        if (event.getTimestamp() <= referenceTime) {
-            return super.evict(event);
+        Action action;
+        if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) {
+            action = super.evict(event);
+            if (action == Action.PROCESS) {
+                ++processed;
+            }
         } else {
-            return Action.KEEP;
+            action = Action.KEEP;
         }
+        return action;
     }
 
     @Override
@@ -51,9 +55,14 @@
     }
 
     @Override
-    public void setContext(Object context) {
-        referenceTime = (Long) context;
-        currentCount.set(windowManager.getEventCount(referenceTime));
+    public void setContext(EvictionContext context) {
+        referenceTime = context.getReferenceTime();
+        if (context.getCurrentCount() != null) {
+            currentCount.set(context.getCurrentCount());
+        } else {
+            currentCount.set(processed + context.getSlidingCount());
+        }
+        processed = 0;
     }
 
     @Override
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
index 391efee..3cfcaad 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
@@ -74,7 +74,7 @@
         long watermarkTs = waterMarkEvent.getTimestamp();
         List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
         for (long ts : eventTs) {
-            evictionPolicy.setContext(ts);
+            evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
             handler.onTrigger();
             lastProcessedTs = ts;
         }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
index 3734c93..00620c4 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
@@ -75,7 +75,8 @@
         long windowEndTs = nextWindowEndTs;
         LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
         while (windowEndTs <= watermarkTs) {
-            evictionPolicy.setContext(windowEndTs);
+            long currentCount = windowManager.getEventCount(windowEndTs);
+            evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
             if (handler.onTrigger()) {
                 windowEndTs += slidingIntervalMs;
             } else {
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
index 7923b7e..74816c2 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -21,6 +21,7 @@
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
@@ -36,16 +37,17 @@
 
 public class LeaderElectorImp implements ILeaderElector {
     private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
-     private final Map conf;
-     private final List<String> servers;
-     private final CuratorFramework zk;
-     private final String leaderlockPath;
-     private final String id;
-     private final AtomicReference<LeaderLatch> leaderLatch;
-     private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+    private final Map conf;
+    private final List<String> servers;
+    private final CuratorFramework zk;
+    private final String leaderlockPath;
+    private final String id;
+    private final AtomicReference<LeaderLatch> leaderLatch;
+    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+    private final BlobStore blobStore;
 
     public LeaderElectorImp(Map conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference<LeaderLatch> leaderLatch,
-            AtomicReference<LeaderLatchListener> leaderLatchListener) {
+            AtomicReference<LeaderLatchListener> leaderLatchListener, BlobStore blobStore) {
         this.conf = conf;
         this.servers = servers;
         this.zk = zk;
@@ -53,6 +55,7 @@
         this.id = id;
         this.leaderLatch = leaderLatch;
         this.leaderLatchListener = leaderLatchListener;
+        this.blobStore = blobStore;
     }
 
     @Override
@@ -65,7 +68,7 @@
         // if this latch is already closed, we need to create new instance.
         if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
             leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
-            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, leaderLatch.get()));
+            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get()));
             LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
         }
         // Only if the latch is not already started we invoke start
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 5e9039a..7723922 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.zookeeper;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -27,11 +30,15 @@
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.KeyFilter;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.zookeeper.KeeperException;
@@ -44,6 +51,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -323,26 +331,51 @@
     }
 
     // Leader latch listener that will be invoked when we either gain or lose leadership
-    public static LeaderLatchListener leaderLatchListenerImpl(Map conf, CuratorFramework zk, LeaderLatch leaderLatch) throws UnknownHostException {
+    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
         final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
         return new LeaderLatchListener() {
             @Override
             public void isLeader() {
-                LOG.info("{} gained leadership", hostName);
+                Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+                Set<String> localTopologyIds = blobStore.filterAndListKeys(new KeyFilter<String>() {
+                    @Override
+                    public String filter(String key) {
+                        return ConfigUtils.getIdFromBlobKey(key);
+                    }
+                });
+                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, localTopologyIds);
+                LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology [{}]",
+                        generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds),
+                        generateJoinedString(diffTopology));
+
+                if (diffTopology.isEmpty()) {
+                    LOG.info("Accepting leadership, all active topology found locally.");
+                } else {
+                    LOG.info("code for all active topologies not available locally, giving up leadership.");
+                    try {
+                        leaderLatch.close();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
             }
 
             @Override
             public void notLeader() {
                 LOG.info("{} lost leadership.", hostName);
             }
+
+            private String generateJoinedString(Set<String> activeTopologyIds) {
+                return Joiner.on(",").join(activeTopologyIds);
+            }
         };
     }
 
-    public static ILeaderElector zkLeaderElector(Map conf) throws UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf);
+    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, blobStore);
     }
 
-    protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException {
+    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
@@ -350,8 +383,9 @@
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
         AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
-                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, leaderLatchAtomicReference.get()));
-        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference);
+                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
+        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
+            leaderLatchListenerAtomicReference, blobStore);
     }
 
     public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
diff --git a/storm-core/src/ui/public/component.html b/storm-core/src/ui/public/component.html
index 6d5465f..005ac5d 100644
--- a/storm-core/src/ui/public/component.html
+++ b/storm-core/src/ui/public/component.html
@@ -22,7 +22,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -32,7 +32,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/moment.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -83,7 +83,7 @@
     try {
       other();
     } catch (err) {
-      $.get("/templates/json-error-template.html", function(template) {
+      getStatic("/templates/json-error-template.html", function(template) {
         $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
       });
     }
@@ -156,7 +156,7 @@
     $.ajaxSetup({
         "error":function(jqXHR,textStatus,response) {
             var errorJson = jQuery.parseJSON(jqXHR.responseText);
-            $.get("/templates/json-error-template.html", function(template) {
+            getStatic("/templates/json-error-template.html", function(template) {
                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
             });
         }
@@ -187,7 +187,7 @@
 
     $.getJSON(url,function(response,status,jqXHR) {
         var uiUser = $("#ui-user");
-        $.get("/templates/user-template.html", function(template) {
+        getStatic("/templates/user-template.html", function(template) {
             uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
             $('#ui-user [data-toggle="tooltip"]').tooltip()
         });
@@ -202,7 +202,7 @@
         var profilerControl = $("#profiler-control");
         var executorStats = $("#component-executor-stats");
         var componentErrors = $("#component-errors");
-        $.get("/templates/component-page-template.html", function(template) {
+        getStatic("/templates/component-page-template.html", function(template) {
             response["profilerActive"] = $.map(response["profilerActive"], function(active_map) {
                 var date = new Date();
                 var millis = date.getTime() + parseInt(active_map["timestamp"]);
@@ -341,9 +341,9 @@
     var passed = {}
     Object.keys(workerActionSelected).forEach(function (id) {
         var url = "/api/v1/topology/"+topologyId+"/profiling/start/" + id + "/" + timeout;
-        $.get(url, function(response,status,jqXHR) {
+        getStatic(url, function(response,status,jqXHR) {
             jsError(function() {
-                $.get("/templates/component-page-template.html", function(template) {
+                getStatic("/templates/component-page-template.html", function(template) {
                     var host_port_split = id.split(":");
                     var host = host_port_split[0];
                     var port = host_port_split[1];
@@ -382,7 +382,7 @@
     $("#stop_" + id).prop('disabled', true);
     setTimeout(function(){ $("#stop_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request to stop profiling...");
     })
     .fail(function(response) {
@@ -398,7 +398,7 @@
     $("#dump_profile_" + id).prop('disabled', true);
     setTimeout(function(){ $("#dump_profile_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request to dump profile snapshot...");
     })
     .fail(function(response) {
@@ -418,7 +418,7 @@
         $("#dump_jstack_" + id).prop('disabled', true);
         setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
 
-        $.get(url)
+        getStatic(url)
         .fail(function(response) {
             failed[id] = response;
         });
@@ -443,7 +443,7 @@
     $("#dump_jstack_" + id).prop('disabled', true);
     setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request for jstack dump...");
     })
     .fail(function(response) {
@@ -461,7 +461,7 @@
         $("#restart_worker_jvm_" + id).prop('disabled', true);
         setTimeout(function(){ $("#restart_worker_jvm_" + id).prop('disabled', false); }, 5000);
 
-        $.get(url)
+        getStatic(url)
         .fail(function(response) {
             failed[id] = response;
         });
@@ -489,7 +489,7 @@
         $("#dump_heap_" + id).prop('disabled', true);
         setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
 
-        $.get(url)
+        getStatic(url)
         .fail(function(response) {
             failed[id] = response;
         });
@@ -514,7 +514,7 @@
     $("#dump_heap_" + id).prop('disabled', true);
     setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request for jmap dump...");
     })
     .fail(function(response) {
diff --git a/storm-core/src/ui/public/deep_search_result.html b/storm-core/src/ui/public/deep_search_result.html
index 406a101..5b4f47b 100644
--- a/storm-core/src/ui/public/deep_search_result.html
+++ b/storm-core/src/ui/public/deep_search_result.html
@@ -21,7 +21,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -31,7 +31,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/url.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -56,12 +56,12 @@
     var port = $.url("?port") || "*";
     var search_archived = $.url("?search-archived");
 
-    $.get("/templates/deep-search-result-page-template.html", function(template) {
+    getStatic("/templates/deep-search-result-page-template.html", function(template) {
         var checked;
         if (search_archived) {
             checked = "checked";
         }
-        $.get("/api/v1/history/summary", function (response, status, jqXHR) {
+        getStatic("/api/v1/history/summary", function (response, status, jqXHR) {
             var findIds = function findIds(query, cb) {
                 var found = [];
                 var re = new RegExp(query, 'i');
@@ -104,7 +104,7 @@
 
         var result = $("#result");
         if(search) {
-            $.get("/api/v1/supervisor/summary", function(response, status, jqXHR) {
+            getStatic("/api/v1/supervisor/summary", function(response, status, jqXHR) {
 
                 for(var i in response.supervisors) {
                     response.supervisors[i].elemId = elem_id_for_host(response.supervisors[i].host);
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
index eae423c..52900a5 100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@ -23,7 +23,7 @@
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
 <link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -32,7 +32,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
 <script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -96,7 +96,7 @@
     $.ajaxSetup({
         "error":function(jqXHR,textStatus,response) {
             var errorJson = jQuery.parseJSON(jqXHR.responseText);
-            $.get("/templates/json-error-template.html", function(template) {
+            getStatic("/templates/json-error-template.html", function(template) {
                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
             });
         }
@@ -109,18 +109,18 @@
     var config = $("#nimbus-configuration");
 
     $.getJSON("/api/v1/cluster/summary",function(response,status,jqXHR) {
-        $.get("/templates/user-template.html", function(template) {
+        getStatic("/templates/user-template.html", function(template) {
             uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
             $('#ui-user [data-toggle="tooltip"]').tooltip()
         });
 
-        $.get("/templates/index-page-template.html", function(template) {
+        getStatic("/templates/index-page-template.html", function(template) {
             clusterSummary.append(Mustache.render($(template).filter("#cluster-summary-template").html(),response));
             $('#cluster-summary [data-toggle="tooltip"]').tooltip();
         });
     });
     $.getJSON("/api/v1/nimbus/summary",function(response,status,jqXHR) {
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
           nimbusSummary.append(Mustache.render($(template).filter("#nimbus-summary-template").html(),response));
           //host, port, isLeader, version, uptime
           dtAutoPage("#nimbus-summary-table", {
@@ -133,7 +133,7 @@
       });
     });
     $.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) {
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
           topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
           //name, owner, status, uptime, num workers, num executors, num tasks, replication count, assigned total mem, assigned total cpu, scheduler info
           dtAutoPage("#topology-summary-table", {
@@ -146,7 +146,7 @@
       });
     });
     $.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) {
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
           supervisorSummary.append(Mustache.render($(template).filter("#supervisor-summary-template").html(),response));
           //id, host, uptime, slots, used slots
           dtAutoPage("#supervisor-summary-table", {
@@ -160,7 +160,7 @@
     });
     $.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) {
       var formattedResponse = formatConfigData(response);
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
         config.append(Mustache.render($(template).filter("#configuration-template").html(),formattedResponse));
         $('#nimbus-configuration-table td').jsonFormatter()
         //key, value
diff --git a/storm-core/src/ui/public/js/script.js b/storm-core/src/ui/public/js/script.js
index a880205..6d641e1 100644
--- a/storm-core/src/ui/public/js/script.js
+++ b/storm-core/src/ui/public/js/script.js
@@ -257,3 +257,15 @@
     opacity: .5,
     color: '#fff',margin:0,width:"30%",top:"40%",left:"35%",textAlign:"center"
 };
+
+// add a url query param to static (templates normally) ajax requests
+// for cache busting
+function getStatic(url, cb) {
+    return $.ajax({
+        url: url,
+        data: {
+            '_ts': '${packageTimestamp}'
+        },
+        success: cb
+    });
+};
diff --git a/storm-core/src/ui/public/js/visualization.js b/storm-core/src/ui/public/js/visualization.js
index 9aab087..4daadc2 100644
--- a/storm-core/src/ui/public/js/visualization.js
+++ b/storm-core/src/ui/public/js/visualization.js
@@ -379,7 +379,7 @@
   try {
     other();
   } catch (err) {
-    $.get("/templates/json-error-template.html", function(template) {
+    getStatic("/templates/json-error-template.html", function(template) {
       $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
     });
   }
@@ -388,7 +388,7 @@
 var should_update;
 function show_visualization(sys) {
     $.getJSON("/api/v1/topology/"+$.url("?id")+"/visualization-init",function(response,status,jqXHR) {
-        $.get("/templates/topology-page-template.html", function(template) {
+        getStatic("/templates/topology-page-template.html", function(template) {
             jsError(function() {
                 var topologyVisualization = $("#visualization-container");
                 topologyVisualization.append(
diff --git a/storm-core/src/ui/public/logviewer_search.html b/storm-core/src/ui/public/logviewer_search.html
index f981499..3a1682e 100644
--- a/storm-core/src/ui/public/logviewer_search.html
+++ b/storm-core/src/ui/public/logviewer_search.html
@@ -21,7 +21,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -30,7 +30,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/url.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -51,7 +51,7 @@
     file = decodeURIComponent(file);
     search = decodeURIComponent(search);
 
-    $.get("/templates/logviewer-search-page-template.html", function(template) {
+    getStatic("/templates/logviewer-search-page-template.html", function(template) {
         $("#search-form").append(Mustache.render($(template).filter("#search-single-file").html(),{file: file, search: search, isDaemon: isDaemon}));
 
         var result = $("#result");
diff --git a/storm-core/src/ui/public/search_result.html b/storm-core/src/ui/public/search_result.html
index ec44aa9..ee0efd8 100644
--- a/storm-core/src/ui/public/search_result.html
+++ b/storm-core/src/ui/public/search_result.html
@@ -21,7 +21,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -30,7 +30,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/url.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -49,7 +49,7 @@
     var count = $.url("?count") || 2;
     var searchArchived = $.url("?searchArchived") || "";
 
-    $.get("/templates/search-result-page-template.html", function(template) {
+    getStatic("/templates/search-result-page-template.html", function(template) {
         $("#search-form").append(Mustache.render($(template).filter("#search-form-template").html(),{id: id, search: search, count: count, searchArchived: searchArchived}));
 
         var result = $("#result");
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 7ad84a4..0f7cf9a 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -24,7 +24,7 @@
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
 <link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -33,7 +33,7 @@
 <script src="/js/bootstrap-3.3.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 <script src="/js/visualization.js" type="text/javascript"></script>
 <script src="/js/arbor.js" type="text/javascript"></script>
 <script src="/js/arbor-graphics.js" type="text/javascript"></script>
@@ -230,7 +230,7 @@
     };
     if (!responseData) {
         var topologyId = $.url("?id");
-        $.get ('/api/v1/topology/' + topologyId + '/logconfig', renderImpl);
+        getStatic ('/api/v1/topology/' + topologyId + '/logconfig', renderImpl);
     } else {
         renderImpl (responseData);
     }
@@ -264,7 +264,7 @@
     $.ajaxSetup({
         "error":function(jqXHR,textStatus,response) {
             var errorJson = jQuery.parseJSON(jqXHR.responseText);
-            $.get("/templates/json-error-template.html", function(template) {
+            getStatic("/templates/json-error-template.html", function(template) {
                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
             });
         }
@@ -272,7 +272,7 @@
 
     $.getJSON(url,function(response,status,jqXHR) {
         var uiUser = $("#ui-user");
-        $.get("/templates/user-template.html", function(template) {
+        getStatic("/templates/user-template.html", function(template) {
             uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
             $('#ui-user [data-toggle="tooltip"]').tooltip();
         });
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 53cf517..96e629d 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1147,37 +1147,6 @@
       (getAllNimbuses [this] `(leader-address))
       (close [this] true))))
 
-(deftest test-cleans-corrupt
-  (with-inprocess-zookeeper zk-port
-    (with-local-tmp [nimbus-dir]
-      (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
-        (letlocals
-         (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                           {STORM-ZOOKEEPER-SERVERS ["localhost"]
-                            STORM-CLUSTER-MODE "local"
-                            STORM-ZOOKEEPER-PORT zk-port
-                            STORM-LOCAL-DIR nimbus-dir}))
-         (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
-         (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (bind topology (Thrift/buildTopology
-                         {"1" (Thrift/prepareSpoutDetails
-                                (TestPlannerSpout. true) (Integer. 3))}
-                         {}))
-         (submit-local-topology nimbus "t1" {} topology)
-         (submit-local-topology nimbus "t2" {} topology)
-         (bind storm-id1 (StormCommon/getStormId cluster-state "t1"))
-         (bind storm-id2 (StormCommon/getStormId cluster-state "t2"))
-         (.shutdown nimbus)
-         (let [blob-store (Utils/getNimbusBlobStore conf nil)]
-           (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state)
-           (.shutdown blob-store))
-         (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (is ( = #{storm-id2} (set (.activeStorms cluster-state))))
-         (.shutdown nimbus)
-         (.disconnect cluster-state)
-         )))))
-
 ;(deftest test-no-overlapping-slots
 ;  ;; test that same node+port never appears across 2 assignments
 ;  )
@@ -1224,7 +1193,7 @@
   (with-inprocess-zookeeper zk-port
     (with-local-tmp [nimbus-dir]
       (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
+                      (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                        {STORM-ZOOKEEPER-SERVERS ["localhost"]
@@ -1239,7 +1208,7 @@
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf] (mock-leader-elector :is-leader false))))]
+                          (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector :is-leader false))))]
 
             (letlocals
               (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1499,7 +1468,7 @@
                   _ (UtilsInstaller. fake-utils)
                   - (StormCommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf] nil)))
+                          (zkLeaderElectorImpl [conf blob-store] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
         (stubbing [nimbus/file-cache-map nil
                  nimbus/mk-blob-cache-map nil
@@ -1563,7 +1532,7 @@
   (with-inprocess-zookeeper zk-port
     (with-local-tmp [nimbus-dir]
       (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
+                      (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                        {STORM-ZOOKEEPER-SERVERS ["localhost"]
diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
index a545427..6645566 100644
--- a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
@@ -187,7 +187,7 @@
             windowManager.add(i, now - 1000);
         }
         // simulate the time trigger by setting the reference time and invoking onTrigger() manually
-        evictionPolicy.setContext(now + 100);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 100));
         windowManager.onTrigger();
 
         // 100 events with past ts should expire
@@ -210,7 +210,7 @@
         }
         activationsEvents.addAll(newEvents);
         // simulate the time trigger by setting the reference time and invoking onTrigger() manually
-        evictionPolicy.setContext(now + 200);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 200));
         windowManager.onTrigger();
         assertTrue(listener.onExpiryEvents.isEmpty());
         assertEquals(activationsEvents, listener.onActivationEvents);
@@ -236,20 +236,20 @@
             windowManager.add(i);
         }
         // simulate the time trigger by setting the reference time and invoking onTrigger() manually
-        evictionPolicy.setContext(now + 60);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 60));
         windowManager.onTrigger();
 
         assertEquals(seq(1, 10), listener.onActivationEvents);
         assertTrue(listener.onActivationExpiredEvents.isEmpty());
         listener.clear();
         // wait so all events expire
-        evictionPolicy.setContext(now + 120);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 120));
         windowManager.onTrigger();
 
         assertEquals(seq(1, 10), listener.onExpiryEvents);
         assertTrue(listener.onActivationEvents.isEmpty());
         listener.clear();
-        evictionPolicy.setContext(now + 180);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 180));
         windowManager.onTrigger();
         assertTrue(listener.onActivationExpiredEvents.isEmpty());
         assertTrue(listener.onActivationEvents.isEmpty());
@@ -354,7 +354,7 @@
 
     @Test
     public void testCountBasedWindowWithEventTs() throws Exception {
-        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3, windowManager);
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3);
         windowManager.setEvictionPolicy(evictionPolicy);
         TriggerPolicy<Integer> triggerPolicy = new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager);
         triggerPolicy.start();
@@ -430,7 +430,64 @@
         assertEquals(seq(9), listener.allOnActivationEvents.get(0));
         assertEquals(seq(9, 12), listener.allOnActivationEvents.get(1));
     }
+
     @Test
+    public void testCountBasedTumblingWithSameEventTs() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(2);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer> triggerPolicy = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(1, 10);
+        windowManager.add(2, 10);
+        windowManager.add(3, 11);
+        windowManager.add(4, 12);
+        windowManager.add(5, 12);
+        windowManager.add(6, 12);
+        windowManager.add(7, 12);
+        windowManager.add(8, 13);
+        windowManager.add(9, 14);
+        windowManager.add(10, 15);
+
+        windowManager.add(new WaterMarkEvent<Integer>(20));
+        assertEquals(5, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(3, 4), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(5, 6), listener.allOnActivationEvents.get(2));
+        assertEquals(seq(7, 8), listener.allOnActivationEvents.get(3));
+        assertEquals(seq(9, 10), listener.allOnActivationEvents.get(4));
+    }
+
+    @Test
+    public void testCountBasedSlidingWithSameEventTs() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(5);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer> triggerPolicy = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(1, 10);
+        windowManager.add(2, 10);
+        windowManager.add(3, 11);
+        windowManager.add(4, 12);
+        windowManager.add(5, 12);
+        windowManager.add(6, 12);
+        windowManager.add(7, 12);
+        windowManager.add(8, 13);
+        windowManager.add(9, 14);
+        windowManager.add(10, 15);
+
+        windowManager.add(new WaterMarkEvent<Integer>(20));
+        assertEquals(5, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(2, 6), listener.allOnActivationEvents.get(2));
+        assertEquals(seq(4, 8), listener.allOnActivationEvents.get(3));
+        assertEquals(seq(6, 10), listener.allOnActivationEvents.get(4));
+
+    }
+        @Test
     public void testEventTimeLag() throws Exception {
         EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5);
         windowManager.setEvictionPolicy(evictionPolicy);
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index dd88a87..f7d0c23 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -30,6 +30,16 @@
     <name>Storm Binary Distribution</name>
     <description>Storm binary distribution</description>
 
+    <!--
+        Used for cache busting in the Storm UI HTML and script.js files
+        See src/main/assembly/pom.xml for the fileSet rules with
+            <filtered>true</filtered>
+    -->
+    <properties>
+        <packageTimestamp>${maven.build.timestamp}</packageTimestamp>
+        <maven.build.timestamp.format>YYYYMMddHHmm</maven.build.timestamp.format>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 6760bc8..b42de4f 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -46,13 +46,36 @@
             </includes>
             <fileMode>0755</fileMode>
         </fileSet>
+        <!--
+            Allow for variable substitution for cache busting in HTML files and script.js
+            See storm-dist/binary/pom.xml for custom variables that will be substituted in.
+
+            The source files should have a ${variable to be replaced} wherever
+            maven assembly plugin should inject a value.
+        -->
+        <fileSet>
+            <directory>${project.basedir}/../../storm-core/src/ui/public</directory>
+            <outputDirectory>public</outputDirectory>
+            <includes>
+                <include>*.html</include>
+                <include>js/script.js</include>
+            </includes>
+            <excludes/>
+            <filtered>true</filtered>
+        </fileSet>
+        <!--
+            Include rest of public/ directory, without any filtering
+        -->
         <fileSet>
             <directory>${project.basedir}/../../storm-core/src/ui/public</directory>
             <outputDirectory>public</outputDirectory>
             <includes>
                 <include>*/**</include>
             </includes>
-            <excludes/>
+            <excludes>
+                <exclude>*.html</exclude>
+                <exclude>js/script.js</exclude>
+            </excludes>
         </fileSet>
         <fileSet>
             <directory>${project.basedir}/../../examples</directory>
@@ -338,6 +361,14 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-opentsdb/target</directory>
+            <outputDirectory>external/storm-opentsdb</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+
         <!-- $STORM_HOME/extlib -->
         <fileSet>
             <directory></directory>
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
index 642c393..9106390 100755
--- a/storm-multilang/python/src/main/resources/resources/storm.py
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -75,8 +75,8 @@
     return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
 
 def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
+    print(json_encode(msg))
+    print("end")
     sys.stdout.flush()
 
 def sync():
@@ -196,7 +196,7 @@
                     sync()
                 else:
                     self.process(tup)
-        except Exception, e:
+        except Exception as e:
             reportError(traceback.format_exc(e))
 
 class BasicBolt(object):
@@ -222,10 +222,10 @@
                     try:
                         self.process(tup)
                         ack(tup)
-                    except Exception, e:
+                    except Exception as e:
                         reportError(traceback.format_exc(e))
                         fail(tup)
-        except Exception, e:
+        except Exception as e:
             reportError(traceback.format_exc(e))
 
 class Spout(object):
@@ -256,5 +256,5 @@
                 if msg["command"] == "fail":
                     self.fail(msg["id"])
                 sync()
-        except Exception, e:
+        except Exception as e:
             reportError(traceback.format_exc(e))