| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| |
| (ns jepsen.flink.utils |
| (:require [clojure.tools.logging :refer :all] |
| [jepsen |
| [control :as c] |
| [util :refer [meh]]] |
| [jepsen.os.debian :as debian])) |
| |
| (defn retry |
| "Runs a function op and retries on exception. |
| |
| The following options are supported: |
| |
| :on-retry - A function called for every retry with an exception and the attempt number as arguments. |
| :success - A function called with the result of op. |
| :fallback – A function with an exception as the first argument that is called if all retries are exhausted. |
| :retries - Number of total retries. |
| :delay – The time between retries." |
| ([op & {:keys [on-retry success fallback retries delay] |
| :or {on-retry (fn [exception attempt] (warn "Retryable operation failed:" |
| (.getMessage exception))) |
| success identity |
| fallback #(throw %) |
| retries 10 |
| delay 2000} |
| :as keys}] |
| (let [r (try |
| (op) |
| (catch Exception e (if (< 0 retries) |
| {:exception e} |
| (fallback e))))] |
| (if (:exception r) |
| (do |
| (on-retry (:exception r) retries) |
| (Thread/sleep delay) |
| (recur op (assoc keys :retries (dec retries)))) |
| (success r))))) |
| |
| (defn join-space |
| [& tokens] |
| (clojure.string/join " " tokens)) |
| |
| (defn find-files! |
| "Lists files recursively given a directory. If the directory does not exist, an empty collection |
| is returned." |
| ([dir] (find-files! dir "*")) |
| ([dir name] |
| (let [files (try |
| (c/exec :find dir :-type :f :-name (c/lit (str "\"" name "\""))) |
| (catch Exception e |
| (if (.contains (.getMessage e) "No such file or directory") |
| "" |
| (throw e))))] |
| (->> |
| (clojure.string/split files #"\n") |
| (remove clojure.string/blank?))))) |
| |
| ;;; runit process supervisor (http://smarden.org/runit/) |
| |
| (def runit-version "2.1.2-9.2") |
| (def runit-systemd-version "2.1.2-9.2") |
| |
| (defn- install-process-supervisor! |
| "Installs the process supervisor." |
| [] |
| (debian/install {:runit runit-version |
| :runit-systemd runit-systemd-version})) |
| |
| (defn create-supervised-service! |
| "Registers a service with the process supervisor and starts it." |
| [service-name cmd] |
| (let [service-dir (str "/etc/sv/" service-name) |
| run-script (str service-dir "/run")] |
| (info "Create supervised service" service-name) |
| (c/su |
| (install-process-supervisor!) |
| (c/exec :mkdir :-p service-dir) |
| (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh" |
| "exec 2>&1" |
| (str "exec " cmd)]) :> run-script) |
| (c/exec :chmod :+x run-script) |
| (c/exec :mkdir :-p "/etc/service") |
| (c/exec :ln :-sfT service-dir (str "/etc/service/" service-name))))) |
| |
| (defn stop-supervised-service! |
| "Stops a service and removes it from supervision." |
| [service-name] |
| (info "Stop supervised service" service-name) |
| (c/su |
| (c/exec :rm :-f (str "/etc/service/" service-name)))) |
| |
| (defn stop-all-supervised-services! |
| "Stops and removes all services from supervision if any." |
| [] |
| (info "Stop all supervised services.") |
| (c/su |
| ;; HACK: |
| ;; Remove all symlinks in /etc/service except sshd. |
| ;; This is only relevant when tests are run in Docker because there sshd is started using runit. |
| (meh (c/exec :find (c/lit (str "/etc/service -mindepth 1 -maxdepth 1 -type l -not -name 'sshd' -delete")))))) |
| |
| ;;; jstack |
| |
| (defn- includes-any? |
| [s substrs] |
| (some #(clojure.string/includes? s %) substrs)) |
| |
| (defn- jps! |
| ([] |
| (map #(clojure.string/split % #"\s") |
| (-> (c/exec :jps) |
| (clojure.string/trim) |
| (clojure.string/split #"\n")))) |
| |
| ([class-name-patterns] |
| (->> (jps!) |
| (filter #(= 2 (count %))) |
| (filter (fn [[_ class-name]] (includes-any? class-name class-name-patterns)))))) |
| |
| (defn- write-jstack! |
| [pid out-path] |
| (try |
| (c/exec :jstack :-l pid :> out-path) |
| (catch Exception e |
| (warn e "Failed to invoke jstack on pid" pid)))) |
| |
| (defn dump-jstack-by-pattern! |
| "Dumps the output of jstack for all JVMs that match one of the specified patterns." |
| [out-dir & class-name-patterns] |
| (let [pid-class-names (jps! class-name-patterns)] |
| (doseq [[pid class-name] pid-class-names] |
| (let [out-path (str out-dir "/jstack_" pid "_" class-name)] |
| (write-jstack! pid out-path))))) |