(ns backtype.storm.daemon.common
(:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm log config util])
(defn system-component? [id]
(< id 0))
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
;; this avoid situation where node goes down and task doesn't know what to do information-wise
(defrecord Assignment [master-code-dir node->host task->node+port task->start-time-secs])
(defrecord StormBase [storm-name launch-time-secs])
(defrecord SupervisorInfo [time-secs hostname worker-ports uptime-secs])
(defrecord TaskInfo [component-id])
(defprotocol DaemonCommon
(waiting? [this]))
(def LS-WORKER-HEARTBEAT "worker-heartbeat")
;; LocalState constants
(def LS-ID "supervisor-id")
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")
(defrecord WorkerHeartbeat [time-secs storm-id task-ids port])
;; should include stats in here
;; TODO: want to know how many it has processed from every source
;; component/stream pair
;; TODO: want to know how many it has emitted to every stream
(defrecord TaskStats [^long processed
^long acked
^long emitted
^long transferred
^long failed])
(defrecord TaskHeartbeat [time-secs uptime-secs stats])
(defn new-task-stats []
(TaskStats. 0 0 0 0 0))
;technically this is only active task ids
(defn storm-task-ids [storm-cluster-state storm-id]
(keys (:task->node+port (.assignment-info storm-cluster-state storm-id nil))))
(defn storm-task-info
"Returns map from task -> component id"
[storm-cluster-state storm-id]
(let [task-ids (.task-ids storm-cluster-state storm-id)]
(into {}
(dofor [id task-ids]
[id (:component-id (.task-info storm-cluster-state storm-id id))]
(defn get-storm-id [storm-cluster-state storm-name]
(let [active-storms (.active-storms storm-cluster-state)]
#(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
(defn topology-bases [storm-cluster-state]
(let [active-topologies (.active-storms storm-cluster-state)]
(into {}
(dofor [id active-topologies]
[id (.storm-base storm-cluster-state id nil)]
(defn validate-distributed-mode! [conf]
(if (local-mode? conf)
(IllegalArgumentException. "Cannot start server in local mode!"))))
(defmacro defserverfn [name & body]
`(let [exec-fn# (fn ~@body)]
(defn ~name [& args#]
(apply exec-fn# args#)
(catch Throwable t#
(log-error t# "Error on initialization of server " ~(str name))
(halt-process! 13 "Error on initialization")