blob: ba43a14d80486c40d593b33f61d230f10d1acf65 [file] [log] [blame]
(ns backtype.storm.config
(:import [org.jvyaml YAML])
(:import [java.io FileReader File])
(:import [backtype.storm Config])
(:import [backtype.storm.utils Utils LocalState])
(:import [org.apache.commons.io FileUtils])
(:require [clojure.contrib [str-utils2 :as str]])
(:use [backtype.storm util])
)
(def RESOURCES-SUBDIR "resources")
;; define clojure constants for every configuration parameter
(doseq [f (seq (.getFields Config))]
(let [name (.getName f)
new-name (.replace (.toUpperCase name) "_" "-")]
(eval
`(def ~(symbol new-name) (. Config ~(symbol name))))
))
(defn cluster-mode [conf & args]
(keyword (conf STORM-CLUSTER-MODE)))
(defn local-mode? [conf]
(let [mode (conf STORM-CLUSTER-MODE)]
(condp = mode
"local" true
"distributed" false
(throw (IllegalArgumentException.
(str "Illegal cluster mode in conf: " mode)))
)))
(defn sampling-rate [conf]
(->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
(/ 1)
int))
(defn mk-stats-sampler [conf]
(even-sampler (sampling-rate conf)))
; storm.zookeeper.servers:
; - "server1"
; - "server2"
; - "server3"
; nimbus.host: "master"
;
; ########### These all have default values as shown
;
; ### storm.* configs are general configurations
; # the local dir is where jars are kept
; storm.local.dir: "/mnt/storm"
; storm.zookeeper.port: 2181
; storm.zookeeper.root: "/storm"
(defn mk-zk-connect-string [conf]
(let [servers (conf STORM-ZOOKEEPER-SERVERS)
port (conf STORM-ZOOKEEPER-PORT)
root (conf STORM-ZOOKEEPER-ROOT)]
(str
(str/join ","
(for [s servers]
(str s ":" port)))
root)
))
(defn read-default-config []
(clojurify-structure (Utils/readDefaultConfig)))
(defn read-storm-config []
(clojurify-structure (Utils/readStormConfig)))
(defn read-yaml-config [name]
(clojurify-structure (Utils/findAndReadConfigFile name true)))
(defn master-stormdist-root [conf storm-id]
(str (conf STORM-LOCAL-DIR) "/stormdist/" storm-id))
(defn master-stormjar-path [stormroot]
(str stormroot "/stormjar.jar"))
(defn master-stormcode-path [stormroot]
(str stormroot "/stormcode.ser"))
(defn master-stormconf-path [stormroot]
(str stormroot "/stormconf.ser"))
(defn master-inbox [conf]
(let [ret (str (conf STORM-LOCAL-DIR) "/inbox")]
(FileUtils/forceMkdir (File. ret))
ret ))
(defn supervisor-stormdist-root
([conf] (str (conf STORM-LOCAL-DIR) "/stormdist"))
([conf storm-id]
(str (supervisor-stormdist-root conf) "/" storm-id)))
(defn supervisor-stormjar-path [stormroot]
(str stormroot "/stormjar.jar"))
(defn supervisor-stormcode-path [stormroot]
(str stormroot "/stormcode.ser"))
(defn supervisor-stormconf-path [stormroot]
(str stormroot "/stormconf.ser"))
(defn supervisor-tmp-dir [conf]
(let [ret (str (conf STORM-LOCAL-DIR) "/tmp")]
(FileUtils/forceMkdir (File. ret))
ret ))
(defn supervisor-storm-resources-path [stormroot]
(str stormroot "/" RESOURCES-SUBDIR))
(defn ^LocalState supervisor-state [conf]
(LocalState. (str (conf STORM-LOCAL-DIR) "/localstate")))
(defn worker-root
([conf]
(str (conf STORM-LOCAL-DIR) "/workers"))
([conf id]
(str (worker-root conf) "/" id)))
(defn worker-pids-root
[conf id]
(str (worker-root conf id) "/pids"))
(defn worker-pid-path [conf id pid]
(str (worker-pids-root conf id) "/" pid))
(defn worker-heartbeats-root
[conf id]
(str (worker-root conf id) "/heartbeats"))
;; workers heartbeat here with pid and timestamp
;; if supervisor stops receiving heartbeat, it kills and restarts the process
;; in local mode, keep a global map of ids to threads for simulating process management
(defn ^LocalState worker-state [conf id]
(LocalState. (worker-heartbeats-root conf id)))