(ns org.apache.storm.clojure
(:use [org.apache.storm util])
(:import [org.apache.storm StormSubmitter])
(:import [org.apache.storm.generated StreamInfo])
(:import [org.apache.storm.tuple Tuple])
(:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
(:import [org.apache.storm.spout SpoutOutputCollector ISpout])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
(:import [java.util Collection List])
(:require [org.apache.storm [thrift :as thrift]]))
(defn direct-stream [fields]
(StreamInfo. fields true))
(defn to-spec [avar]
(let [m (meta avar)]
[(str (:ns m)) (str (:name m))]))
(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
(ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec)))
(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
`(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
(defn clojure-spout* [output-spec fn-var conf-var args]
(let [m (meta fn-var)]
(ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec))
(defmacro clojure-spout [output-spec fn-sym conf-sym args]
`(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
(defn normalize-fns [body]
(for [[name args & impl] body
:let [args (-> "this"
(cons args)
(concat [name args] impl)
(defmacro bolt [& body]
(let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
fns (normalize-fns bolt-fns)]
`(reify IBolt
(defmacro bolt-execute [& body]
(~'execute ~@body)))
(defmacro spout [& body]
(let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
fns (normalize-fns spout-fns)]
`(reify ISpout
(defmacro defbolt [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defbolt ~name ~output-spec {} ~@all)
(let [worker-name (symbol (str name "__"))
conf-fn-name (symbol (str name "__conf__"))
params (:params opts)
conf-code (:conf opts)
fn-body (if (:prepare opts)
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (nth args 1)
args (vec (take 1 args))
prepargs [(gensym "conf") (gensym "context") coll-sym]]
`(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
(clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
`(def ~name
(clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
(defn ~conf-fn-name ~(if params params [])
(defn ~worker-name ~(if params params [])
(defmacro defspout [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defspout ~name ~output-spec {} ~@all)
(let [worker-name (symbol (str name "__"))
conf-fn-name (symbol (str name "__conf__"))
params (:params opts)
conf-code (:conf opts)
prepare? (:prepare opts)
prepare? (if (nil? prepare?) true prepare?)
fn-body (if prepare?
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (first args)
prepargs [(gensym "conf") (gensym "context") coll-sym]]
`(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
(clojure-spout ~output-spec ~worker-name ~conf-fn-name args#))
`(def ~name
(clojure-spout ~output-spec ~worker-name ~conf-fn-name []))
(defn ~conf-fn-name ~(if params params [])
(defn ~worker-name ~(if params params [])
(defprotocol TupleValues
(tuple-values [values collector stream]))
(extend-protocol TupleValues
(tuple-values [this collector ^String stream]
(let [^TopologyContext context (:context collector)
fields (.. context (getThisOutputFields stream) toList) ]
(vec (map (into
(empty this) (for [[k v] this]
[(if (keyword? k) (name k) k) v]))
(tuple-values [this collector stream]
(defn- collectify
(if (or (sequential? obj) (instance? Collection obj))
(defnk emit-bolt! [collector values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)
values (tuple-values values collector stream) ]
(.emit ^OutputCollector (:output-collector collector) stream anchor values)
(defnk emit-direct-bolt! [collector task values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)
values (tuple-values values collector stream) ]
(.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values)
(defn ack! [collector ^Tuple tuple]
(.ack ^OutputCollector (:output-collector collector) tuple))
(defn fail! [collector ^Tuple tuple]
(.fail ^OutputCollector (:output-collector collector) tuple))
(defn reset-timeout! [collector ^Tuple tuple]
(.resetTimeout ^OutputCollector (:output-collector collector) tuple))
(defn report-error! [collector ^Tuple tuple]
(.reportError ^OutputCollector (:output-collector collector) tuple))
(defnk emit-spout! [collector values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(let [values (tuple-values values collector stream)]
(.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))
(defnk emit-direct-spout! [collector task values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(let [values (tuple-values values collector stream)]
(.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))
(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
(defalias spout-spec thrift/mk-spout-spec)
(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
(defalias shell-spout-spec thrift/mk-shell-spout-spec)
(defn submit-remote-topology [name conf topology]
(StormSubmitter/submitTopology name conf topology))
(defn local-cluster []
;; do this to avoid a cyclic dependency of
;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
(eval '(new org.apache.storm.LocalCluster)))