blob: 2a7613d2ef25d6ca51b0d63eddd686bad8423f4d [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
(:import [java.io OutputStreamWriter BufferedWriter IOException])
(:import [backtype.storm.scheduler ISupervisor]
[backtype.storm.utils LocalState Time Utils]
[backtype.storm.daemon Shutdownable]
[backtype.storm Constants]
[java.net JarURLConnection]
[java.net URI]
[org.apache.commons.io FileUtils]
[java.io File])
(:use [backtype.storm config util log timer local-state])
(:import [backtype.storm.utils VersionInfo])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]]
[backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
[clojure.set :as set])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
(:gen-class
:methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))
(defmulti download-storm-code cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
(defmulti mk-code-distributor cluster-mode)
(defprotocol SupervisorDaemon
(get-id [this])
(get-conf [this])
(shutdown-all-workers [this])
)
(defn- assignments-snapshot [storm-cluster-state callback assignment-versions]
(let [storm-ids (.assignments storm-cluster-state callback)]
(let [new-assignments
(->>
(dofor [sid storm-ids]
(let [recorded-version (:version (get assignment-versions sid))]
(if-let [assignment-version (.assignment-version storm-cluster-state sid callback)]
(if (= assignment-version recorded-version)
{sid (get assignment-versions sid)}
{sid (.assignment-info-with-version storm-cluster-state sid callback)})
{sid nil})))
(apply merge)
(filter-val not-nil?))]
{:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
:versions new-assignments})))
(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (get assignments-snapshot storm-id)
my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
(:executor->node+port assignment))
port-executors (apply merge-with
concat
(for [[executor [_ port]] my-executors]
{port [executor]}
))]
(into {} (for [[port executors] port-executors]
;; need to cast to int b/c it might be a long (due to how yaml parses things)
;; doall is to avoid serialization/deserialization problems with lazy seqs
[(Integer. port) (mk-local-assignment storm-id (doall executors))]
))))
(defn- read-assignments
"Returns map from port to struct containing :storm-id and :executors"
([assignments-snapshot assignment-id]
(->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
(apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
([assignments-snapshot assignment-id existing-assignment retries]
(try (let [assignments (read-assignments assignments-snapshot assignment-id)]
(reset! retries 0)
assignments)
(catch RuntimeException e
(if (> @retries 2) (throw e) (swap! retries inc))
(log-warn (.getMessage e) ": retrying " @retries " of 3")
existing-assignment))))
(defn- read-storm-code-locations
[assignments-snapshot]
(map-val :master-code-dir assignments-snapshot))
(defn- read-downloaded-storm-ids [conf]
(map #(url-decode %) (read-dir-contents (supervisor-stormdist-root conf)))
)
(defn read-worker-heartbeat [conf id]
(let [local-state (worker-state conf id)]
(try
(ls-worker-heartbeat local-state)
(catch Exception e
(log-warn e "Failed to read local heartbeat for workerId : " id ",Ignoring exception.")
nil))))
(defn my-worker-ids [conf]
(read-dir-contents (worker-root conf)))
(defn read-worker-heartbeats
"Returns map from worker id to heartbeat"
[conf]
(let [ids (my-worker-ids conf)]
(into {}
(dofor [id ids]
[id (read-worker-heartbeat conf id)]))
))
(defn matches-an-assignment? [worker-heartbeat assigned-executors]
(let [local-assignment (assigned-executors (:port worker-heartbeat))]
(and local-assignment
(= (:storm-id worker-heartbeat) (:storm-id local-assignment))
(= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
(set (:executors local-assignment))))))
(let [dead-workers (atom #{})]
(defn get-dead-workers []
@dead-workers)
(defn add-dead-worker [worker]
(swap! dead-workers conj worker))
(defn remove-dead-worker [worker]
(swap! dead-workers disj worker)))
(defn is-worker-hb-timed-out? [now hb conf]
(> (- now (:time-secs hb))
(conf SUPERVISOR-WORKER-TIMEOUT-SECS)))
(defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
[supervisor assigned-executors now]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
approved-ids (set (keys (ls-approved-workers local-state)))]
(into
{}
(dofor [[id hb] id->heartbeat]
(let [state (cond
(not hb)
:not-started
(or (not (contains? approved-ids id))
(not (matches-an-assignment? hb assigned-executors)))
:disallowed
(or
(when (get (get-dead-workers) id)
(log-message "Worker Process " id " has died!")
true)
(is-worker-hb-timed-out? now hb conf))
:timed-out
true
:valid)]
(log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
[id [state hb]]
))
)))
(defn- wait-for-worker-launch [conf id start-time]
(let [state (worker-state conf id)]
(loop []
(let [hb (ls-worker-heartbeat state)]
(when (and
(not hb)
(<
(- (current-time-secs) start-time)
(conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
))
(log-message id " still hasn't started")
(Time/sleep 500)
(recur)
)))
(when-not (ls-worker-heartbeat state)
(log-message "Worker " id " failed to start")
)))
(defn- wait-for-workers-launch [conf ids]
(let [start-time (current-time-secs)]
(doseq [id ids]
(wait-for-worker-launch conf id start-time))
))
(defn generate-supervisor-id []
(uuid))
(defnk worker-launcher [conf user args :environment {} :log-prefix nil :exit-code-callback nil]
(let [_ (when (clojure.string/blank? user)
(throw (java.lang.IllegalArgumentException.
"User cannot be blank when calling worker-launcher.")))
wl-initial (conf SUPERVISOR-WORKER-LAUNCHER)
storm-home (System/getProperty "storm.home")
wl (if wl-initial wl-initial (str storm-home "/bin/worker-launcher"))
command (concat [wl user] args)]
(log-message "Running as user:" user " command:" (pr-str command))
(launch-process command :environment environment :log-prefix log-prefix :exit-code-callback exit-code-callback)
))
(defnk worker-launcher-and-wait [conf user args :environment {} :log-prefix nil]
(let [process (worker-launcher conf user args :environment environment)]
(if log-prefix
(read-and-log-stream log-prefix (.getInputStream process)))
(try
(.waitFor process)
(catch InterruptedException e
(log-message log-prefix " interrupted.")))
(.exitValue process)))
(defn- rmr-as-user
"Launches a process owned by the given user that deletes the given path
recursively. Throws RuntimeException if the directory is not removed."
[conf id user path]
(worker-launcher-and-wait conf
user
["rmr" path]
:log-prefix (str "rmr " id))
(if (exists-file? path)
(throw (RuntimeException. (str path " was not deleted")))))
(defn try-cleanup-worker [conf id user]
(try
(if (.exists (File. (worker-root conf id)))
(do
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(rmr-as-user conf id user (worker-root conf id))
(do
(rmr (worker-heartbeats-root conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
(rmpath (worker-pids-root conf id))
(rmpath (worker-root conf id))))
(remove-worker-user! conf id)
(remove-dead-worker id)
))
(catch IOException e
(log-warn-error e "Failed to cleanup worker " id ". Will retry later"))
(catch RuntimeException e
(log-warn-error e "Failed to cleanup worker " id ". Will retry later")
)
(catch java.io.FileNotFoundException e (log-message (.getMessage e)))
))
(defn shutdown-worker [supervisor id]
(log-message "Shutting down " (:supervisor-id supervisor) ":" id)
(let [conf (:conf supervisor)
pids (read-dir-contents (worker-pids-root conf id))
thread-pid (@(:worker-thread-pids-atom supervisor) id)
shutdown-sleep-secs (conf SUPERVISOR-WORKER-SHUTDOWN-SLEEP-SECS)
as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
user (get-worker-user conf id)]
(when thread-pid
(psim/kill-process thread-pid))
(doseq [pid pids]
(if as-user
(worker-launcher-and-wait conf user ["signal" pid "15"] :log-prefix (str "kill -15 " pid))
(kill-process-with-sig-term pid)))
(when-not (empty? pids)
(log-message "Sleep " shutdown-sleep-secs " seconds for execution of cleanup threads on worker.")
(sleep-secs shutdown-sleep-secs))
(doseq [pid pids]
(if as-user
(worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
(force-kill-process pid))
(if as-user
(rmr-as-user conf id user (worker-pid-path conf id pid))
(try
(rmpath (worker-pid-path conf id pid))
(catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
(try-cleanup-worker conf id user))
(log-message "Shut down " (:supervisor-id supervisor) ":" id))
(def SUPERVISOR-ZK-ACLS
[(first ZooDefs$Ids/CREATOR_ALL_ACL)
(ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
(defn supervisor-data [conf shared-context ^ISupervisor isupervisor]
{:conf conf
:shared-context shared-context
:isupervisor isupervisor
:active (atom true)
:uptime (uptime-computer)
:version (str (VersionInfo/getVersion))
:worker-thread-pids-atom (atom {})
:storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
(Utils/isZkAuthenticationConfiguredStormServer
conf)
SUPERVISOR-ZK-ACLS))
:local-state (supervisor-state conf)
:supervisor-id (.getSupervisorId isupervisor)
:assignment-id (.getAssignmentId isupervisor)
:my-hostname (hostname conf)
:curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
:heartbeat-timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")
))
:event-timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")
))
:assignment-versions (atom {})
:sync-retry (atom 0)
:code-distributor (mk-code-distributor conf)
:download-lock (Object.)
})
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
download-lock (:download-lock supervisor)
^LocalState local-state (:local-state supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
assigned-executors (defaulted (ls-local-assignments local-state) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
keepers (filter-val
(fn [[state _]] (= state :valid))
allocated)
keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
new-worker-ids (into
{}
(for [port (keys reassign-executors)]
[port (uuid)]))
]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; - read pids, kill -9 and individually remove file
;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
;; 3. of the rest, figure out what assignments aren't yet satisfied
;; 4. generate new worker ids, write new "approved workers" to LS
;; 5. create local dir for worker id
;; 5. launch new workers (give worker-id, port, and supervisor-id)
;; 6. wait for workers launch
(log-debug "Syncing processes")
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Allocated: " allocated)
(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)
(if (:code-distributor supervisor)
(.cleanup (:code-distributor supervisor) id))
))
(doseq [id (vals new-worker-ids)]
(local-mkdirs (worker-pids-root conf id))
(local-mkdirs (worker-heartbeats-root conf id)))
(ls-approved-workers! local-state
(merge
(select-keys (ls-approved-workers local-state)
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
))
;; check storm topology code dir exists before launching workers
(doseq [[port assignment] reassign-executors]
(let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
storm-id (:storm-id assignment)
cached-assignment-info @(:assignment-versions supervisor)
assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
(get cached-assignment-info storm-id)
(.assignment-info-with-version storm-cluster-state storm-id nil))
storm-code-map (read-storm-code-locations assignment-info)
master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
stormroot (supervisor-stormdist-root conf storm-id)]
(if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
(download-storm-code conf storm-id master-code-dir supervisor download-lock))
))
(wait-for-workers-launch
conf
(dofor [[port assignment] reassign-executors]
(let [id (new-worker-ids port)]
(try
(log-message "Launching worker with assignment "
(pr-str assignment)
" for this supervisor "
(:supervisor-id supervisor)
" on port "
port
" with id "
id
)
(launch-worker supervisor
(:storm-id assignment)
port
id)
(catch java.io.FileNotFoundException e
(log-message "Unable to launch worker due to "
(.getMessage e)))
(catch java.io.IOException e
(log-message "Unable to launch worker due to "
(.getMessage e))))
id)))
))
(defn assigned-storm-ids-from-port-assignments [assignment]
(->> assignment
vals
(map :storm-id)
set))
(defn shutdown-disallowed-workers [supervisor]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
assigned-executors (defaulted (ls-local-assignments local-state) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
disallowed (keys (filter-val
(fn [[state _]] (= state :disallowed))
allocated))]
(log-debug "Allocated workers " allocated)
(log-debug "Disallowed workers " disallowed)
(doseq [id disallowed]
(shutdown-worker supervisor id))
))
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
download-lock (:download-lock supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
assignment-versions @(:assignment-versions supervisor)
{assignments-snapshot :assignments versions :versions} (assignments-snapshot
storm-cluster-state sync-callback
assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
existing-assignment (ls-local-assignments local-state)
all-assignment (read-assignments assignments-snapshot
(:assignment-id supervisor)
existing-assignment
(:sync-retry supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment)
;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
;; should we only download when topology is assigned to this supervisor?
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
(download-storm-code conf storm-id master-code-dir supervisor download-lock)))
(log-debug "Writing new assignment "
(pr-str new-assignment))
(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
(.assigned isupervisor (keys new-assignment))
(ls-local-assignments! local-state
new-assignment)
(reset! (:assignment-versions supervisor) versions)
(reset! (:curr-assignment supervisor) new-assignment)
;; remove any downloaded code that's no longer assigned or active
;; important that this happens after setting the local assignment so that
;; synchronize-supervisor doesn't try to launch workers for which the
;; resources don't exist
(if on-windows? (shutdown-disallowed-workers supervisor))
(doseq [storm-id downloaded-storm-ids]
(when-not (storm-code-map storm-id)
(log-message "Removing code for storm id "
storm-id)
(try
(rmr (supervisor-stormdist-root conf storm-id))
(catch Exception e (log-message (.getMessage e))))
))
(.add processes-event-manager sync-processes)
)))
;; in local state, supervisor stores who its current assignments are
;; another thread launches events to restart any dead processes if necessary
(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
(log-message "Starting Supervisor with conf " conf)
(.prepare isupervisor conf (supervisor-isupervisor-dir conf))
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf)))
(let [supervisor (supervisor-data conf shared-context isupervisor)
[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
heartbeat-fn (fn [] (.supervisor-heartbeat!
(:storm-cluster-state supervisor)
(:supervisor-id supervisor)
(->SupervisorInfo (current-time-secs)
(:my-hostname supervisor)
(:assignment-id supervisor)
(keys @(:curr-assignment supervisor))
;; used ports
(.getMetadata isupervisor)
(conf SUPERVISOR-SCHEDULER-META)
((:uptime supervisor))
(:version supervisor))))]
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
(schedule-recurring (:heartbeat-timer supervisor)
0
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
(schedule-recurring (:event-timer supervisor) 0 10 (fn [] (.add event-manager synchronize-supervisor)))
(schedule-recurring (:event-timer supervisor)
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes))))
(log-message "Starting supervisor with id " (:supervisor-id supervisor) " at host " (:my-hostname supervisor))
(reify
Shutdownable
(shutdown [this]
(log-message "Shutting down supervisor " (:supervisor-id supervisor))
(reset! (:active supervisor) false)
(cancel-timer (:heartbeat-timer supervisor))
(cancel-timer (:event-timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
(.disconnect (:storm-cluster-state supervisor)))
SupervisorDaemon
(get-conf [this]
conf)
(get-id [this]
(:supervisor-id supervisor))
(shutdown-all-workers [this]
(let [ids (my-worker-ids conf)]
(doseq [id ids]
(shutdown-worker supervisor id)
)))
DaemonCommon
(waiting? [this]
(or (not @(:active supervisor))
(and
(timer-waiting? (:heartbeat-timer supervisor))
(timer-waiting? (:event-timer supervisor))
(every? (memfn waiting?) managers)))
))))
(defn kill-supervisor [supervisor]
(.shutdown supervisor)
)
(defn setup-storm-code-dir [conf storm-conf dir]
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir))))
;; distributed implementation
(defmethod download-storm-code
:distributed [conf storm-id master-code-dir supervisor download-lock]
;; Downloading to permanent location is atomic
(let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
stormroot (supervisor-stormdist-root conf storm-id)
master-meta-file-path (master-storm-metafile-path master-code-dir)
supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
(locking download-lock
(log-message "Downloading code for storm id " storm-id " from " master-code-dir)
(FileUtils/forceMkdir (File. tmproot))
(Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path)
(if (:code-distributor supervisor)
(.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path)))
(extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
(if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot)))
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
(setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
(log-message "Finished downloading code for storm id " storm-id " from " master-code-dir))))
(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
(let [file (get-log-metadata-file storm-id port)]
;;run worker as user needs the directory to have special permissions
;; or it is insecure
(when (not (.exists (.getParentFile file)))
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(do (FileUtils/forceMkdir (.getParentFile file))
(setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) (.getCanonicalPath (.getParentFile file))))
(.mkdirs (.getParentFile file))))
(let [writer (java.io.FileWriter. file)
yaml (Yaml.)]
(try
(.dump yaml data writer)
(finally
(.close writer))))))
(defn write-log-metadata! [storm-conf user worker-id storm-id port conf]
(let [data {TOPOLOGY-SUBMITTER-USER user
"worker-id" worker-id
LOGS-GROUPS (sort (distinct (remove nil?
(concat
(storm-conf LOGS-GROUPS)
(storm-conf TOPOLOGY-GROUPS)))))
LOGS-USERS (sort (distinct (remove nil?
(concat
(storm-conf LOGS-USERS)
(storm-conf TOPOLOGY-USERS)))))}]
(write-log-metadata-to-yaml-file! storm-id port data conf)))
(defmethod mk-code-distributor :distributed [conf]
(let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
(.prepare code-distributor conf)
code-distributor))
(defn jlp [stormroot conf]
(let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
arch (System/getProperty "os.arch")
arch-resource-root (str resource-root File/separator os "-" arch)]
(str arch-resource-root File/pathSeparator resource-root File/pathSeparator (conf JAVA-LIBRARY-PATH))))
(defn substitute-childopts
"Generates runtime childopts by replacing keys with topology-id, worker-id, port"
[value worker-id topology-id port]
(let [replacement-map {"%ID%" (str port)
"%WORKER-ID%" (str worker-id)
"%TOPOLOGY-ID%" (str topology-id)
"%WORKER-PORT%" (str port)}
sub-fn #(reduce (fn [string entry]
(apply clojure.string/replace string entry))
%
replacement-map)]
(cond
(nil? value) nil
(list? value) (map sub-fn value)
:else (-> value sub-fn (clojure.string/split #"\s+")))))
(defn java-cmd []
(let [java-home (.get (System/getenv) "JAVA_HOME")]
(if (nil? java-home)
"java"
(str java-home file-path-separator "bin" file-path-separator "java")
)))
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
run-worker-as-user (conf SUPERVISOR-RUN-WORKER-AS-USER)
storm-home (System/getProperty "storm.home")
storm-options (System/getProperty "storm.options")
storm-conf-file (System/getProperty "storm.conf.file")
storm-log-dir (or (System/getProperty "storm.log.dir") (str storm-home file-path-separator "logs"))
storm-log-conf-dir (conf STORM-LOG4J2-CONF-DIR)
storm-log4j2-conf-dir (or storm-log-conf-dir (str storm-home file-path-separator "log4j2"))
stormroot (supervisor-stormdist-root conf storm-id)
jlp (jlp stormroot conf)
stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id)
topo-classpath (if-let [cp (storm-conf TOPOLOGY-CLASSPATH)]
[cp]
[])
classpath (-> (worker-classpath)
(add-to-classpath [stormjar])
(add-to-classpath topo-classpath))
top-gc-opts (storm-conf TOPOLOGY-WORKER-GC-CHILDOPTS)
gc-opts (substitute-childopts (if top-gc-opts top-gc-opts (conf WORKER-GC-CHILDOPTS)) worker-id storm-id port)
topo-worker-logwriter-childopts (storm-conf TOPOLOGY-WORKER-LOGWRITER-CHILDOPTS)
user (storm-conf TOPOLOGY-SUBMITTER-USER)
logging-sensitivity (storm-conf TOPOLOGY-LOGGING-SENSITIVITY "S3")
logfilename (logs-filename storm-id port)
worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
(substitute-childopts s worker-id storm-id port))
topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
(substitute-childopts s worker-id storm-id port))
topology-worker-environment (if-let [env (storm-conf TOPOLOGY-ENVIRONMENT)]
(merge env {"LD_LIBRARY_PATH" jlp})
{"LD_LIBRARY_PATH" jlp})
command (concat
[(java-cmd) "-cp" classpath
topo-worker-logwriter-childopts
(str "-Dlogfile.name=" logfilename)
(str "-Dstorm.home=" storm-home)
(str "-Dstorm.id=" storm-id)
(str "-Dworker.id=" worker-id)
(str "-Dworker.port=" port)
(str "-Dstorm.log.dir=" storm-log-dir)
(str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml")
"backtype.storm.LogWriter"]
[(java-cmd) "-server"]
worker-childopts
topo-worker-childopts
gc-opts
[(str "-Djava.library.path=" jlp)
(str "-Dlogfile.name=" logfilename)
(str "-Dstorm.home=" storm-home)
(str "-Dstorm.conf.file=" storm-conf-file)
(str "-Dstorm.options=" storm-options)
(str "-Dstorm.log.dir=" storm-log-dir)
(str "-Dlogging.sensitivity=" logging-sensitivity)
(str "-Dlog4j.configurationFile=" storm-log4j2-conf-dir file-path-separator "worker.xml")
(str "-Dstorm.id=" storm-id)
(str "-Dworker.id=" worker-id)
(str "-Dworker.port=" port)
"-cp" classpath
"backtype.storm.daemon.worker"
storm-id
(:assignment-id supervisor)
port
worker-id])
command (->> command (map str) (filter (complement empty?)))]
(log-message "Launching worker with command: " (shell-cmd command))
(write-log-metadata! storm-conf user worker-id storm-id port conf)
(set-worker-user! conf worker-id user)
(let [log-prefix (str "Worker Process " worker-id)
callback (fn [exit-code]
(log-message log-prefix " exited with code: " exit-code)
(add-dead-worker worker-id))]
(remove-dead-worker worker-id)
(if run-worker-as-user
(let [worker-dir (worker-root conf worker-id)]
(worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback))
(launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback)
))))
;; local implementation
(defn resources-jar []
(->> (.split (current-classpath) File/pathSeparator)
(filter #(.endsWith % ".jar"))
(filter #(zip-contains-dir? % RESOURCES-SUBDIR))
first ))
(defmethod download-storm-code
:local [conf storm-id master-code-dir supervisor download-lock]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(locking download-lock
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
(let [classloader (.getContextClassLoader (Thread/currentThread))
resources-jar (resources-jar)
url (.getResource classloader RESOURCES-SUBDIR)
target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
(cond
resources-jar
(do
(log-message "Extracting resources from jar at " resources-jar " to " target-dir)
(extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
url
(do
(log-message "Copying resources at " (URI. (str url)) " to " target-dir)
(if (= (.getProtocol url) "jar" )
(extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
(FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
)
)
)
)))
(defmethod mk-code-distributor :local [conf] nil)
(defmethod launch-worker
:local [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
pid (uuid)
worker (worker/mk-worker conf
(:shared-context supervisor)
storm-id
(:assignment-id supervisor)
port
worker-id)]
(set-worker-user! conf worker-id "")
(psim/register-process pid worker)
(swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
))
(defn -launch [supervisor]
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
(let [supervisor (mk-supervisor conf nil supervisor)]
(add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown supervisor)))))
(defn standalone-supervisor []
(let [conf-atom (atom nil)
id-atom (atom nil)]
(reify ISupervisor
(prepare [this conf local-dir]
(reset! conf-atom conf)
(let [state (LocalState. local-dir)
curr-id (if-let [id (ls-supervisor-id state)]
id
(generate-supervisor-id))]
(ls-supervisor-id! state curr-id)
(reset! id-atom curr-id))
)
(confirmAssigned [this port]
true)
(getMetadata [this]
(doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS))))
(getSupervisorId [this]
@id-atom)
(getAssignmentId [this]
@id-atom)
(killedWorker [this port]
)
(assigned [this ports]
))))
(defn -main []
(setup-default-uncaught-exception-handler)
(-launch (standalone-supervisor)))