Merge branch 'STORM-2000' of github.com:satishd/storm
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 361c368..9b3b699 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
## 2.0.0
+ * STORM-1962: support python 3 and 2 in multilang
+ * STORM-1964: Unexpected behavior when using count window together with timestamp extraction
+ * STORM-1890: ensure we refetch static resources after package build
* STORM-1966 Expand metric having Map type as value into multiple metrics based on entries
* STORM-1737: storm-kafka-client has compilation errors with Apache Kafka 0.10
* STORM-1910 One topology cannot use hdfs spout to read from two locations
@@ -123,6 +126,9 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-1988: Kafka Offset not showing due to bad classpath.
+ * STORM-1987: Fix TridentKafkaWordCount arg handling in distributed mode.
+ * STORM-1969: Modify HiveTopology to show usage of non-partition table.
* STORM-1950: Change response json of "Topology Lag" REST API to keyed by spoutId, topic, partition.
* STORM-1833: Simple equi-join in storm-sql standalone mode
* STORM-1866: Update Resource Aware Scheduler Documentation
@@ -155,6 +161,8 @@
* STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
## 1.0.2
+ * STORM-1976: Remove cleanup-corrupt-topologies!
+ * STORM-1977: Restore logic: give up leadership when elected as leader but doesn't have one or more of topology codes on local
* STORM-1939: Frequent InterruptedException raised by ShellBoltMessageQueue.poll
* STORM-1928: ShellSpout should check heartbeat while ShellSpout is waiting for subprocess to sync
* STORM-1922: Supervisor summary default order by host
diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor
index a47a27d..bfe50b7 100755
--- a/bin/storm-kafka-monitor
+++ b/bin/storm-kafka-monitor
@@ -40,4 +40,4 @@
JAVA="$JAVA_HOME/bin/java"
fi
-exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/storm-kafka-monitor*.jar" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
+exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
index 2b25fad..0499041 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
@@ -213,7 +213,7 @@
System.exit(1);
} else if (args.length == 1) {
zkUrl = args[0];
- } else if (args.length == 2) {
+ } else {
zkUrl = args[0];
brokerUrl = args[1];
}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
index db45af2..a7a79fb 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
@@ -38,7 +38,7 @@
/**
* Given a endPoint, returns a RecordWriter with columnNames.
*
- * @param tuple
+ * @param endPoint
* @return
*/
@@ -66,7 +66,7 @@
/**
* Given a TridetnTuple, return a hive partition values list.
*
- * @param TridentTuple
+ * @param tuple
* @return List<String>
*/
List<String> mapPartitions(TridentTuple tuple);
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
index 4df1c60..dcc26a4 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
@@ -324,7 +324,7 @@
/**
* if there are remainingTransactions in current txnBatch, begins nextTransactions
* otherwise creates new txnBatch.
- * @param boolean rollToNext
+ * @param rollToNext
*/
private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
if(txnBatch.remainingTransactions() == 0) {
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
index 8b61d5e..4afd298 100644
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
+++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java
@@ -51,7 +51,6 @@
config.setNumWorkers(1);
UserDataSpout spout = new UserDataSpout();
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withTimeAsPartitionField("yyyy/MM/dd/hh")
.withColumnFields(new Fields(colNames));
HiveOptions hiveOptions;
if (args.length == 6) {
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 315e3e9..1beec0e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -313,12 +313,16 @@
"kafkaProps=" + kafkaProps +
", keyDeserializer=" + keyDeserializer +
", valueDeserializer=" + valueDeserializer +
- ", topics=" + getSubscribedTopics() +
- ", topicWildcardPattern=" + getTopicWildcardPattern() +
- ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
", pollTimeoutMs=" + pollTimeoutMs +
", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
", maxRetries=" + maxRetries +
+ ", maxUncommittedOffsets=" + maxUncommittedOffsets +
+ ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
+ ", kafkaSpoutStreams=" + kafkaSpoutStreams +
+ ", tuplesBuilder=" + tuplesBuilder +
+ ", retryService=" + retryService +
+ ", topics=" + getSubscribedTopics() +
+ ", topicWildcardPattern=" + getTopicWildcardPattern() +
'}';
}
}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 78fd265..6fe997c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -264,8 +264,8 @@
private long nextTime(KafkaSpoutMessageId msgId) {
final long currentTimeNanos = System.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
- ? currentTimeNanos + initialDelay.lengthNanos()
- : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+ ? currentTimeNanos + initialDelay.lengthNanos
+ : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
}
diff --git a/pom.xml b/pom.xml
index 06ea3a4..31cbe97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -209,19 +209,19 @@
<!-- dependency versions -->
<clojure.version>1.7.0</clojure.version>
<java_jmx.version>0.3.1</java_jmx.version>
- <compojure.version>1.1.3</compojure.version>
+ <compojure.version>1.1.9</compojure.version>
<hiccup.version>0.3.6</hiccup.version>
<commons-compress.version>1.4.1</commons-compress.version>
<commons-io.version>2.5</commons-io.version>
<commons-lang.version>2.5</commons-lang.version>
<commons-exec.version>1.1</commons-exec.version>
- <commons-fileupload.version>1.2.1</commons-fileupload.version>
+ <commons-fileupload.version>1.3.2</commons-fileupload.version>
<commons-codec.version>1.6</commons-codec.version>
<commons-cli.version>1.3.1</commons-cli.version>
<clj-time.version>0.8.0</clj-time.version>
<curator.version>2.10.0</curator.version>
<json-simple.version>1.1</json-simple.version>
- <ring.version>1.3.0</ring.version>
+ <ring.version>1.3.1</ring.version>
<ring-json.version>0.3.1</ring-json.version>
<jetty.version>7.6.13.v20130916</jetty.version>
<clojure.tools.logging.version>0.2.3</clojure.tools.logging.version>
@@ -673,6 +673,11 @@
</dependency>
<dependency>
<groupId>ring</groupId>
+ <artifactId>ring-core</artifactId>
+ <version>${ring.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ring</groupId>
<artifactId>ring-devel</artifactId>
<version>${ring.version}</version>
</dependency>
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b03dc20..1220746 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -74,6 +74,10 @@
</dependency>
<dependency>
<groupId>ring</groupId>
+ <artifactId>ring-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ring</groupId>
<artifactId>ring-devel</artifactId>
</dependency>
<dependency>
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8d193e6..5ed4d39 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -25,7 +25,8 @@
(defn -main [^String tmpjarpath & args]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- zk-leader-elector (Zookeeper/zkLeaderElector conf)
+ ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
+ zk-leader-elector (Zookeeper/zkLeaderElector conf nil)
leader-nimbus (.getLeader zk-leader-elector)
host (.getHost leader-nimbus)
port (.getPort leader-nimbus)
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 0391adb..93507ff 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -179,7 +179,8 @@
(get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
(defn nimbus-data [conf inimbus]
- (let [forced-scheduler (.getForcedScheduler inimbus)]
+ (let [forced-scheduler (.getForcedScheduler inimbus)
+ blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))]
{:conf conf
:nimbus-host-port-info (NimbusInfo/fromConf conf)
:inimbus inimbus
@@ -197,7 +198,7 @@
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
- :blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+ :blob-store blob-store
:blob-downloaders (mk-blob-cache-map conf)
:blob-uploaders (mk-blob-cache-map conf)
:blob-listers (mk-bloblist-cache-map conf)
@@ -211,7 +212,7 @@
(Utils/exitProcess 20 "Error when processing an event"))))
:scheduler (mk-scheduler conf inimbus)
- :leader-elector (Zookeeper/zkLeaderElector conf)
+ :leader-elector (Zookeeper/zkLeaderElector conf blob-store)
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
@@ -1166,19 +1167,6 @@
topo-history-state (:topo-history-state nimbus)]
(.filterOldTopologies ^LocalState topo-history-state cutoff-age))))
-(defn cleanup-corrupt-topologies! [nimbus]
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- blob-store (:blob-store nimbus)
- code-ids (set (code-ids blob-store))
- active-topologies (set (.activeStorms storm-cluster-state))
- corrupt-topologies (set/difference active-topologies code-ids)]
- (doseq [corrupt corrupt-topologies]
- (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
- (.removeStorm storm-cluster-state corrupt)
- (if (instance? LocalFsBlobStore blob-store)
- (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
- (.removeBlobstoreKey storm-cluster-state blob-key))))))
-
(defn setup-blobstore [nimbus]
"Sets up blobstore state for all current keys."
(let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -2202,7 +2190,6 @@
STORM-VERSION))
(.addToLeaderLockQueue (:leader-elector nimbus))
- (cleanup-corrupt-topologies! nimbus)
(when (instance? LocalFsBlobStore blob-store)
;register call back for blob-store
(.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 3ad955d..831725a 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -18,7 +18,8 @@
(:use compojure.core)
(:use [clojure.java.shell :only [sh]])
(:use ring.middleware.reload
- ring.middleware.multipart-params)
+ ring.middleware.multipart-params
+ ring.middleware.multipart-params.temp-file)
(:use [ring.middleware.json :only [wrap-json-params]])
(:use [hiccup core page-helpers])
(:use [org.apache.storm config util log converter])
diff --git a/storm-core/src/clj/org/apache/storm/ui/helpers.clj b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
index e681cfb..ac1ecd1 100644
--- a/storm-core/src/clj/org/apache/storm/ui/helpers.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/helpers.clj
@@ -36,22 +36,35 @@
[org.eclipse.jetty.server DispatcherType]
[org.eclipse.jetty.servlets CrossOriginFilter]
(org.json.simple JSONValue))
- (:require [ring.util servlet])
+ (:require [ring.util servlet]
+ [ring.util.response :as response])
(:require [compojure.route :as route]
[compojure.handler :as handler]))
;; TODO this function and its callings will be replace when ui.core and logviewer and drpc move to Java
(def num-web-requests (StormMetricsRegistry/registerMeter "num-web-requests"))
+
(defn requests-middleware
- "Coda Hale metric for counting the number of web requests."
+ "Wrap request with Coda Hale metric for counting the number of web requests,
+ and add Cache-Control: no-cache for html files in root directory (index.html, topology.html, etc)"
[handler]
(fn [req]
(.mark num-web-requests)
- (handler req)))
+ (let [uri (:uri req)
+ res (handler req)
+ content-type (response/get-header res "Content-Type")]
+ ;; check that the response is html and that the path is for a root page: a single / in the path
+ ;; then we know we don't want it cached (e.g. /index.html)
+ (if (and (= content-type "text/html")
+ (= 1 (count (re-seq #"/" uri))))
+ ;; response for html page in root directory, no-cache
+ (response/header res "Cache-Control" "no-cache")
+ ;; else, carry on
+ res))))
;; TODO this function and its callings will be replace when ui.core and logviewer move to Java
(defnk json-response
[data callback :need-serialize true :status 200 :headers {}]
{:status status
:headers (UIHelpers/getJsonResponseHeaders callback headers)
- :body (UIHelpers/getJsonResponseBody data callback need-serialize)})
\ No newline at end of file
+ :body (UIHelpers/getJsonResponseBody data callback need-serialize)})
diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index fe0f4da..3f245e1 100644
--- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -253,7 +253,7 @@
WindowManager<Tuple> manager) {
if (windowLengthCount != null) {
if (isTupleTs()) {
- return new WatermarkCountEvictionPolicy<>(windowLengthCount.value, manager);
+ return new WatermarkCountEvictionPolicy<>(windowLengthCount.value);
} else {
return new CountEvictionPolicy<>(windowLengthCount.value);
}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
index 0e7b233..fb12202 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountEvictionPolicy.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.windowing;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* An eviction policy that tracks event counts and can
@@ -26,12 +26,12 @@
* @param <T> the type of event tracked by this policy.
*/
public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
- private final int threshold;
- protected final AtomicInteger currentCount;
+ protected final int threshold;
+ protected final AtomicLong currentCount;
public CountEvictionPolicy(int count) {
this.threshold = count;
- this.currentCount = new AtomicInteger();
+ this.currentCount = new AtomicLong();
}
@Override
@@ -41,7 +41,7 @@
* return if the event should be evicted
*/
while (true) {
- int curVal = currentCount.get();
+ long curVal = currentCount.get();
if (curVal > threshold) {
if (currentCount.compareAndSet(curVal, curVal - 1)) {
return Action.EXPIRE;
@@ -61,7 +61,7 @@
}
@Override
- public void setContext(Object context) {
+ public void setContext(EvictionContext context) {
// NOOP
}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
index 2594c67..17750b6 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/CountTriggerPolicy.java
@@ -44,7 +44,7 @@
public void track(Event<T> event) {
if (started && !event.isWatermark()) {
if (currentCount.incrementAndGet() >= count) {
- evictionPolicy.setContext(System.currentTimeMillis());
+ evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
handler.onTrigger();
}
}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
new file mode 100644
index 0000000..ef65f66
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/windowing/DefaultEvictionContext.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+public class DefaultEvictionContext implements EvictionContext {
+ private final Long referenceTime;
+ private final Long currentCount;
+ private final Long slidingCount;
+
+ public DefaultEvictionContext(Long referenceTime) {
+ this(referenceTime, null);
+ }
+
+ public DefaultEvictionContext(Long referenceTime, Long currentCount) {
+ this(referenceTime, currentCount, null);
+ }
+
+ public DefaultEvictionContext(Long referenceTime, Long currentCount, Long slidingCount) {
+ this.referenceTime = referenceTime;
+ this.currentCount = currentCount;
+ this.slidingCount = slidingCount;
+ }
+
+ @Override
+ public Long getReferenceTime() {
+ return referenceTime;
+ }
+
+ @Override
+ public Long getCurrentCount() {
+ return currentCount;
+ }
+
+ @Override
+ public Long getSlidingCount() {
+ return slidingCount;
+ }
+}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java
new file mode 100644
index 0000000..37dcfd9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionContext.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.windowing;
+
+/**
+ * Context information that can be used by the eviction policy
+ */
+public interface EvictionContext {
+ /**
+ * Returns the reference time that the eviction policy could use to
+ * evict the events. In the case of event time processing, this would be
+ * the watermark time.
+ *
+ * @return the reference time in millis
+ */
+ Long getReferenceTime();
+
+ /**
+ * Returns the sliding count for count based windows
+ *
+ * @return the sliding count
+ */
+ Long getSlidingCount();
+
+ /**
+ * Returns the current count of events in the queue up to the reference tim
+ * based on which count based evictions can be performed.
+ *
+ * @return the current count
+ */
+ Long getCurrentCount();
+}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
index 8128d80..05e4d93 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/EvictionPolicy.java
@@ -70,5 +70,6 @@
*
* @param context
*/
- void setContext(Object context);
+ void setContext(EvictionContext context);
+
}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
index a2db239..e646207 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeEvictionPolicy.java
@@ -57,8 +57,8 @@
}
@Override
- public void setContext(Object context) {
- referenceTime = ((Number) context).longValue();
+ public void setContext(EvictionContext context) {
+ referenceTime = context.getReferenceTime();
}
@Override
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
index 1ad7714..b057afb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java
@@ -115,7 +115,7 @@
* to evict the events
*/
if (evictionPolicy != null) {
- evictionPolicy.setContext(System.currentTimeMillis());
+ evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
}
handler.onTrigger();
} catch (Throwable th) {
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
index e1af6fb..74240bb 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountEvictionPolicy.java
@@ -24,25 +24,29 @@
* @param <T> the type of event tracked by this policy.
*/
public class WatermarkCountEvictionPolicy<T> extends CountEvictionPolicy<T> {
- private final WindowManager<T> windowManager;
/*
* The reference time in millis for window calculations and
* expiring events. If not set it will default to System.currentTimeMillis()
*/
private long referenceTime;
+ private long processed = 0L;
- public WatermarkCountEvictionPolicy(int count, WindowManager<T> windowManager) {
+ public WatermarkCountEvictionPolicy(int count) {
super(count);
- this.windowManager = windowManager;
}
@Override
public Action evict(Event<T> event) {
- if (event.getTimestamp() <= referenceTime) {
- return super.evict(event);
+ Action action;
+ if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) {
+ action = super.evict(event);
+ if (action == Action.PROCESS) {
+ ++processed;
+ }
} else {
- return Action.KEEP;
+ action = Action.KEEP;
}
+ return action;
}
@Override
@@ -51,9 +55,14 @@
}
@Override
- public void setContext(Object context) {
- referenceTime = (Long) context;
- currentCount.set(windowManager.getEventCount(referenceTime));
+ public void setContext(EvictionContext context) {
+ referenceTime = context.getReferenceTime();
+ if (context.getCurrentCount() != null) {
+ currentCount.set(context.getCurrentCount());
+ } else {
+ currentCount.set(processed + context.getSlidingCount());
+ }
+ processed = 0;
}
@Override
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
index 391efee..3cfcaad 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkCountTriggerPolicy.java
@@ -74,7 +74,7 @@
long watermarkTs = waterMarkEvent.getTimestamp();
List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
for (long ts : eventTs) {
- evictionPolicy.setContext(ts);
+ evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
handler.onTrigger();
lastProcessedTs = ts;
}
diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
index 3734c93..00620c4 100644
--- a/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
+++ b/storm-core/src/jvm/org/apache/storm/windowing/WatermarkTimeTriggerPolicy.java
@@ -75,7 +75,8 @@
long windowEndTs = nextWindowEndTs;
LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
while (windowEndTs <= watermarkTs) {
- evictionPolicy.setContext(windowEndTs);
+ long currentCount = windowManager.getEventCount(windowEndTs);
+ evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
if (handler.onTrigger()) {
windowEndTs += slidingIntervalMs;
} else {
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
index 7923b7e..74816c2 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -21,6 +21,7 @@
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Utils;
@@ -36,16 +37,17 @@
public class LeaderElectorImp implements ILeaderElector {
private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
- private final Map conf;
- private final List<String> servers;
- private final CuratorFramework zk;
- private final String leaderlockPath;
- private final String id;
- private final AtomicReference<LeaderLatch> leaderLatch;
- private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+ private final Map conf;
+ private final List<String> servers;
+ private final CuratorFramework zk;
+ private final String leaderlockPath;
+ private final String id;
+ private final AtomicReference<LeaderLatch> leaderLatch;
+ private final AtomicReference<LeaderLatchListener> leaderLatchListener;
+ private final BlobStore blobStore;
public LeaderElectorImp(Map conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference<LeaderLatch> leaderLatch,
- AtomicReference<LeaderLatchListener> leaderLatchListener) {
+ AtomicReference<LeaderLatchListener> leaderLatchListener, BlobStore blobStore) {
this.conf = conf;
this.servers = servers;
this.zk = zk;
@@ -53,6 +55,7 @@
this.id = id;
this.leaderLatch = leaderLatch;
this.leaderLatchListener = leaderLatchListener;
+ this.blobStore = blobStore;
}
@Override
@@ -65,7 +68,7 @@
// if this latch is already closed, we need to create new instance.
if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
- leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, leaderLatch.get()));
+ leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get()));
LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
}
// Only if the latch is not already started we invoke start
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 5e9039a..7723922 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -17,6 +17,9 @@
*/
package org.apache.storm.zookeeper;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
+
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
@@ -27,11 +30,15 @@
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.KeyFilter;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.apache.zookeeper.KeeperException;
@@ -44,6 +51,7 @@
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -323,26 +331,51 @@
}
// Leader latch listener that will be invoked when we either gain or lose leadership
- public static LeaderLatchListener leaderLatchListenerImpl(Map conf, CuratorFramework zk, LeaderLatch leaderLatch) throws UnknownHostException {
+ public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
return new LeaderLatchListener() {
@Override
public void isLeader() {
- LOG.info("{} gained leadership", hostName);
+ Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+ Set<String> localTopologyIds = blobStore.filterAndListKeys(new KeyFilter<String>() {
+ @Override
+ public String filter(String key) {
+ return ConfigUtils.getIdFromBlobKey(key);
+ }
+ });
+ Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, localTopologyIds);
+ LOG.info("active-topology-ids [{}] local-topology-ids [{}] diff-topology [{}]",
+ generateJoinedString(activeTopologyIds), generateJoinedString(localTopologyIds),
+ generateJoinedString(diffTopology));
+
+ if (diffTopology.isEmpty()) {
+ LOG.info("Accepting leadership, all active topology found locally.");
+ } else {
+ LOG.info("code for all active topologies not available locally, giving up leadership.");
+ try {
+ leaderLatch.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
public void notLeader() {
LOG.info("{} lost leadership.", hostName);
}
+
+ private String generateJoinedString(Set<String> activeTopologyIds) {
+ return Joiner.on(",").join(activeTopologyIds);
+ }
};
}
- public static ILeaderElector zkLeaderElector(Map conf) throws UnknownHostException {
- return _instance.zkLeaderElectorImpl(conf);
+ public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
+ return _instance.zkLeaderElectorImpl(conf, blobStore);
}
- protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException {
+ protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
@@ -350,8 +383,9 @@
String id = NimbusInfo.fromConf(conf).toHostPortString();
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
- new AtomicReference<>(leaderLatchListenerImpl(conf, zk, leaderLatchAtomicReference.get()));
- return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference);
+ new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
+ return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
+ leaderLatchListenerAtomicReference, blobStore);
}
public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
diff --git a/storm-core/src/ui/public/component.html b/storm-core/src/ui/public/component.html
index 6d5465f..005ac5d 100644
--- a/storm-core/src/ui/public/component.html
+++ b/storm-core/src/ui/public/component.html
@@ -22,7 +22,7 @@
<link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
<link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
<link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
<script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -32,7 +32,7 @@
<script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
<script src="/js/moment.min.js" type="text/javascript"></script>
<script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
</head>
<body>
<div class="container-fluid">
@@ -83,7 +83,7 @@
try {
other();
} catch (err) {
- $.get("/templates/json-error-template.html", function(template) {
+ getStatic("/templates/json-error-template.html", function(template) {
$("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
});
}
@@ -156,7 +156,7 @@
$.ajaxSetup({
"error":function(jqXHR,textStatus,response) {
var errorJson = jQuery.parseJSON(jqXHR.responseText);
- $.get("/templates/json-error-template.html", function(template) {
+ getStatic("/templates/json-error-template.html", function(template) {
$("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
});
}
@@ -187,7 +187,7 @@
$.getJSON(url,function(response,status,jqXHR) {
var uiUser = $("#ui-user");
- $.get("/templates/user-template.html", function(template) {
+ getStatic("/templates/user-template.html", function(template) {
uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
$('#ui-user [data-toggle="tooltip"]').tooltip()
});
@@ -202,7 +202,7 @@
var profilerControl = $("#profiler-control");
var executorStats = $("#component-executor-stats");
var componentErrors = $("#component-errors");
- $.get("/templates/component-page-template.html", function(template) {
+ getStatic("/templates/component-page-template.html", function(template) {
response["profilerActive"] = $.map(response["profilerActive"], function(active_map) {
var date = new Date();
var millis = date.getTime() + parseInt(active_map["timestamp"]);
@@ -341,9 +341,9 @@
var passed = {}
Object.keys(workerActionSelected).forEach(function (id) {
var url = "/api/v1/topology/"+topologyId+"/profiling/start/" + id + "/" + timeout;
- $.get(url, function(response,status,jqXHR) {
+ getStatic(url, function(response,status,jqXHR) {
jsError(function() {
- $.get("/templates/component-page-template.html", function(template) {
+ getStatic("/templates/component-page-template.html", function(template) {
var host_port_split = id.split(":");
var host = host_port_split[0];
var port = host_port_split[1];
@@ -382,7 +382,7 @@
$("#stop_" + id).prop('disabled', true);
setTimeout(function(){ $("#stop_" + id).prop('disabled', false); }, 5000);
- $.get(url, function(response,status,jqXHR) {
+ getStatic(url, function(response,status,jqXHR) {
alert("Submitted request to stop profiling...");
})
.fail(function(response) {
@@ -398,7 +398,7 @@
$("#dump_profile_" + id).prop('disabled', true);
setTimeout(function(){ $("#dump_profile_" + id).prop('disabled', false); }, 5000);
- $.get(url, function(response,status,jqXHR) {
+ getStatic(url, function(response,status,jqXHR) {
alert("Submitted request to dump profile snapshot...");
})
.fail(function(response) {
@@ -418,7 +418,7 @@
$("#dump_jstack_" + id).prop('disabled', true);
setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
- $.get(url)
+ getStatic(url)
.fail(function(response) {
failed[id] = response;
});
@@ -443,7 +443,7 @@
$("#dump_jstack_" + id).prop('disabled', true);
setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
- $.get(url, function(response,status,jqXHR) {
+ getStatic(url, function(response,status,jqXHR) {
alert("Submitted request for jstack dump...");
})
.fail(function(response) {
@@ -461,7 +461,7 @@
$("#restart_worker_jvm_" + id).prop('disabled', true);
setTimeout(function(){ $("#restart_worker_jvm_" + id).prop('disabled', false); }, 5000);
- $.get(url)
+ getStatic(url)
.fail(function(response) {
failed[id] = response;
});
@@ -489,7 +489,7 @@
$("#dump_heap_" + id).prop('disabled', true);
setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
- $.get(url)
+ getStatic(url)
.fail(function(response) {
failed[id] = response;
});
@@ -514,7 +514,7 @@
$("#dump_heap_" + id).prop('disabled', true);
setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
- $.get(url, function(response,status,jqXHR) {
+ getStatic(url, function(response,status,jqXHR) {
alert("Submitted request for jmap dump...");
})
.fail(function(response) {
diff --git a/storm-core/src/ui/public/deep_search_result.html b/storm-core/src/ui/public/deep_search_result.html
index 406a101..5b4f47b 100644
--- a/storm-core/src/ui/public/deep_search_result.html
+++ b/storm-core/src/ui/public/deep_search_result.html
@@ -21,7 +21,7 @@
<link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
<link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
<link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
<script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -31,7 +31,7 @@
<script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
<script src="/js/url.min.js" type="text/javascript"></script>
<script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
</head>
<body>
<div class="container-fluid">
@@ -56,12 +56,12 @@
var port = $.url("?port") || "*";
var search_archived = $.url("?search-archived");
- $.get("/templates/deep-search-result-page-template.html", function(template) {
+ getStatic("/templates/deep-search-result-page-template.html", function(template) {
var checked;
if (search_archived) {
checked = "checked";
}
- $.get("/api/v1/history/summary", function (response, status, jqXHR) {
+ getStatic("/api/v1/history/summary", function (response, status, jqXHR) {
var findIds = function findIds(query, cb) {
var found = [];
var re = new RegExp(query, 'i');
@@ -104,7 +104,7 @@
var result = $("#result");
if(search) {
- $.get("/api/v1/supervisor/summary", function(response, status, jqXHR) {
+ getStatic("/api/v1/supervisor/summary", function(response, status, jqXHR) {
for(var i in response.supervisors) {
response.supervisors[i].elemId = elem_id_for_host(response.supervisors[i].host);
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
index eae423c..52900a5 100644
--- a/storm-core/src/ui/public/index.html
+++ b/storm-core/src/ui/public/index.html
@@ -23,7 +23,7 @@
<link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
<link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
<link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
<script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -32,7 +32,7 @@
<script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
<script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
<script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
</head>
<body>
<div class="container-fluid">
@@ -96,7 +96,7 @@
$.ajaxSetup({
"error":function(jqXHR,textStatus,response) {
var errorJson = jQuery.parseJSON(jqXHR.responseText);
- $.get("/templates/json-error-template.html", function(template) {
+ getStatic("/templates/json-error-template.html", function(template) {
$("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
});
}
@@ -109,18 +109,18 @@
var config = $("#nimbus-configuration");
$.getJSON("/api/v1/cluster/summary",function(response,status,jqXHR) {
- $.get("/templates/user-template.html", function(template) {
+ getStatic("/templates/user-template.html", function(template) {
uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
$('#ui-user [data-toggle="tooltip"]').tooltip()
});
- $.get("/templates/index-page-template.html", function(template) {
+ getStatic("/templates/index-page-template.html", function(template) {
clusterSummary.append(Mustache.render($(template).filter("#cluster-summary-template").html(),response));
$('#cluster-summary [data-toggle="tooltip"]').tooltip();
});
});
$.getJSON("/api/v1/nimbus/summary",function(response,status,jqXHR) {
- $.get("/templates/index-page-template.html", function(template) {
+ getStatic("/templates/index-page-template.html", function(template) {
nimbusSummary.append(Mustache.render($(template).filter("#nimbus-summary-template").html(),response));
//host, port, isLeader, version, uptime
dtAutoPage("#nimbus-summary-table", {
@@ -133,7 +133,7 @@
});
});
$.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) {
- $.get("/templates/index-page-template.html", function(template) {
+ getStatic("/templates/index-page-template.html", function(template) {
topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
//name, owner, status, uptime, num workers, num executors, num tasks, replication count, assigned total mem, assigned total cpu, scheduler info
dtAutoPage("#topology-summary-table", {
@@ -146,7 +146,7 @@
});
});
$.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) {
- $.get("/templates/index-page-template.html", function(template) {
+ getStatic("/templates/index-page-template.html", function(template) {
supervisorSummary.append(Mustache.render($(template).filter("#supervisor-summary-template").html(),response));
//id, host, uptime, slots, used slots
dtAutoPage("#supervisor-summary-table", {
@@ -160,7 +160,7 @@
});
$.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) {
var formattedResponse = formatConfigData(response);
- $.get("/templates/index-page-template.html", function(template) {
+ getStatic("/templates/index-page-template.html", function(template) {
config.append(Mustache.render($(template).filter("#configuration-template").html(),formattedResponse));
$('#nimbus-configuration-table td').jsonFormatter()
//key, value
diff --git a/storm-core/src/ui/public/js/script.js b/storm-core/src/ui/public/js/script.js
index a880205..6d641e1 100644
--- a/storm-core/src/ui/public/js/script.js
+++ b/storm-core/src/ui/public/js/script.js
@@ -257,3 +257,15 @@
opacity: .5,
color: '#fff',margin:0,width:"30%",top:"40%",left:"35%",textAlign:"center"
};
+
+// add a url query param to static (templates normally) ajax requests
+// for cache busting
+function getStatic(url, cb) {
+ return $.ajax({
+ url: url,
+ data: {
+ '_ts': '${packageTimestamp}'
+ },
+ success: cb
+ });
+};
diff --git a/storm-core/src/ui/public/js/visualization.js b/storm-core/src/ui/public/js/visualization.js
index 9aab087..4daadc2 100644
--- a/storm-core/src/ui/public/js/visualization.js
+++ b/storm-core/src/ui/public/js/visualization.js
@@ -379,7 +379,7 @@
try {
other();
} catch (err) {
- $.get("/templates/json-error-template.html", function(template) {
+ getStatic("/templates/json-error-template.html", function(template) {
$("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
});
}
@@ -388,7 +388,7 @@
var should_update;
function show_visualization(sys) {
$.getJSON("/api/v1/topology/"+$.url("?id")+"/visualization-init",function(response,status,jqXHR) {
- $.get("/templates/topology-page-template.html", function(template) {
+ getStatic("/templates/topology-page-template.html", function(template) {
jsError(function() {
var topologyVisualization = $("#visualization-container");
topologyVisualization.append(
diff --git a/storm-core/src/ui/public/logviewer_search.html b/storm-core/src/ui/public/logviewer_search.html
index f981499..3a1682e 100644
--- a/storm-core/src/ui/public/logviewer_search.html
+++ b/storm-core/src/ui/public/logviewer_search.html
@@ -21,7 +21,7 @@
<link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
<link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
<link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
<script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -30,7 +30,7 @@
<script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
<script src="/js/url.min.js" type="text/javascript"></script>
<script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
</head>
<body>
<div class="container-fluid">
@@ -51,7 +51,7 @@
file = decodeURIComponent(file);
search = decodeURIComponent(search);
- $.get("/templates/logviewer-search-page-template.html", function(template) {
+ getStatic("/templates/logviewer-search-page-template.html", function(template) {
$("#search-form").append(Mustache.render($(template).filter("#search-single-file").html(),{file: file, search: search, isDaemon: isDaemon}));
var result = $("#result");
diff --git a/storm-core/src/ui/public/search_result.html b/storm-core/src/ui/public/search_result.html
index ec44aa9..ee0efd8 100644
--- a/storm-core/src/ui/public/search_result.html
+++ b/storm-core/src/ui/public/search_result.html
@@ -21,7 +21,7 @@
<link href="/css/bootstrap-3.3.1.min.css" rel="stylesheet" type="text/css">
<link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
<link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
<script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -30,7 +30,7 @@
<script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
<script src="/js/url.min.js" type="text/javascript"></script>
<script src="/js/dataTables.bootstrap.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
</head>
<body>
<div class="container-fluid">
@@ -49,7 +49,7 @@
var count = $.url("?count") || 2;
var searchArchived = $.url("?searchArchived") || "";
- $.get("/templates/search-result-page-template.html", function(template) {
+ getStatic("/templates/search-result-page-template.html", function(template) {
$("#search-form").append(Mustache.render($(template).filter("#search-form-template").html(),{id: id, search: search, count: count, searchArchived: searchArchived}));
var result = $("#result");
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 7ad84a4..0f7cf9a 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -24,7 +24,7 @@
<link href="/css/jquery.dataTables.1.10.4.min.css" rel="stylesheet" type="text/css">
<link href="/css/dataTables.bootstrap.css" rel="stylesheet" type="text/css">
<link href="/css/jsonFormatter.min.css" rel="stylesheet" type="text/css">
-<link href="/css/style.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css?_ts=${packageTimestamp}" rel="stylesheet" type="text/css">
<script src="/js/jquery-1.11.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.dataTables.1.10.4.min.js" type="text/javascript"></script>
<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
@@ -33,7 +33,7 @@
<script src="/js/bootstrap-3.3.1.min.js" type="text/javascript"></script>
<script src="/js/jquery.blockUI.min.js" type="text/javascript"></script>
<script src="/js/jsonFormatter.min.js" type="text/javascript"></script>
-<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/script.js?_ts=${packageTimestamp}" type="text/javascript"></script>
<script src="/js/visualization.js" type="text/javascript"></script>
<script src="/js/arbor.js" type="text/javascript"></script>
<script src="/js/arbor-graphics.js" type="text/javascript"></script>
@@ -230,7 +230,7 @@
};
if (!responseData) {
var topologyId = $.url("?id");
- $.get ('/api/v1/topology/' + topologyId + '/logconfig', renderImpl);
+ getStatic ('/api/v1/topology/' + topologyId + '/logconfig', renderImpl);
} else {
renderImpl (responseData);
}
@@ -264,7 +264,7 @@
$.ajaxSetup({
"error":function(jqXHR,textStatus,response) {
var errorJson = jQuery.parseJSON(jqXHR.responseText);
- $.get("/templates/json-error-template.html", function(template) {
+ getStatic("/templates/json-error-template.html", function(template) {
$("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
});
}
@@ -272,7 +272,7 @@
$.getJSON(url,function(response,status,jqXHR) {
var uiUser = $("#ui-user");
- $.get("/templates/user-template.html", function(template) {
+ getStatic("/templates/user-template.html", function(template) {
uiUser.append(Mustache.render($(template).filter("#user-template").html(),response));
$('#ui-user [data-toggle="tooltip"]').tooltip();
});
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 53cf517..96e629d 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -1147,37 +1147,6 @@
(getAllNimbuses [this] `(leader-address))
(close [this] true))))
-(deftest test-cleans-corrupt
- (with-inprocess-zookeeper zk-port
- (with-local-tmp [nimbus-dir]
- (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
- (letlocals
- (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-ZOOKEEPER-SERVERS ["localhost"]
- STORM-CLUSTER-MODE "local"
- STORM-ZOOKEEPER-PORT zk-port
- STORM-LOCAL-DIR nimbus-dir}))
- (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
- (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
- (bind topology (Thrift/buildTopology
- {"1" (Thrift/prepareSpoutDetails
- (TestPlannerSpout. true) (Integer. 3))}
- {}))
- (submit-local-topology nimbus "t1" {} topology)
- (submit-local-topology nimbus "t2" {} topology)
- (bind storm-id1 (StormCommon/getStormId cluster-state "t1"))
- (bind storm-id2 (StormCommon/getStormId cluster-state "t2"))
- (.shutdown nimbus)
- (let [blob-store (Utils/getNimbusBlobStore conf nil)]
- (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state)
- (.shutdown blob-store))
- (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
- (is ( = #{storm-id2} (set (.activeStorms cluster-state))))
- (.shutdown nimbus)
- (.disconnect cluster-state)
- )))))
-
;(deftest test-no-overlapping-slots
; ;; test that same node+port never appears across 2 assignments
; )
@@ -1224,7 +1193,7 @@
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
+ (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
@@ -1239,7 +1208,7 @@
{}))
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf] (mock-leader-elector :is-leader false))))]
+ (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector :is-leader false))))]
(letlocals
(bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1499,7 +1468,7 @@
_ (UtilsInstaller. fake-utils)
- (StormCommonInstaller. fake-common)
zk-le (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf] nil)))
+ (zkLeaderElectorImpl [conf blob-store] nil)))
mocked-cluster (MockedCluster. cluster-utils)]
(stubbing [nimbus/file-cache-map nil
nimbus/mk-blob-cache-map nil
@@ -1563,7 +1532,7 @@
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf] (mock-leader-elector))))]
+ (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-ZOOKEEPER-SERVERS ["localhost"]
diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
index a545427..6645566 100644
--- a/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/windowing/WindowManagerTest.java
@@ -187,7 +187,7 @@
windowManager.add(i, now - 1000);
}
// simulate the time trigger by setting the reference time and invoking onTrigger() manually
- evictionPolicy.setContext(now + 100);
+ evictionPolicy.setContext(new DefaultEvictionContext(now + 100));
windowManager.onTrigger();
// 100 events with past ts should expire
@@ -210,7 +210,7 @@
}
activationsEvents.addAll(newEvents);
// simulate the time trigger by setting the reference time and invoking onTrigger() manually
- evictionPolicy.setContext(now + 200);
+ evictionPolicy.setContext(new DefaultEvictionContext(now + 200));
windowManager.onTrigger();
assertTrue(listener.onExpiryEvents.isEmpty());
assertEquals(activationsEvents, listener.onActivationEvents);
@@ -236,20 +236,20 @@
windowManager.add(i);
}
// simulate the time trigger by setting the reference time and invoking onTrigger() manually
- evictionPolicy.setContext(now + 60);
+ evictionPolicy.setContext(new DefaultEvictionContext(now + 60));
windowManager.onTrigger();
assertEquals(seq(1, 10), listener.onActivationEvents);
assertTrue(listener.onActivationExpiredEvents.isEmpty());
listener.clear();
// wait so all events expire
- evictionPolicy.setContext(now + 120);
+ evictionPolicy.setContext(new DefaultEvictionContext(now + 120));
windowManager.onTrigger();
assertEquals(seq(1, 10), listener.onExpiryEvents);
assertTrue(listener.onActivationEvents.isEmpty());
listener.clear();
- evictionPolicy.setContext(now + 180);
+ evictionPolicy.setContext(new DefaultEvictionContext(now + 180));
windowManager.onTrigger();
assertTrue(listener.onActivationExpiredEvents.isEmpty());
assertTrue(listener.onActivationEvents.isEmpty());
@@ -354,7 +354,7 @@
@Test
public void testCountBasedWindowWithEventTs() throws Exception {
- EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3, windowManager);
+ EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(3);
windowManager.setEvictionPolicy(evictionPolicy);
TriggerPolicy<Integer> triggerPolicy = new WatermarkTimeTriggerPolicy<Integer>(10, windowManager, evictionPolicy, windowManager);
triggerPolicy.start();
@@ -430,7 +430,64 @@
assertEquals(seq(9), listener.allOnActivationEvents.get(0));
assertEquals(seq(9, 12), listener.allOnActivationEvents.get(1));
}
+
@Test
+ public void testCountBasedTumblingWithSameEventTs() throws Exception {
+ EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(2);
+ windowManager.setEvictionPolicy(evictionPolicy);
+ TriggerPolicy<Integer> triggerPolicy = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+ triggerPolicy.start();
+ windowManager.setTriggerPolicy(triggerPolicy);
+
+ windowManager.add(1, 10);
+ windowManager.add(2, 10);
+ windowManager.add(3, 11);
+ windowManager.add(4, 12);
+ windowManager.add(5, 12);
+ windowManager.add(6, 12);
+ windowManager.add(7, 12);
+ windowManager.add(8, 13);
+ windowManager.add(9, 14);
+ windowManager.add(10, 15);
+
+ windowManager.add(new WaterMarkEvent<Integer>(20));
+ assertEquals(5, listener.allOnActivationEvents.size());
+ assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0));
+ assertEquals(seq(3, 4), listener.allOnActivationEvents.get(1));
+ assertEquals(seq(5, 6), listener.allOnActivationEvents.get(2));
+ assertEquals(seq(7, 8), listener.allOnActivationEvents.get(3));
+ assertEquals(seq(9, 10), listener.allOnActivationEvents.get(4));
+ }
+
+ @Test
+ public void testCountBasedSlidingWithSameEventTs() throws Exception {
+ EvictionPolicy<Integer> evictionPolicy = new WatermarkCountEvictionPolicy<>(5);
+ windowManager.setEvictionPolicy(evictionPolicy);
+ TriggerPolicy<Integer> triggerPolicy = new WatermarkCountTriggerPolicy<Integer>(2, windowManager, evictionPolicy, windowManager);
+ triggerPolicy.start();
+ windowManager.setTriggerPolicy(triggerPolicy);
+
+ windowManager.add(1, 10);
+ windowManager.add(2, 10);
+ windowManager.add(3, 11);
+ windowManager.add(4, 12);
+ windowManager.add(5, 12);
+ windowManager.add(6, 12);
+ windowManager.add(7, 12);
+ windowManager.add(8, 13);
+ windowManager.add(9, 14);
+ windowManager.add(10, 15);
+
+ windowManager.add(new WaterMarkEvent<Integer>(20));
+ assertEquals(5, listener.allOnActivationEvents.size());
+ assertEquals(seq(1, 2), listener.allOnActivationEvents.get(0));
+ assertEquals(seq(1, 4), listener.allOnActivationEvents.get(1));
+ assertEquals(seq(2, 6), listener.allOnActivationEvents.get(2));
+ assertEquals(seq(4, 8), listener.allOnActivationEvents.get(3));
+ assertEquals(seq(6, 10), listener.allOnActivationEvents.get(4));
+
+ }
+ @Test
public void testEventTimeLag() throws Exception {
EvictionPolicy<Integer> evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5);
windowManager.setEvictionPolicy(evictionPolicy);
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index dd88a87..f7d0c23 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -30,6 +30,16 @@
<name>Storm Binary Distribution</name>
<description>Storm binary distribution</description>
+ <!--
+ Used for cache busting in the Storm UI HTML and script.js files
+ See src/main/assembly/pom.xml for the fileSet rules with
+ <filtered>true</filtered>
+ -->
+ <properties>
+ <packageTimestamp>${maven.build.timestamp}</packageTimestamp>
+ <maven.build.timestamp.format>YYYYMMddHHmm</maven.build.timestamp.format>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 930b4d7..b42de4f 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -46,13 +46,36 @@
</includes>
<fileMode>0755</fileMode>
</fileSet>
+ <!--
+ Allow for variable substitution for cache busting in HTML files and script.js
+ See storm-dist/binary/pom.xml for custom variables that will be substituted in.
+
+ The source files should have a ${variable to be replaced} wherever
+ maven assembly plugin should inject a value.
+ -->
+ <fileSet>
+ <directory>${project.basedir}/../../storm-core/src/ui/public</directory>
+ <outputDirectory>public</outputDirectory>
+ <includes>
+ <include>*.html</include>
+ <include>js/script.js</include>
+ </includes>
+ <excludes/>
+ <filtered>true</filtered>
+ </fileSet>
+ <!--
+ Include rest of public/ directory, without any filtering
+ -->
<fileSet>
<directory>${project.basedir}/../../storm-core/src/ui/public</directory>
<outputDirectory>public</outputDirectory>
<includes>
<include>*/**</include>
</includes>
- <excludes/>
+ <excludes>
+ <exclude>*.html</exclude>
+ <exclude>js/script.js</exclude>
+ </excludes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../../examples</directory>
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
index 642c393..9106390 100755
--- a/storm-multilang/python/src/main/resources/resources/storm.py
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -75,8 +75,8 @@
return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
def sendMsgToParent(msg):
- print json_encode(msg)
- print "end"
+ print(json_encode(msg))
+ print("end")
sys.stdout.flush()
def sync():
@@ -196,7 +196,7 @@
sync()
else:
self.process(tup)
- except Exception, e:
+ except Exception as e:
reportError(traceback.format_exc(e))
class BasicBolt(object):
@@ -222,10 +222,10 @@
try:
self.process(tup)
ack(tup)
- except Exception, e:
+ except Exception as e:
reportError(traceback.format_exc(e))
fail(tup)
- except Exception, e:
+ except Exception as e:
reportError(traceback.format_exc(e))
class Spout(object):
@@ -256,5 +256,5 @@
if msg["command"] == "fail":
self.fail(msg["id"])
sync()
- except Exception, e:
+ except Exception as e:
reportError(traceback.format_exc(e))