blob: f778d238c5daff057c3e92702a1d9c287c94c262 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.thrift
(:import [java.util HashMap])
(:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology
StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
GlobalStreamId ComponentObject ComponentObject$_Fields
ShellComponent])
(:import [backtype.storm.utils Utils])
(:import [backtype.storm Constants])
(:import [backtype.storm.grouping CustomStreamGrouping])
(:import [backtype.storm.topology TopologyBuilder])
(:import [backtype.storm.clojure RichShellBolt RichShellSpout])
(:import [org.apache.thrift.protocol TBinaryProtocol TProtocol])
(:import [org.apache.thrift.transport TTransport TFramedTransport TSocket])
(:use [backtype.storm util config log]))
(defn instantiate-java-object
[^JavaObject obj]
(let [name (symbol (.get_full_class_name obj))
args (map (memfn getFieldValue) (.get_args_list obj))]
(eval `(new ~name ~@args))))
(def grouping-constants
{Grouping$_Fields/FIELDS :fields
Grouping$_Fields/SHUFFLE :shuffle
Grouping$_Fields/ALL :all
Grouping$_Fields/NONE :none
Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized
Grouping$_Fields/CUSTOM_OBJECT :custom-object
Grouping$_Fields/DIRECT :direct
Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
(defn grouping-type
[^Grouping grouping]
(grouping-constants (.getSetField grouping)))
(defn field-grouping
[^Grouping grouping]
(when-not (= (grouping-type grouping) :fields)
(throw (IllegalArgumentException. "Tried to get grouping fields from non fields grouping")))
(.get_fields grouping))
(defn global-grouping?
[^Grouping grouping]
(and (= :fields (grouping-type grouping))
(empty? (field-grouping grouping))))
(defn parallelism-hint
[^ComponentCommon component-common]
(let [phint (.get_parallelism_hint component-common)]
(if-not (.is_set_parallelism_hint component-common) 1 phint)))
(defn nimbus-client-and-conn
[host port]
(log-message "Connecting to Nimbus at " host ":" port)
(let [transport (TFramedTransport. (TSocket. host port))
prot (TBinaryProtocol. transport)
client (Nimbus$Client. prot)]
(.open transport)
[client transport]))
(defmacro with-nimbus-connection
[[client-sym host port] & body]
`(let [[^Nimbus$Client ~client-sym ^TTransport conn#]
(nimbus-client-and-conn ~host ~port)]
(try
~@body
(finally (.close conn#)))))
(defmacro with-configured-nimbus-connection
[client-sym & body]
`(let [conf# (read-storm-config)
host# (conf# NIMBUS-HOST)
port# (conf# NIMBUS-THRIFT-PORT)]
(with-nimbus-connection [~client-sym host# port#]
~@body )))
(defn direct-output-fields
[fields]
(StreamInfo. fields true))
(defn output-fields
[fields]
(StreamInfo. fields false))
(defn mk-output-spec
[output-spec]
(let [output-spec (if (map? output-spec)
output-spec
{Utils/DEFAULT_STREAM_ID output-spec})]
(map-val
(fn [out]
(if (instance? StreamInfo out)
out
(StreamInfo. out false)))
output-spec)))
(defnk mk-plain-component-common
[inputs output-spec parallelism-hint :conf nil]
(let [ret (ComponentCommon. (HashMap. inputs) (HashMap. (mk-output-spec output-spec)))]
(when parallelism-hint
(.set_parallelism_hint ret parallelism-hint))
(when conf
(.set_json_conf ret (to-json conf)))
ret))
(defnk mk-spout-spec*
[spout outputs :p nil :conf nil]
(SpoutSpec. (ComponentObject/serialized_java (Utils/serialize spout))
(mk-plain-component-common {} outputs p :conf conf)))
(defn mk-shuffle-grouping
[]
(Grouping/shuffle (NullStruct.)))
(defn mk-local-or-shuffle-grouping
[]
(Grouping/local_or_shuffle (NullStruct.)))
(defn mk-fields-grouping
[fields]
(Grouping/fields fields))
(defn mk-global-grouping
[]
(mk-fields-grouping []))
(defn mk-direct-grouping
[]
(Grouping/direct (NullStruct.)))
(defn mk-all-grouping
[]
(Grouping/all (NullStruct.)))
(defn mk-none-grouping
[]
(Grouping/none (NullStruct.)))
(defn deserialized-component-object
[^ComponentObject obj]
(when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
(throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
(Utils/deserialize (.get_serialized_java obj)))
(defn serialize-component-object
[obj]
(ComponentObject/serialized_java (Utils/serialize obj)))
(defn- mk-grouping
[grouping-spec]
(cond (nil? grouping-spec)
(mk-none-grouping)
(instance? Grouping grouping-spec)
grouping-spec
(instance? CustomStreamGrouping grouping-spec)
(Grouping/custom_serialized (Utils/serialize grouping-spec))
(instance? JavaObject grouping-spec)
(Grouping/custom_object grouping-spec)
(sequential? grouping-spec)
(mk-fields-grouping grouping-spec)
(= grouping-spec :shuffle)
(mk-shuffle-grouping)
(= grouping-spec :local-or-shuffle)
(mk-local-or-shuffle-grouping)
(= grouping-spec :none)
(mk-none-grouping)
(= grouping-spec :all)
(mk-all-grouping)
(= grouping-spec :global)
(mk-global-grouping)
(= grouping-spec :direct)
(mk-direct-grouping)
true
(throw (IllegalArgumentException.
(str grouping-spec " is not a valid grouping")))))
(defn- mk-inputs
[inputs]
(into {} (for [[stream-id grouping-spec] inputs]
[(if (sequential? stream-id)
(GlobalStreamId. (first stream-id) (second stream-id))
(GlobalStreamId. stream-id Utils/DEFAULT_STREAM_ID))
(mk-grouping grouping-spec)])))
(defnk mk-bolt-spec*
[inputs bolt outputs :p nil :conf nil]
(let [common (mk-plain-component-common (mk-inputs inputs) outputs p :conf conf)]
(Bolt. (ComponentObject/serialized_java (Utils/serialize bolt))
common)))
(defnk mk-spout-spec
[spout :parallelism-hint nil :p nil :conf nil]
(let [parallelism-hint (if p p parallelism-hint)]
{:obj spout :p parallelism-hint :conf conf}))
(defn- shell-component-params
[command script-or-output-spec kwargs]
(if (string? script-or-output-spec)
[(into-array String [command script-or-output-spec])
(first kwargs)
(rest kwargs)]
[(into-array String command)
script-or-output-spec
kwargs]))
(defnk mk-bolt-spec
[inputs bolt :parallelism-hint nil :p nil :conf nil]
(let [parallelism-hint (if p p parallelism-hint)]
{:obj bolt :inputs inputs :p parallelism-hint :conf conf}))
(defn mk-shell-bolt-spec
[inputs command script-or-output-spec & kwargs]
(let [[command output-spec kwargs]
(shell-component-params command script-or-output-spec kwargs)]
(apply mk-bolt-spec inputs
(RichShellBolt. command (mk-output-spec output-spec)) kwargs)))
(defn mk-shell-spout-spec
[command script-or-output-spec & kwargs]
(let [[command output-spec kwargs]
(shell-component-params command script-or-output-spec kwargs)]
(apply mk-spout-spec
(RichShellSpout. command (mk-output-spec output-spec)) kwargs)))
(defn- add-inputs
[declarer inputs]
(doseq [[id grouping] (mk-inputs inputs)]
(.grouping declarer id grouping)))
(defn mk-topology
([spout-map bolt-map]
(let [builder (TopologyBuilder.)]
(doseq [[name {spout :obj p :p conf :conf}] spout-map]
(-> builder (.setSpout name spout (if-not (nil? p) (int p) p)) (.addConfigurations conf)))
(doseq [[name {bolt :obj p :p conf :conf inputs :inputs}] bolt-map]
(-> builder (.setBolt name bolt (if-not (nil? p) (int p) p)) (.addConfigurations conf) (add-inputs inputs)))
(.createTopology builder)))
([spout-map bolt-map state-spout-map]
(mk-topology spout-map bolt-map)))
;; clojurify-structure is needed or else every element becomes the same after successive calls
;; don't know why this happens
(def STORM-TOPOLOGY-FIELDS
(-> StormTopology/metaDataMap clojurify-structure keys))
(def SPOUT-FIELDS
[StormTopology$_Fields/SPOUTS
StormTopology$_Fields/STATE_SPOUTS])