blob: 6f4ca469f402ff419841f9ab466c98d3cd0202c0 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns jepsen.flink.checker-test
(:require [clojure.test :refer :all]
[jepsen
[checker :as checker]]
[jepsen.flink.checker :refer :all]))
(deftest all-jobs-running?-history-test
(let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 127443701575}
{:type :ok, :f :jobs-running?, :value {"3886d6b547969c3f15c53896bb496b8f" false}, :process 0, :time 127453553462}
{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 127453553463}
{:type :ok, :f :jobs-running?, :value {"3886d6b547969c3f15c53896bb496b8f" true}, :process 0, :time 127453553464}
{:type :info, :f :jobs-running?, :value nil, :process 0, :time 127453553465}]]
(is (= [false true false] (all-jobs-running?-history history)))))
(deftest job-running-checker-test
(let [checker (job-running-checker 3 60 10)
test {}
opts {}
check (fn [history] (checker/check checker test history opts))
job-running-value {"3886d6b547969c3f15c53896bb496b8f" true}
job-not-running-value {"3886d6b547969c3f15c53896bb496b8f" false}]
(testing "Model should be inconsistent if job is not running after grace period."
(let [result (check
[{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
{:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}])]
(is (= false (:valid? result)))
(is (= "Job is not running." (-> result :final-models first :msg)))))
(testing "Model should be consistent if job is running after grace period."
(is (= true (:valid? (check
[{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000001}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000002}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000003}])))))
(testing "Should tolerate non-running job during failures."
(is (= true (:valid? (check
[{:type :info, :f :partition-start, :process :nemesis, :time -1}
{:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
{:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}
{:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
{:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000004}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000005}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000006}])))))
(testing "Should not tolerate non-running job without a cause."
(let [result (check
[{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 1}
{:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000002}])]
(is (= false (:valid? result)))
(is (= "Job is not running." (-> result :final-models first :msg)))))
(testing "Model should be inconsistent if job submission was unsuccessful."
(let [result (check [{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 239150413307}
{:type :info, :f :jobs-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
(is (= false (:valid? result)))))
(testing "Model should be inconsistent if the job status cannot be polled, i.e., if the cluster is unavailable."
(let [result (check [{:type :fail, :f :jobs-running?, :value job-running-value, :process 0, :time 0 :error "Error"}
{:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
{:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
(is (= false (:valid? result)))
(is (= "Cluster is not running." (-> result :final-models first :msg)))))
(testing "Should tolerate non-running job after cancellation."
(is (= true (:valid? (check [{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 0}
{:type :ok, :f :cancel-jobs, :value nil, :process 0, :time 1}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
{:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 3}])))))
(testing "Model should be inconsistent if job is running after cancellation."
(let [result (check [{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 0}
{:type :ok, :f :cancel-jobs, :value true, :process 0, :time 1}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 10000000002}])]
(is (= false (:valid? result)))
(is (= "Job is running after cancellation." (-> result :final-models first :msg)))))
(testing "Model should be inconsistent if Flink cluster is not available at the end."
(let [result (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 1}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
{:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
(is (= false (:valid? result)))
(is (= "Cluster is not running." (-> result :final-models first :msg)))))
(testing "Model should be inconsistent if Flink cluster is not available after job cancellation."
(let [result (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 1}
{:type :ok, :f :cancel-jobs, :value job-running-value, :process 0, :time 2}
{:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
(is (= false (:valid? result)))
(is (= "Cluster is not running." (-> result :final-models first :msg)))))
(testing "Should throw AssertionError if job cancelling operation failed."
(is (thrown-with-msg? AssertionError
#":cancel-job must not fail"
(check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
{:type :fail, :f :cancel-jobs, :value nil, :process 0, :time 1}]))))
(testing "Should tolerate non-running job if grace period has not passed."
(is (= true (:valid? (check [{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 0}
{:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 1}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 3}
{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 4}])))))))
(deftest safe-inc-test
(is (= (safe-inc nil) 1))
(is (= (safe-inc 1) 2)))
(deftest nemeses-active?-test
(is (= (nemeses-active? {:partition-start 2 :fail-name-node-start 0}) true))
(is (= (nemeses-active? {:partition-start 0}) false)))
(deftest dissoc-if-test
(is (= (:a (dissoc-if #(-> (first %) (= :b)) {:a 1 :b 2})) 1)))
(deftest zero-value?-test
(is (= (zero-value? [:test 0]) true))
(is (= (zero-value? [:test 1]) false)))