blob: 390d2aa406555dd16eeae7ee8bdb40fb587bbf3e [file] [log] [blame]
(ns backtype.storm.clojure
(:use backtype.storm.bootstrap)
(:import [backtype.storm.generated StreamInfo])
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.task OutputCollector])
(:import backtype.storm.clojure.ClojureBolt)
(:require [backtype.storm [thrift :as thrift]]))
(defn direct-stream [fields]
(StreamInfo. fields true))
(defn clojure-bolt* [output-spec fn-var & args]
(let [m (meta fn-var)]
(ClojureBolt. (str (:ns m)) (str (:name m)) args (thrift/mk-output-spec output-spec))
))
(defmacro clojure-bolt [output-spec fn-sym & args]
`(clojure-bolt* ~output-spec (var ~fn-sym) ~@args))
(defmacro defbolt [name output-spec [tuple-sym collector-sym] & body]
(let [worker-name (symbol (str name "__"))]
`(do
(defn ~worker-name []
(fn [^Tuple ~tuple-sym ^OutputCollector ~collector-sym]
~@body
))
(def ~name (clojure-bolt ~output-spec ~worker-name))
)))
(defn hint [sym class-sym]
(with-meta sym {:tag class-sym})
)
(defmulti hinted-args (fn [kw args] kw))
(defmethod hinted-args :prepare [_ [conf context collector]]
[(hint conf 'java.util.Map)
(hint context 'backtype.storm.task.TopologyContext)
(hint collector 'backtype.storm.bolt.OutputCollector)]
)
(defmethod hinted-args :execute [_ [tuple collector]]
[(hint tuple 'backtype.storm.tuple.Tuple)
(hint collector 'backtype.storm.task.OutputCollector)]
)
(defmethod hinted-args :cleanup [_ [collector]]
[(hint collector 'backtype.storm.task.OutputCollector)]
)
(defmacro defboltfull [name output-spec & kwargs]
(let [opts (apply hash-map kwargs)
worker-name (symbol (str name "__"))
let-bindings (:let opts)
hof-args (:params opts)
definer (if hof-args
`(defn ~name [& args#]
(apply clojure-bolt* ~output-spec (var ~worker-name) args#))
`(def ~name (clojure-bolt ~output-spec ~worker-name)))
fns (select-keys opts [:prepare :execute :cleanup])
fns (into {}
(for [[fn-kw [args & impl]] fns]
[fn-kw `(fn ~(hinted-args fn-kw args) ~@impl)]
))]
`(do
(defn ~worker-name [~@hof-args]
(let [~@let-bindings]
~fns
))
~definer
)))