blob: 06de2b92a49561fbefbb82ce2343ab0cf4d82c11 [file] [log] [blame]
(ns backtype.storm.daemon.common
(:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
)
(def ACKER-COMPONENT-ID -1)
(def ACKER-INIT-STREAM-ID -1)
(def ACKER-ACK-STREAM-ID -2)
(def ACKER-FAIL-STREAM-ID -3)
(defn system-component? [id]
(< id 0))
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
;; this avoid situation where node goes down and task doesn't know what to do information-wise
(defrecord Assignment [master-code-dir node->host task->node+port task->start-time-secs])
(defrecord StormBase [storm-name launch-time-secs])
(defrecord SupervisorInfo [time-secs hostname worker-ports uptime-secs])
(defrecord TaskInfo [component-id])
(defprotocol DaemonCommon
(waiting? [this]))
(def LS-WORKER-HEARTBEAT "worker-heartbeat")
;; LocalState constants
(def LS-ID "supervisor-id")
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")
(defrecord WorkerHeartbeat [time-secs storm-id task-ids port])
;; should include stats in here
;; TODO: want to know how many it has processed from every source
;; component/stream pair
;; TODO: want to know how many it has emitted to every stream
(defrecord TaskStats [^long processed
^long acked
^long emitted
^long transferred
^long failed])
(defrecord TaskHeartbeat [time-secs uptime-secs stats])
(defn new-task-stats []
(TaskStats. 0 0 0 0 0))
;technically this is only active task ids
(defn storm-task-ids [storm-cluster-state storm-id]
(keys (:task->node+port (.assignment-info storm-cluster-state storm-id nil))))
(defn storm-task-info
"Returns map from task -> component id"
[storm-cluster-state storm-id]
(let [task-ids (.task-ids storm-cluster-state storm-id)]
(into {}
(dofor [id task-ids]
[id (:component-id (.task-info storm-cluster-state storm-id id))]
))))
(defn get-storm-id [storm-cluster-state storm-name]
(let [active-storms (.active-storms storm-cluster-state)]
(find-first
#(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
active-storms)
))
(defn topology-bases [storm-cluster-state]
(let [active-topologies (.active-storms storm-cluster-state)]
(into {}
(dofor [id active-topologies]
[id (.storm-base storm-cluster-state id nil)]
))
))
(defn validate-distributed-mode! [conf]
(if (local-mode? conf)
(throw
(IllegalArgumentException. "Cannot start server in local mode!"))))
(defmacro defserverfn [name & body]
`(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
(try
(apply exec-fn# args#)
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(halt-process! 13 "Error on initialization")
)))))