(ns backtype.storm.daemon.worker
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap])
(:import [java.util.concurrent LinkedBlockingQueue])
(:require [backtype.storm.daemon [task :as task]])
(defmulti virtual-port-url cluster-mode)
(defmulti connect-url cluster-mode)
(defn read-worker-task-ids [storm-cluster-state storm-id supervisor-id port]
(let [assignment (:task->node+port (.assignment-info storm-cluster-state storm-id nil))]
(mapcat (fn [[task-id loc]]
(if (= loc [supervisor-id port])
(defn- read-storm-cache [conf storm-id]
(let [stormroot (supervisor-stormdist-root conf storm-id)
conf-path (supervisor-stormconf-path stormroot)
topology-path (supervisor-stormcode-path stormroot)]
[(merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
(Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))]
(defn do-heartbeat [conf worker-id port storm-id task-ids]
(.put (worker-state conf worker-id)
(defn worker-outbound-tasks
"Returns seq of task-ids that receive messages from this worker"
;; if this is an acker, needs to talk to the spouts
[task->component mk-topology-context task-ids]
(let [topology-context (mk-topology-context (first task-ids))
spout-components (-> topology-context
contains-acker? (some? (fn [tid]
(.getComponentId topology-context tid)))
components (concat
(if contains-acker? spout-components)
(fn [task-id]
(let [context (mk-topology-context task-id)]
(->> (.getThisTargets context)
(map keys)
(apply concat))
(apply concat
;; fix this
(-> (reverse-map task->component) (select-keys components) vals)))
;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
;; what about if there's inconsistency in assignments? -> but nimbus
;; should guarantee this consistency
;; TODO: consider doing worker heartbeating rather than task heartbeating to reduce the load on zookeeper
(defserverfn mk-worker [conf storm-id supervisor-id port worker-id]
(log-message "Launching worker for " storm-id " on " supervisor-id ":" port " with id " worker-id)
(let [active (atom true)
storm-active-atom (atom false)
cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port)
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
_ (when (= :distributed (cluster-mode conf))
(touch (worker-pid-path conf worker-id (process-pid))))
heartbeat-fn #(do-heartbeat conf worker-id port storm-id task-ids)
;; do this here so that the worker process dies if this fails
;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
_ (heartbeat-fn)
[storm-conf topology] (read-storm-cache conf storm-id)
event-manager (event/event-manager true)
task->component (storm-task-info storm-cluster-state storm-id)
mk-topology-context #(TopologyContext. topology
(supervisor-stormdist-root conf storm-id))
(worker-pids-root conf worker-id)
zmq-context (mq/context (storm-conf ZMQ-THREADS))
outbound-tasks (worker-outbound-tasks task->component mk-topology-context task-ids)
endpoint-socket-lock (mk-rw-lock)
node+port->socket (atom {})
task->node+port (atom {})
transfer-queue (LinkedBlockingQueue.) ; possibly bound the size of it
transfer-fn (fn [task ^Tuple tuple]
(.put transfer-queue [task tuple])
refresh-connections (fn this
(this (fn [& ignored] (.add event-manager this))))
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
needed-connections (set (vals my-assignment))
current-connections (set (keys @node+port->socket))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
(swap! node+port->socket
(into {}
(dofor [[node port :as endpoint] new-connections]
(-> zmq-context
(mq/socket mq/push)
(mq/set-linger (storm-conf ZMQ-LINGER-MILLIS))
(connect-url conf
((:node->host assignment) node)
(write-locked endpoint-socket-lock
(reset! task->node+port my-assignment))
(doseq [endpoint remove-connections]
(.close (@node+port->socket endpoint)))
(apply swap!
refresh-storm-active (fn this
(this (fn [& ignored] (.add event-manager this))))
(not-nil? (.storm-base storm-cluster-state storm-id callback)))
_ (refresh-connections nil)
_ (refresh-storm-active nil)
heartbeat-thread (async-loop
(fn []
;; this @active check handles the case where it's started after shutdown* joins to the thread
;; if the thread is started after the join, then @active must be false. So there's no risk
;; of writing heartbeat after it's been shut down.
(when @active (heartbeat-fn) (conf WORKER-HEARTBEAT-FREQUENCY-SECS))
:priority Thread/MAX_PRIORITY)
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) storm-id zmq-context cluster-state storm-active-atom transfer-fn))
threads [(async-loop
(fn []
(.add event-manager refresh-connections)
(.add event-manager refresh-storm-active)
(when @active (storm-conf TASK-REFRESH-POLL-SECS))
(fn [^ArrayList drainer ^TupleSerializer serializer]
(let [felem (.take transfer-queue)]
(.add drainer felem)
(.drainTo transfer-queue drainer))
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(doseq [[task ^Tuple tuple] drainer]
(let [socket (node+port->socket (task->node+port task))
ser-tuple (.serialize serializer tuple)]
(mqvp/virtual-send socket task ser-tuple)
(.clear drainer)
0 )
:args-fn (fn [] [(ArrayList.) (TupleSerializer. storm-conf)]))
_ (log-message "Launching virtual port for " supervisor-id ":" port)
virtual-port-shutdown (mqvp/launch-virtual-port! zmq-context
(virtual-port-url conf port)
:kill-fn (fn [] (halt-process! 11))
:valid-ports task-ids)
_ (log-message "Launched virtual port for " supervisor-id ":" port)
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)
(reset! active false)
(doseq [task tasks] (.shutdown task))
(doseq [[_ socket] @node+port->socket]
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
(log-message "Terminating zmq context")
(.term zmq-context)
(log-message "Disconnecting from storm cluster state context")
(log-message "Waiting for heartbeat thread to die")
(doseq [t threads]
(.interrupt t)
(.join t))
(.shutdown event-manager)
(.disconnect storm-cluster-state)
(.close cluster-state)
(log-message "Shut down worker " storm-id " " supervisor-id " " port))
ret (reify
(waiting? [this]
(.waiting? event-manager)
(every? (memfn waiting?) tasks)
(.sleeping? heartbeat-thread)))
(log-message "Worker " worker-id " for storm " storm-id " on " supervisor-id ":" port " has finished loading")
(defmethod virtual-port-url :local [conf port]
(str "ipc://" port ".ipc"))
(defmethod virtual-port-url :distributed [conf port]
(str "tcp://*:" port))
(defmethod connect-url :local [conf host port]
(str "ipc://" port ".ipc"))
(defmethod connect-url :distributed [conf host port]
(str "tcp://" host ":" port))
(defn -main [storm-id supervisor-id port-str worker-id]
(let [conf (read-storm-config)]
(validate-distributed-mode! conf)
(mk-worker conf storm-id supervisor-id (Integer/parseInt port-str) worker-id)))