blob: f930fe9e933aa7bb8beafba5b8716b67ce84b2c8 [file] [log] [blame]
(ns backtype.storm.daemon.supervisor
(:import [backtype.storm.scheduler ISupervisor])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]])
(:gen-class
:methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))
(bootstrap)
(defmulti download-storm-code cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
;; used as part of a map from port to this
(defrecord LocalAssignment [storm-id task-ids])
(defprotocol SupervisorDaemon
(get-id [this])
(get-conf [this])
(shutdown-all-workers [this])
)
(defn- read-my-tasks [storm-cluster-state storm-id supervisor-id callback]
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
my-tasks (filter (fn [[_ [node _]]] (= node supervisor-id))
(:task->node+port assignment))
port-tasks (apply merge-with
concat
(for [[task-id [_ port]] my-tasks]
{port [task-id]}
))]
(into {} (for [[port task-ids] port-tasks]
;; need to cast to int b/c it might be a long (due to how yaml parses things)
[(Integer. port) (LocalAssignment. storm-id (doall task-ids))]
))
))
(defn- read-assignments
"Returns map from port to struct containing :storm-id and :task-ids and :master-code-dir"
[storm-cluster-state supervisor-id callback]
(let [storm-ids (.assignments storm-cluster-state callback)]
(apply merge-with
(fn [& ignored]
(throw (RuntimeException.
"Should not have multiple storms assigned to one port")))
(dofor [sid storm-ids] (read-my-tasks storm-cluster-state sid supervisor-id callback))
)))
(defn- read-storm-code-locations
[storm-cluster-state callback]
(let [storm-ids (.assignments storm-cluster-state callback)]
(into {}
(dofor [sid storm-ids]
[sid (:master-code-dir (.assignment-info storm-cluster-state sid callback))]
))
))
(defn- read-downloaded-storm-ids [conf]
(map #(java.net.URLDecoder/decode %) (read-dir-contents (supervisor-stormdist-root conf)))
)
(defn read-worker-heartbeat [conf id]
(let [local-state (worker-state conf id)]
(.get local-state LS-WORKER-HEARTBEAT)
))
(defn my-worker-ids [conf]
(read-dir-contents (worker-root conf)))
(defn read-worker-heartbeats
"Returns map from worker id to heartbeat"
[conf]
(let [ids (my-worker-ids conf)]
(into {}
(dofor [id ids]
[id (read-worker-heartbeat conf id)]))
))
(defn matches-an-assignment? [worker-heartbeat assigned-tasks]
(let [local-assignment (assigned-tasks (:port worker-heartbeat))]
(and local-assignment
(= (:storm-id worker-heartbeat) (:storm-id local-assignment))
(= (set (:task-ids worker-heartbeat)) (set (:task-ids local-assignment))))
))
(defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
[supervisor assigned-tasks now]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))]
(into
{}
(dofor [[id hb] id->heartbeat]
(let [state (cond
(or (not (contains? approved-ids id))
(not (matches-an-assignment? hb assigned-tasks)))
:disallowed
(not hb)
:not-started
(> (- now (:time-secs hb))
(conf SUPERVISOR-WORKER-TIMEOUT-SECS))
:timed-out
true
:valid)]
(log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
[id [state hb]]
))
)))
(defn- wait-for-worker-launch [conf id start-time]
(let [state (worker-state conf id)]
(loop []
(let [hb (.get state LS-WORKER-HEARTBEAT)]
(when (and
(not hb)
(<
(- (current-time-secs) start-time)
(conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
))
(log-message id " still hasn't started")
(Time/sleep 500)
(recur)
)))
(when-not (.get state LS-WORKER-HEARTBEAT)
(log-message "Worker " id " failed to start")
)))
(defn- wait-for-workers-launch [conf ids]
(let [start-time (current-time-secs)]
(doseq [id ids]
(wait-for-worker-launch conf id start-time))
))
(defn generate-supervisor-id []
(uuid))
(defn try-cleanup-worker [conf id]
(try
(rmr (worker-heartbeats-root conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
(rmpath (worker-pids-root conf id))
(rmpath (worker-root conf id))
(catch RuntimeException e
(log-warn-error e "Failed to cleanup worker " id ". Will retry later")
)))
(defn shutdown-worker [supervisor id]
(log-message "Shutting down " (:supervisor-id supervisor) ":" id)
(let [conf (:conf supervisor)
pids (read-dir-contents (worker-pids-root conf id))
thread-pid (@(:worker-thread-pids-atom supervisor) id)]
(when thread-pid
(psim/kill-process thread-pid))
(doseq [pid pids]
(ensure-process-killed! pid)
(rmpath (worker-pid-path conf id pid))
)
(try-cleanup-worker conf id))
(log-message "Shut down " (:supervisor-id supervisor) ":" id))
(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
{:conf conf
:shared-context shared-context
:isupervisor isupervisor
:active (atom true)
:uptime (uptime-computer)
:worker-thread-pids-atom (atom {})
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:local-state (supervisor-state conf)
:supervisor-id (.getId isupervisor)
:my-hostname (if (contains? conf STORM-LOCAL-HOSTNAME)
(conf STORM-LOCAL-HOSTNAME)
(local-hostname))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
})
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
assigned-tasks (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS)
{})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-tasks now)
keepers (filter-val
(fn [[state _]] (= state :valid))
allocated)
keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
reassign-tasks (select-keys-pred (complement keep-ports) assigned-tasks)
new-worker-ids (into
{}
(for [port (keys reassign-tasks)]
[port (uuid)]))
]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; - read pids, kill -9 and individually remove file
;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
;; 3. of the rest, figure out what assignments aren't yet satisfied
;; 4. generate new worker ids, write new "approved workers" to LS
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch
(log-debug "Syncing processes")
(log-debug "Assigned tasks: " assigned-tasks)
(log-debug "Allocated: " allocated)
(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)
))
(doseq [id (vals new-worker-ids)]
(local-mkdirs (worker-pids-root conf id)))
(.put local-state LS-APPROVED-WORKERS
(merge
(select-keys (.get local-state LS-APPROVED-WORKERS)
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
))
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-tasks]
(let [id (new-worker-ids port)]
(log-message "Launching worker with assignment "
(pr-str assignment)
" for this supervisor "
(:supervisor-id supervisor)
" on port "
port
" with id "
id
)
(launch-worker supervisor
(:storm-id assignment)
port
id)
id)))
))
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
storm-code-map (read-storm-code-locations storm-cluster-state sync-callback)
assigned-storm-ids (set (keys storm-code-map))
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
all-assignment (read-assignments
storm-cluster-state
(:supervisor-id supervisor)
sync-callback)
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment)
;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
;; should we only download when storm is assigned to this supervisor?
(doseq [[storm-id master-code-dir] storm-code-map]
(when-not (downloaded-storm-ids storm-id)
(log-message "Downloading code for storm id "
storm-id
" from "
master-code-dir)
(download-storm-code conf storm-id master-code-dir)
(log-message "Finished downloading code for storm id "
storm-id
" from "
master-code-dir)
))
;; remove any downloaded code that's no longer assigned or active
(doseq [storm-id downloaded-storm-ids]
(when-not (assigned-storm-ids storm-id)
(log-message "Removing code for storm id "
storm-id)
(rmr (supervisor-stormdist-root conf storm-id))
))
(log-debug "Writing new assignment "
(pr-str new-assignment))
(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
(.put local-state
LS-LOCAL-ASSIGNMENTS
new-assignment)
(.add processes-event-manager sync-processes)
)))
;; in local state, supervisor stores who its current assignments are
;; another thread launches events to restart any dead processes if necessary
(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
(log-message "Starting Supervisor with conf " conf)
(.prepare isupervisor conf (supervisor-isupervisor-dir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
heartbeat-fn (fn [] (.supervisor-heartbeat!
(:storm-cluster-state supervisor)
(:supervisor-id supervisor)
(SupervisorInfo. (current-time-secs)
(:my-hostname supervisor)
(.getMetadata isupervisor)
((:uptime supervisor)))))]
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
(schedule-recurring (:timer supervisor)
0
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
(schedule-recurring (:timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(schedule-recurring (:timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes))))
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
Shutdownable
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
(cancel-timer (:timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
(.disconnect (:storm-cluster-state supervisor)))
SupervisorDaemon
(get-conf [this]
conf)
(get-id [this]
(:supervisor-id supervisor))
(shutdown-all-workers [this]
(let [ids (my-worker-ids conf)]
(doseq [id ids]
(shutdown-worker supervisor id)
)))
DaemonCommon
(waiting? [this]
(or (not @(:active supervisor))
(and
(timer-waiting? (:timer supervisor))
(every? (memfn waiting?) managers)))
))))
(defn kill-supervisor [supervisor]
(.shutdown supervisor)
)
;; distributed implementation
(defmethod download-storm-code
:distributed [conf storm-id master-code-dir]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) "/" (uuid))
stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. tmproot))
(Utils/downloadFromMaster conf (master-stormjar-path master-code-dir) (supervisor-stormjar-path tmproot))
(Utils/downloadFromMaster conf (master-stormcode-path master-code-dir) (supervisor-stormcode-path tmproot))
(Utils/downloadFromMaster conf (master-stormconf-path master-code-dir) (supervisor-stormconf-path tmproot))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
))
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
stormroot (supervisor-stormdist-root conf storm-id)
stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
"%ID%"
(str port))
logfilename (str "worker-" port ".log")
command (str "java -server " childopts
" -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
" -Dlogfile.name=" logfilename
" -Dstorm.home=" (System/getProperty "storm.home")
" -Dlog4j.configuration=storm.log.properties"
" -cp " classpath " backtype.storm.daemon.worker "
(java.net.URLEncoder/encode storm-id) " " (:supervisor-id supervisor)
" " port " " worker-id)]
(log-message "Launching worker with command: " command)
(launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)})
))
;; local implementation
(defn resources-jar []
(->> (.split (current-classpath) File/pathSeparator)
(filter #(.endsWith % ".jar"))
(filter #(zip-contains-dir? % RESOURCES-SUBDIR))
first ))
(defmethod download-storm-code
:local [conf storm-id master-code-dir]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
(let [classloader (.getContextClassLoader (Thread/currentThread))
resources-jar (resources-jar)
url (.getResource classloader RESOURCES-SUBDIR)
target-dir (str stormroot "/" RESOURCES-SUBDIR)]
(cond
resources-jar
(do
(log-message "Extracting resources from jar at " resources-jar " to " target-dir)
(extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
url
(do
(log-message "Copying resources at " (str url) " to " target-dir)
(FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir))
))
)))
(defmethod launch-worker
:local [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
pid (uuid)
worker (worker/mk-worker conf
(:shared-context supervisor)
storm-id
(:supervisor-id supervisor)
port
worker-id)]
(psim/register-process pid worker)
(swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
))
(defn -launch [supervisor]
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
(mk-supervisor conf nil supervisor)))
(defn standalone-supervisor []
(let [conf-atom (atom nil)
id-atom (atom nil)]
(reify ISupervisor
(prepare [this conf local-dir]
(reset! conf-atom conf)
(let [state (LocalState. local-dir)
curr-id (if-let [id (.get state LS-ID)]
id
(generate-supervisor-id))]
(.put state LS-ID curr-id)
(reset! id-atom curr-id))
)
(confirmAssigned [this port]
true)
(getMetadata [this]
(get @conf-atom SUPERVISOR-SLOTS-PORTS))
(getId [this]
@id-atom)
(killedWorker [this port]
))))
(defn -main []
(-launch (standalone-supervisor)))