;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements.  See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership.  The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License.  You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns backtype.storm.clojure
  (:use [backtype.storm util])
  (:import [backtype.storm StormSubmitter])
  (:import [backtype.storm.generated StreamInfo])
  (:import [backtype.storm.tuple Tuple])
  (:import [backtype.storm.task OutputCollector IBolt TopologyContext])
  (:import [backtype.storm.spout SpoutOutputCollector ISpout])
  (:import [backtype.storm.utils Utils])
  (:import [backtype.storm.clojure ClojureBolt ClojureSpout])
  (:import [java.util List])
  (:require [backtype.storm [thrift :as thrift]]))

(defn direct-stream [fields]
  (StreamInfo. fields true))

(defn to-spec [avar]
  (let [m (meta avar)]
    [(str (:ns m)) (str (:name m))]))

(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec)))

(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))

(defn clojure-spout* [output-spec fn-var conf-var args]
  (let [m (meta fn-var)]
    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec))
    ))

(defmacro clojure-spout [output-spec fn-sym conf-sym args]
  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))

(defn normalize-fns [body]
  (for [[name args & impl] body
        :let [args (-> "this"
                       gensym
                       (cons args)
                       vec)]]
    (concat [name args] impl)
    ))

(defmacro bolt [& body]
  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
        fns (normalize-fns bolt-fns)]
    `(reify IBolt
       ~@fns
       ~@other-fns)))

(defmacro bolt-execute [& body]
  `(bolt
     (~'execute ~@body)))

(defmacro spout [& body]
  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
        fns (normalize-fns spout-fns)]
    `(reify ISpout
       ~@fns
       ~@other-fns)))

(defmacro defbolt [name output-spec & [opts & impl :as all]]
  (if-not (map? opts)
    `(defbolt ~name ~output-spec {} ~@all)
    (let [worker-name (symbol (str name "__"))
          conf-fn-name (symbol (str name "__conf__"))
          params (:params opts)
          conf-code (:conf opts)
          fn-body (if (:prepare opts)
                    (cons 'fn impl)
                    (let [[args & impl-body] impl
                          coll-sym (nth args 1)
                          args (vec (take 1 args))
                          prepargs [(gensym "conf") (gensym "context") coll-sym]]
                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
          definer (if params
                    `(defn ~name [& args#]
                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
                    `(def ~name
                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
                    )
          ]
      `(do
         (defn ~conf-fn-name ~(if params params [])
           ~conf-code
           )
         (defn ~worker-name ~(if params params [])
           ~fn-body
           )
         ~definer
         ))))

(defmacro defspout [name output-spec & [opts & impl :as all]]
  (if-not (map? opts)
    `(defspout ~name ~output-spec {} ~@all)
    (let [worker-name (symbol (str name "__"))
          conf-fn-name (symbol (str name "__conf__"))
          params (:params opts)
          conf-code (:conf opts)
          prepare? (:prepare opts)
          prepare? (if (nil? prepare?) true prepare?)
          fn-body (if prepare?
                    (cons 'fn impl)
                    (let [[args & impl-body] impl
                          coll-sym (first args)
                          prepargs [(gensym "conf") (gensym "context") coll-sym]]
                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
          definer (if params
                    `(defn ~name [& args#]
                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#))
                    `(def ~name
                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name []))
                    )
          ]
      `(do
         (defn ~conf-fn-name ~(if params params [])
           ~conf-code
           )
         (defn ~worker-name ~(if params params [])
           ~fn-body
           )
         ~definer
         ))))

(defprotocol TupleValues
  (tuple-values [values collector stream]))

(extend-protocol TupleValues
  java.util.Map
  (tuple-values [this collector ^String stream]
    (let [^TopologyContext context (:context collector)
          fields (..  context (getThisOutputFields stream) toList) ]
      (vec (map (into
                  (empty this) (for [[k v] this]
                                   [(if (keyword? k) (name k) k) v]))
                fields))))
  java.util.List
  (tuple-values [this collector stream]
    this))

(defnk emit-bolt! [collector values
                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
  (let [^List anchor (collectify anchor)
        values (tuple-values values collector stream) ]
    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
    ))

(defnk emit-direct-bolt! [collector task values
                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
  (let [^List anchor (collectify anchor)
        values (tuple-values values collector stream) ]
    (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values)
    ))

(defn ack! [collector ^Tuple tuple]
  (.ack ^OutputCollector (:output-collector collector) tuple))

(defn fail! [collector ^Tuple tuple]
  (.fail ^OutputCollector (:output-collector collector) tuple))

(defn report-error! [collector ^Tuple tuple]
  (.reportError ^OutputCollector (:output-collector collector) tuple))

(defnk emit-spout! [collector values
                    :stream Utils/DEFAULT_STREAM_ID :id nil]
  (let [values (tuple-values values collector stream)]
    (.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))

(defnk emit-direct-spout! [collector task values
                           :stream Utils/DEFAULT_STREAM_ID :id nil]
  (let [values (tuple-values values collector stream)]
    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))

(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
(defalias spout-spec thrift/mk-spout-spec)
(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
(defalias shell-spout-spec thrift/mk-shell-spout-spec)

(defn submit-remote-topology [name conf topology]
  (StormSubmitter/submitTopology name conf topology))

(defn local-cluster []
  ;; do this to avoid a cyclic dependency of
  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
  (eval '(new backtype.storm.LocalCluster)))
