blob: 2a8a40c30c118efb9b6e83c6a279adbefcc70def [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.logviewer
(:use compojure.core)
(:use [clojure.set :only [difference intersection]])
(:use [clojure.string :only [blank?]])
(:use [hiccup core page-helpers])
(:use [backtype.storm config util log timer])
(:use [backtype.storm.ui helpers])
(:import [org.slf4j LoggerFactory])
(:import [java.io File FileFilter FileInputStream])
(:import [org.apache.logging.log4j LogManager])
(:import [org.apache.logging.log4j.core Appender LoggerContext])
(:import [org.apache.logging.log4j.core.appender RollingFileAppender])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
(:import [backtype.storm.ui InvalidRequestException]
[backtype.storm.security.auth AuthUtils])
(:require [compojure.route :as route]
[compojure.handler :as handler]
[ring.middleware.keyword-params]
[ring.util.response :as resp])
(:require [backtype.storm.daemon common [supervisor :as supervisor]])
(:import [java.io File FileFilter])
(:require [compojure.route :as route]
[compojure.handler :as handler]
[ring.util.response :as resp]
[clojure.string :as string])
(:gen-class))
(def ^:dynamic *STORM-CONF* (read-storm-config))
(defn cleanup-cutoff-age-millis [conf now-millis]
(- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
;TODO: handle cleanup of old event log files
(defn mk-FileFilter-for-log-cleanup [conf now-millis]
(let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
(reify FileFilter (^boolean accept [this ^File file]
(boolean (and
(.isFile file)
(re-find worker-log-filename-pattern (.getName file))
(<= (.lastModified file) cutoff-age-millis)))))))
(defn select-files-for-cleanup [conf now-millis root-dir]
(let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
(.listFiles (File. root-dir) file-filter)))
(defn get-metadata-file-for-log-root-name [root-name root-dir]
(let [metaFile (clojure.java.io/file root-dir "metadata"
(str root-name ".yaml"))]
(if (.exists metaFile)
metaFile
(do
(log-warn "Could not find " (.getCanonicalPath metaFile)
" to clean up for " root-name)
nil))))
(defn get-worker-id-from-metadata-file [metaFile]
(get (clojure-from-yaml-file metaFile) "worker-id"))
(defn get-topo-owner-from-metadata-file [metaFile]
(get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
(defn get-log-root->files-map [log-files]
"Returns a map of \"root name\" to a the set of files in log-files having the
root name. The \"root name\" of a log file is the part of the name preceding
the extension."
(reduce #(assoc %1 ;; The accumulated map so far
(first %2) ;; key: The root name of the log file
(conj (%1 (first %2) #{}) (second %2))) ;; val: The set of log files with the root name
{} ;; initial (empty) map
(map #(list
(second (re-find worker-log-filename-pattern (.getName %))) ;; The root name of the log file
%) ;; The log file
log-files)))
(defn identify-worker-log-files [log-files root-dir]
(into {} (for [log-root-entry (get-log-root->files-map log-files)
:let [metaFile (get-metadata-file-for-log-root-name
(key log-root-entry) root-dir)
log-root (key log-root-entry)
files (val log-root-entry)]
:when metaFile]
{(get-worker-id-from-metadata-file metaFile)
{:owner (get-topo-owner-from-metadata-file metaFile)
:files
;; If each log for this root name is to be deleted, then
;; include the metadata file also.
(if (empty? (difference
(set (filter #(re-find (re-pattern log-root) %)
(read-dir-contents root-dir)))
(set (map #(.getName %) files))))
(conj files metaFile)
;; Otherwise, keep the list of files as it is.
files)}})))
(defn get-dead-worker-files-and-owners [conf now-secs log-files root-dir]
(if (empty? log-files)
{}
(let [id->heartbeat (supervisor/read-worker-heartbeats conf)
alive-ids (keys (remove
#(or (not (val %))
(supervisor/is-worker-hb-timed-out? now-secs (val %) conf))
id->heartbeat))
id->entries (identify-worker-log-files log-files root-dir)]
(for [[id {:keys [owner files]}] id->entries
:when (not (contains? (set alive-ids) id))]
{:owner owner
:files files}))))
(defn cleanup-fn! [log-root-dir]
(let [now-secs (current-time-secs)
old-log-files (select-files-for-cleanup *STORM-CONF* (* now-secs 1000) log-root-dir)
dead-worker-files (get-dead-worker-files-and-owners *STORM-CONF* now-secs old-log-files log-root-dir)]
(log-debug "log cleanup: now=" now-secs
" old log files " (pr-str (map #(.getName %) old-log-files))
" dead worker files " (->> dead-worker-files
(mapcat (fn [{l :files}] l))
(map #(.getName %))
(pr-str)))
(dofor [{:keys [owner files]} dead-worker-files
file files]
(let [path (.getCanonicalPath file)]
(log-message "Cleaning up: Removing " path)
(try
(if (or (blank? owner) (re-matches #".*\.yaml$" path))
(rmr path)
;; worker-launcher does not actually launch a worker process. It
;; merely executes one of a prescribed set of commands. In this case, we ask it
;; to delete a file as the owner of that file.
(supervisor/worker-launcher *STORM-CONF* owner (str "rmr " path)))
(catch Exception ex
(log-error ex)))))))
(defn start-log-cleaner! [conf log-root-dir]
(let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(when interval-secs
(log-debug "starting log cleanup thread at interval: " interval-secs)
(schedule-recurring (mk-timer :thread-name "logviewer-cleanup")
0 ;; Start immediately.
interval-secs
(fn [] (cleanup-fn! log-root-dir))))))
(defn page-file
([path tail]
(let [flen (.length (clojure.java.io/file path))
skip (- flen tail)]
(page-file path skip tail)))
([path start length]
(with-open [input (FileInputStream. path)
output (java.io.ByteArrayOutputStream.)]
(if (>= start (.length (clojure.java.io/file path)))
(throw
(InvalidRequestException. "Cannot start past the end of the file")))
(if (> start 0)
;; FileInputStream#skip may not work the first time.
(loop [skipped 0]
(let [skipped (+ skipped (.skip input (- start skipped)))]
(if (< skipped start) (recur skipped)))))
(let [buffer (make-array Byte/TYPE 1024)]
(loop []
(when (< (.size output) length)
(let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
(when (pos? size)
(.write output buffer 0 size)
(recur)))))
(.toString output)))))
(defn get-log-user-group-whitelist [fname]
(let [wl-file (get-log-metadata-file fname)
m (clojure-from-yaml-file wl-file)]
(if (not-nil? m)
(do
(let [user-wl (.get m LOGS-USERS)
user-wl (if user-wl user-wl [])
group-wl (.get m LOGS-GROUPS)
group-wl (if group-wl group-wl [])]
[user-wl group-wl]))
nil)))
(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin *STORM-CONF*))
(defn user-groups
[user]
(if (blank? user) [] (.getGroups igroup-mapper user)))
(defn authorized-log-user? [user fname conf]
(if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist fname)))
nil
(let [groups (user-groups user)
[user-wl group-wl] (get-log-user-group-whitelist fname)
logs-users (concat (conf LOGS-USERS)
(conf NIMBUS-ADMINS)
user-wl)
logs-groups (concat (conf LOGS-GROUPS)
group-wl)]
(or (some #(= % user) logs-users)
(< 0 (.size (intersection (set groups) (set group-wl))))))))
(defn log-root-dir
"Given an appender name, as configured, get the parent directory of the appender's log file.
Note that if anything goes wrong, this will throw an Error and exit."
[appender-name]
(let [appender (.getAppender (.getConfiguration (LogManager/getContext)) appender-name)]
(if (and appender-name appender (instance? RollingFileAppender appender))
(.getParent (File. (.getFileName appender)))
(throw
(RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j2 agree.")))))
(defnk to-btn-link
"Create a link that is formatted like a button"
[url text :enabled true]
[:a {:href (java.net.URI. url)
:class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
(defn pager-links [fname start length file-size]
(let [prev-start (max 0 (- start length))
next-start (if (> file-size 0)
(min (max 0 (- file-size length)) (+ start length))
(+ start length))]
[[:div
(concat
[(to-btn-link (url "/log"
{:file fname
:start (max 0 (- start length))
:length length})
"Prev" :enabled (< prev-start start))]
[(to-btn-link (url "/log"
{:file fname
:start 0
:length length}) "First")]
[(to-btn-link (url "/log"
{:file fname
:length length})
"Last")]
[(to-btn-link (url "/log"
{:file fname
:start (min (max 0 (- file-size length))
(+ start length))
:length length})
"Next" :enabled (> next-start start))])]]))
(defn- download-link [fname]
[[:p (link-to (url-format "/download/%s" fname) "Download Full Log")]])
(defn log-page [fname start length grep user root-dir]
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(let [file (.getCanonicalFile (File. root-dir fname))
file-length (.length file)
path (.getCanonicalPath file)]
(if (and (= (.getCanonicalFile (File. root-dir))
(.getParentFile file))
(.exists file))
(let [default-length 51200
length (if length
(min 10485760 length)
default-length)
log-string (escape-html
(if start
(page-file path start length)
(page-file path length)))
start (or start (- file-length length))]
(if grep
(html [:pre#logContent
(if grep
(->> (.split log-string "\n")
(filter #(.contains % grep))
(string/join "\n"))
log-string)])
(let [pager-data (pager-links fname start length file-length)]
(html (concat pager-data
(download-link fname)
[[:pre#logContent log-string]]
pager-data)))))
(-> (resp/response "Page not found")
(resp/status 404))))
(if (nil? (get-log-user-group-whitelist fname))
(-> (resp/response "Page not found")
(resp/status 404))
(unauthorized-user-html user))))
(defn download-log-file [fname req resp user ^String root-dir]
(let [file (.getCanonicalFile (File. root-dir fname))]
(if (= (File. root-dir) (.getParentFile file))
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(-> (resp/response file)
(resp/content-type "application/octet-stream"))
(unauthorized-user-html user))
(-> (resp/response "Page not found")
(resp/status 404)))))
(defn log-template
([body] (log-template body nil nil))
([body fname user]
(html4
[:head
[:title (str (escape-html fname) " - Storm Log Viewer")]
(include-css "/css/bootstrap-3.3.1.min.css")
(include-css "/css/jquery.dataTables.1.10.4.min.css")
(include-css "/css/style.css")
]
[:body
(concat
(when (not (blank? user)) [[:div.ui-user [:p "User: " user]]])
[[:h3 (escape-html fname)]]
(seq body))
])))
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
(defn- parse-long-from-map [m k]
(try
(Long/parseLong (k m))
(catch NumberFormatException ex
(throw (InvalidRequestException.
(str "Could not make an integer out of the query parameter '"
(name k) "'")
ex)))))
(defroutes log-routes
(GET "/log" [:as req & m]
(try
(let [servlet-request (:servlet-request req)
log-root (:log-root req)
user (.getUserName http-creds-handler servlet-request)
start (if (:start m) (parse-long-from-map m :start))
length (if (:length m) (parse-long-from-map m :length))]
(log-template (log-page (:file m) start length (:grep m) user log-root)
(:file m) user))
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
(GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
(try
(let [user (.getUserName http-creds-handler servlet-request)]
(download-log-file file servlet-request servlet-response user log-root))
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
(route/resources "/")
(route/not-found "Page not found"))
(defn conf-middleware
"For passing the storm configuration with each request."
[app log-root]
(fn [req]
(app (assoc req :log-root log-root))))
(defn start-logviewer! [conf log-root-dir]
(try
(let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
filter-class (conf UI-FILTER)
filter-params (conf UI-FILTER-PARAMS)
logapp (handler/api log-routes) ;; query params as map
middle (conf-middleware logapp log-root-dir)
filters-confs (if (conf UI-FILTER)
[{:filter-class filter-class
:filter-params (or (conf UI-FILTER-PARAMS) {})}]
[])
filters-confs (concat filters-confs
[{:filter-class "org.eclipse.jetty.servlets.GzipFilter"
:filter-name "Gzipper"
:filter-params {}}])
https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0))
keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH)
keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD)
keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE)
key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD)
truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH)
truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD)
truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE)
want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH)
need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)]
(storm-run-jetty {:port (int (conf LOGVIEWER-PORT))
:configurator (fn [server]
(config-ssl server
https-port
keystore-path
keystore-pass
keystore-type
key-password
truststore-path
truststore-password
truststore-type
want-client-auth
need-client-auth)
(config-filter server middle filters-confs))}))
(catch Exception ex
(log-error ex))))
(defn -main []
(let [conf (read-storm-config)
log-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
(setup-default-uncaught-exception-handler)
(start-log-cleaner! conf log-root)
(start-logviewer! conf log-root)))