blob: d32114b8fa5062b8812283ba7fa4a589d4f7db96 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns jepsen.flink.kafka
(:require [clojure.tools.logging :refer :all]
[jepsen
[db :as db]
[control :as c]
[util :refer [meh]]]
[jepsen.control.util :as cu]
[jepsen.flink.zookeeper :as fzk]
[jepsen.flink.utils :as fu]))
(def install-dir "/opt/kafka")
(def application-log-dir "/opt/kafka/logs")
(def log-dirs "/opt/kafka/kafka-logs")
(def server-properties (str install-dir "/config/server.properties"))
(def start-script (str install-dir "/bin/kafka-server-start.sh"))
(def topic-script (str install-dir "/bin/kafka-topics.sh"))
(def stop-script (str install-dir "/bin/kafka-server-stop.sh"))
(defn- broker-id
[nodes node]
(.indexOf (sort nodes) node))
(defn- override-property
[name value]
(str "--override " name "=" value))
(defn- start-server-command
[{:keys [nodes] :as test} node]
(fu/join-space
start-script
"-daemon"
server-properties
(override-property "zookeeper.connect" (fzk/zookeeper-quorum test))
(override-property "broker.id" (broker-id nodes node))
(override-property "log.dirs" log-dirs)
(override-property "retention.ms" "1800000")))
(defn- start-server!
[test node]
(c/exec (c/lit (start-server-command test node))))
(defn- stop-server!
[]
(info "Stopping Kafka")
(cu/grepkill! "kafka"))
(defn- create-topic-command
[{:keys [nodes] :as test}]
(fu/join-space
topic-script
"--create"
"--topic kafka-test-topic"
(str "--partitions " (count nodes))
"--replication-factor 1"
"--zookeeper"
(fzk/zookeeper-quorum test)))
(defn- create-topic!
[test]
(info "Attempting to create Kafka topic")
(fu/retry (fn [] (c/exec (c/lit (create-topic-command test))))))
(defn- delete-kafka!
[]
(info "Deleting Kafka distribution and logs")
(c/exec :rm :-rf install-dir))
(defn db
[kafka-dist-url]
(reify db/DB
(setup! [_ test node]
(c/su
(cu/install-archive! kafka-dist-url install-dir)
(start-server! test node)
(when (zero? (broker-id (:nodes test) node))
(create-topic! test))))
(teardown! [_ _ _]
(c/su
(stop-server!)
(delete-kafka!)))
db/LogFiles
(log-files [_ _ _]
(fu/find-files! application-log-dir))))