;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements.  See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership.  The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License.  You may obtain a copy of the License at
;;
;;     http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns jepsen.flink.db
  (:require [clj-http.client :as http]
            [clojure.java.io]
            [clojure.tools.logging :refer :all]
            [jepsen
             [control :as c]
             [db :as db]
             [util :refer [meh]]]
            [jepsen.control.util :as cu]
            [jepsen.flink.hadoop :as hadoop]
            [jepsen.flink.mesos :as mesos]
            [jepsen.flink.utils :as fu]
            [jepsen.flink.zookeeper :refer :all]))

(def install-dir "/opt/flink")
(def upload-dir "/tmp")
(def log-dir (str install-dir "/log"))
(def conf-file (str install-dir "/conf/flink-conf.yaml"))
(def masters-file (str install-dir "/conf/masters"))

(def taskmanager-slots 3)

(defn flink-configuration
  [test node]
  {:high-availability                  "zookeeper"
   :high-availability.zookeeper.quorum (zookeeper-quorum test)
   :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
   :jobmanager.rpc.address             node
   :state.savepoints.dir               (str (:ha-storage-dir test) "/savepoints")
   :rest.address                       node
   :rest.port                          8081
   :rest.bind-address                  "0.0.0.0"
   :taskmanager.numberOfTaskSlots      taskmanager-slots
   :yarn.application-attempts          99999
   :slotmanager.taskmanager-timeout    10000
   :state.backend.local-recovery       "true"
   :taskmanager.registration.timeout   "30 s"})

(defn write-configuration!
  "Writes the flink-conf.yaml to the flink conf directory"
  [test node]
  (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
                                         (seq (flink-configuration test node))))]
    (c/exec :echo c :> conf-file)
    ;; TODO: write log4j.properties properly
    (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))

(defn- file-name
  [path]
  (.getName (clojure.java.io/file path)))

(defn upload-job-jar!
  [job-jar]
  (c/upload job-jar upload-dir)
  (c/exec :mv (str upload-dir "/" (file-name job-jar)) install-dir))

(defn upload-job-jars!
  [job-jars]
  (doseq [job-jar job-jars]
    (upload-job-jar! job-jar)))

(defn install-flink!
  [test node]
  (let [url (:tarball test)]
    (info "Installing Flink from" url)
    (cu/install-archive! url install-dir)
    (info "Enable S3 FS")
    (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
    (upload-job-jars! (->> test :test-spec :jobs (map :job-jar)))
    (write-configuration! test node)))

(defn teardown-flink!
  []
  (info "Tearing down Flink")
  (meh (cu/grepkill! "flink"))
  (meh (c/exec :rm :-rf install-dir))
  (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))

(defn combined-db
  [dbs]
  (reify db/DB
    (setup! [_ test node]
      (c/su
        (doseq [db dbs] (db/setup! db test node))))
    (teardown! [_ test node]
      (c/su
        (try
          (doseq [db dbs] (db/teardown! db test node))
          (finally (fu/stop-all-supervised-services!)))))
    db/LogFiles
    (log-files [_ test node]
      (->>
        (filter (partial satisfies? db/LogFiles) dbs)
        (map #(db/log-files % test node))
        (flatten)))))

(defn flink-db
  [db]
  (let [flink-base-db (reify db/DB
                        (setup! [_ test node]
                          (c/su
                            (install-flink! test node)))

                        (teardown! [_ _ _]
                          (c/su
                            (teardown-flink!)))

                        db/LogFiles
                        (log-files [_ _ _]
                          (fu/find-files! log-dir)))]
    (combined-db [flink-base-db db])))

(defn- sorted-nodes
  [test]
  (-> test :nodes sort))

(defn- select-nodes
  [test selector]
  (-> (sorted-nodes test)
      selector))

(defn- first-node
  [test]
  (select-nodes test first))

(defn- create-env-vars
  "Expects a map containing environment variables, and returns a string that can be used to set
  environment variables for a child process using Bash's quick assignment and inheritance trick.
  For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
  [m]
  (->>
    (map #(str (name (first %)) "=" (second %)) m)
    (apply fu/join-space)
    (#(str % " "))))

(defn- hadoop-env-vars
  []
  (create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop classpath`")
                    :HADOOP_CONF_DIR  hadoop/hadoop-conf-dir}))

(defn exec-flink!
  [cmd args]
  (c/su
    (c/exec (c/lit (fu/join-space
                     (hadoop-env-vars)
                     (str install-dir "/bin/flink")
                     cmd
                     (apply fu/join-space args))))))

(defn flink-run-cli-args
  "Returns the CLI args that should be passed to 'flink run'"
  [job-spec]
  (concat
    ["-d"]
    (if (:main-class job-spec)
      [(str "-c " (:main-class job-spec))]
      [])))

(defn submit-job!
  ([test] (submit-job! test []))
  ([test cli-args]
   (doseq [{:keys [job-jar job-args] :as job-spec} (-> test :test-spec :jobs)]
     (exec-flink! "run" (concat cli-args
                                (flink-run-cli-args job-spec)
                                [(str install-dir "/" (file-name job-jar))
                                 job-args])))))

(defn- submit-job-with-retry!
  ([test] (submit-job-with-retry! test []))
  ([test cli-args] (fu/retry
                     (partial submit-job! test cli-args)
                     :fallback (fn [e] (do
                                         (fatal e "Could not submit job.")
                                         (System/exit 1))))))

;;; Standalone

(def standalone-master-count 2)

(defn- standalone-master-nodes
  [test]
  (select-nodes test (partial take standalone-master-count)))

(defn- standalone-taskmanager-nodes
  [test]
  (select-nodes test (partial drop standalone-master-count)))

(defn- start-standalone-masters!
  []
  (let [jobmanager-script (str install-dir "/bin/jobmanager.sh")
        jobmanager-log (str log-dir "/jobmanager.log")]
    (fu/create-supervised-service!
      "flink-master"
      (fu/join-space "env" (hadoop-env-vars) jobmanager-script "start-foreground" ">>" jobmanager-log))))

(defn- start-standalone-taskmanagers!
  []
  (let [taskmanager-script (str install-dir "/bin/taskmanager.sh")
        taskmanager-log (str log-dir "/taskmanager.log")]
    (fu/create-supervised-service!
      "flink-taskmanager"
      (fu/join-space "env" (hadoop-env-vars) taskmanager-script "start-foreground" ">>" taskmanager-log))))

(defn start-flink-db
  []
  (flink-db
    (reify db/DB
      (setup! [_ test node]
        (c/su
          (when (some #{node} (standalone-master-nodes test))
            (start-standalone-masters!))
          (when (some #{node} (standalone-taskmanager-nodes test))
            (start-standalone-taskmanagers!))
          (when (= (first-node test) node)
            (submit-job-with-retry! test))))

      (teardown! [_ test node]
        (c/su
          (when (some #{node} (standalone-master-nodes test))
            (fu/stop-supervised-service! "flink-master"))
          (when (some #{node} (standalone-taskmanager-nodes test))
            (fu/stop-supervised-service! "flink-taskmanager")))))))

;;; YARN

(defn- start-yarn-session-cmd
  []
  (fu/join-space (hadoop-env-vars)
                 (str install-dir "/bin/yarn-session.sh")
                 "-d"
                 "-jm 2048m"
                 "-tm 2048m"))

(defn- start-yarn-session!
  []
  (info "Starting YARN session")
  (let [exec-start-yarn-session! #(c/su (c/exec (c/lit (start-yarn-session-cmd))))
        log-failure! (fn [exception _] (info "Starting YARN session failed due to"
                                             (.getMessage exception)
                                             "Retrying..."))]
    (fu/retry exec-start-yarn-session!
              :delay 4000
              :on-error log-failure!)))

(defn yarn-session-db
  []
  (flink-db (reify db/DB
              (setup! [_ test node]
                (when (= node (first-node test))
                  (start-yarn-session!)
                  (submit-job! test)))
              (teardown! [_ _ _]))))

(defn- start-yarn-job!
  [test]
  (c/su
    (submit-job-with-retry! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"])))

(defn yarn-job-db
  []
  (flink-db (reify db/DB
              (setup! [_ test node]
                (when (= node (first-node test))
                  (start-yarn-job! test)))
              (teardown! [_ _ _]))))

;;; Mesos

(defn- mesos-appmaster-cmd
  "Returns the command used by Marathon to start Flink's Mesos application master."
  [test]
  (fu/join-space
    (hadoop-env-vars)
    (str install-dir "/bin/mesos-appmaster.sh")
    (str "-Dmesos.master=" (zookeeper-uri test mesos/zk-namespace))
    "-Djobmanager.rpc.address=$(hostname -f)"
    "-Djobmanager.heap.mb=2048"
    "-Djobmanager.rpc.port=6123"
    "-Dmesos.resourcemanager.tasks.mem=2048"
    "-Dtaskmanager.heap.mb=2048"
    "-Dmesos.resourcemanager.tasks.cpus=1"
    "-Drest.bind-address=$(hostname -f)"))

(defn- start-mesos-session!
  [test]
  (c/su
    (let [log-submission-failure! (fn [exception _]
                                    (info "Submitting Flink Application via Marathon failed due to"
                                          (.getMessage exception)
                                          "Retrying..."))
          submit-flink! (fn []
                          (http/post
                            (str (mesos/marathon-base-url test) "/v2/apps")
                            {:form-params  {:id                    "flink"
                                            :cmd                   (mesos-appmaster-cmd test)
                                            :cpus                  1.0
                                            :mem                   2048
                                            :maxLaunchDelaySeconds 3}
                             :content-type :json}))
          marathon-response (fu/retry submit-flink!
                                      :on-retry log-submission-failure!
                                      :delay 4000)]
      (info "Submitted Flink Application via Marathon" marathon-response))))

(defn flink-mesos-app-master
  []
  (flink-db
    (reify
      db/DB
      (setup! [_ test node]
        (when (= (first-node test) node)
          (start-mesos-session! test)
          (submit-job-with-retry! test)))

      (teardown! [_ _ _]))))
