| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| |
| (ns jepsen.flink.client |
| (:require [clj-http.client :as http] |
| [clojure.tools.logging :refer :all] |
| [jepsen.client :as client] |
| [jepsen.flink.zookeeper :as fz] |
| [jepsen.flink.utils :as fu] |
| [zookeeper :as zk]) |
| (:import (java.io ByteArrayInputStream ObjectInputStream))) |
| |
| (defn connect-zk-client! |
| [connection-string] |
| (zk/connect connection-string :timeout-msec 60000)) |
| |
| (defn read-url |
| [bytes] |
| (with-open [object-input-stream (ObjectInputStream. (ByteArrayInputStream. bytes))] |
| (.readUTF object-input-stream))) |
| |
| (defn wait-for-zk-operation |
| [zk-client operation path] |
| (let [p (promise)] |
| (letfn [(iter [_] |
| (when-let [res (operation zk-client path :watcher iter)] |
| (deliver p res))) |
| ] |
| (iter nil) |
| p))) |
| |
| (defn wait-for-path-to-exist |
| [zk-client path] |
| (info "Waiting for path" path "in ZK.") |
| (wait-for-zk-operation zk-client zk/exists path)) |
| |
| (defn get-only-application-id |
| [coll] |
| (assert (= 1 (count coll)) (str "Expected 1 application id, got " coll ". " |
| "Failed to deploy the Flink cluster, or there are lingering Flink clusters.")) |
| (first coll)) |
| |
| (defn wait-for-children-to-exist |
| [zk-client path] |
| (wait-for-zk-operation zk-client zk/children path)) |
| |
| (defn find-application-id |
| [zk-client] |
| (do |
| (-> |
| (wait-for-path-to-exist zk-client "/flink") |
| (deref)) |
| (-> |
| (wait-for-children-to-exist zk-client "/flink") |
| (deref) |
| (get-only-application-id)))) |
| |
| (defn watch-node-bytes |
| [zk-client path callback] |
| (when (zk/exists zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback))) |
| (->> |
| (zk/data zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback))) |
| :data |
| (callback)))) |
| |
| (defn make-job-manager-url [test] |
| (let [rest-url-atom (atom nil) |
| zk-client (connect-zk-client! (fz/zookeeper-quorum test)) |
| init-future (future |
| (let [application-id (find-application-id zk-client) |
| path (str "/flink/" application-id "/leader/rest_server_lock") |
| _ (-> |
| (wait-for-path-to-exist zk-client path) |
| (deref))] |
| (info "Determined application id to be" application-id) |
| (watch-node-bytes zk-client path |
| (fn [bytes] |
| (let [url (read-url bytes)] |
| (info "Leading REST url changed to" url) |
| (reset! rest-url-atom url))))))] |
| {:rest-url-atom rest-url-atom |
| :closer (fn [] (zk/close zk-client)) |
| :init-future init-future})) |
| |
| (defn list-jobs! |
| [base-url] |
| (->> |
| (http/get (str base-url "/jobs") {:as :json}) |
| :body |
| :jobs |
| (map :id))) |
| |
| (defn job-running? |
| [base-url job-id] |
| (let [response (http/get (str base-url "/jobs/" job-id) {:as :json :throw-exceptions false}) |
| body (:body response) |
| error (:errors body)] |
| (cond |
| (http/missing? response) false |
| (not (http/success? response)) (throw (ex-info "Could not determine if job is running" {:job-id job-id :error error})) |
| :else (do |
| (assert (:vertices body) "Job does not have vertices") |
| (->> |
| body |
| :vertices |
| (map :status) |
| (every? #(= "RUNNING" %))))))) |
| |
| (defn jobs-running? |
| "Checks if multiple jobs are running. Returns a map where the keys are job ids and the values are |
| booleans indicating whether the job is running or not." |
| [base-url job-ids] |
| (let [job-running-on-current-master? (partial job-running? base-url) |
| make-job-id-running?-pair (juxt identity job-running-on-current-master?)] |
| (into {} (map make-job-id-running?-pair job-ids)))) |
| |
| (defn- cancel-job! |
| "Cancels the specified job. Returns true if the job could be canceled. |
| Returns false if the job does not exist. Throws an exception if the HTTP status |
| is not successful." |
| [base-url job-id] |
| (let [response (http/patch (str base-url "/jobs/" job-id) {:as :json :throw-exceptions false}) |
| error (-> response :body :errors)] |
| (cond |
| (http/missing? response) false |
| (not (http/success? response)) (throw (ex-info "Job cancellation unsuccessful" {:job-id job-id :error error})) |
| :else true))) |
| |
| (defn- cancel-jobs! |
| [base-url job-ids] |
| (doseq [job-id job-ids] (cancel-job! base-url job-id))) |
| |
| (defmacro dispatch-operation |
| [op & body] |
| `(try |
| (assoc ~op :type :ok :value ~@body) |
| (catch Exception e# (do |
| (warn e# "An exception occurred while running" (quote ~@body)) |
| (assoc ~op :type :fail :error (.getMessage e#)))))) |
| |
| (defmacro dispatch-operation-or-fatal |
| "Dispatches op by evaluating body, retrying a number of times if needed. |
| Fails fatally if all retries are exhausted." |
| [op & body] |
| `(assoc ~op :type :ok :value (fu/retry (fn [] ~@body) :fallback (fn [e#] |
| (fatal e# "Required operation did not succeed" (quote ~@body)) |
| (System/exit 1))))) |
| |
| (defn- dispatch-rest-operation! |
| [rest-url job-ids op] |
| (assert job-ids) |
| (if-not rest-url |
| (assoc op :type :fail :error "Have not determined REST URL yet.") |
| (case (:f op) |
| :jobs-running? (dispatch-operation op (fu/retry |
| (partial jobs-running? rest-url job-ids) |
| :retries 3 |
| :fallback #(throw %))) |
| :cancel-jobs (dispatch-operation-or-fatal op (cancel-jobs! rest-url job-ids))))) |
| |
| (defrecord Client |
| [closer ; function that closes the ZK client |
| rest-url ; atom storing the current rest-url |
| init-future ; future that completes if rest-url is set to an initial value |
| job-ids ; atom storing the job-ids |
| client-setup?] ; Has the client already been setup? Used to avoid running setup! again if the client is re-opened. |
| client/Client |
| (open! [this test _] |
| (info "Open client.") |
| (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)] |
| (assoc this :closer closer |
| :rest-url rest-url-atom |
| :init-future init-future))) |
| |
| (setup! [_ _] |
| (info "Setup client.") |
| (when (compare-and-set! client-setup? false true) |
| (deref init-future) |
| (let [jobs (fu/retry (fn [] (list-jobs! @rest-url)) |
| :fallback (fn [e] |
| (fatal e "Could not get running jobs.") |
| (System/exit 1))) |
| num-jobs (count jobs)] |
| (assert (pos? num-jobs) (str "Expected at least 1 job, was " num-jobs)) |
| (info "Submitted jobs" jobs) |
| (reset! job-ids jobs)))) |
| |
| (invoke! [_ _ op] |
| (dispatch-rest-operation! @rest-url @job-ids op)) |
| |
| (teardown! [_ _]) |
| (close! [_ _] |
| (info "Closing client.") |
| (closer))) |
| |
| (defn create-client |
| [] |
| (Client. nil nil nil (atom nil) (atom false))) |