blob: cc6632a2a1599b7eb9510dd7c6331ff2f210dcc5 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.ui.core
(:use compojure.core)
(:use [clojure.java.shell :only [sh]])
(:use ring.middleware.reload
ring.middleware.multipart-params)
(:use [ring.middleware.json :only [wrap-json-params]])
(:use [hiccup core page-helpers])
(:use [backtype.storm config util log stats tuple zookeeper converter])
(:use [backtype.storm.ui helpers])
(:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID ACKER-INIT-STREAM-ID ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID mk-authorization-handler
start-metrics-reporters]]])
(:import [backtype.storm.utils Utils]
[backtype.storm.generated NimbusSummary])
(:use [clojure.string :only [blank? lower-case trim split]])
(:import [backtype.storm.generated ExecutorSpecificStats
ExecutorStats ExecutorSummary ExecutorInfo TopologyInfo SpoutStats BoltStats
ErrorInfo ClusterSummary SupervisorSummary TopologySummary
Nimbus$Client StormTopology GlobalStreamId RebalanceOptions
KillOptions GetInfoOptions NumErrorsChoice DebugOptions TopologyPageInfo
TopologyStats CommonAggregateStats ComponentAggregateStats
ComponentType BoltAggregateStats SpoutAggregateStats
ExecutorAggregateStats SpecificAggregateStats ComponentPageInfo
LogConfig LogLevel LogLevelAction])
(:import [backtype.storm.security.auth AuthUtils ReqContext])
(:import [backtype.storm.generated AuthorizationException ProfileRequest ProfileAction NodeInfo])
(:import [backtype.storm.security.auth AuthUtils])
(:import [backtype.storm.utils VersionInfo])
(:import [backtype.storm Config])
(:import [java.io File])
(:require [compojure.route :as route]
[compojure.handler :as handler]
[ring.util.response :as resp]
[backtype.storm [thrift :as thrift]])
(:require [metrics.meters :refer [defmeter mark!]])
(:import [org.apache.commons.lang StringEscapeUtils])
(:import [org.apache.logging.log4j Level])
(:gen-class))
(def ^:dynamic *STORM-CONF* (read-storm-config))
(def ^:dynamic *UI-ACL-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-AUTHORIZER) *STORM-CONF*))
(def ^:dynamic *UI-IMPERSONATION-HANDLER* (mk-authorization-handler (*STORM-CONF* NIMBUS-IMPERSONATION-AUTHORIZER) *STORM-CONF*))
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
(defmeter ui:num-cluster-configuration-http-requests)
(defmeter ui:num-cluster-summary-http-requests)
(defmeter ui:num-nimbus-summary-http-requests)
(defmeter ui:num-supervisor-summary-http-requests)
(defmeter ui:num-all-topologies-summary-http-requests)
(defmeter ui:num-topology-page-http-requests)
(defmeter ui:num-build-visualization-http-requests)
(defmeter ui:num-mk-visualization-data-http-requests)
(defmeter ui:num-component-page-http-requests)
(defmeter ui:num-log-config-http-requests)
(defmeter ui:num-activate-topology-http-requests)
(defmeter ui:num-deactivate-topology-http-requests)
(defmeter ui:num-debug-topology-http-requests)
(defmeter ui:num-component-op-response-http-requests)
(defmeter ui:num-topology-op-response-http-requests)
(defmeter ui:num-topology-op-response-http-requests)
(defmeter ui:num-topology-op-response-http-requests)
(defmeter ui:num-main-page-http-requests)
(defn assert-authorized-user
([op]
(assert-authorized-user op nil))
([op topology-conf]
(let [context (ReqContext/context)]
(if (.isImpersonating context)
(if *UI-IMPERSONATION-HANDLER*
(if-not (.permit *UI-IMPERSONATION-HANDLER* context op topology-conf)
(let [principal (.principal context)
real-principal (.realPrincipal context)
user (if principal (.getName principal) "unknown")
real-user (if real-principal (.getName real-principal) "unknown")
remote-address (.remoteAddress context)]
(throw (AuthorizationException.
(str "user '" real-user "' is not authorized to impersonate user '" user "' from host '" remote-address "'. Please
see SECURITY.MD to learn how to configure impersonation ACL.")))))
(log-warn " principal " (.realPrincipal context) " is trying to impersonate " (.principal context) " but "
NIMBUS-IMPERSONATION-AUTHORIZER " has no authorizer configured. This is a potential security hole.
Please see SECURITY.MD to learn how to configure an impersonation authorizer.")))
(if *UI-ACL-HANDLER*
(if-not (.permit *UI-ACL-HANDLER* context op topology-conf)
(let [principal (.principal context)
user (if principal (.getName principal) "unknown")]
(throw (AuthorizationException.
(str "UI request '" op "' for '" user "' user is not authorized")))))))))
(defn assert-authorized-profiler-action
[op]
(if-not (*STORM-CONF* WORKER-PROFILER-ENABLED)
(throw (AuthorizationException.
(str "UI request for profiler action '" op "' is disabled.")))))
(defn executor-summary-type
[topology ^ExecutorSummary s]
(component-type topology (.get_component_id s)))
(defn is-ack-stream
[stream]
(let [acker-streams
[ACKER-INIT-STREAM-ID
ACKER-ACK-STREAM-ID
ACKER-FAIL-STREAM-ID]]
(every? #(not= %1 stream) acker-streams)))
(defn spout-summary?
[topology s]
(= :spout (executor-summary-type topology s)))
(defn bolt-summary?
[topology s]
(= :bolt (executor-summary-type topology s)))
(defn group-by-comp
[summs]
(let [ret (group-by #(.get_component_id ^ExecutorSummary %) summs)]
(into (sorted-map) ret )))
(defn logviewer-link [host fname secure?]
(if (and secure? (*STORM-CONF* LOGVIEWER-HTTPS-PORT))
(url-format "https://%s:%s/log?file=%s"
host
(*STORM-CONF* LOGVIEWER-HTTPS-PORT)
fname)
(url-format "http://%s:%s/log?file=%s"
host
(*STORM-CONF* LOGVIEWER-PORT)
fname)))
(defn event-log-link
[topology-id component-id host port secure?]
(logviewer-link host (event-logs-filename topology-id port) secure?))
(defn worker-log-link [host port topology-id secure?]
(let [fname (logs-filename topology-id port)]
(logviewer-link host fname secure?)))
(defn nimbus-log-link [host port]
(url-format "http://%s:%s/daemonlog?file=nimbus.log" host (*STORM-CONF* LOGVIEWER-PORT) port))
(defn get-error-time
[error]
(if error
(time-delta (.get_error_time_secs ^ErrorInfo error))))
(defn get-error-data
[error]
(if error
(error-subset (.get_error ^ErrorInfo error))
""))
(defn get-error-port
[error]
(if error
(.get_port ^ErrorInfo error)
""))
(defn get-error-host
[error]
(if error
(.get_host ^ErrorInfo error)
""))
(defn get-error-time
[error]
(if error
(.get_error_time_secs ^ErrorInfo error)
""))
(defn worker-dump-link [host port topology-id]
(url-format "http://%s:%s/dumps/%s/%s"
(url-encode host)
(*STORM-CONF* LOGVIEWER-PORT)
(url-encode topology-id)
(str (url-encode host) ":" (url-encode port))))
(defn stats-times
[stats-map]
(sort-by #(Integer/parseInt %)
(-> stats-map
clojurify-structure
(dissoc ":all-time")
keys)))
(defn window-hint
[window]
(if (= window ":all-time")
"All time"
(pretty-uptime-sec window)))
(defn sanitize-stream-name
[name]
(let [sym-regex #"(?![A-Za-z_\-:\.])."]
(str
(if (re-find #"^[A-Za-z]" name)
(clojure.string/replace name sym-regex "_")
(clojure.string/replace (str \s name) sym-regex "_"))
(hash name))))
(defn sanitize-transferred
[transferred]
(into {}
(for [[time, stream-map] transferred]
[time, (into {}
(for [[stream, trans] stream-map]
[(sanitize-stream-name stream), trans]))])))
(defn visualization-data
[spout-bolt spout-comp-summs bolt-comp-summs window storm-id]
(let [components (for [[id spec] spout-bolt]
[id
(let [inputs (.get_inputs (.get_common spec))
bolt-summs (get bolt-comp-summs id)
spout-summs (get spout-comp-summs id)
bolt-cap (if bolt-summs
(compute-bolt-capacity bolt-summs)
0)]
{:type (if bolt-summs "bolt" "spout")
:capacity bolt-cap
:latency (if bolt-summs
(get-in
(bolt-streams-stats bolt-summs true)
[:process-latencies window])
(get-in
(spout-streams-stats spout-summs true)
[:complete-latencies window]))
:transferred (or
(get-in
(spout-streams-stats spout-summs true)
[:transferred window])
(get-in
(bolt-streams-stats bolt-summs true)
[:transferred window]))
:stats (let [mapfn (fn [dat]
(map (fn [^ExecutorSummary summ]
{:host (.get_host summ)
:port (.get_port summ)
:uptime_secs (.get_uptime_secs summ)
:transferred (if-let [stats (.get_stats summ)]
(sanitize-transferred (.get_transferred stats)))})
dat))]
(if bolt-summs
(mapfn bolt-summs)
(mapfn spout-summs)))
:link (url-format "/component.html?id=%s&topology_id=%s" id storm-id)
:inputs (for [[global-stream-id group] inputs]
{:component (.get_componentId global-stream-id)
:stream (.get_streamId global-stream-id)
:sani-stream (sanitize-stream-name (.get_streamId global-stream-id))
:grouping (clojure.core/name (thrift/grouping-type group))})})])]
(into {} (doall components))))
(defn stream-boxes [datmap]
(let [filter-fn (mk-include-sys-fn true)
streams
(vec (doall (distinct
(apply concat
(for [[k v] datmap]
(for [m (get v :inputs)]
{:stream (get m :stream)
:sani-stream (get m :sani-stream)
:checked (is-ack-stream (get m :stream))}))))))]
(map (fn [row]
{:row row}) (partition 4 4 nil streams))))
(defn- get-topology-info
([^Nimbus$Client nimbus id]
(.getTopologyInfo nimbus id))
([^Nimbus$Client nimbus id options]
(.getTopologyInfoWithOpts nimbus id options)))
(defn mk-visualization-data
[id window include-sys?]
(thrift/with-configured-nimbus-connection
nimbus
(let [window (if window window ":all-time")
topology (.getTopology ^Nimbus$Client nimbus id)
spouts (.get_spouts topology)
bolts (.get_bolts topology)
summ (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
execs (.get_executors summ)
spout-summs (filter (partial spout-summary? topology) execs)
bolt-summs (filter (partial bolt-summary? topology) execs)
spout-comp-summs (group-by-comp spout-summs)
bolt-comp-summs (group-by-comp bolt-summs)
bolt-comp-summs (filter-key (mk-include-sys-fn include-sys?)
bolt-comp-summs)]
(visualization-data
(merge (hashmap-to-persistent spouts)
(hashmap-to-persistent bolts))
spout-comp-summs bolt-comp-summs window id))))
(defn validate-tplg-submit-params [params]
(let [tplg-jar-file (params :topologyJar)
tplg-config (if (not-nil? (params :topologyConfig)) (from-json (params :topologyConfig)))]
(cond
(nil? tplg-jar-file) {:valid false :error "missing topology jar file"}
(nil? tplg-config) {:valid false :error "missing topology config"}
(nil? (tplg-config "topologyMainClass")) {:valid false :error "topologyMainClass missing in topologyConfig"}
:else {:valid true})))
(defn run-tplg-submit-cmd [tplg-jar-file tplg-config user]
(let [tplg-main-class (if (not-nil? tplg-config) (trim (tplg-config "topologyMainClass")))
tplg-main-class-args (if (not-nil? tplg-config) (tplg-config "topologyMainClassArgs"))
storm-home (System/getProperty "storm.home")
storm-conf-dir (str storm-home file-path-separator "conf")
storm-log-dir (if (not-nil? (*STORM-CONF* "storm.log.dir")) (*STORM-CONF* "storm.log.dir")
(str storm-home file-path-separator "logs"))
storm-libs (str storm-home file-path-separator "lib" file-path-separator "*")
java-cmd (str (System/getProperty "java.home") file-path-separator "bin" file-path-separator "java")
storm-cmd (str storm-home file-path-separator "bin" file-path-separator "storm")
tplg-cmd-response (apply sh
(flatten
[storm-cmd "jar" tplg-jar-file tplg-main-class
(if (not-nil? tplg-main-class-args) tplg-main-class-args [])
(if (not= user "unknown") (str "-c storm.doAsUser=" user) [])]))]
(log-message "tplg-cmd-response " tplg-cmd-response)
(cond
(= (tplg-cmd-response :exit) 0) {"status" "success"}
(and (not= (tplg-cmd-response :exit) 0)
(not-nil? (re-find #"already exists on cluster" (tplg-cmd-response :err)))) {"status" "failed" "error" "Topology with the same name exists in cluster"}
(not= (tplg-cmd-response :exit) 0) {"status" "failed" "error" (clojure.string/trim-newline (tplg-cmd-response :err))}
:else {"status" "success" "response" "topology deployed"}
)))
(defn cluster-configuration []
(thrift/with-configured-nimbus-connection nimbus
(.getNimbusConf ^Nimbus$Client nimbus)))
(defn topology-history-info
([user]
(thrift/with-configured-nimbus-connection nimbus
(topology-history-info (.getTopologyHistory ^Nimbus$Client nimbus user) user)))
([history user]
{"topo-history"
(into [] (.get_topo_ids history))}))
(defn cluster-summary
([user]
(thrift/with-configured-nimbus-connection nimbus
(cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user)))
([^ClusterSummary summ user]
(let [sups (.get_supervisors summ)
used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))
total-slots (reduce + (map #(.get_num_workers ^SupervisorSummary %) sups))
free-slots (- total-slots used-slots)
topologies (.get_topologies_size summ)
total-tasks (->> (.get_topologies summ)
(map #(.get_num_tasks ^TopologySummary %))
(reduce +))
total-executors (->> (.get_topologies summ)
(map #(.get_num_executors ^TopologySummary %))
(reduce +))]
{"user" user
"stormVersion" (str (VersionInfo/getVersion))
"supervisors" (count sups)
"topologies" topologies
"slotsTotal" total-slots
"slotsUsed" used-slots
"slotsFree" free-slots
"executorsTotal" total-executors
"tasksTotal" total-tasks })))
(defn convert-to-nimbus-summary[nimbus-seed]
(let [[host port] (.split nimbus-seed ":")]
{
"host" host
"port" port
"nimbusLogLink" (nimbus-log-link host port)
"status" "Offline"
"version" "Not applicable"
"nimbusUpTime" "Not applicable"
"nimbusUptimeSeconds" "Not applicable"}
))
(defn nimbus-summary
([]
(thrift/with-configured-nimbus-connection nimbus
(nimbus-summary
(.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
([nimbuses]
(let [nimbus-seeds (set (map #(str %1 ":" (*STORM-CONF* NIMBUS-THRIFT-PORT)) (set (*STORM-CONF* NIMBUS-SEEDS))))
alive-nimbuses (set (map #(str (.get_host %1) ":" (.get_port %1)) nimbuses))
offline-nimbuses (clojure.set/difference nimbus-seeds alive-nimbuses)
offline-nimbuses-summary (map #(convert-to-nimbus-summary %1) offline-nimbuses)]
{"nimbuses"
(concat offline-nimbuses-summary
(for [^NimbusSummary n nimbuses
:let [uptime (.get_uptime_secs n)]]
{
"host" (.get_host n)
"port" (.get_port n)
"nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n))
"status" (if (.is_isLeader n) "Leader" "Not a Leader")
"version" (.get_version n)
"nimbusUpTime" (pretty-uptime-sec uptime)
"nimbusUpTimeSeconds" uptime}))})))
(defn supervisor-summary
([]
(thrift/with-configured-nimbus-connection nimbus
(supervisor-summary
(.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus)))))
([summs]
{"supervisors"
(for [^SupervisorSummary s summs]
{"id" (.get_supervisor_id s)
"host" (.get_host s)
"uptime" (pretty-uptime-sec (.get_uptime_secs s))
"uptimeSeconds" (.get_uptime_secs s)
"slotsTotal" (.get_num_workers s)
"slotsUsed" (.get_num_used_workers s)
"totalMem" (get (.get_total_resources s) Config/SUPERVISOR_MEMORY_CAPACITY_MB)
"totalCpu" (get (.get_total_resources s) Config/SUPERVISOR_CPU_CAPACITY)
"usedMem" (.get_used_mem s)
"usedCpu" (.get_used_cpu s)
"version" (.get_version s)})
"schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
(defn all-topologies-summary
([]
(thrift/with-configured-nimbus-connection
nimbus
(all-topologies-summary
(.get_topologies (.getClusterInfo ^Nimbus$Client nimbus)))))
([summs]
{"topologies"
(for [^TopologySummary t summs]
{
"id" (.get_id t)
"encodedId" (url-encode (.get_id t))
"owner" (.get_owner t)
"name" (.get_name t)
"status" (.get_status t)
"uptime" (pretty-uptime-sec (.get_uptime_secs t))
"uptimeSeconds" (.get_uptime_secs t)
"tasksTotal" (.get_num_tasks t)
"workersTotal" (.get_num_workers t)
"executorsTotal" (.get_num_executors t)
"replicationCount" (.get_replication_count t)
"schedulerInfo" (.get_sched_status t)
"requestedMemOnHeap" (.get_requested_memonheap t)
"requestedMemOffHeap" (.get_requested_memoffheap t)
"requestedTotalMem" (+ (.get_requested_memonheap t) (.get_requested_memoffheap t))
"requestedCpu" (.get_requested_cpu t)
"assignedMemOnHeap" (.get_assigned_memonheap t)
"assignedMemOffHeap" (.get_assigned_memoffheap t)
"assignedTotalMem" (+ (.get_assigned_memonheap t) (.get_assigned_memoffheap t))
"assignedCpu" (.get_assigned_cpu t)})
"schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))
(defn topology-stats [window stats]
(let [times (stats-times (:emitted stats))
display-map (into {} (for [t times] [t pretty-uptime-sec]))
display-map (assoc display-map ":all-time" (fn [_] "All time"))]
(for [w (concat times [":all-time"])
:let [disp ((display-map w) w)]]
{"windowPretty" disp
"window" w
"emitted" (get-in stats [:emitted w])
"transferred" (get-in stats [:transferred w])
"completeLatency" (float-str (get-in stats [:complete-latencies w]))
"acked" (get-in stats [:acked w])
"failed" (get-in stats [:failed w])})))
(defn build-visualization [id window include-sys?]
(thrift/with-configured-nimbus-connection nimbus
(let [window (if window window ":all-time")
topology-info (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/ONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus
id))
storm-topology (.getTopology ^Nimbus$Client nimbus id)
spout-executor-summaries (filter (partial spout-summary? storm-topology) (.get_executors topology-info))
bolt-executor-summaries (filter (partial bolt-summary? storm-topology) (.get_executors topology-info))
spout-comp-id->executor-summaries (group-by-comp spout-executor-summaries)
bolt-comp-id->executor-summaries (group-by-comp bolt-executor-summaries)
bolt-comp-id->executor-summaries (filter-key (mk-include-sys-fn include-sys?) bolt-comp-id->executor-summaries)
id->spout-spec (.get_spouts storm-topology)
id->bolt (.get_bolts storm-topology)
visualizer-data (visualization-data (merge (hashmap-to-persistent id->spout-spec)
(hashmap-to-persistent id->bolt))
spout-comp-id->executor-summaries
bolt-comp-id->executor-summaries
window
id)]
{"visualizationTable" (stream-boxes visualizer-data)})))
(defn- get-error-json
[topo-id error-info secure?]
(let [host (get-error-host error-info)
port (get-error-port error-info)]
{"lastError" (get-error-data error-info)
"errorTime" (get-error-time error-info)
"errorHost" host
"errorPort" port
"errorLapsedSecs" (get-error-time error-info)
"errorWorkerLogLink" (worker-log-link host port topo-id secure?)}))
(defn- common-agg-stats-json
"Returns a JSON representation of a common aggregated statistics."
[^CommonAggregateStats common-stats]
{"executors" (.get_num_executors common-stats)
"tasks" (.get_num_tasks common-stats)
"emitted" (.get_emitted common-stats)
"transferred" (.get_transferred common-stats)
"acked" (.get_acked common-stats)
"failed" (.get_failed common-stats)})
(defmulti comp-agg-stats-json
"Returns a JSON representation of aggregated statistics."
(fn [_ _ [id ^ComponentAggregateStats s]] (.get_type s)))
(defmethod comp-agg-stats-json ComponentType/SPOUT
[topo-id secure? [id ^ComponentAggregateStats s]]
(let [^SpoutAggregateStats ss (.. s get_specific_stats get_spout)
cs (.get_common_stats s)]
(merge
(common-agg-stats-json cs)
(get-error-json topo-id (.get_last_error s) secure?)
{"spoutId" id
"encodedSpoutId" (url-encode id)
"completeLatency" (float-str (.get_complete_latency_ms ss))})))
(defmethod comp-agg-stats-json ComponentType/BOLT
[topo-id secure? [id ^ComponentAggregateStats s]]
(let [^BoltAggregateStats ss (.. s get_specific_stats get_bolt)
cs (.get_common_stats s)]
(merge
(common-agg-stats-json cs)
(get-error-json topo-id (.get_last_error s) secure?)
{"boltId" id
"encodedBoltId" (url-encode id)
"capacity" (float-str (.get_capacity ss))
"executeLatency" (float-str (.get_execute_latency_ms ss))
"executed" (.get_executed ss)
"processLatency" (float-str (.get_process_latency_ms ss))})))
(defn- unpack-topology-page-info
"Unpacks the serialized object to data structures"
[^TopologyPageInfo topo-info window secure?]
(let [id (.get_id topo-info)
^TopologyStats topo-stats (.get_topology_stats topo-info)
stat->window->number
{:emitted (.get_window_to_emitted topo-stats)
:transferred (.get_window_to_transferred topo-stats)
:complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
:acked (.get_window_to_acked topo-stats)
:failed (.get_window_to_failed topo-stats)}
topo-stats (topology-stats window stat->window->number)
[debugEnabled
samplingPct] (if-let [debug-opts (.get_debug_options topo-info)]
[(.is_enable debug-opts)
(.get_samplingpct debug-opts)])
uptime (.get_uptime_secs topo-info)]
{"id" id
"encodedId" (url-encode id)
"owner" (.get_owner topo-info)
"name" (.get_name topo-info)
"status" (.get_status topo-info)
"uptime" (pretty-uptime-sec uptime)
"uptimeSeconds" uptime
"tasksTotal" (.get_num_tasks topo-info)
"workersTotal" (.get_num_workers topo-info)
"executorsTotal" (.get_num_executors topo-info)
"schedulerInfo" (.get_sched_status topo-info)
"requestedMemOnHeap" (.get_requested_memonheap topo-info)
"requestedMemOffHeap" (.get_requested_memoffheap topo-info)
"requestedCpu" (.get_requested_cpu topo-info)
"assignedMemOnHeap" (.get_assigned_memonheap topo-info)
"assignedMemOffHeap" (.get_assigned_memoffheap topo-info)
"assignedTotalMem" (+ (.get_assigned_memonheap topo-info) (.get_assigned_memoffheap topo-info))
"assignedCpu" (.get_assigned_cpu topo-info)
"topologyStats" topo-stats
"spouts" (map (partial comp-agg-stats-json id secure?)
(.get_id_to_spout_agg_stats topo-info))
"bolts" (map (partial comp-agg-stats-json id secure?)
(.get_id_to_bolt_agg_stats topo-info))
"configuration" (.get_topology_conf topo-info)
"debug" (or debugEnabled false)
"samplingPct" (or samplingPct 10)
"replicationCount" (.get_replication_count topo-info)}))
(defn exec-host-port
[executors]
(for [^ExecutorSummary e executors]
{"host" (.get_host e)
"port" (.get_port e)}))
(defn worker-host-port
"Get the set of all worker host/ports"
[id]
(thrift/with-configured-nimbus-connection nimbus
(distinct (exec-host-port (.get_executors (get-topology-info nimbus id))))))
(defn topology-page [id window include-sys? user secure?]
(thrift/with-configured-nimbus-connection nimbus
(let [window (if window window ":all-time")
window-hint (window-hint window)
topo-page-info (.getTopologyPageInfo ^Nimbus$Client nimbus
id
window
include-sys?)
topology-conf (from-json (.get_topology_conf topo-page-info))
msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)]
(merge
(unpack-topology-page-info topo-page-info window secure?)
{"user" user
"window" window
"windowHint" window-hint
"msgTimeout" msg-timeout
"configuration" topology-conf
"visualizationTable" []
"schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)}))))
(defn component-errors
[errors-list topology-id secure?]
(let [errors (->> errors-list
(sort-by #(.get_error_time_secs ^ErrorInfo %))
reverse)]
{"componentErrors"
(for [^ErrorInfo e errors]
{"time" (* 1000 (long (.get_error_time_secs e)))
"errorHost" (.get_host e)
"errorPort" (.get_port e)
"errorWorkerLogLink" (worker-log-link (.get_host e)
(.get_port e)
topology-id
secure?)
"errorLapsedSecs" (get-error-time e)
"error" (.get_error e)})}))
(defmulti unpack-comp-agg-stat
(fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
(defmethod unpack-comp-agg-stat ComponentType/BOLT
[[window ^ComponentAggregateStats s]]
(let [^CommonAggregateStats comm-s (.get_common_stats s)
^SpecificAggregateStats spec-s (.get_specific_stats s)
^BoltAggregateStats bolt-s (.get_bolt spec-s)]
{"window" window
"windowPretty" (window-hint window)
"emitted" (.get_emitted comm-s)
"transferred" (.get_transferred comm-s)
"acked" (.get_acked comm-s)
"failed" (.get_failed comm-s)
"executeLatency" (float-str (.get_execute_latency_ms bolt-s))
"processLatency" (float-str (.get_process_latency_ms bolt-s))
"executed" (.get_executed bolt-s)
"capacity" (float-str (.get_capacity bolt-s))}))
(defmethod unpack-comp-agg-stat ComponentType/SPOUT
[[window ^ComponentAggregateStats s]]
(let [^CommonAggregateStats comm-s (.get_common_stats s)
^SpecificAggregateStats spec-s (.get_specific_stats s)
^SpoutAggregateStats spout-s (.get_spout spec-s)]
{"window" window
"windowPretty" (window-hint window)
"emitted" (.get_emitted comm-s)
"transferred" (.get_transferred comm-s)
"acked" (.get_acked comm-s)
"failed" (.get_failed comm-s)
"completeLatency" (float-str (.get_complete_latency_ms spout-s))}))
(defn- unpack-bolt-input-stat
[[^GlobalStreamId s ^ComponentAggregateStats stats]]
(let [^SpecificAggregateStats sas (.get_specific_stats stats)
^BoltAggregateStats bas (.get_bolt sas)
^CommonAggregateStats cas (.get_common_stats stats)
comp-id (.get_componentId s)]
{"component" comp-id
"encodedComponentId" (url-encode comp-id)
"stream" (.get_streamId s)
"executeLatency" (float-str (.get_execute_latency_ms bas))
"processLatency" (float-str (.get_process_latency_ms bas))
"executed" (nil-to-zero (.get_executed bas))
"acked" (nil-to-zero (.get_acked cas))
"failed" (nil-to-zero (.get_failed cas))}))
(defmulti unpack-comp-output-stat
(fn [[_ ^ComponentAggregateStats s]] (.get_type s)))
(defmethod unpack-comp-output-stat ComponentType/BOLT
[[stream-id ^ComponentAggregateStats stats]]
(let [^CommonAggregateStats cas (.get_common_stats stats)]
{"stream" stream-id
"emitted" (nil-to-zero (.get_emitted cas))
"transferred" (nil-to-zero (.get_transferred cas))}))
(defmethod unpack-comp-output-stat ComponentType/SPOUT
[[stream-id ^ComponentAggregateStats stats]]
(let [^CommonAggregateStats cas (.get_common_stats stats)
^SpecificAggregateStats spec-s (.get_specific_stats stats)
^SpoutAggregateStats spout-s (.get_spout spec-s)]
{"stream" stream-id
"emitted" (nil-to-zero (.get_emitted cas))
"transferred" (nil-to-zero (.get_transferred cas))
"completeLatency" (float-str (.get_complete_latency_ms spout-s))
"acked" (nil-to-zero (.get_acked cas))
"failed" (nil-to-zero (.get_failed cas))}))
(defmulti unpack-comp-exec-stat
(fn [_ _ ^ComponentAggregateStats cas] (.get_type (.get_stats ^ExecutorAggregateStats cas))))
(defmethod unpack-comp-exec-stat ComponentType/BOLT
[topology-id secure? ^ExecutorAggregateStats eas]
(let [^ExecutorSummary summ (.get_exec_summary eas)
^ExecutorInfo info (.get_executor_info summ)
^ComponentAggregateStats stats (.get_stats eas)
^SpecificAggregateStats ss (.get_specific_stats stats)
^BoltAggregateStats bas (.get_bolt ss)
^CommonAggregateStats cas (.get_common_stats stats)
host (.get_host summ)
port (.get_port summ)
exec-id (pretty-executor-info info)
uptime (.get_uptime_secs summ)]
{"id" exec-id
"encodedId" (url-encode exec-id)
"uptime" (pretty-uptime-sec uptime)
"uptimeSeconds" uptime
"host" host
"port" port
"emitted" (nil-to-zero (.get_emitted cas))
"transferred" (nil-to-zero (.get_transferred cas))
"capacity" (float-str (nil-to-zero (.get_capacity bas)))
"executeLatency" (float-str (.get_execute_latency_ms bas))
"executed" (nil-to-zero (.get_executed bas))
"processLatency" (float-str (.get_process_latency_ms bas))
"acked" (nil-to-zero (.get_acked cas))
"failed" (nil-to-zero (.get_failed cas))
"workerLogLink" (worker-log-link host port topology-id secure?)}))
(defmethod unpack-comp-exec-stat ComponentType/SPOUT
[topology-id secure? ^ExecutorAggregateStats eas]
(let [^ExecutorSummary summ (.get_exec_summary eas)
^ExecutorInfo info (.get_executor_info summ)
^ComponentAggregateStats stats (.get_stats eas)
^SpecificAggregateStats ss (.get_specific_stats stats)
^SpoutAggregateStats sas (.get_spout ss)
^CommonAggregateStats cas (.get_common_stats stats)
host (.get_host summ)
port (.get_port summ)
exec-id (pretty-executor-info info)
uptime (.get_uptime_secs summ)]
{"id" exec-id
"encodedId" (url-encode exec-id)
"uptime" (pretty-uptime-sec uptime)
"uptimeSeconds" uptime
"host" host
"port" port
"emitted" (nil-to-zero (.get_emitted cas))
"transferred" (nil-to-zero (.get_transferred cas))
"completeLatency" (float-str (.get_complete_latency_ms sas))
"acked" (nil-to-zero (.get_acked cas))
"failed" (nil-to-zero (.get_failed cas))
"workerLogLink" (worker-log-link host port topology-id secure?)}))
(defmulti unpack-component-page-info
"Unpacks component-specific info to clojure data structures"
(fn [^ComponentPageInfo info & _]
(.get_component_type info)))
(defmethod unpack-component-page-info ComponentType/BOLT
[^ComponentPageInfo info topology-id window include-sys? secure?]
(merge
{"boltStats" (map unpack-comp-agg-stat (.get_window_to_stats info))
"inputStats" (map unpack-bolt-input-stat (.get_gsid_to_input_stats info))
"outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
"executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
(.get_exec_stats info))}
(-> info .get_errors (component-errors topology-id secure?))))
(defmethod unpack-component-page-info ComponentType/SPOUT
[^ComponentPageInfo info topology-id window include-sys? secure?]
(merge
{"spoutSummary" (map unpack-comp-agg-stat (.get_window_to_stats info))
"outputStats" (map unpack-comp-output-stat (.get_sid_to_output_stats info))
"executorStats" (map (partial unpack-comp-exec-stat topology-id secure?)
(.get_exec_stats info))}
(-> info .get_errors (component-errors topology-id secure?))))
(defn get-active-profile-actions
[nimbus topology-id component]
(let [profile-actions (.getComponentPendingProfileActions nimbus
topology-id
component
ProfileAction/JPROFILE_STOP)
latest-profile-actions (map clojurify-profile-request profile-actions)
active-actions (map (fn [profile-action]
{"host" (:host profile-action)
"port" (str (:port profile-action))
"dumplink" (worker-dump-link (:host profile-action) (str (:port profile-action)) topology-id)
"timestamp" (str (- (:timestamp profile-action) (System/currentTimeMillis)))})
latest-profile-actions)]
(log-message "Latest-active actions are: " (pr active-actions))
active-actions))
(defn component-page
[topology-id component window include-sys? user secure?]
(thrift/with-configured-nimbus-connection nimbus
(let [window (or window ":all-time")
window-hint (window-hint window)
comp-page-info (.getComponentPageInfo ^Nimbus$Client nimbus
topology-id
component
window
include-sys?)
topology-conf (from-json (.getTopologyConf ^Nimbus$Client nimbus
topology-id))
msg-timeout (topology-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)
[debugEnabled
samplingPct] (if-let [debug-opts (.get_debug_options comp-page-info)]
[(.is_enable debug-opts)
(.get_samplingpct debug-opts)])]
(assoc
(unpack-component-page-info comp-page-info
topology-id
window
include-sys?
secure?)
"user" user
"id" component
"encodedId" (url-encode component)
"name" (.get_topology_name comp-page-info)
"executors" (.get_num_executors comp-page-info)
"tasks" (.get_num_tasks comp-page-info)
"topologyId" topology-id
"topologyStatus" (.get_topology_status comp-page-info)
"encodedTopologyId" (url-encode topology-id)
"window" window
"componentType" (-> comp-page-info .get_component_type str lower-case)
"windowHint" window-hint
"debug" (or debugEnabled false)
"samplingPct" (or samplingPct 10)
"eventLogLink" (event-log-link topology-id
component
(.get_eventlog_host comp-page-info)
(.get_eventlog_port comp-page-info)
secure?)
"profileActionEnabled" (*STORM-CONF* WORKER-PROFILER-ENABLED)
"profilerActive" (if (*STORM-CONF* WORKER-PROFILER-ENABLED)
(get-active-profile-actions nimbus topology-id component)
[])))))
(defn- level-to-dict [level]
(if level
(let [timeout (.get_reset_log_level_timeout_secs level)
timeout-epoch (.get_reset_log_level_timeout_epoch level)
target-level (.get_target_log_level level)
reset-level (.get_reset_log_level level)]
{"target_level" (.toString (Level/toLevel target-level))
"reset_level" (.toString (Level/toLevel reset-level))
"timeout" timeout
"timeout_epoch" timeout-epoch})))
(defn log-config [topology-id]
(thrift/with-configured-nimbus-connection
nimbus
(let [log-config (.getLogConfig ^Nimbus$Client nimbus topology-id)
named-logger-levels (into {}
(for [[key val] (.get_named_logger_level log-config)]
[(str key) (level-to-dict val)]))]
{"namedLoggerLevels" named-logger-levels})))
(defn topology-config [topology-id]
(thrift/with-configured-nimbus-connection nimbus
(from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id))))
(defn topology-op-response [topology-id op]
{"topologyOperation" op,
"topologyId" topology-id,
"status" "success"
})
(defn component-op-response [topology-id component-id op]
{"topologyOperation" op,
"topologyId" topology-id,
"componentId" component-id,
"status" "success"
})
(defn check-include-sys?
[sys?]
(if (or (nil? sys?) (= "false" sys?)) false true))
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
(defn populate-context!
"Populate the Storm RequestContext from an servlet-request. This should be called in each handler"
[servlet-request]
(when http-creds-handler
(.populateContext http-creds-handler (ReqContext/context) servlet-request)))
(defn get-user-name
[servlet-request]
(.getUserName http-creds-handler servlet-request))
(defroutes main-routes
(GET "/api/v1/cluster/configuration" [& m]
(mark! ui:num-cluster-configuration-http-requests)
(json-response (cluster-configuration)
(:callback m) :serialize-fn identity))
(GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
(mark! ui:num-cluster-summary-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(let [user (get-user-name servlet-request)]
(json-response (assoc (cluster-summary user)
"bugtracker-url" (*STORM-CONF* UI-PROJECT-BUGTRACKER-URL)
"central-log-url" (*STORM-CONF* UI-CENTRAL-LOGGING-URL)) (:callback m))))
(GET "/api/v1/nimbus/summary" [:as {:keys [cookies servlet-request]} & m]
(mark! ui:num-nimbus-summary-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(json-response (nimbus-summary) (:callback m)))
(GET "/api/v1/history/summary" [:as {:keys [cookies servlet-request]} & m]
(let [user (.getUserName http-creds-handler servlet-request)]
(json-response (topology-history-info user) (:callback m))))
(GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m]
(mark! ui:num-supervisor-summary-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(json-response (assoc (supervisor-summary)
"logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)) (:callback m)))
(GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
(mark! ui:num-all-topologies-summary-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(json-response (all-topologies-summary) (:callback m)))
(GET "/api/v1/topology-workers/:id" [:as {:keys [cookies servlet-request]} id & m]
(let [id (url-decode id)]
(json-response {"hostPortList" (worker-host-port id)
"logviewerPort" (*STORM-CONF* LOGVIEWER-PORT)} (:callback m))))
(GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request scheme]} id & m]
(mark! ui:num-topology-page-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(let [user (get-user-name servlet-request)]
(json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m))))
(GET "/api/v1/topology/:id/visualization-init" [:as {:keys [cookies servlet-request]} id & m]
(mark! ui:num-build-visualization-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(json-response (build-visualization id (:window m) (check-include-sys? (:sys m))) (:callback m)))
(GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
(mark! ui:num-mk-visualization-data-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))
(GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]} id component & m]
(mark! ui:num-component-page-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(let [user (get-user-name servlet-request)]
(json-response
(component-page id component (:window m) (check-include-sys? (:sys m)) user (= scheme :https))
(:callback m))))
(GET "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id & m]
(mark! ui:num-log-config-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(json-response (log-config id) (:callback m)))
(POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
(mark! ui:num-activate-topology-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "activate" (topology-config id))
(thrift/with-configured-nimbus-connection nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(.activate nimbus name)
(log-message "Activating topology '" name "'")))
(json-response (topology-op-response id "activate") (m "callback")))
(POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m]
(mark! ui:num-deactivate-topology-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "deactivate" (topology-config id))
(thrift/with-configured-nimbus-connection nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(.deactivate nimbus name)
(log-message "Deactivating topology '" name "'")))
(json-response (topology-op-response id "deactivate") (m "callback")))
(POST "/api/v1/topology/:id/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id action spct & m]
(mark! ui:num-debug-topology-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "debug" (topology-config id))
(thrift/with-configured-nimbus-connection nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
enable? (= "enable" action)]
(.debug nimbus name "" enable? (Integer/parseInt spct))
(log-message "Debug topology [" name "] action [" action "] sampling pct [" spct "]")))
(json-response (topology-op-response id (str "debug/" action)) (m "callback")))
(POST "/api/v1/topology/:id/component/:component/debug/:action/:spct" [:as {:keys [cookies servlet-request]} id component action spct & m]
(mark! ui:num-component-op-response-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "debug" (topology-config id))
(thrift/with-configured-nimbus-connection nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
enable? (= "enable" action)]
(.debug nimbus name component enable? (Integer/parseInt spct))
(log-message "Debug topology [" name "] component [" component "] action [" action "] sampling pct [" spct "]")))
(json-response (component-op-response id component (str "/debug/" action)) (m "callback")))
(POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
(mark! ui:num-topology-op-response-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "rebalance" (topology-config id))
(thrift/with-configured-nimbus-connection nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
rebalance-options (m "rebalanceOptions")
options (RebalanceOptions.)]
(.set_wait_secs options (Integer/parseInt wait-time))
(if (and (not-nil? rebalance-options) (contains? rebalance-options "numWorkers"))
(.set_num_workers options (Integer/parseInt (.toString (rebalance-options "numWorkers")))))
(if (and (not-nil? rebalance-options) (contains? rebalance-options "executors"))
(doseq [keyval (rebalance-options "executors")]
(.put_to_num_executors options (key keyval) (Integer/parseInt (.toString (val keyval))))))
(.rebalance nimbus name options)
(log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
(json-response (topology-op-response id "rebalance") (m "callback")))
(POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
(mark! ui:num-topology-op-response-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "killTopology" (topology-config id))
(thrift/with-configured-nimbus-connection nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
options (KillOptions.)]
(.set_wait_secs options (Integer/parseInt wait-time))
(.killTopologyWithOpts nimbus name options)
(log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
(json-response (topology-op-response id "kill") (m "callback")))
(POST "/api/v1/topology/:id/logconfig" [:as {:keys [cookies servlet-request]} id namedLoggerLevels & m]
(mark! ui:num-topology-op-response-http-requests)
(populate-context! servlet-request)
(assert-authorized-user "setLogConfig" (topology-config id))
(thrift/with-configured-nimbus-connection
nimbus
(let [new-log-config (LogConfig.)]
(doseq [[key level] namedLoggerLevels]
(let [logger-name (str key)
target-level (.get level "target_level")
timeout (or (.get level "timeout") 0)
named-logger-level (LogLevel.)]
;; if target-level is nil, do not set it, user wants to clear
(log-message "The target level for " logger-name " is " target-level)
(if (nil? target-level)
(do
(.set_action named-logger-level LogLevelAction/REMOVE)
(.unset_target_log_level named-logger-level))
(do
(.set_action named-logger-level LogLevelAction/UPDATE)
;; the toLevel here ensures the string we get is valid
(.set_target_log_level named-logger-level (.name (Level/toLevel target-level)))
(.set_reset_log_level_timeout_secs named-logger-level timeout)))
(log-message "Adding this " logger-name " " named-logger-level " to " new-log-config)
(.put_to_named_logger_level new-log-config logger-name named-logger-level)))
(log-message "Setting topology " id " log config " new-log-config)
(.setLogConfig nimbus id new-log-config)
(json-response (log-config id) (m "callback")))))
(GET "/api/v1/topology/:id/profiling/start/:host-port/:timeout"
[:as {:keys [servlet-request]} id host-port timeout & m]
(thrift/with-configured-nimbus-connection nimbus
(let [user (.getUserName http-creds-handler servlet-request)
topology-conf (from-json
(.getTopologyConf ^Nimbus$Client nimbus id))]
(assert-authorized-user "setWorkerProfiler" (topology-config id))
(assert-authorized-profiler-action "start"))
(let [[host, port] (split host-port #":")
nodeinfo (NodeInfo. host (set [(Long. port)]))
timestamp (+ (System/currentTimeMillis) (* 60000 (Long. timeout)))
request (ProfileRequest. nodeinfo
ProfileAction/JPROFILE_STOP)]
(.set_time_stamp request timestamp)
(.setWorkerProfiler nimbus id request)
(json-response {"status" "ok"
"id" host-port
"timeout" timeout
"dumplink" (worker-dump-link
host
port
id)}
(m "callback")))))
(GET "/api/v1/topology/:id/profiling/stop/:host-port"
[:as {:keys [servlet-request]} id host-port & m]
(thrift/with-configured-nimbus-connection nimbus
(let [user (.getUserName http-creds-handler servlet-request)
topology-conf (from-json
(.getTopologyConf ^Nimbus$Client nimbus id))]
(assert-authorized-user "setWorkerProfiler" (topology-config id))
(assert-authorized-profiler-action "stop"))
(let [[host, port] (split host-port #":")
nodeinfo (NodeInfo. host (set [(Long. port)]))
timestamp 0
request (ProfileRequest. nodeinfo
ProfileAction/JPROFILE_STOP)]
(.set_time_stamp request timestamp)
(.setWorkerProfiler nimbus id request)
(json-response {"status" "ok"
"id" host-port}
(m "callback")))))
(GET "/api/v1/topology/:id/profiling/dumpprofile/:host-port"
[:as {:keys [servlet-request]} id host-port & m]
(thrift/with-configured-nimbus-connection nimbus
(let [user (.getUserName http-creds-handler servlet-request)
topology-conf (from-json
(.getTopologyConf ^Nimbus$Client nimbus id))]
(assert-authorized-user "setWorkerProfiler" (topology-config id))
(assert-authorized-profiler-action "dumpprofile"))
(let [[host, port] (split host-port #":")
nodeinfo (NodeInfo. host (set [(Long. port)]))
timestamp (System/currentTimeMillis)
request (ProfileRequest. nodeinfo
ProfileAction/JPROFILE_DUMP)]
(.set_time_stamp request timestamp)
(.setWorkerProfiler nimbus id request)
(json-response {"status" "ok"
"id" host-port}
(m "callback")))))
(GET "/api/v1/topology/:id/profiling/dumpjstack/:host-port"
[:as {:keys [servlet-request]} id host-port & m]
(thrift/with-configured-nimbus-connection nimbus
(let [user (.getUserName http-creds-handler servlet-request)
topology-conf (from-json
(.getTopologyConf ^Nimbus$Client nimbus id))]
(assert-authorized-user "setWorkerProfiler" (topology-config id))
(assert-authorized-profiler-action "dumpjstack"))
(let [[host, port] (split host-port #":")
nodeinfo (NodeInfo. host (set [(Long. port)]))
timestamp (System/currentTimeMillis)
request (ProfileRequest. nodeinfo
ProfileAction/JSTACK_DUMP)]
(.set_time_stamp request timestamp)
(.setWorkerProfiler nimbus id request)
(json-response {"status" "ok"
"id" host-port}
(m "callback")))))
(GET "/api/v1/topology/:id/profiling/restartworker/:host-port"
[:as {:keys [servlet-request]} id host-port & m]
(thrift/with-configured-nimbus-connection nimbus
(let [user (.getUserName http-creds-handler servlet-request)
topology-conf (from-json
(.getTopologyConf ^Nimbus$Client nimbus id))]
(assert-authorized-user "setWorkerProfiler" (topology-config id))
(assert-authorized-profiler-action "restartworker"))
(let [[host, port] (split host-port #":")
nodeinfo (NodeInfo. host (set [(Long. port)]))
timestamp (System/currentTimeMillis)
request (ProfileRequest. nodeinfo
ProfileAction/JVM_RESTART)]
(.set_time_stamp request timestamp)
(.setWorkerProfiler nimbus id request)
(json-response {"status" "ok"
"id" host-port}
(m "callback")))))
(GET "/api/v1/topology/:id/profiling/dumpheap/:host-port"
[:as {:keys [servlet-request]} id host-port & m]
(thrift/with-configured-nimbus-connection nimbus
(let [user (.getUserName http-creds-handler servlet-request)
topology-conf (from-json
(.getTopologyConf ^Nimbus$Client nimbus id))]
(assert-authorized-user "setWorkerProfiler" (topology-config id))
(assert-authorized-profiler-action "dumpheap"))
(let [[host, port] (split host-port #":")
nodeinfo (NodeInfo. host (set [(Long. port)]))
timestamp (System/currentTimeMillis)
request (ProfileRequest. nodeinfo
ProfileAction/JMAP_DUMP)]
(.set_time_stamp request timestamp)
(.setWorkerProfiler nimbus id request)
(json-response {"status" "ok"
"id" host-port}
(m "callback")))))
(GET "/" [:as {cookies :cookies}]
(mark! ui:num-main-page-http-requests)
(resp/redirect "/index.html"))
(route/resources "/")
(route/not-found "Page not found"))
(defn catch-errors
[handler]
(fn [request]
(try
(handler request)
(catch Exception ex
(json-response (exception->json ex) ((:query-params request) "callback") :status 500)))))
(def app
(handler/site (-> main-routes
(wrap-json-params)
(wrap-multipart-params)
(wrap-reload '[backtype.storm.ui.core])
requests-middleware
catch-errors)))
(defn start-server!
[]
(try
(let [conf *STORM-CONF*
header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
filters-confs [{:filter-class (conf UI-FILTER)
:filter-params (conf UI-FILTER-PARAMS)}]
https-port (if (not-nil? (conf UI-HTTPS-PORT)) (conf UI-HTTPS-PORT) 0)
https-ks-path (conf UI-HTTPS-KEYSTORE-PATH)
https-ks-password (conf UI-HTTPS-KEYSTORE-PASSWORD)
https-ks-type (conf UI-HTTPS-KEYSTORE-TYPE)
https-key-password (conf UI-HTTPS-KEY-PASSWORD)
https-ts-path (conf UI-HTTPS-TRUSTSTORE-PATH)
https-ts-password (conf UI-HTTPS-TRUSTSTORE-PASSWORD)
https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
(start-metrics-reporters)
(storm-run-jetty {:port (conf UI-PORT)
:host (conf UI-HOST)
:https-port https-port
:configurator (fn [server]
(config-ssl server
https-port
https-ks-path
https-ks-password
https-ks-type
https-key-password
https-ts-path
https-ts-password
https-ts-type
https-need-client-auth
https-want-client-auth)
(doseq [connector (.getConnectors server)]
(.setRequestHeaderSize connector header-buffer-size))
(config-filter server app filters-confs))}))
(catch Exception ex
(log-error ex))))
(defn -main [] (start-server!))