Merge pull request #710 from d2r/d2r-sys-stats-toggle-fix

Fix sys stats cookie being lost from UI requests
diff --git a/README.markdown b/README.markdown
index 5da35b0..9e3be66 100644
--- a/README.markdown
+++ b/README.markdown
@@ -10,8 +10,33 @@
 
 ## Getting help
 
-Feel free to ask questions on Storm's mailing list: http://groups.google.com/group/storm-user
+__NOTE:__ The google groups account storm-user@googlegroups.com is now officially deprecated in favor of the Apache-hosted user/dev mailing lists.
 
+### Storm Users
+Storm users should send messages and subscribe to [user@storm.incubator.apache.org](mailto:user@storm.incubator.apache.org).
+
+You can subscribe to this list by sending an email to [user-subscribe@storm.incubator.apache.org](mailto:user-subscribe@storm.incubator.apache.org). Likewise, you can cancel a subscription by sending an email to [user-unsubscribe@storm.incubator.apache.org](mailto:user-unsubscribe@storm.incubator.apache.org).
+
+You can view the archives of the mailing list [here](http://mail-archives.apache.org/mod_mbox/incubator-storm-user/).
+
+### Storm Developers
+Storm developers should send messages and subscribe to [dev@storm.incubator.apache.org](mailto:dev@storm.incubator.apache.org).
+
+You can subscribe to this list by sending an email to [dev-subscribe@storm.incubator.apache.org](mailto:dev-subscribe@storm.incubator.apache.org). Likewise, you can cancel a subscription by sending an email to [dev-unsubscribe@storm.incubator.apache.org](mailto:dev-unsubscribe@storm.incubator.apache.org).
+
+You can view the archives of the mailing list [here](http://mail-archives.apache.org/mod_mbox/incubator-storm-dev/).
+
+### Which list should I send/subscribe to?
+If you are using a pre-built binary distribution of Storm, then chances are you should send questions, comments, storm-related announcements, etc. to [user@storm.apache.incubator.org](user@storm.apache.incubator.org). 
+
+If you are building storm from source, developing new features, or otherwise hacking storm source code, then [dev@storm.incubator.apache.org](dev@storm.incubator.apache.org) is more appropriate. 
+
+### What will happen with storm-user@googlegroups.com?
+All existing messages will remain archived there, and can be accessed/searched [here](https://groups.google.com/forum/#!forum/storm-user).
+
+New messages sent to storm-user@googlegroups.com will either be rejected/bounced or replied to with a message to direct the email to the appropriate Apache-hosted group.
+
+### IRC
 You can also come to the #storm-user room on [freenode](http://freenode.net/). You can usually find a Storm developer there to help you out.
 
 ## License
@@ -27,11 +52,14 @@
 
 * Nathan Marz ([@nathanmarz](http://twitter.com/nathanmarz))
 
-## Core contributors
+## Committers
 
 * James Xu ([@xumingming](https://github.com/xumingming))
 * Jason Jackson ([@jason_j](http://twitter.com/jason_j))
 * Andy Feng ([@anfeng](https://github.com/anfeng))
+* Flip Kromer ([@mrflip](https://github.com/mrflip))
+* David Lao ([@davidlao2k](https://github.com/davidlao2k))
+* P. Taylor Goetz ([@ptgoetz](https://github.com/ptgoetz))
 
 ## Contributors
 
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index ede4034..08c7889 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -40,6 +40,7 @@
 
 logviewer.port: 8000
 logviewer.childopts: "-Xmx128m"
+logviewer.appender.name: "A1"
 
 
 drpc.port: 3772
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 431bf35..40f5561 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -283,9 +283,10 @@
         receive-queue (:receive-queue executor-data)
         context (:worker-context executor-data)]
     (when tick-time-secs
-      (if (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
-               (= :spout (:type executor-data)))
-        (log-message "Timeouts disabled for executor " (:executor-id executor-data))
+      (if (or (system-id? (:component-id executor-data))
+              (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
+                   (= :spout (:type executor-data))))
+        (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
         (schedule-recurring
           (:user-timer worker)
           tick-time-secs
diff --git a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
index 3cc8b39..c116937 100644
--- a/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/logviewer.clj
@@ -3,11 +3,16 @@
   (:use [hiccup core page-helpers])
   (:use [backtype.storm config util log])
   (:use [ring.adapter.jetty :only [run-jetty]])
+  (:import [org.slf4j LoggerFactory])
+  (:import [ch.qos.logback.classic Logger])
   (:import [org.apache.commons.logging LogFactory])
   (:import [org.apache.commons.logging.impl Log4JLogger])
+  (:import [ch.qos.logback.core FileAppender])
   (:import [org.apache.log4j Level])
+  (:import [java.io File])
   (:require [compojure.route :as route]
-            [compojure.handler :as handler])
+            [compojure.handler :as handler]
+            [clojure.string :as string])
   (:gen-class))
 
 (defn tail-file [path tail]
@@ -25,8 +30,19 @@
       (.toString output))
     ))
 
-(defn log-page [file tail grep]
-  (let [path (str (System/getProperty "storm.home") "/logs/" file)
+(defn log-root-dir
+  "Given an appender name, as configured, get the parent directory of the appender's log file.
+
+Note that if anything goes wrong, this will throw an Error and exit."
+  [appender-name]
+  (let [appender (.getAppender (LoggerFactory/getLogger Logger/ROOT_LOGGER_NAME) appender-name)]
+    (if (and appender-name appender (instance? FileAppender appender))
+      (.getParent (File. (.getFile appender)))
+      (throw
+       (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and logback agree.")))))
+
+(defn log-page [file tail grep root-dir]
+  (let [path (.getCanonicalPath (File. root-dir file))
         tail (if tail
                (min 10485760 (Integer/parseInt tail))
                10240)
@@ -59,8 +75,8 @@
     ]))
 
 (defroutes log-routes
-  (GET "/log" [:as {cookies :cookies} & m]
-       (log-template (log-page (:file m) (:tail m) (:grep m))))
+  (GET "/log" [:as req & m]
+       (log-template (log-page (:file m) (:tail m) (:grep m) (:log-root req))))
   (GET "/loglevel" [:as {cookies :cookies} & m]
        (log-template (log-level-page (:name m) (:level m))))
   (route/resources "/")
@@ -70,10 +86,16 @@
   (handler/site log-routes)
  )
 
-(defn start-logviewer [port]
-  (run-jetty logapp {:port port}))
+(defn conf-middleware
+  "For passing the storm configuration with each request."
+  [app log-root]
+  (fn [req]
+    (app (assoc req :log-root log-root))))
+
+(defn start-logviewer [port log-root]
+  (run-jetty (conf-middleware logapp log-root) {:port port}))
 
 (defn -main []
-  (let [conf (read-storm-config)]
-    (start-logviewer (int (conf LOGVIEWER-PORT)))))
-
+  (let [conf (read-storm-config)
+        log-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
+    (start-logviewer (int (conf LOGVIEWER-PORT)) log-root)))
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 04731dc..caac996 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -326,7 +326,7 @@
 ;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
 ;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
 ;; tracked through heartbeat-cache
-(defn- update-executor-cache [curr hb]
+(defn- update-executor-cache [curr hb timeout]
   (let [reported-time (:time-secs hb)
         {last-nimbus-time :nimbus-time
          last-reported-time :executor-reported-time} curr
@@ -338,15 +338,18 @@
                       (current-time-secs)
                       last-nimbus-time
                       )]
-      {:nimbus-time nimbus-time
+      {:is-timed-out (and
+                       nimbus-time
+                       (>= (time-delta nimbus-time) timeout))
+       :nimbus-time nimbus-time
        :executor-reported-time reported-time}))
 
-(defn update-heartbeat-cache [cache executor-beats all-executors]
+(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
   (let [cache (select-keys cache all-executors)]
     (into {}
       (for [executor all-executors :let [curr (cache executor)]]
         [executor
-         (update-executor-cache curr (get executor-beats executor))]
+         (update-executor-cache curr (get executor-beats executor) timeout)]
          ))))
 
 (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
@@ -355,7 +358,8 @@
         executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment))
         cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
                                       executor-beats
-                                      all-executors)]
+                                      all-executors
+                                      ((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))]
       (swap! (:heartbeats-cache nimbus) assoc storm-id cache)))
 
 (defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
@@ -380,14 +384,12 @@
     (->> all-executors
         (filter (fn [executor]
           (let [start-time (get executor-start-times executor)
-                nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
+                is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]
             (if (and start-time
                    (or
                     (< (time-delta start-time)
                        (conf NIMBUS-TASK-LAUNCH-SECS))
-                    (not nimbus-time)
-                    (< (time-delta nimbus-time)
-                       (conf NIMBUS-TASK-TIMEOUT-SECS))
+                    (not is-timed-out)
                     ))
               true
               (do
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 89e17d2..13e8d02 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -313,6 +313,12 @@
     public static final Object LOGVIEWER_CHILDOPTS_SCHEMA = String.class;
 
     /**
+     * Appender name used by log viewer to determine log directory.
+     */
+    public static final String LOGVIEWER_APPENDER_NAME = "logviewer.appender.name";
+    public static final Object LOGVIEWER_APPENDER_NAME_SCHEMA = String.class;
+
+    /**
      * Childopts for Storm UI Java process.
      */
     public static final String UI_CHILDOPTS = "ui.childopts";
@@ -727,6 +733,12 @@
     public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
 
     /**
+     * Max pending tuples in one ShellBolt
+     */
+    public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
+    public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = Number.class;
+
+    /**
      * The root directory in ZooKeeper for metadata about TransactionalSpouts.
      */
     public static final String TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root";
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 854aa8f..3fa6741 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -1,5 +1,6 @@
 package backtype.storm.task;
 
+import backtype.storm.Config;
 import backtype.storm.generated.ShellComponent;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
@@ -71,6 +72,10 @@
 
     public void prepare(Map stormConf, TopologyContext context,
                         final OutputCollector collector) {
+        Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
+        if (maxPending != null) {
+           this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
+        }
         _rand = new Random();
         _process = new ShellProcess(_command);
         _collector = collector;
diff --git a/storm-core/test/clj/backtype/storm/tick_tuple_test.clj b/storm-core/test/clj/backtype/storm/tick_tuple_test.clj
new file mode 100644
index 0000000..b1679c5
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/tick_tuple_test.clj
@@ -0,0 +1,35 @@
+(ns backtype.storm.tick-tuple-test
+  (:use [clojure test])
+  (:use [backtype.storm bootstrap testing])
+  (:use [backtype.storm.daemon common]))
+
+(bootstrap)
+
+(defbolt noop-bolt ["tuple"] {:prepare true}
+  [conf context collector]
+  (bolt
+   (execute [tuple])))
+
+(defspout noop-spout ["tuple"]
+  [conf context collector]
+  (spout
+   (nextTuple [])))
+
+(deftest test-tick-tuple-works-with-system-bolt
+  (with-simulated-time-local-cluster [cluster]
+    (let [topology (thrift/mk-topology
+                    {"1" (thrift/mk-spout-spec noop-spout)}
+                    {"2" (thrift/mk-bolt-spec {"1" ["tuple"]} noop-bolt)})]
+      (try
+        (submit-local-topology (:nimbus cluster)
+                               "test"
+                               {TOPOLOGY-TICK-TUPLE-FREQ-SECS 1}
+                               topology)
+        (advance-cluster-time cluster 2)
+        ;; if reaches here, it means everything works ok.
+        (is true)
+        (catch Exception e
+          (is false))))))
+
+
+
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
index be8ab38..00431d4 100644
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -93,9 +93,12 @@
      */
     private int getSleepTimeMs()
     {
-        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(1 << retries.get()));
+        int backoff = 1 << Math.max(1, retries.get());
+        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
         if ( sleepMs > max_sleep_ms )
             sleepMs = max_sleep_ms;
+        if ( sleepMs < base_sleep_ms )
+          sleepMs = base_sleep_ms;
         return sleepMs;
     }