blob: c1dfb1b2a2f8e2512a0feed120e5b263093adc7d [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.event
(:use [backtype.storm log util])
(:import [backtype.storm.utils Time Utils])
(:import [java.io InterruptedIOException])
(:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))
(defprotocol EventManager
(add [this event-fn])
(waiting? [this])
(shutdown [this]))
(defn event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
[daemon?]
(let [added (atom 0)
processed (atom 0)
^LinkedBlockingQueue queue (LinkedBlockingQueue.)
running (atom true)
runner (Thread.
(fn []
(try-cause
(while @running
(let [r (.take queue)]
(r)
(swap! processed inc)))
(catch InterruptedIOException t
(log-message "Event manager interrupted while doing IO"))
(catch InterruptedException t
(log-message "Event manager interrupted"))
(catch Throwable t
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")))))]
(.setDaemon runner daemon?)
(.start runner)
(reify
EventManager
(add
[this event-fn]
;; should keep track of total added and processed to know if this is finished yet
(when-not @running
(throw (RuntimeException. "Cannot add events to a shutdown event manager")))
(swap! added inc)
(.put queue event-fn))
(waiting?
[this]
(or (Time/isThreadWaiting runner)
(= @processed @added)))
(shutdown
[this]
(reset! running false)
(.interrupt runner)
(.join runner)))))