Merge branch 'STORM-2000' of github.com:satishd/storm
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 361c368..9b3b699 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
 ## 2.0.0
+ * STORM-1962: support python 3 and 2 in multilang
+ * STORM-1964: Unexpected behavior when using count window together with timestamp extraction
+ * STORM-1890: ensure we refetch static resources after package build
  * STORM-1966 Expand metric having Map type as value into multiple metrics based on entries
  * STORM-1737: storm-kafka-client has compilation errors with Apache Kafka 0.10
  * STORM-1910 One topology cannot use hdfs spout to read from two locations
@@ -123,6 +126,9 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0 
+ * STORM-1988: Kafka Offset not showing due to bad classpath.
+ * STORM-1987: Fix TridentKafkaWordCount arg handling in distributed mode.
+ * STORM-1969: Modify HiveTopology to show usage of non-partition table.
  * STORM-1950: Change response json of "Topology Lag" REST API to keyed by spoutId, topic, partition.
  * STORM-1833: Simple equi-join in storm-sql standalone mode
  * STORM-1866: Update Resource Aware Scheduler Documentation
@@ -155,6 +161,8 @@
  * STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
 
 ## 1.0.2
+ * STORM-1976: Remove cleanup-corrupt-topologies!
+ * STORM-1977: Restore logic: give up leadership when elected as leader but doesn't have one or more of topology codes on local
  * STORM-1939: Frequent InterruptedException raised by ShellBoltMessageQueue.poll
  * STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
  * STORM-1922: Supervisor summary default order by host
diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor
index a47a27d..bfe50b7 100755
--- a/bin/storm-kafka-monitor
+++ b/bin/storm-kafka-monitor
@@ -40,4 +40,4 @@
   JAVA="$JAVA_HOME/bin/java"
 fi
 
-exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/storm-kafka-monitor*.jar" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
+exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
index 2b25fad..0499041 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
@@ -213,7 +213,7 @@
             System.exit(1);
         } else if (args.length == 1) {
             zkUrl = args[0];
-        } else if (args.length == 2) {
+        } else {
             zkUrl = args[0];
             brokerUrl = args[1];
         }
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
index db45af2..a7a79fb 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
@@ -38,7 +38,7 @@
     /**
      * Given a endPoint, returns a RecordWriter with columnNames.
      *
-     * @param tuple
+     * @param endPoint
      * @return
      */
 
@@ -66,7 +66,7 @@
     /**
      * Given a TridetnTuple, return a hive partition values list.
      *
-     * @param TridentTuple
+     * @param tuple
      * @return List<String>
      */
     List<String> mapPartitions(TridentTuple tuple);
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
index 4df1c60..dcc26a4 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -324,7 +324,7 @@
     /**
      * if there are remainingTransactions in current txnBatch, begins nextTransactions
      * otherwise creates new txnBatch.
-     * @param boolean rollToNext
+     * @param rollToNext
      */
     private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
         if(txnBatch.remainingTransactions() == 0) {
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
index 8b61d5e..4afd298 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -51,7 +51,6 @@
         config.setNumWorkers(1);
         UserDataSpout spout = new UserDataSpout();
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
-                .withTimeAsPartitionField("yyyy/MM/dd/hh")
                 .withColumnFields(new Fields(colNames));
         HiveOptions hiveOptions;
         if (args.length == 6) {
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 315e3e9..1beec0e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -313,12 +313,16 @@
                 "kafkaProps=" + kafkaProps +
                 ", keyDeserializer=" + keyDeserializer +
                 ", valueDeserializer=" + valueDeserializer +
-                ", topics=" + getSubscribedTopics() +
-                ", topicWildcardPattern=" + getTopicWildcardPattern() +
-                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
                 ", pollTimeoutMs=" + pollTimeoutMs +
                 ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
                 ", maxRetries=" + maxRetries +
+                ", maxUncommittedOffsets=" + maxUncommittedOffsets +
+                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
+                ", kafkaSpoutStreams=" + kafkaSpoutStreams +
+                ", tuplesBuilder=" + tuplesBuilder +
+                ", retryService=" + retryService +
+                ", topics=" + getSubscribedTopics() +
+                ", topicWildcardPattern=" + getTopicWildcardPattern() +
                 '}';
     }
 }
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 78fd265..6fe997c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -264,8 +264,8 @@
     private long nextTime(KafkaSpoutMessageId msgId) {
         final long currentTimeNanos = System.nanoTime();
         final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
-                ? currentTimeNanos + initialDelay.lengthNanos()
-                : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+                ? currentTimeNanos + initialDelay.lengthNanos
+                : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
         return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
     }
 
diff --git a/pom.xml b/pom.xml
index 06ea3a4..31cbe97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,19 +209,19 @@
         <!-- dependency versions -->
         <clojure.version>1.7.0</clojure.version>
         <java_jmx.version>0.3.1</java_jmx.version>
-        <compojure.version>1.1.3</compojure.version>
+        <compojure.version>1.1.9</compojure.version>
         <hiccup.version>0.3.6</hiccup.version>
         <commons-compress.version>1.4.1</commons-compress.version>
         <commons-io.version>2.5</commons-io.version>
         <commons-lang.version>2.5</commons-lang.version>
         <commons-exec.version>1.1</commons-exec.version>
-        <commons-fileupload.version>1.2.1</commons-fileupload.version>
+        <commons-fileupload.version>1.3.2</commons-fileupload.version>
         <commons-codec.version>1.6</commons-codec.version>
         <commons-cli.version>1.3.1</commons-cli.version>
         <clj-time.version>0.8.0</clj-time.version>
         <curator.version>2.10.0</curator.version>
         <json-simple.version>1.1</json-simple.version>
-        <ring.version>1.3.0</ring.version>
+        <ring.version>1.3.1</ring.version>
         <ring-json.version>0.3.1</ring-json.version>
         <jetty.version>7.6.13.v20130916</jetty.version>
         <clojure.tools.logging.version>0.2.3</clojure.tools.logging.version>
@@ -673,6 +673,11 @@
             </dependency>
             <dependency>
                 <groupId>ring</groupId>
+                <artifactId>ring-core</artifactId>
+                <version>${ring.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>ring</groupId>
                 <artifactId>ring-devel</artifactId>
                 <version>${ring.version}</version>
             </dependency>
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b03dc20..1220746 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -74,6 +74,10 @@
         </dependency>
         <dependency>
             <groupId>ring</groupId>
+            <artifactId>ring-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ring</groupId>
             <artifactId>ring-devel</artifactId>
         </dependency>
         <dependency>
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8d193e6..5ed4d39 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -25,7 +25,8 @@
 
 (defn -main [^String tmpjarpath & args]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        zk-leader-elector (Zookeeper/zkLeaderElector conf)
+        ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
+        zk-leader-elector (Zookeeper/zkLeaderElector conf nil)
         leader-nimbus (.getLeader zk-leader-elector)
         host (.getHost leader-nimbus)
         port (.getPort leader-nimbus)
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0391adb..93507ff 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -179,7 +179,8 @@
     (get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
 
 (defn nimbus-data [conf inimbus]
-  (let [forced-scheduler (.getForcedScheduler inimbus)]
+  (let [forced-scheduler (.getForcedScheduler inimbus)
+        blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))]
     {:conf conf
      :nimbus-host-port-info (NimbusInfo/fromConf conf)
      :inimbus inimbus
@@ -197,7 +198,7 @@
      :heartbeats-cache (atom {})
      :downloaders (file-cache-map conf)
      :uploaders (file-cache-map conf)
-     :blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+     :blob-store blob-store
      :blob-downloaders (mk-blob-cache-map conf)
      :blob-uploaders (mk-blob-cache-map conf)
      :blob-listers (mk-bloblist-cache-map conf)
@@ -211,7 +212,7 @@
                   (Utils/exitProcess 20 "Error when processing an event"))))
 
      :scheduler (mk-scheduler conf inimbus)
-     :leader-elector (Zookeeper/zkLeaderElector conf)
+     :leader-elector (Zookeeper/zkLeaderElector conf blob-store)
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
@@ -1166,19 +1167,6 @@
           topo-history-state (:topo-history-state nimbus)]
           (.filterOldTopologies ^LocalState topo-history-state cutoff-age))))
 
-(defn cleanup-corrupt-topologies! [nimbus]
-  (let [storm-cluster-state (:storm-cluster-state nimbus)
-        blob-store (:blob-store nimbus)
-        code-ids (set (code-ids blob-store))
-        active-topologies (set (.activeStorms storm-cluster-state))
-        corrupt-topologies (set/difference active-topologies code-ids)]
-    (doseq [corrupt corrupt-topologies]
-      (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
-      (.removeStorm storm-cluster-state corrupt)
-      (if (instance? LocalFsBlobStore blob-store)
-        (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
-          (.removeBlobstoreKey storm-cluster-state blob-key))))))
-
 (defn setup-blobstore [nimbus]
   "Sets up blobstore state for all current keys."
   (let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -2202,7 +2190,6 @@
         STORM-VERSION))
 
     (.addToLeaderLockQueue (:leader-elector nimbus))
-    (cleanup-corrupt-topologies! nimbus)
     (when (instance? LocalFsBlobStore blob-store)
       ;register call back for blob-store
       (.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 3ad955d..831725a 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -18,7 +18,8 @@
   (:use compojure.core)
   (:use [clojure.java.shell :only [sh]])
   (:use ring.middleware.reload
-        ring.middleware.multipart-params)
+        ring.middleware.multipart-params
+        ring.middleware.multipart-params.temp-file)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
   (:use [org.apache.storm config util log converter])
diff --git a/storm-core/src/clj/org/apache/storm/ui/helpers.clj b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
index e681cfb..ac1ecd1 100644
--- a/storm-core/src/clj/org/apache/storm/ui/helpers.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
@@ -36,22 +36,35 @@
            [org.eclipse.jetty.server DispatcherType]
            [org.eclipse.jetty.servlets CrossOriginFilter]
            (org.json.simple JSONValue))
-  (:require [ring.util servlet])
+  (:require [ring.util servlet]
+            [ring.util.response :as response])
   (:require [compojure.route :as route]
             [compojure.handler :as handler]))
 
 ;; TODO this function and its callings will be replace when ui.core and logviewer and drpc move to Java
 (def num-web-requests (StormMetricsRegistry/registerMeter "num-web-requests"))
+
 (defn requests-middleware
-  "Coda Hale metric for counting the number of web requests."
+  "Wrap request with Coda Hale metric for counting the number of web requests, 
+  and add Cache-Control: no-cache for html files in root directory (index.html, topology.html, etc)"
   [handler]
   (fn [req]
     (.mark num-web-requests)
-    (handler req)))
+    (let [uri (:uri req)
+          res (handler req) 
+          content-type (response/get-header res "Content-Type")]
+      ;; check that the response is html and that the path is for a root page: a single / in the path 
+      ;; then we know we don't want it cached (e.g. /index.html)
+      (if (and (= content-type "text/html") 
+               (= 1 (count (re-seq #"/" uri))))
+        ;; response for html page in root directory, no-cache 
+        (response/header res "Cache-Control" "no-cache")
+        ;; else, carry on
+        res))))
 
 ;; TODO this function and its callings will be replace when ui.core and logviewer move to Java
 (defnk json-response
   [data callback :need-serialize true :status 200 :headers {}]
   {:status status
    :headers (UIHelpers/getJsonResponseHeaders callback headers)
-   :body (UIHelpers/getJsonResponseBody data callback need-serialize)})
\ No newline at end of file
+   :body (UIHelpers/getJsonResponseBody data callback need-serialize)})
diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index fe0f4da..3f245e1 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -253,7 +253,7 @@
                                                     WindowManager<Tuple> manager) {
         if (windowLengthCount != null) {
             if (isTupleTs()) {
-                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value, manager);
+                return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
             } else {
                 return new CountEvictionPolicy<>(windowLengthCount.value);
             }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
index 0e7b233..fb12202 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.windowing;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * An eviction policy that tracks event counts and can
@@ -26,12 +26,12 @@
  * @param <T> the type of event tracked by this policy.
  */
 public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
-    private final int threshold;
-    protected final AtomicInteger currentCount;
+    protected final int threshold;
+    protected final AtomicLong currentCount;
 
     public CountEvictionPolicy(int count) {
         this.threshold = count;
-        this.currentCount = new AtomicInteger();
+        this.currentCount = new AtomicLong();
     }
 
     @Override
@@ -41,7 +41,7 @@
          * return if the event should be evicted
          */
         while (true) {
-            int curVal = currentCount.get();
+            long curVal = currentCount.get();
             if (curVal > threshold) {
                 if (currentCount.compareAndSet(curVal, curVal - 1)) {
                     return Action.EXPIRE;
@@ -61,7 +61,7 @@
     }
 
     @Override
-    public void setContext(Object context) {
+    public void setContext(EvictionContext context) {
         // NOOP
     }
 
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
index 2594c67..17750b6 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
@@ -44,7 +44,7 @@
     public void track(Event<T> event) {
         if (started && !event.isWatermark()) {
             if (currentCount.incrementAndGet() >= count) {
-                evictionPolicy.setContext(System.currentTimeMillis());
+                evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
                 handler.onTrigger();
             }
         }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
new file mode 100644
index 0000000..ef65f66
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+public class DefaultEvictionContext implements EvictionContext {
+    private final Long referenceTime;
+    private final Long currentCount;
+    private final Long slidingCount;
+
+    public DefaultEvictionContext(Long referenceTime) {
+        this(referenceTime, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount) {
+        this(referenceTime, currentCount, null);
+    }
+
+    public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) {
+        this.referenceTime = referenceTime;
+        this.currentCount = currentCount;
+        this.slidingCount = slidingCount;
+    }
+
+    @Override
+    public Long getReferenceTime() {
+        return referenceTime;
+    }
+
+    @Override
+    public Long getCurrentCount() {
+        return currentCount;
+    }
+
+    @Override
+    public Long getSlidingCount() {
+        return slidingCount;
+    }
+}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java
new file mode 100644
index 0000000..37dcfd9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+/**
+ * Context information that can be used by the eviction policy
+ */
+public interface EvictionContext {
+    /**
+     * Returns the reference time that the eviction policy could use to
+     * evict the events. In the case of event time processing, this would be
+     * the watermark time.
+     *
+     * @return the reference time in millis
+     */
+    Long getReferenceTime();
+
+    /**
+     * Returns the sliding count for count based windows
+     *
+     * @return the sliding count
+     */
+    Long getSlidingCount();
+
+    /**
+     * Returns the current count of events in the queue up to the reference tim
+     * based on which count based evictions can be performed.
+     *
+     * @return the current count
+     */
+    Long getCurrentCount();
+}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
index 8128d80..05e4d93 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
@@ -70,5 +70,6 @@
      *
      * @param context
      */
-    void setContext(Object context);
+    void setContext(EvictionContext context);
+
 }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index a2db239..e646207 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -57,8 +57,8 @@
     }
 
     @Override
-    public void setContext(Object context) {
-        referenceTime = ((Number) context).longValue();
+    public void setContext(EvictionContext context) {
+        referenceTime = context.getReferenceTime();
     }
 
     @Override
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index 1ad7714..b057afb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -115,7 +115,7 @@
                      * to evict the events
                      */
                     if (evictionPolicy != null) {
-                        evictionPolicy.setContext(System.currentTimeMillis());
+                        evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
                     }
                     handler.onTrigger();
                 } catch (Throwable th) {
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index e1af6fb..74240bb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -24,25 +24,29 @@
  * @param <T> the type of event tracked by this policy.
  */
 public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
-    private final WindowManager<T> windowManager;
     /*
      * The reference time in millis for window calculations and
      * expiring events. If not set it will default to System.currentTimeMillis()
      */
     private long referenceTime;
+    private long processed = 0L;
 
-    public WatermarkCountEvictionPolicy(int count, WindowManager<T> windowManager) {
+    public WatermarkCountEvictionPolicy(int count) {
         super(count);
-        this.windowManager = windowManager;
     }
 
     @Override
     public Action evict(Event<T> event) {
-        if (event.getTimestamp() <= referenceTime) {
-            return super.evict(event);
+        Action action;
+        if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) {
+            action = super.evict(event);
+            if (action == Action.PROCESS) {
+                ++processed;
+            }
         } else {
-            return Action.KEEP;
+            action = Action.KEEP;
         }
+        return action;
     }
 
     @Override
@@ -51,9 +55,14 @@
     }
 
     @Override
-    public void setContext(Object context) {
-        referenceTime = (Long) context;
-        currentCount.set(windowManager.getEventCount(referenceTime));
+    public void setContext(EvictionContext context) {
+        referenceTime = context.getReferenceTime();
+        if (context.getCurrentCount() != null) {
+            currentCount.set(context.getCurrentCount());
+        } else {
+            currentCount.set(processed + context.getSlidingCount());
+        }
+        processed = 0;
     }
 
     @Override
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
index 391efee..3cfcaad 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
@@ -74,7 +74,7 @@
         long watermarkTs = waterMarkEvent.getTimestamp();
         List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
         for (long ts : eventTs) {
-            evictionPolicy.setContext(ts);
+            evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
             handler.onTrigger();
             lastProcessedTs = ts;
         }
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
index 3734c93..00620c4 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
@@ -75,7 +75,8 @@
         long windowEndTs = nextWindowEndTs;
         LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
         while (windowEndTs <= watermarkTs) {
-            evictionPolicy.setContext(windowEndTs);
+            long currentCount = windowManager.getEventCount(windowEndTs);
+            evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
             if (handler.onTrigger()) {
                 windowEndTs += slidingIntervalMs;
             } else {
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
index 7923b7e..74816c2 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -21,6 +21,7 @@
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
@@ -36,16 +37,17 @@
 
 public class LeaderElectorImp implements ILeaderElector {
     private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
-     private final Map conf;
-     private final List<String> servers;
-     private final CuratorFramework zk;
-     private final String leaderlockPath;
-     private final String id;
-     private final AtomicReference<LeaderLatch> leaderLatch;
-     private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+    private final Map conf;
+    private final List<String> servers;
+    private final CuratorFramework zk;
+    private final String leaderlockPath;
+    private final String id;
+    private final AtomicReference<LeaderLatch> leaderLatch;
+    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+    private final BlobStore blobStore;
 
     public LeaderElectorImp(Map conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference<LeaderLatch> leaderLatch,
-            AtomicReference<LeaderLatchListener> leaderLatchListener) {
+            AtomicReference<LeaderLatchListener> leaderLatchListener, BlobStore blobStore) {
         this.conf = conf;
         this.servers = servers;
         this.zk = zk;
@@ -53,6 +55,7 @@
         this.id = id;
         this.leaderLatch = leaderLatch;
         this.leaderLatchListener = leaderLatchListener;
+        this.blobStore = blobStore;
     }
 
     @Override
@@ -65,7 +68,7 @@
         // if this latch is already closed, we need to create new instance.
         if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
             leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
-            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, leaderLatch.get()));
+            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get()));
             LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
         }
         // Only if the latch is not already started we invoke start
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 5e9039a..7723922 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.zookeeper;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -27,11 +30,15 @@
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.KeyFilter;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
 import org.apache.zookeeper.KeeperException;
@@ -44,6 +51,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -323,26 +331,51 @@
     }
 
     // Leader latch listener that will be invoked when we either gain or lose leadership
-    public static LeaderLatchListener leaderLatchListenerImpl(Map conf, CuratorFramework zk, LeaderLatch leaderLatch) throws UnknownHostException {
+    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
         final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
         return new LeaderLatchListener() {
             @Override
             public void isLeader() {
-                LOG.info("{} gained leadership", hostName);
+                Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+                Set<String> localTopologyIds = blobStore.filterAndListKeys(new KeyFilter<String>() {
+                    @Override
+                    public String filter(String key) {
+                        return ConfigUtils.getIdFromBlobKey(key);
+                    }
+                });
+                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, localTopologyIds);
+                LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology [{}]",
+                        generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds),
+                        generateJoinedString(diffTopology));
+
+                if (diffTopology.isEmpty()) {
+                    LOG.info("Accepting leadership, all active topology found locally.");
+                } else {
+                    LOG.info("code for all active topologies not available locally, giving up leadership.");
+                    try {
+                        leaderLatch.close();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
             }
 
             @Override
             public void notLeader() {
                 LOG.info("{} lost leadership.", hostName);
             }
+
+            private String generateJoinedString(Set<String> activeTopologyIds) {
+                return Joiner.on(",").join(activeTopologyIds);
+            }
         };
     }
 
-    public static ILeaderElector zkLeaderElector(Map conf) throws UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf);
+    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, blobStore);
     }
 
-    protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException {
+    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
         CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
@@ -350,8 +383,9 @@
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
         AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
-                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, leaderLatchAtomicReference.get()));
-        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference);
+                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
+        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
+            leaderLatchListenerAtomicReference, blobStore);
     }
 
     public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
diff --git a/storm-core/src/ui/public/component.html b/storm-core/src/ui/public/component.html
index 6d5465f..005ac5d 100644
--- a/storm-core/src/ui/public/component.html
+++ b/storm-core/src/ui/public/component.html
@@ -22,7 +22,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -32,7 +32,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/moment.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -83,7 +83,7 @@
     try {
       other();
     } catch (err) {
-      $.get("/templates/json-error-template.html", function(template) {
+      getStatic("/templates/json-error-template.html", function(template) {
         $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
       });
     }
@@ -156,7 +156,7 @@
     $.ajaxSetup({
         "error":function(jqXHR,textStatus,response) {
             var errorJson = jQuery.parseJSON(jqXHR.responseText);
-            $.get("/templates/json-error-template.html", function(template) {
+            getStatic("/templates/json-error-template.html", function(template) {
                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
             });
         }
@@ -187,7 +187,7 @@
 
     $.getJSON(url,function(response,status,jqXHR) {
         var uiUser = $("#ui-user");
-        $.get("/templates/user-template.html", function(template) {
+        getStatic("/templates/user-template.html", function(template) {
             uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
             $('#ui-user [data-toggle="tooltip"]').tooltip()
         });
@@ -202,7 +202,7 @@
         var profilerControl = $("#profiler-control");
         var executorStats = $("#component-executor-stats");
         var componentErrors = $("#component-errors");
-        $.get("/templates/component-page-template.html", function(template) {
+        getStatic("/templates/component-page-template.html", function(template) {
             response["profilerActive"] = $.map(response["profilerActive"], function(active_map) {
                 var date = new Date();
                 var millis = date.getTime() + parseInt(active_map["timestamp"]);
@@ -341,9 +341,9 @@
     var passed = {}
     Object.keys(workerActionSelected).forEach(function (id) {
         var url = "/api/v1/topology/"+topologyId+"/profiling/start/" + id + "/" + timeout;
-        $.get(url, function(response,status,jqXHR) {
+        getStatic(url, function(response,status,jqXHR) {
             jsError(function() {
-                $.get("/templates/component-page-template.html", function(template) {
+                getStatic("/templates/component-page-template.html", function(template) {
                     var host_port_split = id.split(":");
                     var host = host_port_split[0];
                     var port = host_port_split[1];
@@ -382,7 +382,7 @@
     $("#stop_" + id).prop('disabled', true);
     setTimeout(function(){ $("#stop_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request to stop profiling...");
     })
     .fail(function(response) {
@@ -398,7 +398,7 @@
     $("#dump_profile_" + id).prop('disabled', true);
     setTimeout(function(){ $("#dump_profile_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request to dump profile snapshot...");
     })
     .fail(function(response) {
@@ -418,7 +418,7 @@
         $("#dump_jstack_" + id).prop('disabled', true);
         setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
 
-        $.get(url)
+        getStatic(url)
         .fail(function(response) {
             failed[id] = response;
         });
@@ -443,7 +443,7 @@
     $("#dump_jstack_" + id).prop('disabled', true);
     setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request for jstack dump...");
     })
     .fail(function(response) {
@@ -461,7 +461,7 @@
         $("#restart_worker_jvm_" + id).prop('disabled', true);
         setTimeout(function(){ $("#restart_worker_jvm_" + id).prop('disabled', false); }, 5000);
 
-        $.get(url)
+        getStatic(url)
         .fail(function(response) {
             failed[id] = response;
         });
@@ -489,7 +489,7 @@
         $("#dump_heap_" + id).prop('disabled', true);
         setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
 
-        $.get(url)
+        getStatic(url)
         .fail(function(response) {
             failed[id] = response;
         });
@@ -514,7 +514,7 @@
     $("#dump_heap_" + id).prop('disabled', true);
     setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
     
-    $.get(url, function(response,status,jqXHR) {
+    getStatic(url, function(response,status,jqXHR) {
         alert("Submitted request for jmap dump...");
     })
     .fail(function(response) {
diff --git a/storm-core/src/ui/public/deep_search_result.html b/storm-core/src/ui/public/deep_search_result.html
index 406a101..5b4f47b 100644
--- a/storm-core/src/ui/public/deep_search_result.html
+++ b/storm-core/src/ui/public/deep_search_result.html
@@ -21,7 +21,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -31,7 +31,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/url.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -56,12 +56,12 @@
     var port = $.url("?port") || "*";
     var search_archived = $.url("?search-archived");
 
-    $.get("/templates/deep-search-result-page-template.html", function(template) {
+    getStatic("/templates/deep-search-result-page-template.html", function(template) {
         var checked;
         if (search_archived) {
             checked = "checked";
         }
-        $.get("/api/v1/history/summary", function (response, status, jqXHR) {
+        getStatic("/api/v1/history/summary", function (response, status, jqXHR) {
             var findIds = function findIds(query, cb) {
                 var found = [];
                 var re = new RegExp(query, 'i');
@@ -104,7 +104,7 @@
 
         var result = $("#result");
         if(search) {
-            $.get("/api/v1/supervisor/summary", function(response, status, jqXHR) {
+            getStatic("/api/v1/supervisor/summary", function(response, status, jqXHR) {
 
                 for(var i in response.supervisors) {
                     response.supervisors[i].elemId = elem_id_for_host(response.supervisors[i].host);
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
index eae423c..52900a5 100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@ -23,7 +23,7 @@
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
 <link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -32,7 +32,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
 <script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -96,7 +96,7 @@
     $.ajaxSetup({
         "error":function(jqXHR,textStatus,response) {
             var errorJson = jQuery.parseJSON(jqXHR.responseText);
-            $.get("/templates/json-error-template.html", function(template) {
+            getStatic("/templates/json-error-template.html", function(template) {
                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
             });
         }
@@ -109,18 +109,18 @@
     var config = $("#nimbus-configuration");
 
     $.getJSON("/api/v1/cluster/summary",function(response,status,jqXHR) {
-        $.get("/templates/user-template.html", function(template) {
+        getStatic("/templates/user-template.html", function(template) {
             uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
             $('#ui-user [data-toggle="tooltip"]').tooltip()
         });
 
-        $.get("/templates/index-page-template.html", function(template) {
+        getStatic("/templates/index-page-template.html", function(template) {
             clusterSummary.append(Mustache.render($(template).filter("#cluster-summary-template").html(),response));
             $('#cluster-summary [data-toggle="tooltip"]').tooltip();
         });
     });
     $.getJSON("/api/v1/nimbus/summary",function(response,status,jqXHR) {
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
           nimbusSummary.append(Mustache.render($(template).filter("#nimbus-summary-template").html(),response));
           //host, port, isLeader, version, uptime
           dtAutoPage("#nimbus-summary-table", {
@@ -133,7 +133,7 @@
       });
     });
     $.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) {
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
           topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
           //name, owner, status, uptime, num workers, num executors, num tasks, replication count, assigned total mem, assigned total cpu, scheduler info
           dtAutoPage("#topology-summary-table", {
@@ -146,7 +146,7 @@
       });
     });
     $.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) {
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
           supervisorSummary.append(Mustache.render($(template).filter("#supervisor-summary-template").html(),response));
           //id, host, uptime, slots, used slots
           dtAutoPage("#supervisor-summary-table", {
@@ -160,7 +160,7 @@
     });
     $.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) {
       var formattedResponse = formatConfigData(response);
-      $.get("/templates/index-page-template.html", function(template) {
+      getStatic("/templates/index-page-template.html", function(template) {
         config.append(Mustache.render($(template).filter("#configuration-template").html(),formattedResponse));
         $('#nimbus-configuration-table td').jsonFormatter()
         //key, value
diff --git a/storm-core/src/ui/public/js/script.js b/storm-core/src/ui/public/js/script.js
index a880205..6d641e1 100644
--- a/storm-core/src/ui/public/js/script.js
+++ b/storm-core/src/ui/public/js/script.js
@@ -257,3 +257,15 @@
     opacity: .5,
     color: '#fff',margin:0,width:"30%",top:"40%",left:"35%",textAlign:"center"
 };
+
+// add a url query param to static (templates normally) ajax requests
+// for cache busting
+function getStatic(url, cb) {
+    return $.ajax({
+        url: url,
+        data: {
+            '_ts': '${packageTimestamp}'
+        },
+        success: cb
+    });
+};
diff --git a/storm-core/src/ui/public/js/visualization.js b/storm-core/src/ui/public/js/visualization.js
index 9aab087..4daadc2 100644
--- a/storm-core/src/ui/public/js/visualization.js
+++ b/storm-core/src/ui/public/js/visualization.js
@@ -379,7 +379,7 @@
   try {
     other();
   } catch (err) {
-    $.get("/templates/json-error-template.html", function(template) {
+    getStatic("/templates/json-error-template.html", function(template) {
       $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
     });
   }
@@ -388,7 +388,7 @@
 var should_update;
 function show_visualization(sys) {
     $.getJSON("/api/v1/topology/"+$.url("?id")+"/visualization-init",function(response,status,jqXHR) {
-        $.get("/templates/topology-page-template.html", function(template) {
+        getStatic("/templates/topology-page-template.html", function(template) {
             jsError(function() {
                 var topologyVisualization = $("#visualization-container");
                 topologyVisualization.append(
diff --git a/storm-core/src/ui/public/logviewer_search.html b/storm-core/src/ui/public/logviewer_search.html
index f981499..3a1682e 100644
--- a/storm-core/src/ui/public/logviewer_search.html
+++ b/storm-core/src/ui/public/logviewer_search.html
@@ -21,7 +21,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -30,7 +30,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/url.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -51,7 +51,7 @@
     file = decodeURIComponent(file);
     search = decodeURIComponent(search);
 
-    $.get("/templates/logviewer-search-page-template.html", function(template) {
+    getStatic("/templates/logviewer-search-page-template.html", function(template) {
         $("#search-form").append(Mustache.render($(template).filter("#search-single-file").html(),{file: file, search: search, isDaemon: isDaemon}));
 
         var result = $("#result");
diff --git a/storm-core/src/ui/public/search_result.html b/storm-core/src/ui/public/search_result.html
index ec44aa9..ee0efd8 100644
--- a/storm-core/src/ui/public/search_result.html
+++ b/storm-core/src/ui/public/search_result.html
@@ -21,7 +21,7 @@
 <link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -30,7 +30,7 @@
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/url.min.js" type="text/javascript"></script>
 <script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 </head>
 <body>
 <div class="container-fluid">
@@ -49,7 +49,7 @@
     var count = $.url("?count") || 2;
     var searchArchived = $.url("?searchArchived") || "";
 
-    $.get("/templates/search-result-page-template.html", function(template) {
+    getStatic("/templates/search-result-page-template.html", function(template) {
         $("#search-form").append(Mustache.render($(template).filter("#search-form-template").html(),{id: id, search: search, count: count, searchArchived: searchArchived}));
 
         var result = $("#result");
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 7ad84a4..0f7cf9a 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -24,7 +24,7 @@
 <link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
 <link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
 <link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
 <script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
 <script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -33,7 +33,7 @@
 <script src="/js/bootstrap-3.3.1.min.js" type="text/javascript"></script>
 <script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
 <script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
 <script src="/js/visualization.js" type="text/javascript"></script>
 <script src="/js/arbor.js" type="text/javascript"></script>
 <script src="/js/arbor-graphics.js" type="text/javascript"></script>
@@ -230,7 +230,7 @@
     };
     if (!responseData) {
         var topologyId = $.url("?id");
-        $.get ('/api/v1/topology/' + topologyId + '/logconfig', renderImpl);
+        getStatic ('/api/v1/topology/' + topologyId + '/logconfig', renderImpl);
     } else {
         renderImpl (responseData);
     }
@@ -264,7 +264,7 @@
     $.ajaxSetup({
         "error":function(jqXHR,textStatus,response) {
             var errorJson = jQuery.parseJSON(jqXHR.responseText);
-            $.get("/templates/json-error-template.html", function(template) {
+            getStatic("/templates/json-error-template.html", function(template) {
                 $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
             });
         }
@@ -272,7 +272,7 @@
 
     $.getJSON(url,function(response,status,jqXHR) {
         var uiUser = $("#ui-user");
-        $.get("/templates/user-template.html", function(template) {
+        getStatic("/templates/user-template.html", function(template) {
             uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
             $('#ui-user [data-toggle="tooltip"]').tooltip();
         });
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 53cf517..96e629d 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1147,37 +1147,6 @@
       (getAllNimbuses [this] `(leader-address))
       (close [this] true))))
 
-(deftest test-cleans-corrupt
-  (with-inprocess-zookeeper zk-port
-    (with-local-tmp [nimbus-dir]
-      (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
-        (letlocals
-         (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                           {STORM-ZOOKEEPER-SERVERS ["localhost"]
-                            STORM-CLUSTER-MODE "local"
-                            STORM-ZOOKEEPER-PORT zk-port
-                            STORM-LOCAL-DIR nimbus-dir}))
-         (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
-         (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (bind topology (Thrift/buildTopology
-                         {"1" (Thrift/prepareSpoutDetails
-                                (TestPlannerSpout. true) (Integer. 3))}
-                         {}))
-         (submit-local-topology nimbus "t1" {} topology)
-         (submit-local-topology nimbus "t2" {} topology)
-         (bind storm-id1 (StormCommon/getStormId cluster-state "t1"))
-         (bind storm-id2 (StormCommon/getStormId cluster-state "t2"))
-         (.shutdown nimbus)
-         (let [blob-store (Utils/getNimbusBlobStore conf nil)]
-           (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state)
-           (.shutdown blob-store))
-         (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (is ( = #{storm-id2} (set (.activeStorms cluster-state))))
-         (.shutdown nimbus)
-         (.disconnect cluster-state)
-         )))))
-
 ;(deftest test-no-overlapping-slots
 ;  ;; test that same node+port never appears across 2 assignments
 ;  )
@@ -1224,7 +1193,7 @@
   (with-inprocess-zookeeper zk-port
     (with-local-tmp [nimbus-dir]
       (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
+                      (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                        {STORM-ZOOKEEPER-SERVERS ["localhost"]
@@ -1239,7 +1208,7 @@
                            {}))
 
           (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf] (mock-leader-elector :is-leader false))))]
+                          (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector :is-leader false))))]
 
             (letlocals
               (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1499,7 +1468,7 @@
                   _ (UtilsInstaller. fake-utils)
                   - (StormCommonInstaller. fake-common)
                   zk-le (MockedZookeeper. (proxy [Zookeeper] []
-                          (zkLeaderElectorImpl [conf] nil)))
+                          (zkLeaderElectorImpl [conf blob-store] nil)))
                   mocked-cluster (MockedCluster. cluster-utils)]
         (stubbing [nimbus/file-cache-map nil
                  nimbus/mk-blob-cache-map nil
@@ -1563,7 +1532,7 @@
   (with-inprocess-zookeeper zk-port
     (with-local-tmp [nimbus-dir]
       (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
-                      (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
+                      (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
         (letlocals
           (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
                        {STORM-ZOOKEEPER-SERVERS ["localhost"]
diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
index a545427..6645566 100644
--- a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
@@ -187,7 +187,7 @@
             windowManager.add(i, now - 1000);
         }
         // simulate the time trigger by setting the reference time and invoking onTrigger() manually
-        evictionPolicy.setContext(now + 100);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 100));
         windowManager.onTrigger();
 
         // 100 events with past ts should expire
@@ -210,7 +210,7 @@
         }
         activationsEvents.addAll(newEvents);
         // simulate the time trigger by setting the reference time and invoking onTrigger() manually
-        evictionPolicy.setContext(now + 200);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 200));
         windowManager.onTrigger();
         assertTrue(listener.onExpiryEvents.isEmpty());
         assertEquals(activationsEvents, listener.onActivationEvents);
@@ -236,20 +236,20 @@
             windowManager.add(i);
         }
         // simulate the time trigger by setting the reference time and invoking onTrigger() manually
-        evictionPolicy.setContext(now + 60);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 60));
         windowManager.onTrigger();
 
         assertEquals(seq(1, 10), listener.onActivationEvents);
         assertTrue(listener.onActivationExpiredEvents.isEmpty());
         listener.clear();
         // wait so all events expire
-        evictionPolicy.setContext(now + 120);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 120));
         windowManager.onTrigger();
 
         assertEquals(seq(1, 10), listener.onExpiryEvents);
         assertTrue(listener.onActivationEvents.isEmpty());
         listener.clear();
-        evictionPolicy.setContext(now + 180);
+        evictionPolicy.setContext(new DefaultEvictionContext(now + 180));
         windowManager.onTrigger();
         assertTrue(listener.onActivationExpiredEvents.isEmpty());
         assertTrue(listener.onActivationEvents.isEmpty());
@@ -354,7 +354,7 @@
 
     @Test
     public void testCountBasedWindowWithEventTs() throws Exception {
-        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3, windowManager);
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3);
         windowManager.setEvictionPolicy(evictionPolicy);
         TriggerPolicy<Integer> triggerPolicy = new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager);
         triggerPolicy.start();
@@ -430,7 +430,64 @@
         assertEquals(seq(9), listener.allOnActivationEvents.get(0));
         assertEquals(seq(9, 12), listener.allOnActivationEvents.get(1));
     }
+
     @Test
+    public void testCountBasedTumblingWithSameEventTs() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(2);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer> triggerPolicy = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(1, 10);
+        windowManager.add(2, 10);
+        windowManager.add(3, 11);
+        windowManager.add(4, 12);
+        windowManager.add(5, 12);
+        windowManager.add(6, 12);
+        windowManager.add(7, 12);
+        windowManager.add(8, 13);
+        windowManager.add(9, 14);
+        windowManager.add(10, 15);
+
+        windowManager.add(new WaterMarkEvent<Integer>(20));
+        assertEquals(5, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(3, 4), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(5, 6), listener.allOnActivationEvents.get(2));
+        assertEquals(seq(7, 8), listener.allOnActivationEvents.get(3));
+        assertEquals(seq(9, 10), listener.allOnActivationEvents.get(4));
+    }
+
+    @Test
+    public void testCountBasedSlidingWithSameEventTs() throws Exception {
+        EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(5);
+        windowManager.setEvictionPolicy(evictionPolicy);
+        TriggerPolicy<Integer> triggerPolicy = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+        triggerPolicy.start();
+        windowManager.setTriggerPolicy(triggerPolicy);
+
+        windowManager.add(1, 10);
+        windowManager.add(2, 10);
+        windowManager.add(3, 11);
+        windowManager.add(4, 12);
+        windowManager.add(5, 12);
+        windowManager.add(6, 12);
+        windowManager.add(7, 12);
+        windowManager.add(8, 13);
+        windowManager.add(9, 14);
+        windowManager.add(10, 15);
+
+        windowManager.add(new WaterMarkEvent<Integer>(20));
+        assertEquals(5, listener.allOnActivationEvents.size());
+        assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0));
+        assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1));
+        assertEquals(seq(2, 6), listener.allOnActivationEvents.get(2));
+        assertEquals(seq(4, 8), listener.allOnActivationEvents.get(3));
+        assertEquals(seq(6, 10), listener.allOnActivationEvents.get(4));
+
+    }
+        @Test
     public void testEventTimeLag() throws Exception {
         EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5);
         windowManager.setEvictionPolicy(evictionPolicy);
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index dd88a87..f7d0c23 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -30,6 +30,16 @@
     <name>Storm Binary Distribution</name>
     <description>Storm binary distribution</description>
 
+    <!--
+        Used for cache busting in the Storm UI HTML and script.js files
+        See src/main/assembly/pom.xml for the fileSet rules with
+            <filtered>true</filtered>
+    -->
+    <properties>
+        <packageTimestamp>${maven.build.timestamp}</packageTimestamp>
+        <maven.build.timestamp.format>YYYYMMddHHmm</maven.build.timestamp.format>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 930b4d7..b42de4f 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -46,13 +46,36 @@
             </includes>
             <fileMode>0755</fileMode>
         </fileSet>
+        <!--
+            Allow for variable substitution for cache busting in HTML files and script.js
+            See storm-dist/binary/pom.xml for custom variables that will be substituted in.
+
+            The source files should have a ${variable to be replaced} wherever
+            maven assembly plugin should inject a value.
+        -->
+        <fileSet>
+            <directory>${project.basedir}/../../storm-core/src/ui/public</directory>
+            <outputDirectory>public</outputDirectory>
+            <includes>
+                <include>*.html</include>
+                <include>js/script.js</include>
+            </includes>
+            <excludes/>
+            <filtered>true</filtered>
+        </fileSet>
+        <!--
+            Include rest of public/ directory, without any filtering
+        -->
         <fileSet>
             <directory>${project.basedir}/../../storm-core/src/ui/public</directory>
             <outputDirectory>public</outputDirectory>
             <includes>
                 <include>*/**</include>
             </includes>
-            <excludes/>
+            <excludes>
+                <exclude>*.html</exclude>
+                <exclude>js/script.js</exclude>
+            </excludes>
         </fileSet>
         <fileSet>
             <directory>${project.basedir}/../../examples</directory>
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
index 642c393..9106390 100755
--- a/storm-multilang/python/src/main/resources/resources/storm.py
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -75,8 +75,8 @@
     return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
 
 def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
+    print(json_encode(msg))
+    print("end")
     sys.stdout.flush()
 
 def sync():
@@ -196,7 +196,7 @@
                     sync()
                 else:
                     self.process(tup)
-        except Exception, e:
+        except Exception as e:
             reportError(traceback.format_exc(e))
 
 class BasicBolt(object):
@@ -222,10 +222,10 @@
                     try:
                         self.process(tup)
                         ack(tup)
-                    except Exception, e:
+                    except Exception as e:
                         reportError(traceback.format_exc(e))
                         fail(tup)
-        except Exception, e:
+        except Exception as e:
             reportError(traceback.format_exc(e))
 
 class Spout(object):
@@ -256,5 +256,5 @@
                 if msg["command"] == "fail":
                     self.fail(msg["id"])
                 sync()
-        except Exception, e:
+        except Exception as e:
             reportError(traceback.format_exc(e))