| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| |
| (ns backtype.storm.cluster |
| (:import [org.apache.zookeeper.data Stat ACL Id] |
| [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary] |
| [java.io Serializable]) |
| (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) |
| (:import [org.apache.curator.framework.state ConnectionStateListener ConnectionState]) |
| (:import [org.apache.curator.framework CuratorFramework]) |
| (:import [backtype.storm.utils Utils]) |
| (:import [java.security MessageDigest]) |
| (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) |
| (:import [backtype.storm.nimbus NimbusInfo]) |
| (:use [backtype.storm util log config converter]) |
| (:require [backtype.storm [zookeeper :as zk]]) |
| (:require [backtype.storm.daemon [common :as common]])) |
| |
| (defprotocol ClusterState |
| (set-ephemeral-node [this path data acls]) |
| (delete-node [this path]) |
| (create-sequential [this path data acls]) |
| ;; if node does not exist, create persistent with this data |
| (set-data [this path data acls]) |
| (get-data [this path watch?]) |
| (get-version [this path watch?]) |
| (get-data-with-version [this path watch?]) |
| (get-children [this path watch?]) |
| (mkdirs [this path acls]) |
| (exists-node? [this path watch?]) |
| (close [this]) |
| (register [this callback]) |
| (unregister [this id]) |
| (add-listener [this listener]) |
| (sync-path [this path])) |
| |
| (defn mk-topo-only-acls |
| [topo-conf] |
| (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)] |
| (when (Utils/isZkAuthenticationConfiguredTopology topo-conf) |
| [(first ZooDefs$Ids/CREATOR_ALL_ACL) |
| (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) |
| |
| (defnk mk-distributed-cluster-state |
| [conf :auth-conf nil :acls nil] |
| (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)] |
| (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) |
| (.close zk)) |
| (let [callbacks (atom {}) |
| active (atom true) |
| zk (zk/mk-client conf |
| (conf STORM-ZOOKEEPER-SERVERS) |
| (conf STORM-ZOOKEEPER-PORT) |
| :auth-conf auth-conf |
| :root (conf STORM-ZOOKEEPER-ROOT) |
| :watcher (fn [state type path] |
| (when @active |
| (when-not (= :connected state) |
| (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper.")) |
| (when-not (= :none type) |
| (doseq [callback (vals @callbacks)] |
| (callback type path))))))] |
| (reify |
| ClusterState |
| |
| (register |
| [this callback] |
| (let [id (uuid)] |
| (swap! callbacks assoc id callback) |
| id)) |
| |
| (unregister |
| [this id] |
| (swap! callbacks dissoc id)) |
| |
| (set-ephemeral-node |
| [this path data acls] |
| (zk/mkdirs zk (parent-path path) acls) |
| (if (zk/exists zk path false) |
| (try-cause |
| (zk/set-data zk path data) ; should verify that it's ephemeral |
| (catch KeeperException$NoNodeException e |
| (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") |
| (zk/create-node zk path data :ephemeral acls))) |
| (zk/create-node zk path data :ephemeral acls))) |
| |
| (create-sequential |
| [this path data acls] |
| (zk/create-node zk path data :sequential acls)) |
| |
| (set-data |
| [this path data acls] |
| ;; note: this does not turn off any existing watches |
| (if (zk/exists zk path false) |
| (zk/set-data zk path data) |
| (do |
| (zk/mkdirs zk (parent-path path) acls) |
| (zk/create-node zk path data :persistent acls)))) |
| |
| (delete-node |
| [this path] |
| (zk/delete-node zk path)) |
| |
| (get-data |
| [this path watch?] |
| (zk/get-data zk path watch?)) |
| |
| (get-data-with-version |
| [this path watch?] |
| (zk/get-data-with-version zk path watch?)) |
| |
| (get-version |
| [this path watch?] |
| (zk/get-version zk path watch?)) |
| |
| (get-children |
| [this path watch?] |
| (zk/get-children zk path watch?)) |
| |
| (mkdirs |
| [this path acls] |
| (zk/mkdirs zk path acls)) |
| |
| (exists-node? |
| [this path watch?] |
| (zk/exists-node? zk path watch?)) |
| |
| (close |
| [this] |
| (reset! active false) |
| (.close zk)) |
| |
| (add-listener |
| [this listener] |
| (zk/add-listener zk listener)) |
| |
| (sync-path |
| [this path] |
| (zk/sync-path zk path)) |
| ))) |
| |
| (defprotocol StormClusterState |
| (assignments [this callback]) |
| (assignment-info [this storm-id callback]) |
| (assignment-info-with-version [this storm-id callback]) |
| (assignment-version [this storm-id callback]) |
| ;returns topologyIds under /stormroot/code-distributor |
| (code-distributor [this callback]) |
| ;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id |
| (code-distributor-info [this storm-id]) |
| |
| ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> |
| (nimbuses [this]) |
| ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id |
| (add-nimbus-host! [this nimbus-id nimbus-summary]) |
| |
| (active-storms [this]) |
| (storm-base [this storm-id callback]) |
| (get-worker-heartbeat [this storm-id node port]) |
| (executor-beats [this storm-id executor->node+port]) |
| (supervisors [this callback]) |
| (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist |
| (setup-heartbeats! [this storm-id]) |
| (teardown-heartbeats! [this storm-id]) |
| (teardown-topology-errors! [this storm-id]) |
| (heartbeat-storms [this]) |
| (error-topologies [this]) |
| (worker-heartbeat! [this storm-id node port info]) |
| (remove-worker-heartbeat! [this storm-id node port]) |
| (supervisor-heartbeat! [this supervisor-id info]) |
| (worker-backpressure! [this storm-id node port info]) |
| (topology-backpressure [this storm-id callback]) |
| (setup-backpressure! [this storm-id]) |
| (remove-worker-backpressure! [this storm-id node port]) |
| (activate-storm! [this storm-id storm-base]) |
| (update-storm! [this storm-id new-elems]) |
| (remove-storm-base! [this storm-id]) |
| (set-assignment! [this storm-id info]) |
| ;adds nimbusinfo under /stormroot/code-distributor/storm-id |
| (setup-code-distributor! [this storm-id info]) |
| (remove-storm! [this storm-id]) |
| (report-error [this storm-id component-id node port error]) |
| (errors [this storm-id component-id]) |
| (last-error [this storm-id component-id]) |
| (set-credentials! [this storm-id creds topo-conf]) |
| (credentials [this storm-id callback]) |
| (disconnect [this])) |
| |
| (def ASSIGNMENTS-ROOT "assignments") |
| (def CODE-ROOT "code") |
| (def STORMS-ROOT "storms") |
| (def SUPERVISORS-ROOT "supervisors") |
| (def WORKERBEATS-ROOT "workerbeats") |
| (def BACKPRESSURE-ROOT "backpressure") |
| (def ERRORS-ROOT "errors") |
| (def CODE-DISTRIBUTOR-ROOT "code-distributor") |
| (def NIMBUSES-ROOT "nimbuses") |
| (def CREDENTIALS-ROOT "credentials") |
| |
| |
| (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) |
| (def STORMS-SUBTREE (str "/" STORMS-ROOT)) |
| (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) |
| (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) |
| (def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT)) |
| (def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) |
| (def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT)) |
| (def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT)) |
| (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) |
| |
| (defn supervisor-path |
| [id] |
| (str SUPERVISORS-SUBTREE "/" id)) |
| |
| (defn assignment-path |
| [id] |
| (str ASSIGNMENTS-SUBTREE "/" id)) |
| |
| (defn code-distributor-path |
| [id] |
| (str CODE-DISTRIBUTOR-SUBTREE "/" id)) |
| |
| (defn nimbus-path |
| [id] |
| (str NIMBUSES-SUBTREE "/" id)) |
| |
| (defn storm-path |
| [id] |
| (str STORMS-SUBTREE "/" id)) |
| |
| (defn workerbeat-storm-root |
| [storm-id] |
| (str WORKERBEATS-SUBTREE "/" storm-id)) |
| |
| (defn workerbeat-path |
| [storm-id node port] |
| (str (workerbeat-storm-root storm-id) "/" node "-" port)) |
| |
| (defn backpressure-storm-root |
| [storm-id] |
| (str BACKPRESSURE-SUBTREE "/" storm-id)) |
| |
| (defn backpressure-path |
| [storm-id node port] |
| (str (backpressure-storm-root storm-id) "/" node "-" port)) |
| |
| (defn error-storm-root |
| [storm-id] |
| (str ERRORS-SUBTREE "/" storm-id)) |
| |
| (defn error-path |
| [storm-id component-id] |
| (str (error-storm-root storm-id) "/" (url-encode component-id))) |
| |
| (def last-error-path-seg "last-error") |
| |
| (defn last-error-path |
| [storm-id component-id] |
| (str (error-storm-root storm-id) |
| "/" |
| (url-encode component-id) |
| "-" |
| last-error-path-seg)) |
| |
| (defn credentials-path |
| [storm-id] |
| (str CREDENTIALS-SUBTREE "/" storm-id)) |
| |
| (defn- issue-callback! |
| [cb-atom] |
| (let [cb @cb-atom] |
| (reset! cb-atom nil) |
| (when cb |
| (cb)))) |
| |
| (defn- issue-map-callback! |
| [cb-atom id] |
| (let [cb (@cb-atom id)] |
| (swap! cb-atom dissoc id) |
| (when cb |
| (cb id)))) |
| |
| (defn- maybe-deserialize |
| [ser clazz] |
| (when ser |
| (Utils/deserialize ser clazz))) |
| |
| (defrecord TaskError [error time-secs host port]) |
| |
| (defn- parse-error-path |
| [^String p] |
| (Long/parseLong (.substring p 1))) |
| |
| (defn convert-executor-beats |
| "Ensures that we only return heartbeats for executors assigned to |
| this worker." |
| [executors worker-hb] |
| (let [executor-stats (:executor-stats worker-hb)] |
| (->> executors |
| (map (fn [t] |
| (if (contains? executor-stats t) |
| {t {:time-secs (:time-secs worker-hb) |
| :uptime (:uptime worker-hb) |
| :stats (get executor-stats t)}}))) |
| (into {})))) |
| |
| ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called. |
| (defnk mk-storm-cluster-state |
| [cluster-state-spec :acls nil] |
| (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec) |
| [false cluster-state-spec] |
| [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls)]) |
| assignment-info-callback (atom {}) |
| assignment-info-with-version-callback (atom {}) |
| assignment-version-callback (atom {}) |
| supervisors-callback (atom nil) |
| backpressure-callback (atom {}) ;; we want to reigister a topo directory getChildren callback for all workers of this dir |
| assignments-callback (atom nil) |
| storm-base-callback (atom {}) |
| code-distributor-callback (atom nil) |
| credentials-callback (atom {}) |
| state-id (register |
| cluster-state |
| (fn [type path] |
| (let [[subtree & args] (tokenize-path path)] |
| (condp = subtree |
| ASSIGNMENTS-ROOT (if (empty? args) |
| (issue-callback! assignments-callback) |
| (do |
| (issue-map-callback! assignment-info-callback (first args)) |
| (issue-map-callback! assignment-version-callback (first args)) |
| (issue-map-callback! assignment-info-with-version-callback (first args)))) |
| SUPERVISORS-ROOT (issue-callback! supervisors-callback) |
| CODE-DISTRIBUTOR-ROOT (issue-callback! code-distributor-callback) |
| STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) |
| CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) |
| BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args)) |
| ;; this should never happen |
| (exit-process! 30 "Unknown callback for subtree " subtree args)))))] |
| (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE]] |
| (mkdirs cluster-state p acls)) |
| (reify |
| StormClusterState |
| |
| (assignments |
| [this callback] |
| (when callback |
| (reset! assignments-callback callback)) |
| (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) |
| |
| (assignment-info |
| [this storm-id callback] |
| (when callback |
| (swap! assignment-info-callback assoc storm-id callback)) |
| (clojurify-assignment (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment))) |
| |
| (assignment-info-with-version |
| [this storm-id callback] |
| (when callback |
| (swap! assignment-info-with-version-callback assoc storm-id callback)) |
| (let [{data :data version :version} |
| (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))] |
| {:data (clojurify-assignment (maybe-deserialize data Assignment)) |
| :version version})) |
| |
| (assignment-version |
| [this storm-id callback] |
| (when callback |
| (swap! assignment-version-callback assoc storm-id callback)) |
| (get-version cluster-state (assignment-path storm-id) (not-nil? callback))) |
| |
| (code-distributor |
| [this callback] |
| (when callback |
| (reset! code-distributor-callback callback)) |
| (do |
| (sync-path cluster-state CODE-DISTRIBUTOR-SUBTREE) |
| (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback)))) |
| |
| (nimbuses |
| [this] |
| (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) false) NimbusSummary) |
| (get-children cluster-state NIMBUSES-SUBTREE false))) |
| |
| (add-nimbus-host! |
| [this nimbus-id nimbus-summary] |
| ;explicit delete for ephmeral node to ensure this session creates the entry. |
| (delete-node cluster-state (nimbus-path nimbus-id)) |
| |
| (add-listener cluster-state (reify ConnectionStateListener |
| (^void stateChanged[this ^CuratorFramework client ^ConnectionState newState] |
| (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState) |
| (if (.equals newState ConnectionState/RECONNECTED) |
| (do |
| (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time") |
| (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)))))) |
| |
| (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)) |
| |
| (code-distributor-info |
| [this storm-id] |
| (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) |
| (let [path (code-distributor-path storm-id)] |
| (do |
| (sync-path cluster-state path) |
| (get-children cluster-state path false))))) |
| |
| (active-storms |
| [this] |
| (get-children cluster-state STORMS-SUBTREE false)) |
| |
| (heartbeat-storms |
| [this] |
| (get-children cluster-state WORKERBEATS-SUBTREE false)) |
| |
| (error-topologies |
| [this] |
| (get-children cluster-state ERRORS-SUBTREE false)) |
| |
| (get-worker-heartbeat |
| [this storm-id node port] |
| (let [worker-hb (get-data cluster-state (workerbeat-path storm-id node port) false)] |
| (if worker-hb |
| (-> worker-hb |
| (maybe-deserialize ClusterWorkerHeartbeat) |
| clojurify-zk-worker-hb)))) |
| |
| (executor-beats |
| [this storm-id executor->node+port] |
| ;; need to take executor->node+port in explicitly so that we don't run into a situation where a |
| ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats |
| ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, |
| ;; we avoid situations like that |
| (let [node+port->executors (reverse-map executor->node+port) |
| all-heartbeats (for [[[node port] executors] node+port->executors] |
| (->> (get-worker-heartbeat this storm-id node port) |
| (convert-executor-beats executors) |
| ))] |
| (apply merge all-heartbeats))) |
| |
| (supervisors |
| [this callback] |
| (when callback |
| (reset! supervisors-callback callback)) |
| (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))) |
| |
| (supervisor-info |
| [this supervisor-id] |
| (clojurify-supervisor-info (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo))) |
| |
| (worker-heartbeat! |
| [this storm-id node port info] |
| (let [thrift-worker-hb (thriftify-zk-worker-hb info)] |
| (if thrift-worker-hb |
| (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls)))) |
| |
| (remove-worker-heartbeat! |
| [this storm-id node port] |
| (delete-node cluster-state (workerbeat-path storm-id node port))) |
| |
| (setup-heartbeats! |
| [this storm-id] |
| (mkdirs cluster-state (workerbeat-storm-root storm-id) acls)) |
| |
| (teardown-heartbeats! |
| [this storm-id] |
| (try-cause |
| (delete-node cluster-state (workerbeat-storm-root storm-id)) |
| (catch KeeperException e |
| (log-warn-error e "Could not teardown heartbeats for " storm-id)))) |
| |
| (worker-backpressure! |
| [this storm-id node port on?] |
| "if znode exists and to be not on?, delete; if exists and on?, do nothing; |
| if not exists and to be on?, create; if not exists and not on?, do nothing" |
| (let [path (backpressure-path storm-id node port) |
| existed (exists-node? cluster-state path false)] |
| (if existed |
| (if (not on?) |
| (delete-node cluster-state path)) ;; delete the znode since the worker is not congested |
| (if on? |
| (set-ephemeral-node cluster-state path nil acls))))) ;; create the znode since worker is congested |
| |
| (topology-backpressure |
| [this storm-id callback] |
| "if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not." |
| (when callback |
| (swap! backpressure-callback assoc storm-id callback)) |
| (let [path (backpressure-storm-root storm-id) |
| children (get-children cluster-state path (not-nil? callback))] |
| (> (count children) 0))) |
| |
| (setup-backpressure! |
| [this storm-id] |
| (mkdirs cluster-state (backpressure-storm-root storm-id) acls)) |
| |
| (remove-worker-backpressure! |
| [this storm-id node port] |
| (delete-node cluster-state (backpressure-path storm-id node port))) |
| |
| (teardown-topology-errors! |
| [this storm-id] |
| (try-cause |
| (delete-node cluster-state (error-storm-root storm-id)) |
| (catch KeeperException e |
| (log-warn-error e "Could not teardown errors for " storm-id)))) |
| |
| (supervisor-heartbeat! |
| [this supervisor-id info] |
| (let [thrift-supervisor-info (thriftify-supervisor-info info)] |
| (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls))) |
| |
| (activate-storm! |
| [this storm-id storm-base] |
| (let [thrift-storm-base (thriftify-storm-base storm-base)] |
| (set-data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls))) |
| |
| (update-storm! |
| [this storm-id new-elems] |
| (let [base (storm-base this storm-id nil) |
| executors (:component->executors base) |
| component->debug (:component->debug base) |
| new-elems (update new-elems :component->executors (partial merge executors)) |
| new-elems (update new-elems :component->debug (partial merge-with merge component->debug))] |
| (set-data cluster-state (storm-path storm-id) |
| (-> base |
| (merge new-elems) |
| thriftify-storm-base |
| Utils/serialize) |
| acls))) |
| |
| (storm-base |
| [this storm-id callback] |
| (when callback |
| (swap! storm-base-callback assoc storm-id callback)) |
| (clojurify-storm-base (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase))) |
| |
| (remove-storm-base! |
| [this storm-id] |
| (delete-node cluster-state (storm-path storm-id))) |
| |
| (set-assignment! |
| [this storm-id info] |
| (let [thrift-assignment (thriftify-assignment info)] |
| (set-data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) |
| |
| (setup-code-distributor! |
| [this storm-id nimbusInfo] |
| (let [path (str (code-distributor-path storm-id) "/" (.toHostPortString nimbusInfo))] |
| (mkdirs cluster-state (code-distributor-path storm-id) acls) |
| ;we delete the node first to ensure the node gets created as part of this session only. |
| (delete-node cluster-state path) |
| (set-ephemeral-node cluster-state path nil acls))) |
| |
| (remove-storm! |
| [this storm-id] |
| (delete-node cluster-state (assignment-path storm-id)) |
| (delete-node cluster-state (code-distributor-path storm-id)) |
| (delete-node cluster-state (credentials-path storm-id)) |
| (remove-storm-base! this storm-id)) |
| |
| (set-credentials! |
| [this storm-id creds topo-conf] |
| (let [topo-acls (mk-topo-only-acls topo-conf) |
| path (credentials-path storm-id) |
| thriftified-creds (thriftify-credentials creds)] |
| (set-data cluster-state path (Utils/serialize thriftified-creds) topo-acls))) |
| |
| (credentials |
| [this storm-id callback] |
| (when callback |
| (swap! credentials-callback assoc storm-id callback)) |
| (clojurify-crdentials (maybe-deserialize (get-data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials))) |
| |
| (report-error |
| [this storm-id component-id node port error] |
| (let [path (error-path storm-id component-id) |
| last-error-path (last-error-path storm-id component-id) |
| data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}) |
| _ (mkdirs cluster-state path acls) |
| ser-data (Utils/serialize data) |
| _ (mkdirs cluster-state path acls) |
| _ (create-sequential cluster-state (str path "/e") ser-data acls) |
| _ (set-data cluster-state last-error-path ser-data acls) |
| to-kill (->> (get-children cluster-state path false) |
| (sort-by parse-error-path) |
| reverse |
| (drop 10))] |
| (doseq [k to-kill] |
| (delete-node cluster-state (str path "/" k))))) |
| |
| (errors |
| [this storm-id component-id] |
| (let [path (error-path storm-id component-id) |
| errors (if (exists-node? cluster-state path false) |
| (dofor [c (get-children cluster-state path false)] |
| (if-let [data (-> (get-data cluster-state |
| (str path "/" c) |
| false) |
| (maybe-deserialize ErrorInfo) |
| clojurify-error)] |
| (map->TaskError data))) |
| ())] |
| (->> (filter not-nil? errors) |
| (sort-by (comp - :time-secs))))) |
| |
| (last-error |
| [this storm-id component-id] |
| (let [path (last-error-path storm-id component-id)] |
| (if (exists-node? cluster-state path false) |
| (if-let [data (-> (get-data cluster-state path false) |
| (maybe-deserialize ErrorInfo) |
| clojurify-error)] |
| (map->TaskError data))))) |
| |
| (disconnect |
| [this] |
| (unregister cluster-state state-id) |
| (when solo? |
| (close cluster-state)))))) |
| |
| ;; daemons have a single thread that will respond to events |
| ;; start with initialize event |
| ;; callbacks add events to the thread's queue |
| |
| ;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified. |
| ;; master gives orders through state, and client records status in state (ephemerally) |
| |
| ;; master tells nodes what workers to launch |
| |
| ;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified |
| ;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up |
| ;; /assignments/{storm id} |
| |
| ;; which tasks they talk to, etc. (immutable until shutdown) |
| ;; everyone reads this in full to understand structure |
| ;; /tasks/{storm id}/{task id} ; just contains bolt id |
| |
| ;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously |
| ;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here |
| |
| ;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously |
| ;; /taskbeats/{storm id}/{ephemeral task id} |
| |
| ;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown |
| ;; master manipulates |
| ;; /storms/{storm id} |
| |
| ;; Zookeeper flows: |
| |
| ;; Master: |
| ;; job submit: |
| ;; 1. read which nodes are available |
| ;; 2. set up the worker/{storm}/{task} stuff (static) |
| ;; 3. set assignments |
| ;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off) |
| |
| ;; Monitoring (or by checking when nodes go down or heartbeats aren't received): |
| ;; 1. read assignment |
| ;; 2. see which tasks/nodes are up |
| ;; 3. make new assignment to fix any problems |
| ;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments) |
| |
| ;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even |
| |
| ;; Supervisor: |
| ;; 1. monitor /storms/* and assignments |
| ;; 2. local state about which workers are local |
| ;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments |
| ;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup |
| |
| ;; Worker: |
| ;; 1. On startup, start the tasks if the storm is on |
| |
| ;; Task: |
| ;; 1. monitor assignments, reroute when assignments change |
| ;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off |
| |
| ;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name) |
| ;; supervisor periodically checks to make sure processes are alive |
| ;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside |
| |
| ;; all tasks in a worker share the same cluster state |
| ;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped |
| ;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear) |
| ;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9) |
| ;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die |