blob: 08fb12e5d019e9e7cd620a7b9a770e555d009a66 [file] [log] [blame]
(ns backtype.storm.event
(:use [backtype.storm log util])
(:import [backtype.storm.utils Time Utils])
(:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
(defprotocol EventManager
(add [this event-fn])
(waiting? [this])
(shutdown [this]))
(defn event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
(let [added (atom 0)
processed (atom 0)
^LinkedBlockingQueue queue (LinkedBlockingQueue.)
running (atom true)
runner (Thread.
(fn []
(while @running
(let [r (.take queue)]
(swap! processed inc)
(catch InterruptedException t
(throw t))
(catch Throwable t
(log-error t "Error when processing event " r)
(halt-process! 20 "Error when processing an event"))
(catch InterruptedException t
(log-message "Event manager interrupted")))
(.setDaemon runner daemon?)
(.start runner)
(add [this event-fn]
;; should keep track of total added and processed to know if this is finished yet
(when-not @running
(throw (RuntimeException. "Cannot add events to a shutdown event manager")))
(swap! added inc)
(.put queue event-fn)
(waiting? [this]
(or (Time/isThreadWaiting runner)
(= @processed @added)
(shutdown [this]
(reset! running false)
(.interrupt runner)
(.join runner)