blob: b8e5f0fa600b74edcc50707daed9673edff791a5 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.worker-test
(:use [clojure test])
(:require [backtype.storm.daemon [worker :as worker]])
(:require [backtype.storm [util :as util]])
(:require [conjure.core])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:import [backtype.storm.generated LogConfig LogLevel LogLevelAction])
(:import [org.apache.logging.log4j Level LogManager])
(:import [org.slf4j Logger])
(:use [conjure core])
(:use [backtype.storm testing log])
(:use [backtype.storm.daemon common])
(:use [clojure.string :only [join]])
(:import [backtype.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status])
(:import [org.mockito Mockito])
)
(deftest test-log-reset-should-not-trigger-for-future-time
(with-local-cluster [cluster]
(let [worker (:worker cluster)
present (time/now)
the-future (coerce/to-long (time/plus present (time/secs 1)))
mock-config {"foo" {:timeout the-future}}
mock-config-atom (atom mock-config)]
(stubbing [time/now present]
(worker/reset-log-levels mock-config-atom)
;; if the worker doesn't reset log levels, the atom should not be nil
(is (not(= @mock-config-atom nil)))))))
(deftest test-log-reset-triggers-for-past-time
(with-local-cluster [cluster]
(let [worker (:worker cluster)
present (time/now)
past (time/plus present (time/secs -1))
mock-config {"foo" { :timeout (coerce/to-long past)
:target-log-level Level/INFO
:reset-log-level Level/WARN}}
mock-config-atom (atom mock-config)]
(stubbing [time/now present]
(worker/reset-log-levels mock-config-atom)
;; the logger config is removed from atom
(is (= @mock-config-atom {}))))))
(deftest test-log-reset-resets-does-nothing-for-empty-log-config
(with-local-cluster [cluster]
(let [worker (:worker cluster)
present (time/now)
past (coerce/to-long (time/plus present (time/secs -1)))
mock-config {}
mock-config-atom (atom mock-config)]
(stubbing [worker/set-logger-level nil
time/now present]
(worker/reset-log-levels mock-config-atom)
;; if the worker resets log level, the atom is nil'ed out
(is (= @mock-config-atom {}))
;; test that the set-logger-level function was not called
(verify-call-times-for worker/set-logger-level 0)))))
(deftest test-log-reset-resets-root-logger-if-set
(with-local-cluster [cluster]
(let [worker (:worker cluster)
present (time/now)
past (coerce/to-long (time/plus present (time/secs -1)))
mock-config {LogManager/ROOT_LOGGER_NAME {:timeout past
:target-log-level Level/DEBUG
:reset-log-level Level/WARN}}
mock-config-atom (atom mock-config)]
(stubbing [worker/set-logger-level nil
time/now present]
(worker/reset-log-levels mock-config-atom)
;; if the worker resets log level, the atom is reset to {}
(is (= @mock-config-atom {}))
;; ensure we reset back to WARN level
(verify-call-times-for worker/set-logger-level 1)
(verify-first-call-args-for-indices worker/set-logger-level [1 2] LogManager/ROOT_LOGGER_NAME Level/WARN)))))
;;This should be removed when it goes into conjure
(defmacro verify-nth-call-args-for-indices
"Asserts that the function was called at least once, and the nth call was
passed the args specified, into the indices of the arglist specified. In
other words, it checks only the particular args you care about."
[n fn-name indices & args]
`(do
(assert-in-fake-context "verify-first-call-args-for-indices")
(assert-conjurified-fn "verify-first-call-args-for-indices" ~fn-name)
(is (< ~n (count (get @call-times ~fn-name)))
(str "(verify-nth-call-args-for-indices " ~n " " ~fn-name " " ~indices " " ~(join " " args) ")"))
(let [nth-call-args# (nth (get @call-times ~fn-name) ~n)
indices-in-range?# (< (apply max ~indices) (count nth-call-args#))]
(if indices-in-range?#
(is (= ~(vec args) (map #(nth nth-call-args# %) ~indices))
(str "(verify-first-call-args-for-indices " ~n " " ~fn-name " " ~indices " " ~(join " " args) ")"))
(is (= :fail (format "indices %s are out of range for the args, %s" ~indices ~(vec args)))
(str "(verify-first-call-args-for-indices " ~n " " ~fn-name " " ~indices " " ~(join " " args) ")"))))))
(deftest test-log-resets-named-loggers-with-past-timeout
(with-local-cluster [cluster]
(let [worker (:worker cluster)
present (time/now)
past (coerce/to-long (time/plus present (time/secs -1)))
mock-config {"my_debug_logger" {:timeout past
:target-log-level Level/DEBUG
:reset-log-level Level/INFO}
"my_info_logger" {:timeout past
:target-log-level Level/INFO
:reset-log-level Level/WARN}
"my_error_logger" {:timeout past
:target-log-level Level/ERROR
:reset-log-level Level/INFO}}
result (atom {})
mock-config-atom (atom mock-config)]
(stubbing [worker/set-logger-level nil
time/now present]
(worker/reset-log-levels mock-config-atom)
;; if the worker resets log level, the atom is reset to {}
(is (= @mock-config-atom {}))
(verify-call-times-for worker/set-logger-level 3)
(verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] "my_debug_logger" Level/INFO)
(verify-nth-call-args-for-indices 1 worker/set-logger-level [1 2] "my_error_logger" Level/INFO)
(verify-nth-call-args-for-indices 2 worker/set-logger-level [1 2] "my_info_logger" Level/WARN)))))
(deftest test-process-root-log-level-to-debug-sets-logger-and-timeout-2
(with-local-cluster [cluster]
(let [worker (:worker cluster)
mock-config (LogConfig.)
root-level (LogLevel.)
mock-config-atom (atom nil)
orig-levels (atom {})
present (time/now)
in-thirty-seconds (coerce/to-long (time/plus present (time/secs 30)))]
;; configure the root logger to be debug
(.set_reset_log_level_timeout_epoch root-level in-thirty-seconds)
(.set_target_log_level root-level "DEBUG")
(.set_action root-level LogLevelAction/UPDATE)
(.put_to_named_logger_level mock-config "ROOT" root-level)
(stubbing [worker/set-logger-level nil
time/now present]
(worker/process-log-config-change mock-config-atom orig-levels mock-config)
;; test that the set-logger-level function was not called
(log-message "Tests " @mock-config-atom)
(verify-call-times-for worker/set-logger-level 1)
(verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] "" Level/DEBUG)
(let [root-result (get @mock-config-atom LogManager/ROOT_LOGGER_NAME)]
(is (= (:action root-result) LogLevelAction/UPDATE))
(is (= (:target-log-level root-result) Level/DEBUG))
;; defaults to INFO level when the logger isn't found previously
(is (= (:reset-log-level root-result) Level/INFO))
(is (= (:timeout root-result) in-thirty-seconds)))))))
(deftest test-process-root-log-level-to-debug-sets-logger-and-timeout
(with-local-cluster [cluster]
(let [worker (:worker cluster)
mock-config (LogConfig.)
root-level (LogLevel.)
orig-levels (atom {})
present (time/now)
in-thirty-seconds (coerce/to-long (time/plus present (time/secs 30)))
mock-config-atom (atom {})]
;; configure the root logger to be debug
(doseq [named {"ROOT" "DEBUG"
"my_debug_logger" "DEBUG"
"my_info_logger" "INFO"
"my_error_logger" "ERROR"}]
(let [level (LogLevel.)]
(.set_action level LogLevelAction/UPDATE)
(.set_reset_log_level_timeout_epoch level in-thirty-seconds)
(.set_target_log_level level (val named))
(.put_to_named_logger_level mock-config (key named) level)))
(log-message "Tests " mock-config)
(stubbing [worker/set-logger-level nil
time/now present]
(worker/process-log-config-change mock-config-atom orig-levels mock-config)
(verify-call-times-for worker/set-logger-level 4)
(verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] "" Level/DEBUG)
(verify-nth-call-args-for-indices 1 worker/set-logger-level [1 2] "my_debug_logger" Level/DEBUG)
(verify-nth-call-args-for-indices 2 worker/set-logger-level [1 2] "my_error_logger" Level/ERROR)
(verify-nth-call-args-for-indices 3 worker/set-logger-level [1 2] "my_info_logger" Level/INFO)))))
(deftest test-worker-is-connection-ready
(let [connection (Mockito/mock ConnectionWithStatus)]
(. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Ready)
(is (= true (worker/is-connection-ready connection)))
(. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Connecting)
(is (= false (worker/is-connection-ready connection)))
(. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Closed)
(is (= false (worker/is-connection-ready connection)))
))