(ns backtype.storm.clojure
(:use [clojure.contrib.def :only [defnk defalias]])
(:use [backtype.storm bootstrap util])
(:import [backtype.storm StormSubmitter])
(:import [backtype.storm.generated StreamInfo])
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.task OutputCollector IBolt])
(:import [backtype.storm.spout SpoutOutputCollector ISpout])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.clojure ClojureBolt ClojureSpout])
(:require [backtype.storm [thrift :as thrift]]))
(defn hint [sym class-sym]
(with-meta sym {:tag class-sym})
(defmulti hinted-args (fn [m args] m))
(defmethod hinted-args 'prepare [_ [conf context collector]]
[(hint conf 'java.util.Map)
(hint context 'backtype.storm.task.TopologyContext)
(hint collector 'backtype.storm.task.OutputCollector)]
(defmethod hinted-args 'execute [_ [tuple]]
[(hint tuple 'backtype.storm.tuple.Tuple)]
(defmethod hinted-args 'cleanup [_ []]
(defmethod hinted-args 'close [_ []]
(defmethod hinted-args 'open [_ [conf context collector]]
[(hint conf 'java.util.Map)
(hint context 'backtype.storm.task.TopologyContext)
(hint collector 'backtype.storm.spout.SpoutOutputCollector)]
(defmethod hinted-args 'nextTuple [_ []]
(defmethod hinted-args 'ack [_ [id]]
[(hint id 'java.lang.Object)]
(defmethod hinted-args 'fail [_ [id]]
[(hint id 'java.lang.Object)]
(defn direct-stream [fields]
(StreamInfo. fields true))
(defn clojure-bolt* [output-spec fn-var args]
(let [m (meta fn-var)]
(ClojureBolt. (str (:ns m)) (str (:name m)) args (thrift/mk-output-spec output-spec))
(defmacro clojure-bolt [output-spec fn-sym args]
`(clojure-bolt* ~output-spec (var ~fn-sym) ~args))
(defn clojure-spout* [output-spec distributed? fn-var args]
(let [m (meta fn-var)]
(ClojureSpout. (str (:ns m)) (str (:name m)) args (thrift/mk-output-spec output-spec) distributed?)
(defmacro clojure-spout [output-spec distributed? fn-sym args]
`(clojure-spout* ~output-spec ~distributed? (var ~fn-sym) ~args))
(defn hint-fns [body]
(for [[name args & impl] body
:let [name (hint name 'void)
args (hinted-args name args)
args (-> "this"
(cons args)
(concat [name args] impl)
(defmacro bolt [& body]
(let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
fns (hint-fns bolt-fns)]
`(reify IBolt
(defmacro bolt-execute [& body]
(~'execute ~@body)))
(defmacro spout [& body]
(let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
fns (hint-fns spout-fns)]
`(reify ISpout
(defmacro defbolt [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defbolt ~name ~output-spec {} ~@all)
(let [worker-name (symbol (str name "__"))
params (:params opts)
fn-body (if (:prepare opts)
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (nth args 1)
args (vec (take 1 args))
prepargs (hinted-args 'prepare [(gensym "conf") (gensym "context") coll-sym])]
`(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
(clojure-bolt ~output-spec ~worker-name args#))
`(def ~name
(clojure-bolt ~output-spec ~worker-name []))
(defn ~worker-name ~(if params params [])
(defmacro defspout [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defspout ~name ~output-spec {} ~@all)
(let [worker-name (symbol (str name "__"))
params (:params opts)
distributed? (get opts :distributed true)
prepare? (:prepare opts)
prepare? (if (nil? prepare?) true prepare?)
fn-body (if prepare?
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (first args)
prepargs (hinted-args 'open [(gensym "conf") (gensym "context") coll-sym])]
`(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
(clojure-spout ~output-spec ~distributed? ~worker-name args#))
`(def ~name
(clojure-spout ~output-spec ~distributed? ~worker-name []))
(defn ~worker-name ~(if params params [])
(defnk emit-bolt! [^OutputCollector collector ^List values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)]
(.emit collector stream anchor values)
(defnk emit-direct-bolt! [^OutputCollector collector task ^List values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)]
(.emitDirect collector task stream anchor values)
(defn ack! [^OutputCollector collector ^Tuple tuple]
(.ack collector tuple))
(defn fail! [^OutputCollector collector ^Tuple tuple]
(.fail collector tuple))
(defnk emit-spout! [^SpoutOutputCollector collector ^List values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(.emit collector stream values id))
(defnk emit-direct-spout! [^SpoutOutputCollector collector task ^List values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(.emitDirect collector task stream values id))
(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
(defalias spout-spec thrift/mk-spout-spec)
(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
(defn submit-remote-topology [name conf topology]
(StormSubmitter/submitTopology name conf topology))
(defn local-cluster []
;; do this to avoid a cyclic dependency of
;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
(eval '(new backtype.storm.LocalCluster)))