| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| |
| (ns backtype.storm.config |
| (:import [java.io FileReader File IOException] |
| [backtype.storm.generated StormTopology]) |
| (:import [backtype.storm Config ConfigValidation$FieldValidator]) |
| (:import [backtype.storm.utils Utils LocalState]) |
| (:import [org.apache.commons.io FileUtils]) |
| (:require [clojure [string :as str]]) |
| (:use [backtype.storm log util])) |
| |
| (def RESOURCES-SUBDIR "resources") |
| |
| (defn- clojure-config-name [name] |
| (.replace (.toUpperCase name) "_" "-")) |
| |
| ;; define clojure constants for every configuration parameter |
| (doseq [f (seq (.getFields Config))] |
| (let [name (.getName f) |
| new-name (clojure-config-name name)] |
| (eval |
| `(def ~(symbol new-name) (. Config ~(symbol name)))))) |
| |
| (def ALL-CONFIGS |
| (dofor [f (seq (.getFields Config))] |
| (.get f nil))) |
| |
| (defmulti get-FieldValidator class-selector) |
| |
| (defmethod get-FieldValidator nil [_] |
| (throw (IllegalArgumentException. "Cannot validate a nil field."))) |
| |
| (defmethod get-FieldValidator |
| ConfigValidation$FieldValidator [validator] validator) |
| |
| (defmethod get-FieldValidator Object |
| [klass] |
| {:pre [(not (nil? klass))]} |
| (reify ConfigValidation$FieldValidator |
| (validateField [this name v] |
| (if (and (not (nil? v)) |
| (not (instance? klass v))) |
| (throw (IllegalArgumentException. |
| (str "field " name " '" v "' must be a '" (.getName klass) "'"))))))) |
| |
| ;; Create a mapping of config-string -> validator |
| ;; Config fields must have a _SCHEMA field defined |
| (def CONFIG-SCHEMA-MAP |
| (->> (.getFields Config) |
| (filter #(not (re-matches #".*_SCHEMA$" (.getName %)))) |
| (map (fn [f] [(.get f nil) |
| (get-FieldValidator |
| (-> Config |
| (.getField (str (.getName f) "_SCHEMA")) |
| (.get nil)))])) |
| (into {}))) |
| |
| (defn cluster-mode |
| [conf & args] |
| (keyword (conf STORM-CLUSTER-MODE))) |
| |
| (defn local-mode? |
| [conf] |
| (let [mode (conf STORM-CLUSTER-MODE)] |
| (condp = mode |
| "local" true |
| "distributed" false |
| (throw (IllegalArgumentException. |
| (str "Illegal cluster mode in conf: " mode)))))) |
| |
| (defn sampling-rate |
| [conf] |
| (->> (conf TOPOLOGY-STATS-SAMPLE-RATE) |
| (/ 1) |
| int)) |
| |
| (defn mk-stats-sampler |
| [conf] |
| (even-sampler (sampling-rate conf))) |
| |
| ; storm.zookeeper.servers: |
| ; - "server1" |
| ; - "server2" |
| ; - "server3" |
| ; nimbus.host: "master" |
| ; |
| ; ########### These all have default values as shown |
| ; |
| ; ### storm.* configs are general configurations |
| ; # the local dir is where jars are kept |
| ; storm.local.dir: "/mnt/storm" |
| ; storm.zookeeper.port: 2181 |
| ; storm.zookeeper.root: "/storm" |
| |
| (defn read-default-config |
| [] |
| (clojurify-structure (Utils/readDefaultConfig))) |
| |
| (defn validate-configs-with-schemas |
| [conf] |
| (doseq [[k v] conf |
| :let [schema (CONFIG-SCHEMA-MAP k)]] |
| (if (not (nil? schema)) |
| (.validateField schema k v)))) |
| |
| (defn read-storm-config |
| [] |
| (let [conf (clojurify-structure (Utils/readStormConfig))] |
| (validate-configs-with-schemas conf) |
| conf)) |
| |
| (defn read-yaml-config |
| ([name must-exist] |
| (let [conf (clojurify-structure (Utils/findAndReadConfigFile name must-exist))] |
| (validate-configs-with-schemas conf) |
| conf)) |
| ([name] |
| (read-yaml-config true))) |
| |
| (defn absolute-storm-local-dir [conf] |
| (let [storm-home (System/getProperty "storm.home") |
| path (conf STORM-LOCAL-DIR)] |
| (if (is-absolute-path? path) path (str storm-home file-path-separator path)))) |
| |
| (defn master-local-dir |
| [conf] |
| (let [ret (str (absolute-storm-local-dir conf) file-path-separator "nimbus")] |
| (FileUtils/forceMkdir (File. ret)) |
| ret)) |
| |
| (defn master-stormdist-root |
| ([conf] |
| (str (master-local-dir conf) file-path-separator "stormdist")) |
| ([conf storm-id] |
| (str (master-stormdist-root conf) file-path-separator storm-id))) |
| |
| (defn master-tmp-dir |
| [conf] |
| (let [ret (str (master-local-dir conf) file-path-separator "tmp")] |
| (FileUtils/forceMkdir (File. ret)) |
| ret )) |
| |
| (defn master-storm-metafile-path [stormroot ] |
| (str stormroot file-path-separator "storm-code-distributor.meta")) |
| |
| (defn master-stormjar-path |
| [stormroot] |
| (str stormroot file-path-separator "stormjar.jar")) |
| |
| (defn master-stormcode-path |
| [stormroot] |
| (str stormroot file-path-separator "stormcode.ser")) |
| |
| (defn master-stormconf-path |
| [stormroot] |
| (str stormroot file-path-separator "stormconf.ser")) |
| |
| (defn master-inbox |
| [conf] |
| (let [ret (str (master-local-dir conf) file-path-separator "inbox")] |
| (FileUtils/forceMkdir (File. ret)) |
| ret )) |
| |
| (defn master-inimbus-dir |
| [conf] |
| (str (master-local-dir conf) file-path-separator "inimbus")) |
| |
| (defn supervisor-local-dir |
| [conf] |
| (let [ret (str (absolute-storm-local-dir conf) file-path-separator "supervisor")] |
| (FileUtils/forceMkdir (File. ret)) |
| ret)) |
| |
| (defn supervisor-isupervisor-dir |
| [conf] |
| (str (supervisor-local-dir conf) file-path-separator "isupervisor")) |
| |
| (defn supervisor-stormdist-root |
| ([conf] |
| (str (supervisor-local-dir conf) file-path-separator "stormdist")) |
| ([conf storm-id] |
| (str (supervisor-stormdist-root conf) file-path-separator (url-encode storm-id)))) |
| |
| (defn supervisor-stormjar-path [stormroot] |
| (str stormroot file-path-separator "stormjar.jar")) |
| |
| (defn supervisor-storm-metafile-path [stormroot] |
| (str stormroot file-path-separator "storm-code-distributor.meta")) |
| |
| (defn supervisor-stormcode-path |
| [stormroot] |
| (str stormroot file-path-separator "stormcode.ser")) |
| |
| (defn supervisor-stormconf-path |
| [stormroot] |
| (str stormroot file-path-separator "stormconf.ser")) |
| |
| (defn supervisor-tmp-dir |
| [conf] |
| (let [ret (str (supervisor-local-dir conf) file-path-separator "tmp")] |
| (FileUtils/forceMkdir (File. ret)) |
| ret )) |
| |
| (defn supervisor-storm-resources-path |
| [stormroot] |
| (str stormroot file-path-separator RESOURCES-SUBDIR)) |
| |
| (defn ^LocalState supervisor-state |
| [conf] |
| (LocalState. (str (supervisor-local-dir conf) file-path-separator "localstate"))) |
| |
| (defn read-supervisor-storm-conf |
| [conf storm-id] |
| (let [stormroot (supervisor-stormdist-root conf storm-id) |
| conf-path (supervisor-stormconf-path stormroot) |
| topology-path (supervisor-stormcode-path stormroot)] |
| (merge conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. conf-path))))))) |
| |
| (defn read-supervisor-topology |
| [conf storm-id] |
| (let [stormroot (supervisor-stormdist-root conf storm-id) |
| topology-path (supervisor-stormcode-path stormroot)] |
| (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology) |
| )) |
| |
| (defn worker-user-root [conf] |
| (str (absolute-storm-local-dir conf) "/workers-users")) |
| |
| (defn worker-user-file [conf worker-id] |
| (str (worker-user-root conf) "/" worker-id)) |
| |
| (defn get-worker-user [conf worker-id] |
| (log-message "GET worker-user " worker-id) |
| (try |
| (str/trim (slurp (worker-user-file conf worker-id))) |
| (catch IOException e |
| (log-warn-error e "Failed to get worker user for " worker-id ".") |
| nil |
| ))) |
| |
| |
| (defn set-worker-user! [conf worker-id user] |
| (log-message "SET worker-user " worker-id " " user) |
| (let [file (worker-user-file conf worker-id)] |
| (.mkdirs (.getParentFile (File. file))) |
| (spit (worker-user-file conf worker-id) user))) |
| |
| (defn remove-worker-user! [conf worker-id] |
| (log-message "REMOVE worker-user " worker-id) |
| (.delete (File. (worker-user-file conf worker-id)))) |
| |
| (defn worker-root |
| ([conf] |
| (str (absolute-storm-local-dir conf) file-path-separator "workers")) |
| ([conf id] |
| (str (worker-root conf) file-path-separator id))) |
| |
| (defn worker-pids-root |
| [conf id] |
| (str (worker-root conf id) file-path-separator "pids")) |
| |
| (defn worker-pid-path |
| [conf id pid] |
| (str (worker-pids-root conf id) file-path-separator pid)) |
| |
| (defn worker-heartbeats-root |
| [conf id] |
| (str (worker-root conf id) file-path-separator "heartbeats")) |
| |
| ;; workers heartbeat here with pid and timestamp |
| ;; if supervisor stops receiving heartbeat, it kills and restarts the process |
| ;; in local mode, keep a global map of ids to threads for simulating process management |
| (defn ^LocalState worker-state |
| [conf id] |
| (LocalState. (worker-heartbeats-root conf id))) |