(ns backtype.storm.supervisor-test
  (:use [clojure test])
  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
  (:use [backtype.storm bootstrap testing])
  (:use [backtype.storm.daemon common])
  (:require [backtype.storm.daemon [worker :as worker] [supervisor :as supervisor]])
  )

(bootstrap)


(defn worker-assignment
  "Return [storm-id taskids]"
  [cluster supervisor-id port]
  (let [state (:storm-cluster-state cluster)
        slot-assigns (for [storm-id (.assignments state nil)]
                        (let [tasks (-> (.assignment-info state storm-id nil)
                                        :task->node+port
                                        reverse-map
                                        (get [supervisor-id port] ))]
                          (when tasks [storm-id tasks])
                          ))
        ret (find-first not-nil? slot-assigns)]
    (when-not ret
      (throw (RuntimeException. "Could not find assignment for worker")))
    ret
    ))

(defn heartbeat-worker [supervisor port storm-id task-ids]
  (let [conf (.get-conf supervisor)]
    (worker/do-heartbeat conf (find-worker-id conf port) port storm-id task-ids)))

(defn heartbeat-workers [cluster supervisor-id ports]
  (let [sup (get-supervisor cluster supervisor-id)]
    (doseq [p ports]
      (let [[storm-id task-ids] (worker-assignment cluster supervisor-id p)]
        (heartbeat-worker sup p storm-id task-ids)
        ))
    ))

(defn validate-launched-once [launched supervisor->ports storm-id]
  (let [counts (map count (vals launched))
        launched-supervisor->ports (apply merge-with concat
                                     (for [[s p] (keys launched)]
                                       {s [p]}
                                       ))]
    (is (every? (partial = 1) counts))
    (is (= launched-supervisor->ports supervisor->ports))
    ))

(deftest launches-assignment
  (with-simulated-time-local-cluster [cluster :supervisors 0
    :daemon-conf {NIMBUS-REASSIGN false
                  SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
                  SUPERVISOR-WORKER-TIMEOUT-SECS 15
                  SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
    (letlocals
      (bind topology (thrift/mk-topology
                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
                       {}))
      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
      (bind changed (capture-changed-workers
                        (submit-mocked-assignment
                          (:nimbus cluster)
                          "test"
                          {TOPOLOGY-WORKERS 3}
                          topology
                          {1 "1"
                           2 "1"
                           3 "1"
                           4 "1"}
                          {1 ["sup1" 1]
                           2 ["sup1" 2]
                           3 ["sup1" 3]
                           4 ["sup1" 3]
                           })
                        (advance-cluster-time cluster 2)
                        (heartbeat-workers cluster "sup1" [1 2 3])
                        (advance-cluster-time cluster 10)))
      (bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
      (is (empty? (:shutdown changed)))
      (validate-launched-once (:launched changed) {"sup1" [1 2 3]} storm-id)
      (bind changed (capture-changed-workers
                        (doseq [i (range 10)]
                          (heartbeat-workers cluster "sup1" [1 2 3])
                          (advance-cluster-time cluster 10))
                        ))
      (is (empty? (:shutdown changed)))
      (is (empty? (:launched changed)))
      (bind changed (capture-changed-workers
                      (heartbeat-workers cluster "sup1" [1 2])
                      (advance-cluster-time cluster 10)
                      ))
      (validate-launched-once (:launched changed) {"sup1" [3]} storm-id)
      (is (= {["sup1" 3] 1} (:shutdown changed)))
      )))

(deftest test-multiple-active-storms-multiple-supervisors
  (with-simulated-time-local-cluster [cluster :supervisors 0
    :daemon-conf {NIMBUS-REASSIGN false
                  SUPERVISOR-WORKER-START-TIMEOUT-SECS 5
                  SUPERVISOR-WORKER-TIMEOUT-SECS 15
                  SUPERVISOR-MONITOR-FREQUENCY-SECS 3}]
    (letlocals
      (bind topology (thrift/mk-topology
                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
                       {}))
      (bind topology2 (thrift/mk-topology
                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
                       {}))
      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
      (bind sup2 (add-supervisor cluster :id "sup2" :ports [1 2]))
      (bind changed (capture-changed-workers
                        (submit-mocked-assignment
                          (:nimbus cluster)
                          "test"
                          {TOPOLOGY-WORKERS 3 TOPOLOGY-MESSAGE-TIMEOUT-SECS 40}
                          topology
                          {1 "1"
                           2 "1"
                           3 "1"
                           4 "1"}
                          {1 ["sup1" 1]
                           2 ["sup1" 2]
                           3 ["sup2" 1]
                           4 ["sup2" 1]
                           })
                        (advance-cluster-time cluster 2)
                        (heartbeat-workers cluster "sup1" [1 2])
                        (heartbeat-workers cluster "sup2" [1])
                        ))
      (bind storm-id (get-storm-id (:storm-cluster-state cluster) "test"))
      (is (empty? (:shutdown changed)))
      (validate-launched-once (:launched changed) {"sup1" [1 2] "sup2" [1]} storm-id)
      (bind changed (capture-changed-workers
                        (submit-mocked-assignment
                          (:nimbus cluster)
                          "test2"
                          {TOPOLOGY-WORKERS 2}
                          topology2
                          {1 "1"
                           2 "1"
                           3 "1"}
                          {1 ["sup1" 3]
                           2 ["sup1" 3]
                           3 ["sup2" 2]
                           })
                        (advance-cluster-time cluster 2)
                        (heartbeat-workers cluster "sup1" [3])
                        (heartbeat-workers cluster "sup2" [2])
                        ))
      (bind storm-id2 (get-storm-id (:storm-cluster-state cluster) "test2"))
      (is (empty? (:shutdown changed)))
      (validate-launched-once (:launched changed) {"sup1" [3] "sup2" [2]} storm-id2)
      (bind changed (capture-changed-workers
        (.killTopology (:nimbus cluster) "test")
        (doseq [i (range 4)]
          (advance-cluster-time cluster 8)
          (heartbeat-workers cluster "sup1" [1 2 3])
          (heartbeat-workers cluster "sup2" [1 2])
          )))
      (is (empty? (:shutdown changed)))
      (is (empty? (:launched changed)))
      (bind changed (capture-changed-workers
        (advance-cluster-time cluster 12)
        ))
      (is (empty? (:launched changed)))
      (is (= {["sup1" 1] 1 ["sup1" 2] 1 ["sup2" 1] 1} (:shutdown changed)))
      (bind changed (capture-changed-workers
        (doseq [i (range 10)]
          (heartbeat-workers cluster "sup1" [3])
          (heartbeat-workers cluster "sup2" [2])
          (advance-cluster-time cluster 10)
          )))
      (is (empty? (:shutdown changed)))
      (is (empty? (:launched changed)))
      ;; TODO check that downloaded code is cleaned up only for the one storm
      )))

(defn get-heartbeat [cluster supervisor-id]
  (.supervisor-info (:storm-cluster-state cluster) supervisor-id))

(defn check-heartbeat [cluster supervisor-id within-secs]
  (let [hb (get-heartbeat cluster supervisor-id)
        time-secs (:time-secs hb)
        now (current-time-secs)
        delta (- now time-secs)]
    (is (>= delta 0))
    (is (<= delta within-secs))
    ))

(deftest heartbeats-to-nimbus
  (with-simulated-time-local-cluster [cluster :supervisors 0
    :daemon-conf {SUPERVISOR-WORKER-START-TIMEOUT-SECS 15
                  SUPERVISOR-HEARTBEAT-FREQUENCY-SECS 3}]
    (letlocals
      (bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
      (advance-cluster-time cluster 4)
      (bind hb (get-heartbeat cluster "sup"))
      (is (= #{5 6 7} (set (:worker-ports hb))))
      (check-heartbeat cluster "sup" 3)
      (advance-cluster-time cluster 3)
      (check-heartbeat cluster "sup" 3)
      (advance-cluster-time cluster 3)
      (check-heartbeat cluster "sup" 3)
      (advance-cluster-time cluster 15)
      (check-heartbeat cluster "sup" 3)
      (bind topology (thrift/mk-topology
                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 4)}
                       {}))
      ;; prevent them from launching by capturing them
      (capture-changed-workers (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology))
      (advance-cluster-time cluster 3)
      (check-heartbeat cluster "sup" 3)
      (advance-cluster-time cluster 3)
      (check-heartbeat cluster "sup" 3)
      (advance-cluster-time cluster 3)
      (check-heartbeat cluster "sup" 3)
      (advance-cluster-time cluster 20)
      (check-heartbeat cluster "sup" 3)

      )))

(deftest test-workers-go-bananas
  ;; test that multiple workers are started for a port, and test that
  ;; supervisor shuts down propertly (doesn't shutdown the most
  ;; recently launched one, checks heartbeats correctly, etc.)
  )

(deftest downloads-code
  )

(deftest test-stateless
  )

(deftest cleans-up-on-unassign
  ;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
  )

