| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| (ns backtype.storm.daemon.logviewer |
| (:use compojure.core) |
| (:use [clojure.set :only [difference intersection]]) |
| (:use [clojure.string :only [blank?]]) |
| (:use [hiccup core page-helpers]) |
| (:use [backtype.storm config util log timer]) |
| (:use [backtype.storm.ui helpers]) |
| (:import [org.slf4j LoggerFactory]) |
| (:import [java.io File FileFilter FileInputStream]) |
| (:import [org.apache.logging.log4j LogManager]) |
| (:import [org.apache.logging.log4j.core Appender LoggerContext]) |
| (:import [org.apache.logging.log4j.core.appender RollingFileAppender]) |
| (:import [org.yaml.snakeyaml Yaml] |
| [org.yaml.snakeyaml.constructor SafeConstructor]) |
| (:import [backtype.storm.ui InvalidRequestException] |
| [backtype.storm.security.auth AuthUtils]) |
| (:require [compojure.route :as route] |
| [compojure.handler :as handler] |
| [ring.middleware.keyword-params] |
| [ring.util.response :as resp]) |
| (:require [backtype.storm.daemon common [supervisor :as supervisor]]) |
| (:import [java.io File FileFilter]) |
| (:require [compojure.route :as route] |
| [compojure.handler :as handler] |
| [ring.util.response :as resp] |
| [clojure.string :as string]) |
| (:gen-class)) |
| |
| (def ^:dynamic *STORM-CONF* (read-storm-config)) |
| |
| (defn cleanup-cutoff-age-millis [conf now-millis] |
| (- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000))) |
| |
| ;TODO: handle cleanup of old event log files |
| (defn mk-FileFilter-for-log-cleanup [conf now-millis] |
| (let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)] |
| (reify FileFilter (^boolean accept [this ^File file] |
| (boolean (and |
| (.isFile file) |
| (re-find worker-log-filename-pattern (.getName file)) |
| (<= (.lastModified file) cutoff-age-millis))))))) |
| |
| (defn select-files-for-cleanup [conf now-millis root-dir] |
| (let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)] |
| (.listFiles (File. root-dir) file-filter))) |
| |
| (defn get-metadata-file-for-log-root-name [root-name root-dir] |
| (let [metaFile (clojure.java.io/file root-dir "metadata" |
| (str root-name ".yaml"))] |
| (if (.exists metaFile) |
| metaFile |
| (do |
| (log-warn "Could not find " (.getCanonicalPath metaFile) |
| " to clean up for " root-name) |
| nil)))) |
| |
| (defn get-worker-id-from-metadata-file [metaFile] |
| (get (clojure-from-yaml-file metaFile) "worker-id")) |
| |
| (defn get-topo-owner-from-metadata-file [metaFile] |
| (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER)) |
| |
| (defn get-log-root->files-map [log-files] |
| "Returns a map of \"root name\" to a the set of files in log-files having the |
| root name. The \"root name\" of a log file is the part of the name preceding |
| the extension." |
| (reduce #(assoc %1 ;; The accumulated map so far |
| (first %2) ;; key: The root name of the log file |
| (conj (%1 (first %2) #{}) (second %2))) ;; val: The set of log files with the root name |
| {} ;; initial (empty) map |
| (map #(list |
| (second (re-find worker-log-filename-pattern (.getName %))) ;; The root name of the log file |
| %) ;; The log file |
| log-files))) |
| |
| (defn identify-worker-log-files [log-files root-dir] |
| (into {} (for [log-root-entry (get-log-root->files-map log-files) |
| :let [metaFile (get-metadata-file-for-log-root-name |
| (key log-root-entry) root-dir) |
| log-root (key log-root-entry) |
| files (val log-root-entry)] |
| :when metaFile] |
| {(get-worker-id-from-metadata-file metaFile) |
| {:owner (get-topo-owner-from-metadata-file metaFile) |
| :files |
| ;; If each log for this root name is to be deleted, then |
| ;; include the metadata file also. |
| (if (empty? (difference |
| (set (filter #(re-find (re-pattern log-root) %) |
| (read-dir-contents root-dir))) |
| (set (map #(.getName %) files)))) |
| (conj files metaFile) |
| ;; Otherwise, keep the list of files as it is. |
| files)}}))) |
| |
| (defn get-dead-worker-files-and-owners [conf now-secs log-files root-dir] |
| (if (empty? log-files) |
| {} |
| (let [id->heartbeat (supervisor/read-worker-heartbeats conf) |
| alive-ids (keys (remove |
| #(or (not (val %)) |
| (supervisor/is-worker-hb-timed-out? now-secs (val %) conf)) |
| id->heartbeat)) |
| id->entries (identify-worker-log-files log-files root-dir)] |
| (for [[id {:keys [owner files]}] id->entries |
| :when (not (contains? (set alive-ids) id))] |
| {:owner owner |
| :files files})))) |
| |
| (defn cleanup-fn! [log-root-dir] |
| (let [now-secs (current-time-secs) |
| old-log-files (select-files-for-cleanup *STORM-CONF* (* now-secs 1000) log-root-dir) |
| dead-worker-files (get-dead-worker-files-and-owners *STORM-CONF* now-secs old-log-files log-root-dir)] |
| (log-debug "log cleanup: now=" now-secs |
| " old log files " (pr-str (map #(.getName %) old-log-files)) |
| " dead worker files " (->> dead-worker-files |
| (mapcat (fn [{l :files}] l)) |
| (map #(.getName %)) |
| (pr-str))) |
| (dofor [{:keys [owner files]} dead-worker-files |
| file files] |
| (let [path (.getCanonicalPath file)] |
| (log-message "Cleaning up: Removing " path) |
| (try |
| (if (or (blank? owner) (re-matches #".*\.yaml$" path)) |
| (rmr path) |
| ;; worker-launcher does not actually launch a worker process. It |
| ;; merely executes one of a prescribed set of commands. In this case, we ask it |
| ;; to delete a file as the owner of that file. |
| (supervisor/worker-launcher *STORM-CONF* owner (str "rmr " path))) |
| (catch Exception ex |
| (log-error ex))))))) |
| |
| (defn start-log-cleaner! [conf log-root-dir] |
| (let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)] |
| (when interval-secs |
| (log-debug "starting log cleanup thread at interval: " interval-secs) |
| (schedule-recurring (mk-timer :thread-name "logviewer-cleanup") |
| 0 ;; Start immediately. |
| interval-secs |
| (fn [] (cleanup-fn! log-root-dir)))))) |
| |
| (defn page-file |
| ([path tail] |
| (let [flen (.length (clojure.java.io/file path)) |
| skip (- flen tail)] |
| (page-file path skip tail))) |
| ([path start length] |
| (with-open [input (FileInputStream. path) |
| output (java.io.ByteArrayOutputStream.)] |
| (if (>= start (.length (clojure.java.io/file path))) |
| (throw |
| (InvalidRequestException. "Cannot start past the end of the file"))) |
| (if (> start 0) |
| ;; FileInputStream#skip may not work the first time. |
| (loop [skipped 0] |
| (let [skipped (+ skipped (.skip input (- start skipped)))] |
| (if (< skipped start) (recur skipped))))) |
| (let [buffer (make-array Byte/TYPE 1024)] |
| (loop [] |
| (when (< (.size output) length) |
| (let [size (.read input buffer 0 (min 1024 (- length (.size output))))] |
| (when (pos? size) |
| (.write output buffer 0 size) |
| (recur))))) |
| (.toString output))))) |
| |
| (defn get-log-user-group-whitelist [fname] |
| (let [wl-file (get-log-metadata-file fname) |
| m (clojure-from-yaml-file wl-file)] |
| (if (not-nil? m) |
| (do |
| (let [user-wl (.get m LOGS-USERS) |
| user-wl (if user-wl user-wl []) |
| group-wl (.get m LOGS-GROUPS) |
| group-wl (if group-wl group-wl [])] |
| [user-wl group-wl])) |
| nil))) |
| |
| (def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin *STORM-CONF*)) |
| (defn user-groups |
| [user] |
| (if (blank? user) [] (.getGroups igroup-mapper user))) |
| |
| (defn authorized-log-user? [user fname conf] |
| (if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist fname))) |
| nil |
| (let [groups (user-groups user) |
| [user-wl group-wl] (get-log-user-group-whitelist fname) |
| logs-users (concat (conf LOGS-USERS) |
| (conf NIMBUS-ADMINS) |
| user-wl) |
| logs-groups (concat (conf LOGS-GROUPS) |
| group-wl)] |
| (or (some #(= % user) logs-users) |
| (< 0 (.size (intersection (set groups) (set group-wl)))))))) |
| |
| (defn log-root-dir |
| "Given an appender name, as configured, get the parent directory of the appender's log file. |
| |
| Note that if anything goes wrong, this will throw an Error and exit." |
| [appender-name] |
| (let [appender (.getAppender (.getConfiguration (LogManager/getContext)) appender-name)] |
| (if (and appender-name appender (instance? RollingFileAppender appender)) |
| (.getParent (File. (.getFileName appender))) |
| (throw |
| (RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j2 agree."))))) |
| |
| (defnk to-btn-link |
| "Create a link that is formatted like a button" |
| [url text :enabled true] |
| [:a {:href (java.net.URI. url) |
| :class (str "btn btn-default " (if enabled "enabled" "disabled"))} text]) |
| |
| (defn pager-links [fname start length file-size] |
| (let [prev-start (max 0 (- start length)) |
| next-start (if (> file-size 0) |
| (min (max 0 (- file-size length)) (+ start length)) |
| (+ start length))] |
| [[:div |
| (concat |
| [(to-btn-link (url "/log" |
| {:file fname |
| :start (max 0 (- start length)) |
| :length length}) |
| "Prev" :enabled (< prev-start start))] |
| [(to-btn-link (url "/log" |
| {:file fname |
| :start 0 |
| :length length}) "First")] |
| [(to-btn-link (url "/log" |
| {:file fname |
| :length length}) |
| "Last")] |
| [(to-btn-link (url "/log" |
| {:file fname |
| :start (min (max 0 (- file-size length)) |
| (+ start length)) |
| :length length}) |
| "Next" :enabled (> next-start start))])]])) |
| |
| (defn- download-link [fname] |
| [[:p (link-to (url-format "/download/%s" fname) "Download Full Log")]]) |
| |
| (defn log-page [fname start length grep user root-dir] |
| (if (or (blank? (*STORM-CONF* UI-FILTER)) |
| (authorized-log-user? user fname *STORM-CONF*)) |
| (let [file (.getCanonicalFile (File. root-dir fname)) |
| file-length (.length file) |
| path (.getCanonicalPath file)] |
| (if (and (= (.getCanonicalFile (File. root-dir)) |
| (.getParentFile file)) |
| (.exists file)) |
| (let [default-length 51200 |
| length (if length |
| (min 10485760 length) |
| default-length) |
| log-string (escape-html |
| (if start |
| (page-file path start length) |
| (page-file path length))) |
| start (or start (- file-length length))] |
| (if grep |
| (html [:pre#logContent |
| (if grep |
| (->> (.split log-string "\n") |
| (filter #(.contains % grep)) |
| (string/join "\n")) |
| log-string)]) |
| (let [pager-data (pager-links fname start length file-length)] |
| (html (concat pager-data |
| (download-link fname) |
| [[:pre#logContent log-string]] |
| pager-data))))) |
| (-> (resp/response "Page not found") |
| (resp/status 404)))) |
| (if (nil? (get-log-user-group-whitelist fname)) |
| (-> (resp/response "Page not found") |
| (resp/status 404)) |
| (unauthorized-user-html user)))) |
| |
| (defn download-log-file [fname req resp user ^String root-dir] |
| (let [file (.getCanonicalFile (File. root-dir fname))] |
| (if (= (File. root-dir) (.getParentFile file)) |
| (if (or (blank? (*STORM-CONF* UI-FILTER)) |
| (authorized-log-user? user fname *STORM-CONF*)) |
| (-> (resp/response file) |
| (resp/content-type "application/octet-stream")) |
| (unauthorized-user-html user)) |
| (-> (resp/response "Page not found") |
| (resp/status 404))))) |
| |
| (defn log-template |
| ([body] (log-template body nil nil)) |
| ([body fname user] |
| (html4 |
| [:head |
| [:title (str (escape-html fname) " - Storm Log Viewer")] |
| (include-css "/css/bootstrap-3.3.1.min.css") |
| (include-css "/css/jquery.dataTables.1.10.4.min.css") |
| (include-css "/css/style.css") |
| ] |
| [:body |
| (concat |
| (when (not (blank? user)) [[:div.ui-user [:p "User: " user]]]) |
| [[:h3 (escape-html fname)]] |
| (seq body)) |
| ]))) |
| |
| (def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*)) |
| |
| (defn- parse-long-from-map [m k] |
| (try |
| (Long/parseLong (k m)) |
| (catch NumberFormatException ex |
| (throw (InvalidRequestException. |
| (str "Could not make an integer out of the query parameter '" |
| (name k) "'") |
| ex))))) |
| |
| (defroutes log-routes |
| (GET "/log" [:as req & m] |
| (try |
| (let [servlet-request (:servlet-request req) |
| log-root (:log-root req) |
| user (.getUserName http-creds-handler servlet-request) |
| start (if (:start m) (parse-long-from-map m :start)) |
| length (if (:length m) (parse-long-from-map m :length))] |
| (log-template (log-page (:file m) start length (:grep m) user log-root) |
| (:file m) user)) |
| (catch InvalidRequestException ex |
| (log-error ex) |
| (ring-response-from-exception ex)))) |
| (GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m] |
| (try |
| (let [user (.getUserName http-creds-handler servlet-request)] |
| (download-log-file file servlet-request servlet-response user log-root)) |
| (catch InvalidRequestException ex |
| (log-error ex) |
| (ring-response-from-exception ex)))) |
| (route/resources "/") |
| (route/not-found "Page not found")) |
| |
| (defn conf-middleware |
| "For passing the storm configuration with each request." |
| [app log-root] |
| (fn [req] |
| (app (assoc req :log-root log-root)))) |
| |
| (defn start-logviewer! [conf log-root-dir] |
| (try |
| (let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES)) |
| filter-class (conf UI-FILTER) |
| filter-params (conf UI-FILTER-PARAMS) |
| logapp (handler/api log-routes) ;; query params as map |
| middle (conf-middleware logapp log-root-dir) |
| filters-confs (if (conf UI-FILTER) |
| [{:filter-class filter-class |
| :filter-params (or (conf UI-FILTER-PARAMS) {})}] |
| []) |
| filters-confs (concat filters-confs |
| [{:filter-class "org.eclipse.jetty.servlets.GzipFilter" |
| :filter-name "Gzipper" |
| :filter-params {}}]) |
| https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0)) |
| keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH) |
| keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD) |
| keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE) |
| key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD) |
| truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH) |
| truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD) |
| truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE) |
| want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH) |
| need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)] |
| (storm-run-jetty {:port (int (conf LOGVIEWER-PORT)) |
| :configurator (fn [server] |
| (config-ssl server |
| https-port |
| keystore-path |
| keystore-pass |
| keystore-type |
| key-password |
| truststore-path |
| truststore-password |
| truststore-type |
| want-client-auth |
| need-client-auth) |
| (config-filter server middle filters-confs))})) |
| (catch Exception ex |
| (log-error ex)))) |
| |
| (defn -main [] |
| (let [conf (read-storm-config) |
| log-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))] |
| (setup-default-uncaught-exception-handler) |
| (start-log-cleaner! conf log-root) |
| (start-logviewer! conf log-root))) |