(ns backtype.storm.integration-test
  (:use [clojure test])
  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
  (:use [backtype.storm bootstrap testing])
  (:use [backtype.storm.daemon common])
  )

(bootstrap)

;; (deftest test-counter
;;   (with-local-cluster [cluster :supervisors 4]
;;     (let [state (:storm-cluster-state cluster)
;;           nimbus (:nimbus cluster)
;;           topology (thrift/mk-topology
;;                     {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
;;                     {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
;;                      3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
;;                      4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
;;                      })]
;;         (submit-local-topology nimbus
;;                             "counter"
;;                             {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;;                             topology)
;;         (Thread/sleep 10000)
;;         (.killTopology nimbus "counter")
;;         (Thread/sleep 10000)
;;         )))

;; (deftest test-multilang-fy
;;   (with-local-cluster [cluster :supervisors 4]
;;     (let [nimbus (:nimbus cluster)
;;           topology (thrift/mk-topology
;;                       {1 (thrift/mk-spout-spec (TestWordSpout. false))}
;;                       {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "fancy" "tester.fy" ["word"] :parallelism-hint 1)}
;;                       )]
;;       (submit-local-topology nimbus
;;                           "test"
;;                           {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;;                           topology)
;;       (Thread/sleep 10000)
;;       (.killTopology nimbus "test")
;;       (Thread/sleep 10000)
;;       )))

;; (deftest test-multilang-rb
;;   (with-local-cluster [cluster :supervisors 4]
;;     (let [nimbus (:nimbus cluster)
;;           topology (thrift/mk-topology
;;                       {1 (thrift/mk-spout-spec (TestWordSpout. false))}
;;                       {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "ruby" "tester.rb" ["word"] :parallelism-hint 1)}
;;                       )]
;;       (submit-local-topology nimbus
;;                           "test"
;;                           {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
;;                           topology)
;;       (Thread/sleep 10000)
;;       (.killTopology nimbus "test")
;;       (Thread/sleep 10000)
;;       )))


(deftest test-multilang-py
  (with-local-cluster [cluster :supervisors 4]
    (let [nimbus (:nimbus cluster)
          topology (thrift/mk-topology
                      {1 (thrift/mk-spout-spec (TestWordSpout. false))}
                      {2 (thrift/mk-shell-bolt-spec {1 :shuffle} "python" "tester.py" ["word"] :parallelism-hint 1)}
                      )]
      (submit-local-topology nimbus
                          "test"
                          {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 20 TOPOLOGY-MESSAGE-TIMEOUT-SECS 3 TOPOLOGY-DEBUG true}
                          topology)
      (Thread/sleep 10000)
      (.killTopology nimbus "test")
      (Thread/sleep 10000)
      )))


(deftest test-basic-topology
  (with-simulated-time-local-cluster [cluster :supervisors 4]
    (let [topology (thrift/mk-topology
                    {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
                    {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint 4)
                     3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
                     4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
                     })
          results (complete-topology cluster
                                     topology
                                     :mock-sources {1 [["nathan"] ["bob"] ["joey"] ["nathan"]]}
                                     :storm-conf {TOPOLOGY-DEBUG true})]
      (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
             (read-tuples results 1)))
      (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
               (read-tuples results 2)))
      (is (= [[1] [2] [3] [4]]
               (read-tuples results 3)))
      (is (= [[1] [2] [3] [4]]
             (read-tuples results 4)))
      )))

(deftest test-shuffle
  (with-simulated-time-local-cluster [cluster :supervisors 4]
    (let [topology (thrift/mk-topology
                    {1 (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
                    {2 (thrift/mk-bolt-spec {1 :shuffle} (TestGlobalCount.)
                                            :parallelism-hint 6)
                     })
          results (complete-topology cluster
                                     topology
                                     ;; important for test that
                                     ;; #tuples = multiple of 4 and 6
                                     :mock-sources {1 [["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ["a"] ["b"]
                                                       ]}
                                     )]
      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
               (read-tuples results 2)))
      )))

(defbolt lalala-bolt1 ["word"] [tuple collector]
  (let [ret (-> (.getValue tuple 0) (str "lalala"))]
    (.emit collector tuple [ret])
    (.ack collector tuple)
    ))

(defboltfull lalala-bolt2 ["word"]
  :let [state (atom nil)]
  :prepare ([conf context collector]
              (println "prepare")
              (reset! state "lalala")
              )
  :execute ([tuple collector]
              (let [ret (-> (.getValue tuple 0) (str @state))]
                (.emit collector tuple [ret])
                (.ack collector tuple)
                )))

(defboltfull lalala-bolt3 ["word"]
  :let [state (atom nil)]
  :params [prefix]
  :prepare ([conf context collector]
              (reset! state (str prefix "lalala"))
              )
  :execute ([tuple collector]
              (let [ret (-> (.getValue tuple 0) (str @state))]
                (.emit collector tuple [ret])
                (.ack collector tuple)
                )))

(deftest test-clojure-bolt
  (with-simulated-time-local-cluster [cluster :supervisors 4]
    (let [nimbus (:nimbus cluster)
          topology (thrift/mk-topology
                      {1 (thrift/mk-spout-spec (TestWordSpout. false))}
                      {2 (thrift/mk-bolt-spec {1 :shuffle}
                                              lalala-bolt1)
                       3 (thrift/mk-bolt-spec {1 :shuffle}
                                              lalala-bolt2)
                       4 (thrift/mk-bolt-spec {1 :shuffle}
                                              (lalala-bolt3 "_nathan_"))}
                      )
          results (complete-topology cluster
                                     topology
                                     :mock-sources {1 [["david"]
                                                       ["adam"]
                                                       ]}
                                     )]
      (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 2)))
      (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results 3)))
      (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results 4)))
      )))

(defn ack-tracking-feeder [fields]
  (let [tracker (AckTracker.)]
    [(doto (feeder-spout fields)
       (.setAckFailDelegate tracker))
     (fn [val]
       (is (= (.getNumAcks tracker) val))
       (.resetNumAcks tracker)
       )]
    ))

(defboltfull branching-bolt ["num"]
  :params [amt]
  :execute ([tuple collector]
              (doseq [i (range amt)]
                (.emit collector tuple [i]))
              (.ack collector tuple)
              ))

(defboltfull agg-bolt ["num"]
  :let [seen (atom [])]
  :params [amt]
  :execute ([tuple collector]
              (swap! seen conj tuple)
              (when (= (count @seen) amt)
                (.emit collector @seen [1])
                (doseq [s @seen]
                  (.ack collector s))
                (reset! seen [])
                )))

(defbolt ack-bolt {}
  [tuple collector]
  (.ack collector tuple))

(defbolt identity-bolt ["num"]
  [tuple collector]
  (.emit collector tuple (.getValues tuple))
  (.ack collector tuple))

(deftest test-acking
  (with-tracked-cluster [cluster]
    (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
          [feeder2 checker2] (ack-tracking-feeder ["num"])
          [feeder3 checker3] (ack-tracking-feeder ["num"])
          tracked (mk-tracked-topology
                   {1 [feeder1]
                    2 [feeder2]
                    3 [feeder3]}
                   {4 [{1 :shuffle} (branching-bolt 2)]
                    5 [{2 :shuffle} (branching-bolt 4)]
                    6 [{3 :shuffle} (branching-bolt 1)]
                    7 [{4 :shuffle
                        5 :shuffle
                        6 :shuffle} (agg-bolt 3)]
                    8 [{7 :shuffle} (branching-bolt 2)]
                    9 [{8 :shuffle} ack-bolt]}
                   )]
      (submit-local-topology (:nimbus cluster)
                             "test"
                             {}
                             (:topology tracked))
      (.feed feeder1 [1])
      (tracked-wait tracked 1)
      (checker1 0)
      (.feed feeder2 [1])
      (tracked-wait tracked 1)
      (checker1 1)
      (checker2 1)
      (.feed feeder1 [1])
      (tracked-wait tracked 1)
      (checker1 0)
      (.feed feeder1 [1])
      (tracked-wait tracked 1)
      (checker1 1)
      (.feed feeder3 [1])
      (tracked-wait tracked 1)
      (checker1 0)
      (checker3 0)
      (.feed feeder2 [1])
      (tracked-wait tracked 1)
      (checker1 1)
      (checker2 1)
      (checker3 1)
      
      )))

(deftest test-ack-branching
  (with-tracked-cluster [cluster]
    (let [[feeder checker] (ack-tracking-feeder ["num"])
          tracked (mk-tracked-topology
                   {1 [feeder]}
                   {2 [{1 :shuffle} identity-bolt]
                    3 [{1 :shuffle} identity-bolt]
                    4 [{2 :shuffle
                        3 :shuffle} (agg-bolt 4)]})]
      (submit-local-topology (:nimbus cluster)
                             "test"
                             {}
                             (:topology tracked))
      (.feed feeder [1])
      (tracked-wait tracked 1)
      (checker 0)
      (.feed feeder [1])
      (tracked-wait tracked 1)
      (checker 2)
      )))

(defbolt dup-anchor ["num"]
  [tuple collector]
  (.emit collector [tuple tuple] [1])
  (.ack collector tuple))

(deftest test-acking-self-anchor
  (with-tracked-cluster [cluster]
    (let [[feeder checker] (ack-tracking-feeder ["num"])
          tracked (mk-tracked-topology
                   {1 [feeder]}
                   {2 [{1 :shuffle} dup-anchor]
                    3 [{2 :shuffle} ack-bolt]})]
      (submit-local-topology (:nimbus cluster)
                             "test"
                             {}
                             (:topology tracked))
      (.feed feeder [1])
      (tracked-wait tracked 1)
      (checker 1)
      (.feed feeder [1])
      (.feed feeder [1])
      (.feed feeder [1])
      (tracked-wait tracked 3)
      (checker 3)
      )))

(deftest test-acking-branching-complex
  ;; test acking with branching in the topology
  )


(deftest test-fields-grouping
  ;; 1. put a shitload of random tuples through it and test that counts are right
  ;; 2. test that different spouts with different phints group the same way
  )

(deftest test-all-grouping
  )

(deftest test-direct-grouping
  )
