blob: fdae7cb1f826bddd0bd5516f9c5958d7bbff4aea [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.clojure
(:use [backtype.storm bootstrap util])
(:import [backtype.storm StormSubmitter])
(:import [backtype.storm.generated StreamInfo])
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.task OutputCollector IBolt TopologyContext])
(:import [backtype.storm.spout SpoutOutputCollector ISpout])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.clojure ClojureBolt ClojureSpout])
(:import [java.util List])
(:require [backtype.storm [thrift :as thrift]]))
(defn direct-stream [fields]
(StreamInfo. fields true))
(defn to-spec [avar]
(let [m (meta avar)]
[(str (:ns m)) (str (:name m))]))
(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
(ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec)))
(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
`(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
(defn clojure-spout* [output-spec fn-var conf-var args]
(let [m (meta fn-var)]
(ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec))
))
(defmacro clojure-spout [output-spec fn-sym conf-sym args]
`(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
(defn normalize-fns [body]
(for [[name args & impl] body
:let [args (-> "this"
gensym
(cons args)
vec)]]
(concat [name args] impl)
))
(defmacro bolt [& body]
(let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
fns (normalize-fns bolt-fns)]
`(reify IBolt
~@fns
~@other-fns)))
(defmacro bolt-execute [& body]
`(bolt
(~'execute ~@body)))
(defmacro spout [& body]
(let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
fns (normalize-fns spout-fns)]
`(reify ISpout
~@fns
~@other-fns)))
(defmacro defbolt [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defbolt ~name ~output-spec {} ~@all)
(let [worker-name (symbol (str name "__"))
conf-fn-name (symbol (str name "__conf__"))
params (:params opts)
conf-code (:conf opts)
fn-body (if (:prepare opts)
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (nth args 1)
args (vec (take 1 args))
prepargs [(gensym "conf") (gensym "context") coll-sym]]
`(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
(clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
`(def ~name
(clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
)
]
`(do
(defn ~conf-fn-name ~(if params params [])
~conf-code
)
(defn ~worker-name ~(if params params [])
~fn-body
)
~definer
))))
(defmacro defspout [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defspout ~name ~output-spec {} ~@all)
(let [worker-name (symbol (str name "__"))
conf-fn-name (symbol (str name "__conf__"))
params (:params opts)
conf-code (:conf opts)
prepare? (:prepare opts)
prepare? (if (nil? prepare?) true prepare?)
fn-body (if prepare?
(cons 'fn impl)
(let [[args & impl-body] impl
coll-sym (first args)
prepargs [(gensym "conf") (gensym "context") coll-sym]]
`(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
definer (if params
`(defn ~name [& args#]
(clojure-spout ~output-spec ~worker-name ~conf-fn-name args#))
`(def ~name
(clojure-spout ~output-spec ~worker-name ~conf-fn-name []))
)
]
`(do
(defn ~conf-fn-name ~(if params params [])
~conf-code
)
(defn ~worker-name ~(if params params [])
~fn-body
)
~definer
))))
(defprotocol TupleValues
(tuple-values [values collector stream]))
(extend-protocol TupleValues
java.util.Map
(tuple-values [this collector ^String stream]
(let [^TopologyContext context (:context collector)
fields (.. context (getThisOutputFields stream) toList) ]
(vec (map (into
(empty this) (for [[k v] this]
[(if (keyword? k) (name k) k) v]))
fields))))
java.util.List
(tuple-values [this collector stream]
this))
(defnk emit-bolt! [collector values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)
values (tuple-values values collector stream) ]
(.emit ^OutputCollector (:output-collector collector) stream anchor values)
))
(defnk emit-direct-bolt! [collector task values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)
values (tuple-values values collector stream) ]
(.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values)
))
(defn ack! [collector ^Tuple tuple]
(.ack ^OutputCollector (:output-collector collector) tuple))
(defn fail! [collector ^Tuple tuple]
(.fail ^OutputCollector (:output-collector collector) tuple))
(defn report-error! [collector ^Tuple tuple]
(.reportError ^OutputCollector (:output-collector collector) tuple))
(defnk emit-spout! [collector values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(let [values (tuple-values values collector stream)]
(.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))
(defnk emit-direct-spout! [collector task values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(let [values (tuple-values values collector stream)]
(.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))
(defalias topology thrift/mk-topology)
(defalias bolt-spec thrift/mk-bolt-spec)
(defalias spout-spec thrift/mk-spout-spec)
(defalias shell-bolt-spec thrift/mk-shell-bolt-spec)
(defalias shell-spout-spec thrift/mk-shell-spout-spec)
(defn submit-remote-topology [name conf topology]
(StormSubmitter/submitTopology name conf topology))
(defn local-cluster []
;; do this to avoid a cyclic dependency of
;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
(eval '(new backtype.storm.LocalCluster)))