(ns backtype.storm.cluster
(:import [ Stat])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm util log config])
(:require [backtype.storm [zookeeper :as zk]])
(:require [backtype.storm.daemon [common :as common]])
(defprotocol ClusterState
(set-ephemeral-node [this path data])
(delete-node [this path])
(create-sequential [this path data])
(set-data [this path data]) ;; if node does not exist, create persistent with this data
(get-data [this path watch?])
(get-children [this path watch?])
(mkdirs [this path])
(close [this])
(register [this callback])
(unregister [this id])
(defn mk-distributed-cluster-state [conf]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
(.close zk))
(let [callbacks (atom {})
active (atom true)
zk (zk/mk-client conf
:auth-conf conf
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
(when-not (= :none type)
(doseq [callback (vals @callbacks)]
(callback type path))))
(register [this callback]
(let [id (uuid)]
(swap! callbacks assoc id callback)
(unregister [this id]
(swap! callbacks dissoc id))
(set-ephemeral-node [this path data]
(zk/mkdirs zk (parent-path path))
(if (zk/exists zk path false)
(zk/set-data zk path data) ; should verify that it's ephemeral
(catch KeeperException$NoNodeException e
(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
(zk/create-node zk path data :ephemeral)
(zk/create-node zk path data :ephemeral)
(create-sequential [this path data]
(zk/create-node zk path data :sequential))
(set-data [this path data]
;; note: this does not turn off any existing watches
(if (zk/exists zk path false)
(zk/set-data zk path data)
(zk/mkdirs zk (parent-path path))
(zk/create-node zk path data :persistent)
(delete-node [this path]
(zk/delete-recursive zk path)
(get-data [this path watch?]
(zk/get-data zk path watch?)
(get-children [this path watch?]
(zk/get-children zk path watch?))
(mkdirs [this path]
(zk/mkdirs zk path))
(close [this]
(reset! active false)
(.close zk))
(defprotocol StormClusterState
(assignments [this callback])
(assignment-info [this storm-id callback])
(active-storms [this])
(storm-base [this storm-id callback])
(get-worker-heartbeat [this storm-id node port])
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
(supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
(setup-heartbeats! [this storm-id])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(activate-storm! [this storm-id storm-base])
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(remove-storm! [this storm-id])
(report-error [this storm-id task-id error])
(errors [this storm-id task-id])
(disconnect [this])
(def ASSIGNMENTS-ROOT "assignments")
(def CODE-ROOT "code")
(def STORMS-ROOT "storms")
(def SUPERVISORS-ROOT "supervisors")
(def WORKERBEATS-ROOT "workerbeats")
(def ERRORS-ROOT "errors")
(defn supervisor-path [id]
(defn assignment-path [id]
(defn storm-path [id]
(str STORMS-SUBTREE "/" id))
(defn workerbeat-storm-root [storm-id]
(str WORKERBEATS-SUBTREE "/" storm-id))
(defn workerbeat-path [storm-id node port]
(str (workerbeat-storm-root storm-id) "/" node "-" port))
(defn error-storm-root [storm-id]
(str ERRORS-SUBTREE "/" storm-id))
(defn error-path [storm-id component-id]
(str (error-storm-root storm-id) "/" (url-encode component-id)))
(defn- issue-callback! [cb-atom]
(let [cb @cb-atom]
(reset! cb-atom nil)
(when cb
(defn- issue-map-callback! [cb-atom id]
(let [cb (@cb-atom id)]
(swap! cb-atom dissoc id)
(when cb
(cb id))
(defn- maybe-deserialize [ser]
(when ser
(Utils/deserialize ser)))
(defstruct TaskError :error :time-secs)
(defn- parse-error-path [^String p]
(Long/parseLong (.substring p 1)))
(defn convert-executor-beats [executors worker-hb]
;; ensures that we only return heartbeats for executors assigned to this worker
(let [executor-stats (:executor-stats worker-hb)]
(->> executors
(map (fn [t]
(if (contains? executor-stats t)
{t {:time-secs (:time-secs worker-hb)
:uptime (:uptime worker-hb)
:stats (get executor-stats t)}})))
(into {}))))
;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
(defn mk-storm-cluster-state [cluster-state-spec]
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
[true (mk-distributed-cluster-state cluster-state-spec)])
assignment-info-callback (atom {})
supervisors-callback (atom nil)
assignments-callback (atom nil)
storm-base-callback (atom {})
state-id (register
(fn [type path]
(let [[subtree & args] (tokenize-path path)]
(condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
(issue-map-callback! assignment-info-callback (first args)))
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
;; this should never happen
(halt-process! 30 "Unknown callback for subtree " subtree args)
(mkdirs cluster-state p))
(assignments [this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
(assignment-info [this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))
(active-storms [this]
(get-children cluster-state STORMS-SUBTREE false)
(heartbeat-storms [this]
(get-children cluster-state WORKERBEATS-SUBTREE false)
(error-topologies [this]
(get-children cluster-state ERRORS-SUBTREE false)
(get-worker-heartbeat [this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
(executor-beats [this storm-id executor->node+port]
;; need to take executor->node+port in explicitly so that we don't run into a situation where a
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
(let [node+port->executors (reverse-map executor->node+port)
all-heartbeats (for [[[node port] executors] node+port->executors]
(->> (get-worker-heartbeat this storm-id node port)
(convert-executor-beats executors)
(apply merge all-heartbeats)))
(supervisors [this callback]
(when callback
(reset! supervisors-callback callback))
(get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))
(supervisor-info [this supervisor-id]
(maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))
(worker-heartbeat! [this storm-id node port info]
(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
(remove-worker-heartbeat! [this storm-id node port]
(delete-node cluster-state (workerbeat-path storm-id node port))
(setup-heartbeats! [this storm-id]
(mkdirs cluster-state (workerbeat-storm-root storm-id)))
(teardown-heartbeats! [this storm-id]
(delete-node cluster-state (workerbeat-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown heartbeats for " storm-id)
(teardown-topology-errors! [this storm-id]
(delete-node cluster-state (error-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown errors for " storm-id)
(supervisor-heartbeat! [this supervisor-id info]
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))
(activate-storm! [this storm-id storm-base]
(set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
(update-storm! [this storm-id new-elems]
(let [base (storm-base this storm-id nil)
executors (:component->executors base)
new-elems (update new-elems :component->executors (partial merge executors))]
(set-data cluster-state (storm-path storm-id)
(-> base
(merge new-elems)
(storm-base [this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))
(remove-storm-base! [this storm-id]
(delete-node cluster-state (storm-path storm-id))
(set-assignment! [this storm-id info]
(set-data cluster-state (assignment-path storm-id) (Utils/serialize info))
(remove-storm! [this storm-id]
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
(report-error [this storm-id component-id error]
(let [path (error-path storm-id component-id)
data {:time-secs (current-time-secs) :error (stringify-error error)}
_ (mkdirs cluster-state path)
_ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
(drop 10))]
(doseq [k to-kill]
(delete-node cluster-state (str path "/" k)))))
(errors [this storm-id component-id]
(let [path (error-path storm-id component-id)
_ (mkdirs cluster-state path)
children (get-children cluster-state path false)
errors (dofor [c children]
(let [data (-> (get-data cluster-state (str path "/" c) false)
(when data
(struct TaskError (:error data) (:time-secs data))
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
(disconnect [this]
(unregister cluster-state state-id)
(when solo?
(close cluster-state)))
;; daemons have a single thread that will respond to events
;; start with initialize event
;; callbacks add events to the thread's queue
;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified.
;; master gives orders through state, and client records status in state (ephemerally)
;; master tells nodes what workers to launch
;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified
;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up
;; /assignments/{storm id}
;; which tasks they talk to, etc. (immutable until shutdown)
;; everyone reads this in full to understand structure
;; /tasks/{storm id}/{task id} ; just contains bolt id
;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously
;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here
;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously
;; /taskbeats/{storm id}/{ephemeral task id}
;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown
;; master manipulates
;; /storms/{storm id}
;; Zookeeper flows:
;; Master:
;; job submit:
;; 1. read which nodes are available
;; 2. set up the worker/{storm}/{task} stuff (static)
;; 3. set assignments
;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)
;; Monitoring (or by checking when nodes go down or heartbeats aren't received):
;; 1. read assignment
;; 2. see which tasks/nodes are up
;; 3. make new assignment to fix any problems
;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)
;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even
;; Supervisor:
;; 1. monitor /storms/* and assignments
;; 2. local state about which workers are local
;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments
;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup
;; Worker:
;; 1. On startup, start the tasks if the storm is on
;; Task:
;; 1. monitor assignments, reroute when assignments change
;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off
;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name)
;; supervisor periodically checks to make sure processes are alive
;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside
;; all tasks in a worker share the same cluster state
;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped
;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear)
;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9)
;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die