blob: ad927830417e942e95bf3056cdeff7ccaf86c9f4 [file] [log] [blame]
(ns backtype.storm.daemon.task
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap])
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
(:require [backtype.storm [tuple :as tuple]]))
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields num-tasks]
(fn [^Tuple tuple]
(mod (tuple/list-hash-code (.select out-fields group-fields (.getValues tuple)))
(defn- mk-grouper
"Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
[^Fields out-fields thrift-grouping num-tasks]
(let [random (Random.)]
(condp = (thrift/grouping-type thrift-grouping)
(if (thrift/global-grouping? thrift-grouping)
(fn [^Tuple tuple]
;; It's possible for target to have multiple tasks if it reads multiple sources
0 )
(let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]
(mk-fields-grouper out-fields group-fields num-tasks)
(fn [^Tuple tuple]
(range num-tasks))
(let [choices (rotating-random-range num-tasks)]
(fn [^Tuple tuple]
(acquire-random-range-id choices num-tasks)
(fn [^Tuple tuple]
(mod (.nextInt random) num-tasks))
(defn- update-ack [curr-entry val]
(let [old (get curr-entry :val 0)]
(assoc curr-entry :val (bit-xor old val))
(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^Integer stream ^List values]
(.emitDirect collector task stream values)
(defn mk-acker-bolt []
(let [output-collector (atom nil)
pending (atom nil)]
(reify IBolt
(^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
(reset! output-collector collector)
(reset! pending (TimeCacheMap. (int (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS))))
(^void execute [this ^Tuple tuple]
(let [id (.getValue tuple 0)
^TimeCacheMap pending @pending
curr (.get pending id)
curr (condp = (.getSourceStreamId tuple)
(update-ack id)
(assoc :spout-task (.getValue tuple 1)))
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
(.put pending id curr)
(when (and curr
(:spout-task curr))
(cond (= 0 (:val curr))
(.remove pending id)
(acker-emit-direct @output-collector
(:spout-task curr)
(:failed curr)
(.remove pending id)
(acker-emit-direct @output-collector
(:spout-task curr)
(.ack ^OutputCollector @output-collector tuple)
(^void cleanup [this]
(defn- get-task-object [topology component-id]
(if (= ACKER-COMPONENT-ID component-id)
(let [spouts (.get_spouts topology)
bolts (.get_bolts topology)
obj (Utils/getSetComponentObject
(contains? spouts component-id) (.get_spout_object (get spouts component-id))
(contains? bolts component-id) (.get_bolt_object (get bolts component-id))
true (throw (RuntimeException. (str "Could not find " component-id " in " topology)))))
obj (if (instance? ShellComponent obj)
(if (contains? spouts component-id)
(ShellSpout. obj)
(ShellBolt. obj))
obj )]
(defn outbound-components
"Returns map of stream id to component id to grouper"
(let [output-groupings (clojurify-structure (.getThisTargets topology-context))
acker-task-amt (count (.getComponentTasks topology-context ACKER-COMPONENT-ID))]
{ACKER-COMPONENT-ID (mk-fields-grouper (Fields. ["id" "spout-task"])
(Fields. ["id"])
{ACKER-COMPONENT-ID (mk-fields-grouper (Fields. ["id" "ack-val"])
(Fields. ["id"])
{ACKER-COMPONENT-ID (mk-fields-grouper (Fields. ["id"]) ;; TODO: add failure msg here later...
(Fields. ["id"])
(into {}
(for [[stream-id component->grouping] output-groupings
:let [out-fields (.getThisOutputFields topology-context stream-id)]]
(into {}
(for [[component tgrouping] component->grouping]
[component (mk-grouper out-fields
(count (.getComponentTasks topology-context component))
(defmulti mk-executors class-selector)
(defmulti close-component class-selector)
(defmulti mk-task-stats class-selector)
(defn- get-readable-name [topology-context]
(let [component-id (.getThisComponentId topology-context)]
(if (system-component? component-id)
({ACKER-COMPONENT-ID "Acker"} component-id)
;; TODO: change this so that can get better name for nested bolts
(str (class (get-task-object (.getRawTopology topology-context) component-id)))
(defn- send-ack [^TopologyContext topology-context ^Tuple input-tuple
^List generated-ids send-fn]
(let [ack-val (bit-xor-vals generated-ids)]
(doseq [[anchor id] (.. input-tuple getMessageId getAnchorsToIds)]
(send-fn (Tuple. topology-context
[anchor (bit-xor ack-val id)]
(.getThisTaskId topology-context)
(defn mk-task [conf storm-conf topology-context storm-id mq-context cluster-state storm-active-atom transfer-fn]
(let [task-id (.getThisTaskId topology-context)
component-id (.getThisComponentId topology-context)
task-info (.getTaskToComponent topology-context)
active (atom true)
uptime (uptime-computer)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
task-object (get-task-object (.getRawTopology topology-context)
(.getThisComponentId topology-context))
task-stats (mk-task-stats task-object (sampling-rate storm-conf))
report-error (fn [error]
(.report-task-error storm-cluster-state storm-id task-id error))
report-error-and-die (fn [error]
(report-error error)
(halt-process! 1 "Task died"))
;; heartbeat ASAP so nimbus doesn't reassign
heartbeat-thread (async-loop
(fn []
(.task-heartbeat! storm-cluster-state storm-id task-id
(TaskHeartbeat. (current-time-secs)
(stats/render-stats! task-stats)))
(when @active (storm-conf TASK-HEARTBEAT-FREQUENCY-SECS))
:priority Thread/MAX_PRIORITY
:kill-fn report-error-and-die)
stream->component->grouper (outbound-components topology-context)
component->tasks (reverse-map task-info)
;; important it binds to virtual port before function returns
puller (msg/bind mq-context task-id)
;; TODO: consider DRYing things up and moving stats / tuple -> multiple components code here
task-transfer-fn (fn [task ^Tuple tuple]
(transfer-fn task tuple)
task-readable-name (get-readable-name topology-context)
emit-sampler (mk-stats-sampler storm-conf)
send-fn (fn this
([^Integer out-task-id ^Tuple tuple]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Emitting direct: " out-task-id "; " task-readable-name " " tuple))
(let [target-component (.getComponentId topology-context out-task-id)
component->grouping (stream->component->grouper (.getSourceStreamId tuple))
grouping (get component->grouping target-component)
out-task-id (if (or
;; This makes streams to/from system
;; component (like ackers) implicit
(system-component? component-id)
(system-component? target-component)
(when (and (not-nil? grouping) (not= :direct grouping))
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(when out-task-id
(task-transfer-fn out-task-id tuple))
(when (emit-sampler)
(stats/emitted-tuple! task-stats (.getSourceStreamId tuple))
(stats/transferred-tuples! task-stats (.getSourceStreamId tuple) 1)
([^Tuple tuple]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Emitting: " task-readable-name " " tuple))
(let [stream (.getSourceStreamId tuple)
;; TODO: this doesn't seem to be very fast
;; and seems to be the current bottleneck
out-tasks (mapcat
(fn [[out-component grouper]]
(when (= :direct grouper)
;; TODO: this is wrong, need to check how the stream was declared
(throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
(let [tasks (component->tasks out-component)
indices (collectify (grouper tuple))]
(for [i indices] (tasks i))))
(stream->component->grouper stream))
num-out-tasks (count out-tasks)]
(when (emit-sampler)
(stats/emitted-tuple! task-stats (.getSourceStreamId tuple))
(stats/transferred-tuples! task-stats (.getSourceStreamId tuple) num-out-tasks)
(if (= num-out-tasks 1)
(task-transfer-fn (first out-tasks) tuple)
;;TODO: optimize the out-tasks = 0 case by
;; not including this tuple in the ack list
;; for previous tuple
;; TODO: need to create the new ids, and then create the tuples, and then ack
(let [out-ids (repeatedly (count out-tasks) #(MessageId/generateId))]
(map (fn [id t]
(task-transfer-fn t (.copyWithNewId tuple id)))
(send-ack topology-context
executor-threads (dofor
[exec (mk-executors task-object storm-conf puller send-fn
storm-active-atom topology-context
task-stats report-error)]
(async-loop (fn [] (exec) (when @active 0))
:kill-fn report-error-and-die))
system-threads [heartbeat-thread]
all-threads (concat executor-threads system-threads)]
(log-message "Shutting down task " storm-id ":" task-id)
(reset! active false)
;; empty messages are skip messages (this unblocks the socket)
(msg/send-local-task-empty mq-context task-id)
(doseq [t all-threads]
(.interrupt t)
(.join t))
(.remove-task-heartbeat! storm-cluster-state storm-id task-id)
(.disconnect storm-cluster-state)
(.close puller)
(close-component task-object)
(log-message "Shut down task " storm-id ":" task-id))
(waiting? [this]
;; executor threads are independent since they don't sleep
;; -> they block on zeromq
(every? (memfn sleeping?) system-threads)
(defn- fail-spout-msg [^ISpout spout storm-conf msg-id ^Tuple tuple time-delta task-stats]
(log-message "Failing message " msg-id ": " tuple)
(.fail spout msg-id)
(when time-delta
(stats/spout-failed-tuple! task-stats (.getSourceStreamId tuple) time-delta)
(defn- ack-spout-msg [^ISpout spout storm-conf msg-id ^Tuple tuple time-delta task-stats]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Acking message " msg-id))
(.ack spout msg-id)
(when time-delta
(stats/spout-acked-tuple! task-stats (.getSourceStreamId tuple) time-delta)
(defmethod mk-executors ISpout [^ISpout spout storm-conf puller send-fn storm-active-atom
^TopologyContext topology-context task-stats report-error-fn]
(let [wait-fn (fn [] @storm-active-atom)
max-spout-pending (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)
deserializer (TupleDeserializer. storm-conf topology-context)
event-queue (ConcurrentLinkedQueue.)
sampler (mk-stats-sampler storm-conf)
pending (TimeCacheMap.
(reify TimeCacheMap$ExpiredCallback
(expire [this msg-id [spout-id tuple start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(.add event-queue #(fail-spout-msg spout storm-conf spout-id tuple time-delta task-stats)))
send-spout-msg (fn [out-stream-id values message-id out-task-id]
(let [task-id (.getThisTaskId topology-context)
gen-id (MessageId/generateId)
tuple-id (if message-id
(MessageId/makeRootId gen-id)
tuple (Tuple. topology-context
out-tasks (if out-task-id
(send-fn out-task-id tuple)
(send-fn tuple))]
(if (= 0 (storm-conf TOPOLOGY-ACKERS))
(.add event-queue #(ack-spout-msg spout storm-conf message-id tuple nil task-stats))
(when message-id
(.put pending gen-id [message-id
(if (sampler) (System/currentTimeMillis))])
(send-fn (Tuple. topology-context
[gen-id task-id]
output-collector (reify ISpoutOutputCollector
(^List emit [this ^int stream-id ^List tuple ^Object message-id]
(send-spout-msg stream-id tuple message-id nil)
(^void emitDirect [this ^int out-task-id ^int stream-id
^List tuple ^Object message-id]
(send-spout-msg stream-id tuple message-id out-task-id)
(.open spout storm-conf topology-context (SpoutOutputCollector. output-collector))
[(fn []
;; This design requires that spouts be non-blocking
(loop []
(when-let [event (.poll event-queue)]
(if (or (not max-spout-pending)
(< (.size pending) max-spout-pending))
(if (wait-fn)
(.nextTuple spout)
(Time/sleep 100))
;; TODO: log that it's getting throttled
(fn []
(let [^bytes ser-msg (msg/recv puller)]
;; skip empty messages (used during shutdown)
(when-not (empty? ser-msg)
(let [tuple (.deserialize deserializer ser-msg)
id (.getValue tuple 0)
[spout-id tuple-finished start-time-ms] (.remove pending id)
time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(when spout-id
(condp = (.getSourceStreamId tuple)
ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout storm-conf spout-id
tuple-finished time-delta task-stats))
ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout storm-conf spout-id
tuple-finished time-delta task-stats))
;; TODO: on failure, emit tuple to failure stream
(defn- tuple-time-delta! [^Map start-times ^Tuple tuple]
(let [start-time (.remove start-times tuple)]
(if start-time
(time-delta-ms start-time))
(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller send-fn storm-active-atom
^TopologyContext topology-context task-stats report-error-fn]
(let [deserializer (TupleDeserializer. storm-conf topology-context)
task-id (.getThisTaskId topology-context)
component-id (.getThisComponentId topology-context)
tuple-start-times (ConcurrentHashMap.)
sampler (mk-stats-sampler storm-conf)
output-collector (reify IInternalOutputCollector
(^List emit [this ^Tuple output]
(send-fn output)
(^void emitDirect [this ^int task-id ^Tuple output]
(send-fn task-id output)
(^void ack [this ^Tuple input-tuple ^List generated-ids]
(send-ack topology-context
(let [delta (tuple-time-delta! tuple-start-times input-tuple)]
(when delta
(stats/bolt-acked-tuple! task-stats
(.getSourceComponent input-tuple)
(.getSourceStreamId input-tuple)
(^void fail [this ^Tuple input-tuple]
(doseq [anchor (.. input-tuple getMessageId getAnchors)]
(send-fn (Tuple. topology-context
(let [delta (tuple-time-delta! tuple-start-times input-tuple)]
(when delta
(stats/bolt-failed-tuple! task-stats
(.getSourceComponent input-tuple)
(.getSourceStreamId input-tuple)
(^void reportError [this ^Throwable error]
(report-error-fn error)
(.prepare bolt
(OutputCollectorImpl. topology-context output-collector))
;; TODO: can get any SubscribedState objects out of the context now
[(fn []
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
;; buffer other tuples until fully synchronized, then process all of those tuples
;; then go into normal loop
;; spill to disk?
;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
;; or just timeout the sync messages that are coming in until full sync is hit from that task
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
(let [^bytes ser (msg/recv puller)]
(when-not (empty? ser) ; skip empty messages (used during shutdown)
(log-debug "Processing message")
(let [tuple (.deserialize deserializer ser)]
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
(log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context))
(when (sampler)
(.put tuple-start-times tuple (System/currentTimeMillis)))
(.execute bolt tuple)
(defmethod close-component ISpout [spout]
(.close spout))
(defmethod close-component IBolt [bolt]
(.cleanup bolt))
(defmethod mk-task-stats ISpout [_ rate]
(stats/mk-spout-stats rate))
(defmethod mk-task-stats IBolt [_ rate]
(stats/mk-bolt-stats rate))