blob: ff934e1de6e34ef394ff37ab5acc3a400cb7997e [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns jepsen.flink.db
(:require [clj-http.client :as http]
[clojure.java.io]
[clojure.string :as str]
[clojure.tools.logging :refer :all]
[jepsen
[control :as c]
[db :as db]
[util :refer [meh]]
[zookeeper :as zk]]
[jepsen.control.util :as cu]
[jepsen.flink.hadoop :as hadoop]
[jepsen.flink.mesos :as mesos]
[jepsen.flink.utils :as fu]
[jepsen.flink.zookeeper :refer :all]))
(def install-dir "/opt/flink")
(def upload-dir "/tmp")
(def log-dir (str install-dir "/log"))
(def conf-file (str install-dir "/conf/flink-conf.yaml"))
(def masters-file (str install-dir "/conf/masters"))
(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
(def deb-zookeeper-package "3.4.9-3+deb8u1")
(def deb-mesos-package "1.5.0-2.0.2")
(def deb-marathon-package "1.6.322")
(def taskmanager-slots 1)
(def master-count 1)
(defn flink-configuration
[test]
{:high-availability "zookeeper"
:high-availability.zookeeper.quorum (zookeeper-quorum test)
:high-availability.storageDir (str (:ha-storage-dir test) "/ha")
:state.savepoints.dir (str (:ha-storage-dir test) "/savepoints")
:web.port 8081
:rest.bind-address "0.0.0.0"
:taskmanager.numberOfTaskSlots taskmanager-slots
:yarn.application-attempts 99999
:slotmanager.taskmanager-timeout 10000
:state.backend.local-recovery "true"
:taskmanager.registration.timeout "30 s"})
(defn master-nodes
[test]
(take master-count (sort (:nodes test))))
(defn write-configuration!
"Writes the flink-conf.yaml and masters file to the flink conf directory"
[test]
(let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
(seq (flink-configuration test))))
m (clojure.string/join "\n" (master-nodes test))]
(c/exec :echo c :> conf-file)
(c/exec :echo m :> masters-file)
;; TODO: write log4j.properties properly
(c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
(defn install-flink!
[test]
(let [url (:tarball test)]
(info "Installing Flink from" url)
(cu/install-archive! url install-dir)
(info "Enable S3 FS")
(c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
(c/upload (:job-jar test) upload-dir)
(c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
(write-configuration! test)))
(defn teardown-flink!
[]
(info "Tearing down Flink")
(meh (c/exec :rm :-rf install-dir))
(meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
(defn get-log-files!
[]
(if (cu/exists? log-dir) (cu/ls-full log-dir) []))
(defn flink-db
[test]
(reify db/DB
(setup! [_ test node]
(c/su
(install-flink! test)))
(teardown! [_ test node]
(c/su
(teardown-flink!)))
db/LogFiles
(log-files [_ test node]
(concat
(get-log-files!)))))
(defn combined-db
[dbs]
(reify db/DB
(setup! [_ test node]
(c/su
(doall (map #(db/setup! % test node) dbs))))
(teardown! [_ test node]
(c/su
(doall (map #(db/teardown! % test node) dbs))))
db/LogFiles
(log-files [_ test node]
(flatten (map #(db/log-files % test node) dbs)))))
;;; YARN
(defn flink-yarn-db
[]
(let [zk (zk/db deb-zookeeper-package)
hadoop (hadoop/db hadoop-dist-url)
flink (flink-db test)]
(combined-db [hadoop zk flink])))
(defn exec-flink!
[test cmd args]
(c/su
(c/exec (c/lit (str
"HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
"HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
install-dir "/bin/flink " cmd " " args)))))
(defn flink-run-cli-args
"Returns the CLI args that should be passed to 'flink run'"
[test]
(concat
["-d"]
(if (:main-class test)
[(str "-c " (:main-class test))]
[])
(if (= :yarn-job (:deployment-mode test))
["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
[])))
(defn submit-job!
([test] (submit-job! test []))
([test cli-args]
(exec-flink! test "run" (clojure.string/join
" "
(concat cli-args
(flink-run-cli-args test)
[(str install-dir "/" (last (str/split (:job-jar test) #"/")))
(:job-args test)])))))
(defn first-node
[test]
(-> test :nodes sort first))
(defn start-yarn-session!
[test]
(let [node (first-node test)]
(c/on node
(info "Starting YARN session from" node)
(c/su
(c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
"HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
" " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
(submit-job! test)))))
(defn start-yarn-job!
[test]
(c/on (first-node test)
(c/su
(submit-job! test))))
;;; Mesos
(defn flink-mesos-db
[]
(let [zk (zk/db deb-zookeeper-package)
hadoop (hadoop/db hadoop-dist-url)
mesos (mesos/db deb-mesos-package deb-marathon-package)
flink (flink-db test)]
(combined-db [hadoop zk mesos flink])))
(defn submit-job-with-retry!
[test]
(fu/retry
(partial submit-job! test)
:fallback (fn [e] (do
(fatal e "Could not submit job.")
(System/exit 1)))))
(defn start-mesos-session!
[test]
(c/su
(let [r (fu/retry (fn []
(http/post
(str (mesos/marathon-base-url test) "/v2/apps")
{:form-params {:id "flink"
:cmd (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
"HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
install-dir "/bin/mesos-appmaster.sh "
"-Dmesos.master=" (zookeeper-uri
test
mesos/zk-namespace) " "
"-Djobmanager.rpc.address=$(hostname -f) "
"-Djobmanager.heap.mb=2048 "
"-Djobmanager.rpc.port=6123 "
"-Djobmanager.web.port=8081 "
"-Dmesos.resourcemanager.tasks.mem=2048 "
"-Dtaskmanager.heap.mb=2048 "
"-Dtaskmanager.numberOfTaskSlots=2 "
"-Dmesos.resourcemanager.tasks.cpus=1 "
"-Drest.bind-address=$(hostname -f) ")
:cpus 1.0
:mem 2048}
:content-type :json})))]
(info "Submitted Flink Application via Marathon" r)
(c/on (-> test :nodes sort first)
(submit-job-with-retry! test)))))