blob: 80ee01ad5662f47aaa877a428e2961165cc61c55 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.logviewer
(:use compojure.core)
(:use [clojure.set :only [difference intersection]])
(:use [clojure.string :only [blank? split]])
(:use [hiccup core page-helpers form-helpers])
(:use [backtype.storm config util log timer])
(:use [backtype.storm.ui helpers])
(:import [backtype.storm.utils Utils])
(:import [org.slf4j LoggerFactory])
(:import [java.util Arrays])
(:import [java.util.zip GZIPInputStream])
(:import [org.apache.logging.log4j LogManager])
(:import [org.apache.logging.log4j.core Appender LoggerContext])
(:import [org.apache.logging.log4j.core.appender RollingFileAppender])
(:import [java.io BufferedInputStream File FileFilter FileInputStream
InputStream InputStreamReader])
(:import [java.nio ByteBuffer])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
(:import [backtype.storm.ui InvalidRequestException]
[backtype.storm.security.auth AuthUtils])
(:require [backtype.storm.daemon common [supervisor :as supervisor]])
(:require [compojure.route :as route]
[compojure.handler :as handler]
[ring.middleware.keyword-params]
[ring.util.codec :as codec]
[ring.util.response :as resp]
[clojure.string :as string])
(:require [metrics.meters :refer [defmeter mark!]])
(:use [backtype.storm.daemon.common :only [start-metrics-reporters]])
(:gen-class))
(def ^:dynamic *STORM-CONF* (read-storm-config))
(defmeter logviewer:num-log-page-http-requests)
(defmeter logviewer:num-daemonlog-page-http-requests)
(defmeter logviewer:num-download-log-file-http-requests)
(defmeter logviewer:num-download-log-daemon-file-http-requests)
(defmeter logviewer:num-list-logs-http-requests)
(defn cleanup-cutoff-age-millis [conf now-millis]
(- now-millis (* (conf LOGVIEWER-CLEANUP-AGE-MINS) 60 1000)))
(defn- last-modifiedtime-worker-logdir
"Return the last modified time for all log files in a worker's log dir"
[log-dir]
(apply max
(.lastModified log-dir)
(for [^File file (.listFiles log-dir)]
(.lastModified file))))
(defn mk-FileFilter-for-log-cleanup [conf now-millis]
(let [cutoff-age-millis (cleanup-cutoff-age-millis conf now-millis)]
(reify FileFilter (^boolean accept [this ^File file]
(boolean (and
(not (.isFile file))
(<= (last-modifiedtime-worker-logdir file) cutoff-age-millis)))))))
(defn select-dirs-for-cleanup [conf now-millis root-dir]
(let [file-filter (mk-FileFilter-for-log-cleanup conf now-millis)]
(reduce clojure.set/union
(sorted-set)
(for [^File topo-dir (.listFiles (File. root-dir))]
(into [] (.listFiles topo-dir file-filter))))))
(defn get-topo-port-workerlog
"Return the path of the worker log with the format of topoId/port/worker.log.*"
[^File file]
(clojure.string/join file-path-separator
(take-last 3
(split (.getCanonicalPath file) (re-pattern file-path-separator)))))
(defn get-metadata-file-for-log-root-name [root-name root-dir]
(let [metaFile (clojure.java.io/file root-dir "metadata"
(str root-name ".yaml"))]
(if (.exists metaFile)
metaFile
(do
(log-warn "Could not find " (.getCanonicalPath metaFile)
" to clean up for " root-name)
nil))))
(defn get-metadata-file-for-wroker-logdir [logdir]
(let [metaFile (clojure.java.io/file logdir "worker.yaml")]
(if (.exists metaFile)
metaFile
(do
(log-warn "Could not find " (.getCanonicalPath metaFile)
" to clean up for " logdir)
nil))))
(defn get-worker-id-from-metadata-file [metaFile]
(get (clojure-from-yaml-file metaFile) "worker-id"))
(defn get-topo-owner-from-metadata-file [metaFile]
(get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
(defn identify-worker-log-dirs [log-dirs]
"return the workerid to worker-log-dir map"
(into {} (for [logdir log-dirs
:let [metaFile (get-metadata-file-for-wroker-logdir logdir)]
:when metaFile]
{(get-worker-id-from-metadata-file metaFile) logdir})))
(defn get-alive-ids
[conf now-secs]
(->>
(supervisor/read-worker-heartbeats conf)
(remove
#(or (not (val %))
(supervisor/is-worker-hb-timed-out? now-secs
(val %)
conf)))
keys
set))
(defn get-dead-worker-dirs
"Return a sorted set of java.io.Files that were written by workers that are
now dead"
[conf now-secs log-dirs]
(if (empty? log-dirs)
(sorted-set)
(let [alive-ids (get-alive-ids conf now-secs)
id->dir (identify-worker-log-dirs log-dirs)]
(apply sorted-set
(for [[id dir] id->dir
:when (not (contains? alive-ids id))]
dir)))))
(defn get-all-worker-dirs [^File root-dir]
(reduce clojure.set/union
(sorted-set)
(for [^File topo-dir (.listFiles root-dir)]
(into [] (.listFiles topo-dir)))))
(defn get-alive-worker-dirs
"Return a sorted set of java.io.Files that were written by workers that are
now active"
[conf root-dir]
(let [alive-ids (get-alive-ids conf (current-time-secs))
log-dirs (get-all-worker-dirs root-dir)
id->dir (identify-worker-log-dirs log-dirs)]
(apply sorted-set
(for [[id dir] id->dir
:when (contains? alive-ids id)]
(.getCanonicalPath dir)))))
(defn get-all-logs-for-rootdir [^File log-dir]
(reduce concat
(for [port-dir (get-all-worker-dirs log-dir)]
(into [] (.listFiles port-dir)))))
(defn is-active-log [^File file]
(re-find #"\.(log|err|out|current|yaml|pid)$" (.getName file)))
(defn filter-candidate-files
"Filter candidate files for global cleanup"
[logs log-dir]
(let [alive-worker-dirs (get-alive-worker-dirs *STORM-CONF* log-dir)]
(filter #(and (not= (.getName %) "worker.yaml") ; exclude metadata file
(not (and (contains? alive-worker-dirs (.getCanonicalPath (.getParentFile %)))
(is-active-log %)))) ; exclude active workers' active logs
logs)))
(defn sorted-worker-logs
"Collect the wroker log files recursively, sorted by decreasing age."
[^File root-dir]
(let [files (get-all-logs-for-rootdir root-dir)
logs (filter-candidate-files files root-dir)]
(sort-by #(.lastModified %) logs)))
(defn sum-file-size
"Given a sequence of Files, sum their sizes."
[files]
(reduce #(+ %1 (.length %2)) 0 files))
(defn delete-oldest-while-logs-too-large [logs_ size]
(loop [logs logs_]
(if (> (sum-file-size logs) size)
(do
(log-message "Log sizes too high. Going to delete: " (.getName (first logs)))
(try (rmr (.getCanonicalPath (first logs)))
(catch Exception ex (log-error ex)))
(recur (rest logs)))
logs)))
(defn per-workerdir-cleanup
"Delete the oldest files in each overloaded worker log dir"
[^File root-dir size]
(dofor [worker-dir (get-all-worker-dirs root-dir)]
(let [filtered-logs (filter #(not (is-active-log %)) (.listFiles worker-dir))
sorted-logs (sort-by #(.lastModified %) filtered-logs)]
(delete-oldest-while-logs-too-large sorted-logs size))))
(defn cleanup-empty-topodir
"Delete the topo dir if it contains zero port dirs"
[^File dir]
(let [topodir (.getParentFile dir)]
(if (empty? (.listFiles topodir))
(rmr (.getCanonicalPath topodir)))))
(defn cleanup-fn!
"Delete old log dirs for which the workers are no longer alive"
[log-root-dir]
(let [now-secs (current-time-secs)
old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
(* now-secs 1000)
log-root-dir)
total-size (*STORM-CONF* LOGVIEWER-MAX-SUM-WORKER-LOGS-SIZE-MB)
per-dir-size (*STORM-CONF* LOGVIEWER-MAX-PER-WORKER-LOGS-SIZE-MB)
per-dir-size (min per-dir-size (* total-size 0.5))
dead-worker-dirs (get-dead-worker-dirs *STORM-CONF*
now-secs
old-log-dirs)]
(log-debug "log cleanup: now=" now-secs
" old log dirs " (pr-str (map #(.getName %) old-log-dirs))
" dead worker dirs " (pr-str
(map #(.getName %) dead-worker-dirs)))
(dofor [dir dead-worker-dirs]
(let [path (.getCanonicalPath dir)]
(log-message "Cleaning up: Removing " path)
(try (rmr path)
(cleanup-empty-topodir dir)
(catch Exception ex (log-error ex)))))
(per-workerdir-cleanup (File. log-root-dir) (* per-dir-size (* 1024 1024)))
(let [all-logs (sorted-worker-logs (File. log-root-dir))
size (* total-size (* 1024 1024))]
(delete-oldest-while-logs-too-large all-logs size))))
(defn start-log-cleaner! [conf log-root-dir]
(let [interval-secs (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
(when interval-secs
(log-debug "starting log cleanup thread at interval: " interval-secs)
(schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
:kill-fn (fn [t]
(log-error t "Error when doing logs cleanup")
(exit-process! 20 "Error when doing log cleanup")))
0 ;; Start immediately.
interval-secs
(fn [] (cleanup-fn! log-root-dir))))))
(defn- skip-bytes
"FileInputStream#skip may not work the first time, so ensure it successfully
skips the given number of bytes."
[^InputStream stream n]
(loop [skipped 0]
(let [skipped (+ skipped (.skip stream (- n skipped)))]
(if (< skipped n) (recur skipped)))))
(defn logfile-matches-filter?
[log-file-name]
(let [regex-string (str "worker.log.*")
regex-pattern (re-pattern regex-string)]
(not= (re-seq regex-pattern (.toString log-file-name)) nil)))
(defn page-file
([path tail]
(let [zip-file? (.endsWith path ".gz")
flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
skip (- flen tail)]
(page-file path skip tail)))
([path start length]
(let [zip-file? (.endsWith path ".gz")
flen (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))]
(with-open [input (if zip-file? (GZIPInputStream. (FileInputStream. path)) (FileInputStream. path))
output (java.io.ByteArrayOutputStream.)]
(if (>= start flen)
(throw
(InvalidRequestException. "Cannot start past the end of the file")))
(if (> start 0) (skip-bytes input start))
(let [buffer (make-array Byte/TYPE 1024)]
(loop []
(when (< (.size output) length)
(let [size (.read input buffer 0 (min 1024 (- length (.size output))))]
(when (pos? size)
(.write output buffer 0 size)
(recur)))))
(.toString output))))))
(defn get-log-user-group-whitelist [fname]
(let [wl-file (get-log-metadata-file fname)
m (clojure-from-yaml-file wl-file)]
(if (not-nil? m)
(do
(let [user-wl (.get m LOGS-USERS)
user-wl (if user-wl user-wl [])
group-wl (.get m LOGS-GROUPS)
group-wl (if group-wl group-wl [])]
[user-wl group-wl]))
nil)))
(def igroup-mapper (AuthUtils/GetGroupMappingServiceProviderPlugin *STORM-CONF*))
(defn user-groups
[user]
(if (blank? user) [] (.getGroups igroup-mapper user)))
(defn authorized-log-user? [user fname conf]
(if (or (blank? user) (blank? fname) (nil? (get-log-user-group-whitelist fname)))
nil
(let [groups (user-groups user)
[user-wl group-wl] (get-log-user-group-whitelist fname)
logs-users (concat (conf LOGS-USERS)
(conf NIMBUS-ADMINS)
user-wl)
logs-groups (concat (conf LOGS-GROUPS)
group-wl)]
(or (some #(= % user) logs-users)
(< 0 (.size (intersection (set groups) (set group-wl))))))))
(defn log-root-dir
"Given an appender name, as configured, get the parent directory of the appender's log file.
Note that if anything goes wrong, this will throw an Error and exit."
[appender-name]
(let [appender (.getAppender (.getConfiguration (LogManager/getContext)) appender-name)]
(if (and appender-name appender (instance? RollingFileAppender appender))
(.getParent (File. (.getFileName appender)))
(throw
(RuntimeException. "Log viewer could not find configured appender, or the appender is not a FileAppender. Please check that the appender name configured in storm and log4j agree.")))))
(defnk to-btn-link
"Create a link that is formatted like a button"
[url text :enabled true]
[:a {:href (java.net.URI. url)
:class (str "btn btn-default " (if enabled "enabled" "disabled"))} text])
(defn search-file-form [fname]
[[:form {:action "logviewer_search.html" :id "search-box"}
"Search this file:"
[:input {:type "text" :name "search"}]
[:input {:type "hidden" :name "file" :value fname}]
[:input {:type "submit" :value "Search"}]]])
(defn log-file-selection-form [log-files type]
[[:form {:action type :id "list-of-files"}
(drop-down "file" log-files )
[:input {:type "submit" :value "Switch file"}]]])
(defn pager-links [fname start length file-size]
(let [prev-start (max 0 (- start length))
next-start (if (> file-size 0)
(min (max 0 (- file-size length)) (+ start length))
(+ start length))]
[[:div
(concat
[(to-btn-link (url "/log"
{:file fname
:start (max 0 (- start length))
:length length})
"Prev" :enabled (< prev-start start))]
[(to-btn-link (url "/log"
{:file fname
:start 0
:length length}) "First")]
[(to-btn-link (url "/log"
{:file fname
:length length})
"Last")]
[(to-btn-link (url "/log"
{:file fname
:start (min (max 0 (- file-size length))
(+ start length))
:length length})
"Next" :enabled (> next-start start))])]]))
(defn- download-link [fname]
[[:p (link-to (url-format "/download/%s" fname) "Download Full File")]])
(defn- daemon-download-link [fname]
[[:p (link-to (url-format "/daemondownload/%s" fname) "Download Full File")]])
(defn- is-txt-file [fname]
(re-find #"\.(log.*|txt|yaml|pid)$" fname))
(def default-bytes-per-page 51200)
(defn log-page [fname start length grep user root-dir]
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(let [file (.getCanonicalFile (File. root-dir fname))
path (.getCanonicalPath file)
zip-file? (.endsWith path ".gz")
topo-dir (.getParentFile (.getParentFile file))]
(if (and (.exists file)
(= (.getCanonicalFile (File. root-dir))
(.getParentFile topo-dir)))
(let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
log-files (reduce clojure.set/union
(sorted-set)
(for [^File port-dir (.listFiles topo-dir)]
(into [] (filter #(.isFile %) (.listFiles port-dir))))) ;all types of files included
files-str (for [file log-files]
(get-topo-port-workerlog file))
reordered-files-str (conj (filter #(not= fname %) files-str) fname)
length (if length
(min 10485760 length)
default-bytes-per-page)
log-string (escape-html
(if (is-txt-file fname)
(if start
(page-file path start length)
(page-file path length))
"This is a binary file and cannot display! You may download the full file."))
start (or start (- file-length length))]
(if grep
(html [:pre#logContent
(if grep
(->> (.split log-string "\n")
(filter #(.contains % grep))
(string/join "\n"))
log-string)])
(let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
(html (concat (search-file-form fname)
(log-file-selection-form reordered-files-str "log") ; list all files for this topology
pager-data
(download-link fname)
[[:pre#logContent log-string]]
pager-data)))))
(-> (resp/response "Page not found")
(resp/status 404))))
(if (nil? (get-log-user-group-whitelist fname))
(-> (resp/response "Page not found")
(resp/status 404))
(unauthorized-user-html user))))
(defn daemonlog-page [fname start length grep user root-dir]
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(let [file (.getCanonicalFile (File. root-dir fname))
file-length (.length file)
path (.getCanonicalPath file)
zip-file? (.endsWith path ".gz")]
(if (and (= (.getCanonicalFile (File. root-dir))
(.getParentFile file))
(.exists file))
(let [file-length (if zip-file? (Utils/zipFileSize (clojure.java.io/file path)) (.length (clojure.java.io/file path)))
length (if length
(min 10485760 length)
default-bytes-per-page)
log-files (into [] (filter #(.isFile %) (.listFiles (File. root-dir)))) ;all types of files included
files-str (for [file log-files]
(.getName file))
reordered-files-str (conj (filter #(not= fname %) files-str) fname)
log-string (escape-html
(if (is-txt-file fname)
(if start
(page-file path start length)
(page-file path length))
"This is a binary file and cannot display! You may download the full file."))
start (or start (- file-length length))]
(if grep
(html [:pre#logContent
(if grep
(->> (.split log-string "\n")
(filter #(.contains % grep))
(string/join "\n"))
log-string)])
(let [pager-data (if (is-txt-file fname) (pager-links fname start length file-length) nil)]
(html (concat (log-file-selection-form reordered-files-str "daemonlog") ; list all daemon logs
pager-data
(daemon-download-link fname)
[[:pre#logContent log-string]]
pager-data)))))
(-> (resp/response "Page not found")
(resp/status 404))))
(if (nil? (get-log-user-group-whitelist fname))
(-> (resp/response "Page not found")
(resp/status 404))
(unauthorized-user-html user))))
(defn download-log-file [fname req resp user ^String root-dir]
(let [file (.getCanonicalFile (File. root-dir fname))]
(if (.exists file)
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(-> (resp/response file)
(resp/content-type "application/octet-stream"))
(unauthorized-user-html user))
(-> (resp/response "Page not found")
(resp/status 404)))))
(def grep-max-search-size 1024)
(def grep-buf-size 2048)
(def grep-context-size 128)
(defn logviewer-port
[]
(int (*STORM-CONF* LOGVIEWER-PORT)))
(defn url-to-match-centered-in-log-page
[needle fname offset port]
(let [host (local-hostname)
port (logviewer-port)
fname (clojure.string/join file-path-separator (take-last 3 (split fname (re-pattern file-path-separator))))]
(url (str "http://" host ":" port "/log")
{:file fname
:start (max 0
(- offset
(int (/ default-bytes-per-page 2))
(int (/ (alength needle) -2)))) ;; Addition
:length default-bytes-per-page})))
(defnk mk-match-data
[^bytes needle ^ByteBuffer haystack haystack-offset file-offset fname
:before-bytes nil :after-bytes nil]
(let [url (url-to-match-centered-in-log-page needle
fname
file-offset
(*STORM-CONF* LOGVIEWER-PORT))
haystack-bytes (.array haystack)
before-string (if (>= haystack-offset grep-context-size)
(String. haystack-bytes
(- haystack-offset grep-context-size)
grep-context-size
"UTF-8")
(let [num-desired (max 0 (- grep-context-size
haystack-offset))
before-size (if before-bytes
(alength before-bytes)
0)
num-expected (min before-size num-desired)]
(if (pos? num-expected)
(str (String. before-bytes
(- before-size num-expected)
num-expected
"UTF-8")
(String. haystack-bytes
0
haystack-offset
"UTF-8"))
(String. haystack-bytes
0
haystack-offset
"UTF-8"))))
after-string (let [needle-size (alength needle)
after-offset (+ haystack-offset needle-size)
haystack-size (.limit haystack)]
(if (< (+ after-offset grep-context-size) haystack-size)
(String. haystack-bytes
after-offset
grep-context-size
"UTF-8")
(let [num-desired (- grep-context-size
(- haystack-size after-offset))
after-size (if after-bytes
(alength after-bytes)
0)
num-expected (min after-size num-desired)]
(if (pos? num-expected)
(str (String. haystack-bytes
after-offset
(- haystack-size after-offset)
"UTF-8")
(String. after-bytes 0 num-expected "UTF-8"))
(String. haystack-bytes
after-offset
(- haystack-size after-offset)
"UTF-8")))))]
{"byteOffset" file-offset
"beforeString" before-string
"afterString" after-string
"matchString" (String. needle "UTF-8")
"logviewerURL" url}))
(defn- try-read-ahead!
"Tries once to read ahead in the stream to fill the context and resets the
stream to its position before the call."
[^BufferedInputStream stream haystack offset file-len bytes-read]
(let [num-expected (min (- file-len bytes-read)
grep-context-size)
after-bytes (byte-array num-expected)]
(.mark stream num-expected)
;; Only try reading once.
(.read stream after-bytes 0 num-expected)
(.reset stream)
after-bytes))
(defn offset-of-bytes
"Searches a given byte array for a match of a sub-array of bytes. Returns
the offset to the byte that matches, or -1 if no match was found."
[^bytes buf ^bytes value init-offset]
{:pre [(> (alength value) 0)
(not (neg? init-offset))]}
(loop [offset init-offset
candidate-offset init-offset
val-offset 0]
(if-not (pos? (- (alength value) val-offset))
;; Found
candidate-offset
(if (>= offset (alength buf))
;; We ran out of buffer for the search.
-1
(if (not= (aget value val-offset) (aget buf offset))
;; The match at this candidate offset failed, so start over with the
;; next candidate byte from the buffer.
(let [new-offset (inc candidate-offset)]
(recur new-offset new-offset 0))
;; So far it matches. Keep going...
(recur (inc offset) candidate-offset (inc val-offset)))))))
(defn- buffer-substring-search!
"As the file is read into a buffer, 1/2 the buffer's size at a time, we
search the buffer for matches of the substring and return a list of zero or
more matches."
[file file-len offset-to-buf init-buf-offset stream bytes-skipped
bytes-read ^ByteBuffer haystack ^bytes needle initial-matches num-matches
^bytes before-bytes]
(loop [buf-offset init-buf-offset
matches initial-matches]
(let [offset (offset-of-bytes (.array haystack) needle buf-offset)]
(if (and (< (count matches) num-matches) (not (neg? offset)))
(let [file-offset (+ offset-to-buf offset)
bytes-needed-after-match (- (.limit haystack)
grep-context-size
(alength needle))
before-arg (if (< offset grep-context-size) before-bytes)
after-arg (if (> offset bytes-needed-after-match)
(try-read-ahead! stream
haystack
offset
file-len
bytes-read))]
(recur (+ offset (alength needle))
(conj matches
(mk-match-data needle
haystack
offset
file-offset
(.getCanonicalPath file)
:before-bytes before-arg
:after-bytes after-arg))))
(let [before-str-to-offset (min (.limit haystack)
grep-max-search-size)
before-str-from-offset (max 0 (- before-str-to-offset
grep-context-size))
new-before-bytes (Arrays/copyOfRange (.array haystack)
before-str-from-offset
before-str-to-offset)
;; It's OK if new-byte-offset is negative. This is normal if
;; we are out of bytes to read from a small file.
new-byte-offset (if (>= (count matches) num-matches)
(+ (get (last matches) "byteOffset")
(alength needle))
(+ bytes-skipped
bytes-read
(- grep-max-search-size)))]
[matches new-byte-offset new-before-bytes])))))
(defn- mk-grep-response
"This response data only includes a next byte offset if there is more of the
file to read."
[search-bytes offset matches next-byte-offset]
(merge {"searchString" (String. search-bytes "UTF-8")
"startByteOffset" offset
"matches" matches}
(and next-byte-offset {"nextByteOffset" next-byte-offset})))
(defn rotate-grep-buffer!
[^ByteBuffer buf ^BufferedInputStream stream total-bytes-read file file-len]
(let [buf-arr (.array buf)]
;; Copy the 2nd half of the buffer to the first half.
(System/arraycopy buf-arr
grep-max-search-size
buf-arr
0
grep-max-search-size)
;; Zero-out the 2nd half to prevent accidental matches.
(Arrays/fill buf-arr
grep-max-search-size
(count buf-arr)
(byte 0))
;; Fill the 2nd half with new bytes from the stream.
(let [bytes-read (.read stream
buf-arr
grep-max-search-size
(min file-len grep-max-search-size))]
(.limit buf (+ grep-max-search-size bytes-read))
(swap! total-bytes-read + bytes-read))))
(defnk substring-search
"Searches for a substring in a log file, starting at the given offset,
returning the given number of matches, surrounded by the given number of
context lines. Other information is included to be useful for progressively
searching through a file for display in a UI. The search string must
grep-max-search-size bytes or fewer when decoded with UTF-8."
[file ^String search-string :num-matches 10 :start-byte-offset 0]
{:pre [(not (empty? search-string))
(<= (count (.getBytes search-string "UTF-8")) grep-max-search-size)]}
(let [zip-file? (.endsWith (.getName file) ".gz")
f-input-steam (FileInputStream. file)
gzipped-input-stream (if zip-file?
(GZIPInputStream. f-input-steam)
f-input-steam)
stream ^BufferedInputStream (BufferedInputStream.
gzipped-input-stream)
file-len (if zip-file? (Utils/zipFileSize file) (.length file))
buf ^ByteBuffer (ByteBuffer/allocate grep-buf-size)
buf-arr ^bytes (.array buf)
string nil
total-bytes-read (atom 0)
matches []
search-bytes ^bytes (.getBytes search-string "UTF-8")
num-matches (or num-matches 10)
start-byte-offset (or start-byte-offset 0)]
;; Start at the part of the log file we are interested in.
;; Allow searching when start-byte-offset == file-len so it doesn't blow up on 0-length files
(if (> start-byte-offset file-len)
(throw
(InvalidRequestException. "Cannot search past the end of the file")))
(when (> start-byte-offset 0)
(skip-bytes stream start-byte-offset))
(java.util.Arrays/fill buf-arr (byte 0))
(let [bytes-read (.read stream buf-arr 0 (min file-len grep-buf-size))]
(.limit buf bytes-read)
(swap! total-bytes-read + bytes-read))
(loop [initial-matches []
init-buf-offset 0
byte-offset start-byte-offset
before-bytes nil]
(let [[matches new-byte-offset new-before-bytes]
(buffer-substring-search! file
file-len
byte-offset
init-buf-offset
stream
start-byte-offset
@total-bytes-read
buf
search-bytes
initial-matches
num-matches
before-bytes)]
(if (and (< (count matches) num-matches)
(< (+ @total-bytes-read start-byte-offset) file-len))
(let [;; The start index is positioned to find any possible
;; occurrence search string that did not quite fit in the
;; buffer on the previous read.
new-buf-offset (- (min (.limit ^ByteBuffer buf)
grep-max-search-size)
(alength search-bytes))]
(rotate-grep-buffer! buf stream total-bytes-read file file-len)
(when (< @total-bytes-read 0)
(throw (InvalidRequestException. "Cannot search past the end of the file")))
(recur matches
new-buf-offset
new-byte-offset
new-before-bytes))
(mk-grep-response search-bytes
start-byte-offset
matches
(if-not (and (< (count matches) num-matches)
(>= @total-bytes-read file-len))
(let [next-byte-offset (+ (get (last matches)
"byteOffset")
(alength search-bytes))]
(if (> file-len next-byte-offset)
next-byte-offset)))))))))
(defn- try-parse-int-param
[nam value]
(try
(Integer/parseInt value)
(catch java.lang.NumberFormatException e
(->
(str "Could not parse " nam " to an integer")
(InvalidRequestException. e)
throw))))
(defn search-log-file
[fname user ^String root-dir search num-matches offset callback origin]
(let [file (.getCanonicalFile (File. root-dir fname))]
(if (.exists file)
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user fname *STORM-CONF*))
(let [num-matches-int (if num-matches
(try-parse-int-param "num-matches"
num-matches))
offset-int (if offset
(try-parse-int-param "start-byte-offset" offset))]
(try
(if (and (not (empty? search))
<= (count (.getBytes search "UTF-8")) grep-max-search-size)
(json-response
(substring-search file
search
:num-matches num-matches-int
:start-byte-offset offset-int)
callback
:headers {"Access-Control-Allow-Origin" origin
"Access-Control-Allow-Credentials" "true"})
(throw
(InvalidRequestException.
(str "Search substring must be between 1 and 1024 UTF-8 "
"bytes in size (inclusive)"))))
(catch Exception ex
(json-response (exception->json ex) callback :status 500))))
(json-response (unauthorized-user-json user) callback :status 401))
(json-response {"error" "Not Found"
"errorMessage" "The file was not found on this node."}
callback
:status 404))))
(defn find-n-matches [logs n file-offset offset search]
(let [logs (drop file-offset logs)
wrap-matches-fn (fn [matches]
{"fileOffset" file-offset
"searchString" search
"matches" matches})]
(loop [matches []
logs logs
offset offset
file-offset file-offset
match-count 0]
(if (empty? logs)
(wrap-matches-fn matches)
(let [these-matches (try
(log-debug "Looking through " (first logs))
(substring-search (first logs)
search
:num-matches (- n match-count)
:start-byte-offset offset)
(catch InvalidRequestException e
(log-error e "Can't search past end of file.")
{}))
file-name (get-topo-port-workerlog (first logs))
new-matches (conj matches
(merge these-matches
{ "fileName" file-name
"port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern file-path-separator))))}))
new-count (+ match-count (count (these-matches "matches")))]
(if (empty? these-matches)
(recur matches (rest logs) 0 (+ file-offset 1) match-count)
(if (>= new-count n)
(wrap-matches-fn new-matches)
(recur new-matches (rest logs) 0 (+ file-offset 1) new-count))))))))
(defn logs-for-port
"Get the filtered, authorized, sorted log files for a port."
[user port-dir]
(let [filter-authorized-fn (fn [user logs]
(filter #(or
(blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user (get-topo-port-workerlog %) *STORM-CONF*)) logs))]
(sort #(compare (.lastModified %2) (.lastModified %1))
(filter-authorized-fn
user
(filter #(re-find worker-log-filename-pattern (.getName %)) (.listFiles port-dir))))))
(defn deep-search-logs-for-topology
[topology-id user ^String root-dir search num-matches port file-offset offset search-archived? callback origin]
(json-response
(if (or (not search) (not (.exists (File. (str root-dir file-path-separator topology-id)))))
[]
(let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
offset (if offset (Integer/parseInt offset) 0)
num-matches (or (Integer/parseInt num-matches) 1)
port-dirs (vec (.listFiles (File. (str root-dir file-path-separator topology-id))))
logs-for-port-fn (partial logs-for-port user)]
(if (or (not port) (= "*" port))
;; Check for all ports
(let [filtered-logs (filter (comp not empty?) (map logs-for-port-fn port-dirs))]
(if search-archived?
(map #(find-n-matches % num-matches 0 0 search)
filtered-logs)
(map #(find-n-matches % num-matches 0 0 search)
(map (comp vector first) filtered-logs))))
;; Check just the one port
(if (not (contains? (into #{} (map str (*STORM-CONF* SUPERVISOR-SLOTS-PORTS))) port))
[]
(let [port-dir (File. (str root-dir file-path-separator topology-id file-path-separator port))]
(if (or (not (.exists port-dir)) (empty? (logs-for-port user port-dir)))
[]
(let [filtered-logs (logs-for-port user port-dir)]
(if search-archived?
(find-n-matches filtered-logs num-matches file-offset offset search)
(find-n-matches [(first filtered-logs)] num-matches 0 offset search)))))))))
callback
:headers {"Access-Control-Allow-Origin" origin
"Access-Control-Allow-Credentials" "true"}))
(defn log-template
([body] (log-template body nil nil))
([body fname user]
(html4
[:head
[:title (str (escape-html fname) " - Storm Log Viewer")]
(include-css "/css/bootstrap-3.3.1.min.css")
(include-css "/css/jquery.dataTables.1.10.4.min.css")
(include-css "/css/style.css")
]
[:body
(concat
(when (not (blank? user)) [[:div.ui-user [:p "User: " user]]])
[[:h3 (escape-html fname)]]
(seq body))
])))
(def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*))
(defn- parse-long-from-map [m k]
(try
(Long/parseLong (k m))
(catch NumberFormatException ex
(throw (InvalidRequestException.
(str "Could not make an integer out of the query parameter '"
(name k) "'")
ex)))))
(defn list-log-files
[user topoId port log-root callback origin]
(let [file-results
(if (nil? topoId)
(if (nil? port)
(get-all-logs-for-rootdir (File. log-root))
(reduce concat
(for [topo-dir (.listFiles (File. log-root))]
(reduce concat
(for [port-dir (.listFiles topo-dir)]
(if (= (str port) (.getName port-dir))
(into [] (.listFiles port-dir))))))))
(if (nil? port)
(let [topo-dir (File. (str log-root file-path-separator topoId))]
(if (.exists topo-dir)
(reduce concat
(for [port-dir (.listFiles topo-dir)]
(into [] (.listFiles port-dir))))
[]))
(let [port-dir (get-worker-dir-from-root log-root topoId port)]
(if (.exists port-dir)
(into [] (.listFiles port-dir))
[]))))
file-strs (sort (for [file file-results]
(get-topo-port-workerlog file)))]
(json-response file-strs
callback
:headers {"Access-Control-Allow-Origin" origin
"Access-Control-Allow-Credentials" "true"})))
(defn get-profiler-dump-files
[dir]
(filter (comp not nil?)
(for [f (.listFiles dir)]
(let [name (.getName f)]
(if (or
(.endsWith name ".txt")
(.endsWith name ".jfr")
(.endsWith name ".bin"))
(.getName f))))))
(defroutes log-routes
(GET "/log" [:as req & m]
(try
(mark! logviewer:num-log-page-http-requests)
(let [servlet-request (:servlet-request req)
log-root (:log-root req)
user (.getUserName http-creds-handler servlet-request)
start (if (:start m) (parse-long-from-map m :start))
length (if (:length m) (parse-long-from-map m :length))
file (url-decode (:file m))]
(log-template (log-page file start length (:grep m) user log-root)
file user))
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
(GET "/dumps/:topo-id/:host-port/:filename"
[:as {:keys [servlet-request servlet-response log-root]} topo-id host-port filename &m]
(let [user (.getUserName http-creds-handler servlet-request)
port (second (split host-port #":"))
dir (File. (str log-root
file-path-separator
topo-id
file-path-separator
port))
file (File. (str log-root
file-path-separator
topo-id
file-path-separator
port
file-path-separator
filename))]
(if (and (.exists dir) (.exists file))
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user
(str topo-id file-path-separator port file-path-separator "worker.log")
*STORM-CONF*))
(-> (resp/response file)
(resp/content-type "application/octet-stream"))
(unauthorized-user-html user))
(-> (resp/response "Page not found")
(resp/status 404)))))
(GET "/dumps/:topo-id/:host-port"
[:as {:keys [servlet-request servlet-response log-root]} topo-id host-port &m]
(let [user (.getUserName http-creds-handler servlet-request)
port (second (split host-port #":"))
dir (File. (str log-root
file-path-separator
topo-id
file-path-separator
port))]
(if (.exists dir)
(if (or (blank? (*STORM-CONF* UI-FILTER))
(authorized-log-user? user
(str topo-id file-path-separator port file-path-separator "worker.log")
*STORM-CONF*))
(html4
[:head
[:title "File Dumps - Storm Log Viewer"]
(include-css "/css/bootstrap-3.3.1.min.css")
(include-css "/css/jquery.dataTables.1.10.4.min.css")
(include-css "/css/style.css")]
[:body
[:ul
(for [file (get-profiler-dump-files dir)]
[:li
[:a {:href (str "/dumps/" topo-id "/" host-port "/" file)} file ]])]])
(unauthorized-user-html user))
(-> (resp/response "Page not found")
(resp/status 404)))))
(GET "/daemonlog" [:as req & m]
(try
(mark! logviewer:num-daemonlog-page-http-requests)
(let [servlet-request (:servlet-request req)
daemonlog-root (:daemonlog-root req)
user (.getUserName http-creds-handler servlet-request)
start (if (:start m) (parse-long-from-map m :start))
length (if (:length m) (parse-long-from-map m :length))
file (url-decode (:file m))]
(log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
file user))
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
(GET "/download/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
(try
(mark! logviewer:num-download-log-file-http-requests)
(let [user (.getUserName http-creds-handler servlet-request)]
(download-log-file file servlet-request servlet-response user log-root))
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
(GET "/daemondownload/:file" [:as {:keys [servlet-request servlet-response daemonlog-root]} file & m]
(try
(mark! logviewer:num-download-log-daemon-file-http-requests)
(let [user (.getUserName http-creds-handler servlet-request)]
(download-log-file file servlet-request servlet-response user daemonlog-root))
(catch InvalidRequestException ex
(log-error ex)
(ring-response-from-exception ex))))
(GET "/search/:file" [:as {:keys [servlet-request servlet-response log-root]} file & m]
;; We do not use servlet-response here, but do not remove it from the
;; :keys list, or this rule could stop working when an authentication
;; filter is configured.
(try
(let [user (.getUserName http-creds-handler servlet-request)]
(search-log-file (url-decode file)
user
log-root
(:search-string m)
(:num-matches m)
(:start-byte-offset m)
(:callback m)
(.getHeader servlet-request "Origin")))
(catch InvalidRequestException ex
(log-error ex)
(json-response (exception->json ex) (:callback m) :status 400))))
(GET "/deepSearch/:topo-id" [:as {:keys [servlet-request servlet-response log-root]} topo-id & m]
;; We do not use servlet-response here, but do not remove it from the
;; :keys list, or this rule could stop working when an authentication
;; filter is configured.
(try
(let [user (.getUserName http-creds-handler servlet-request)]
(deep-search-logs-for-topology topo-id
user
log-root
(:search-string m)
(:num-matches m)
(:port m)
(:start-file-offset m)
(:start-byte-offset m)
(:search-archived m)
(:callback m)
(.getHeader servlet-request "Origin")))
(catch InvalidRequestException ex
(log-error ex)
(json-response (exception->json ex) (:callback m) :status 400))))
(GET "/searchLogs" [:as req & m]
(try
(let [servlet-request (:servlet-request req)
user (.getUserName http-creds-handler servlet-request)]
(list-log-files user
(:topoId m)
(:port m)
(:log-root req)
(:callback m)
(.getHeader servlet-request "Origin")))
(catch InvalidRequestException ex
(log-error ex)
(json-response (exception->json ex) (:callback m) :status 400))))
(GET "/listLogs" [:as req & m]
(try
(mark! logviewer:num-list-logs-http-requests)
(let [servlet-request (:servlet-request req)
user (.getUserName http-creds-handler servlet-request)]
(list-log-files user
(:topoId m)
(:port m)
(:log-root req)
(:callback m)
(.getHeader servlet-request "Origin")))
(catch InvalidRequestException ex
(log-error ex)
(json-response (exception->json ex) (:callback m) :status 400))))
(route/resources "/")
(route/not-found "Page not found"))
(defn conf-middleware
"For passing the storm configuration with each request."
[app log-root daemonlog-root]
(fn [req]
(app (assoc req :log-root log-root :daemonlog-root daemonlog-root))))
(defn start-logviewer! [conf log-root-dir daemonlog-root-dir]
(try
(let [header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
filter-class (conf UI-FILTER)
filter-params (conf UI-FILTER-PARAMS)
logapp (handler/api (-> log-routes
requests-middleware)) ;; query params as map
middle (conf-middleware logapp log-root-dir daemonlog-root-dir)
filters-confs (if (conf UI-FILTER)
[{:filter-class filter-class
:filter-params (or (conf UI-FILTER-PARAMS) {})}]
[])
filters-confs (concat filters-confs
[{:filter-class "org.eclipse.jetty.servlets.GzipFilter"
:filter-name "Gzipper"
:filter-params {}}])
https-port (int (or (conf LOGVIEWER-HTTPS-PORT) 0))
keystore-path (conf LOGVIEWER-HTTPS-KEYSTORE-PATH)
keystore-pass (conf LOGVIEWER-HTTPS-KEYSTORE-PASSWORD)
keystore-type (conf LOGVIEWER-HTTPS-KEYSTORE-TYPE)
key-password (conf LOGVIEWER-HTTPS-KEY-PASSWORD)
truststore-path (conf LOGVIEWER-HTTPS-TRUSTSTORE-PATH)
truststore-password (conf LOGVIEWER-HTTPS-TRUSTSTORE-PASSWORD)
truststore-type (conf LOGVIEWER-HTTPS-TRUSTSTORE-TYPE)
want-client-auth (conf LOGVIEWER-HTTPS-WANT-CLIENT-AUTH)
need-client-auth (conf LOGVIEWER-HTTPS-NEED-CLIENT-AUTH)]
(storm-run-jetty {:port (int (conf LOGVIEWER-PORT))
:configurator (fn [server]
(config-ssl server
https-port
keystore-path
keystore-pass
keystore-type
key-password
truststore-path
truststore-password
truststore-type
want-client-auth
need-client-auth)
(config-filter server middle filters-confs))}))
(catch Exception ex
(log-error ex))))
(defn -main []
(let [conf (read-storm-config)
log-root (worker-artifacts-root conf)
daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
(setup-default-uncaught-exception-handler)
(start-log-cleaner! conf log-root)
(start-logviewer! conf log-root daemonlog-root)
(start-metrics-reporters)))