blob: cfa3a9c0a14cf949eb7f0f5ad623e503e51a4a0f [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns jepsen.flink.db
(:require [clj-http.client :as http]
[clojure.java.io]
[clojure.tools.logging :refer :all]
[jepsen
[control :as c]
[db :as db]
[util :refer [meh]]]
[jepsen.control.util :as cu]
[jepsen.flink.hadoop :as hadoop]
[jepsen.flink.utils :as fu]
[jepsen.flink.zookeeper :refer :all]))
(def install-dir "/opt/flink")
(def upload-dir "/tmp")
(def log-dir (str install-dir "/log"))
(def conf-file (str install-dir "/conf/flink-conf.yaml"))
(def masters-file (str install-dir "/conf/masters"))
(def taskmanager-slots 3)
(defn- default-flink-configuration
[test node]
{:high-availability "zookeeper"
:high-availability.zookeeper.quorum (zookeeper-quorum test)
:high-availability.storageDir "hdfs:///flink/ha"
:jobmanager.memory.process.size "2048m"
:jobmanager.rpc.address node
:state.savepoints.dir "hdfs:///flink/savepoints"
:rest.address node
:rest.port 8081
:rest.bind-address "0.0.0.0"
:taskmanager.numberOfTaskSlots taskmanager-slots
:yarn.application-attempts 99999
:slotmanager.taskmanager-timeout 10000
:state.backend.local-recovery "true"
:taskmanager.memory.process.size "2048m"
:taskmanager.registration.timeout "30 s"})
(defn flink-configuration
[test node]
(let [additional-config (-> test :test-spec :flink-config)]
(merge (default-flink-configuration test node)
additional-config)))
(defn write-configuration!
"Writes the flink-conf.yaml to the flink conf directory"
[test node]
(let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
(seq (flink-configuration test node))))]
(c/exec :echo c :> conf-file)
;; TODO: write log4j.properties properly
(c/exec (c/lit (str "sed -i'.bak' -e '/rootLogger\\.level/ s/=.*/= DEBUG/' " install-dir "/conf/log4j.properties")))))
(defn- file-name
[path]
(.getName (clojure.java.io/file path)))
(defn upload-job-jar!
[job-jar]
(c/upload job-jar upload-dir)
(c/exec :mv (str upload-dir "/" (file-name job-jar)) install-dir))
(defn upload-job-jars!
[job-jars]
(doseq [job-jar job-jars]
(upload-job-jar! job-jar)))
(defn install-flink!
[test node]
(let [url (:tarball test)]
(info "Installing Flink from" url)
(cu/install-archive! url install-dir)
(info "Enable S3 FS")
(c/exec (c/lit (str "mkdir " install-dir "/plugins/s3-fs-hadoop && ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/plugins/s3-fs-hadoop")))
(upload-job-jars! (->> test :test-spec :jobs (map :job-jar)))
(write-configuration! test node)))
(defn teardown-flink!
[]
(info "Tearing down Flink")
(meh (cu/grepkill! "flink"))
(meh (c/exec :rm :-rf install-dir))
(meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
(defn combined-db
[dbs]
(reify db/DB
(setup! [_ test node]
(c/su
(doseq [db dbs] (db/setup! db test node))))
(teardown! [_ test node]
(c/su
(try
(doseq [db (reverse dbs)] (db/teardown! db test node))
(finally (fu/stop-all-supervised-services!)))))
db/LogFiles
(log-files [_ test node]
(->>
(filter (partial satisfies? db/LogFiles) dbs)
(map #(db/log-files % test node))
(flatten)))))
(defn flink-db
[db]
(let [flink-base-db (reify db/DB
(setup! [_ test node]
(c/su
(install-flink! test node)))
(teardown! [_ _ _]
(c/su
(teardown-flink!)))
db/LogFiles
(log-files [_ _ _]
(c/su
(fu/dump-jstack-by-pattern! log-dir
"TaskExecutor"
"TaskManager"
"ClusterEntrypoint")
(fu/find-files! log-dir))))]
(combined-db [flink-base-db db])))
(defn- sorted-nodes
[test]
(-> test :nodes sort))
(defn- select-nodes
[test selector]
(-> (sorted-nodes test)
selector))
(defn- first-node
[test]
(select-nodes test first))
(defn- create-env-vars
"Expects a map containing environment variables, and returns a string that can be used to set
environment variables for a child process using Bash's quick assignment and inheritance trick.
For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
[m]
(->>
(map #(str (name (first %)) "=" (second %)) m)
(apply fu/join-space)
(#(str % " "))))
(defn- hadoop-env-vars
[]
(create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop classpath`")
:HADOOP_CONF_DIR hadoop/hadoop-conf-dir}))
(defn exec-flink!
[cmd args]
(c/su
(c/exec (c/lit (fu/join-space
(hadoop-env-vars)
(str install-dir "/bin/flink")
cmd
(apply fu/join-space args))))))
(defn flink-run-cli-args
"Returns the CLI args that should be passed to 'flink run'"
[job-spec]
(concat
["-d"]
(if (:main-class job-spec)
[(str "-c " (:main-class job-spec))]
[])))
(defn submit-job!
([test] (submit-job! test []))
([test cli-args]
(doseq [{:keys [job-jar job-args] :as job-spec} (-> test :test-spec :jobs)]
(exec-flink! "run" (concat cli-args
(flink-run-cli-args job-spec)
[(str install-dir "/" (file-name job-jar))
job-args])))))
(defn- submit-job-with-retry!
([test] (submit-job-with-retry! test []))
([test cli-args] (fu/retry
(partial submit-job! test cli-args)
:fallback (fn [e] (do
(fatal e "Could not submit job.")
(System/exit 1))))))
;;; Standalone
(def standalone-master-count 2)
(defn- standalone-master-nodes
[test]
(select-nodes test (partial take standalone-master-count)))
(defn- standalone-taskmanager-nodes
[test]
(select-nodes test (partial drop standalone-master-count)))
(defn- start-standalone-masters!
[]
(let [jobmanager-script (str install-dir "/bin/jobmanager.sh")
jobmanager-log (str log-dir "/jobmanager.log")]
(fu/create-supervised-service!
"flink-master"
(fu/join-space "env" (hadoop-env-vars) jobmanager-script "start-foreground" ">>" jobmanager-log))))
(defn- start-standalone-taskmanagers!
[]
(let [taskmanager-script (str install-dir "/bin/taskmanager.sh")
taskmanager-log (str log-dir "/taskmanager.log")]
(fu/create-supervised-service!
"flink-taskmanager"
(fu/join-space "env" (hadoop-env-vars) taskmanager-script "start-foreground" ">>" taskmanager-log))))
(defn start-flink-db
[]
(flink-db
(reify db/DB
(setup! [_ test node]
(c/su
(when (some #{node} (standalone-master-nodes test))
(start-standalone-masters!))
(when (some #{node} (standalone-taskmanager-nodes test))
(start-standalone-taskmanagers!))
(when (= (first-node test) node)
(submit-job-with-retry! test))))
(teardown! [_ test node]
(c/su
(when (some #{node} (standalone-master-nodes test))
(fu/stop-supervised-service! "flink-master"))
(when (some #{node} (standalone-taskmanager-nodes test))
(fu/stop-supervised-service! "flink-taskmanager")))))))
;;; YARN
(defn- start-yarn-session-cmd
[]
(fu/join-space (hadoop-env-vars)
(str install-dir "/bin/yarn-session.sh")
"-d"))
(defn- start-yarn-session!
[]
(info "Starting YARN session")
(let [exec-start-yarn-session! #(c/su (c/exec (c/lit (start-yarn-session-cmd))))
log-failure! (fn [exception _] (info "Starting YARN session failed due to"
(.getMessage exception)
"Retrying..."))]
(fu/retry exec-start-yarn-session!
:delay 4000
:on-error log-failure!)))
(defn yarn-session-db
[]
(flink-db (reify db/DB
(setup! [_ test node]
(when (= node (first-node test))
(start-yarn-session!)
(submit-job! test)))
(teardown! [_ _ _]))))
(defn- start-yarn-job!
[test]
(c/su
(submit-job-with-retry! test ["-m yarn-cluster"])))
(defn yarn-job-db
[]
(flink-db (reify db/DB
(setup! [_ test node]
(when (= node (first-node test))
(start-yarn-job! test)))
(teardown! [_ _ _]))))