(ns jepsen.flink.utils
(:require [ :refer :all]
[control :as c]
[util :refer [meh]]]
[jepsen.os.debian :as debian]))
(defn retry
"Runs a function op and retries on exception.
The following options are supported:
:on-retry - A function called for every retry with an exception and the attempt number as arguments.
:success - A function called with the result of op.
:fallback – A function with an exception as the first argument that is called if all retries are exhausted.
:retries - Number of total retries.
:delay – The time between retries."
([op & {:keys [on-retry success fallback retries delay]
:or {on-retry (fn [exception attempt] (warn "Retryable operation failed:"
(.getMessage exception)))
success identity
fallback #(throw %)
retries 10
delay 2000}
:as keys}]
(let [r (try
(catch Exception e (if (< 0 retries)
{:exception e}
(fallback e))))]
(if (:exception r)
(on-retry (:exception r) retries)
(Thread/sleep delay)
(recur op (assoc keys :retries (dec retries))))
(success r)))))
(defn join-space
[& tokens]
(clojure.string/join " " tokens))
(defn find-files!
"Lists files recursively given a directory. If the directory does not exist, an empty collection
is returned."
(let [files (try
(c/exec :find dir :-type :f)
(catch Exception e
(if (.contains (.getMessage e) "No such file or directory")
(throw e))))]
(clojure.string/split files #"\n")
(remove clojure.string/blank?))))
;;; runit process supervisor (
(def runit-version "2.1.2-9.2")
(def runit-systemd-version "2.1.2-9.2")
(defn- install-process-supervisor!
"Installs the process supervisor."
(debian/install {:runit runit-version
:runit-systemd runit-systemd-version}))
(defn create-supervised-service!
"Registers a service with the process supervisor and starts it."
[service-name cmd]
(let [service-dir (str "/etc/sv/" service-name)
run-script (str service-dir "/run")]
(info "Create supervised service" service-name)
(c/exec :mkdir :-p service-dir)
(c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
"exec 2>&1"
(str "exec " cmd)]) :> run-script)
(c/exec :chmod :+x run-script)
(c/exec :mkdir :-p "/etc/service")
(c/exec :ln :-sfT service-dir (str "/etc/service/" service-name)))))
(defn stop-supervised-service!
"Stops a service and removes it from supervision."
(info "Stop supervised service" service-name)
(c/exec :rm :-f (str "/etc/service/" service-name))))
(defn stop-all-supervised-services!
"Stops and removes all services from supervision if any."
(info "Stop all supervised services.")
;; HACK:
;; Remove all symlinks in /etc/service except sshd.
;; This is only relevant when tests are run in Docker because there sshd is started using runit.
(meh (c/exec :find (c/lit (str "/etc/service -mindepth 1 -maxdepth 1 -type l -not -name 'sshd' -delete"))))))
;;; jstack
(defn- includes-any?
[s substrs]
(some #(clojure.string/includes? s %) substrs))
(defn- jps!
(map #(clojure.string/split % #"\s")
(-> (c/exec :jps)
(clojure.string/split #"\n"))))
(->> (jps!)
(filter #(= 2 (count %)))
(filter (fn [[_ class-name]] (includes-any? class-name class-name-patterns))))))
(defn- write-jstack!
[pid out-path]
(c/exec :jstack :-l pid :> out-path))
(defn dump-jstack-by-pattern!
"Dumps the output of jstack for all JVMs that match one of the specified patterns."
[out-dir & class-name-patterns]
(let [pid-class-names (jps! class-name-patterns)]
(doseq [[pid class-name] pid-class-names]
(let [out-path (str out-dir "/jstack_" pid "_" class-name)]
(write-jstack! pid out-path)))))