blob: 17dfded7857bc97cecc251b4935cc9212b42ac29 [file] [log] [blame]
(ns backtype.storm.daemon.drpc
(:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [org.apache.thrift TException])
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor])
(:import [java.util.concurrent Semaphore])
(:import [backtype.storm.drpc SpoutAdder])
(:import [java.net InetAddress])
(:use [backtype.storm bootstrap])
(:gen-class))
(bootstrap)
(def DEFAULT-PORT 3772) ; "drpc"
(def REQUEST-TIMEOUT-SECS 600)
(def TIMEOUT-CHECK-SECS 60)
;; TODO: change this to use TimeCacheMap
(defn service-handler [^SpoutAdder spout-adder port]
(let [ctr (atom 0)
id->sem (atom {})
id->result (atom {})
id->start (atom {})
cleanup (fn [id] (swap! id->sem dissoc id)
(swap! id->result dissoc id)
(swap! id->start dissoc id))
my-ip (.getHostAddress (InetAddress/getLocalHost))
]
(async-loop
(fn []
(doseq [[id start] @id->start]
(when (> (time-delta start) REQUEST-TIMEOUT-SECS)
(if-let [sem (@id->sem id)]
(.release sem))
(cleanup id)
))
TIMEOUT-CHECK-SECS
))
(reify DistributedRPC$Iface
(^String execute [this ^String function ^String args]
(let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
^Semaphore sem (Semaphore. 0)
return-info (to-json {"ip" my-ip "port" port "id" id})
]
(swap! id->start assoc id (current-time-secs))
(swap! id->sem assoc id sem)
(.add spout-adder function args return-info)
(.acquire sem)
(let [result (@id->result id)]
(cleanup id)
result
)))
(^void result [this ^String id ^String result]
(let [^Semaphore sem (@id->sem id)]
(when sem
(swap! id->result assoc id result)
(.release sem)
)))
)))
(defn launch-server!
([spout-adder]
(launch-server! DEFAULT-PORT spout-adder))
([port spout-adder]
(let [options (THsHaServer$Options.)
_ (set! (. options maxWorkerThreads) 64)
service-handler (service-handler spout-adder port)
server (THsHaServer.
(DistributedRPC$Processor. service-handler)
(TNonblockingServerSocket. port)
(TBinaryProtocol$Factory.) options)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
(log-message "Starting Distributed RPC server...")
(.serve server))))
(defn -main [spout-adder-class & args]
(let [form (concat ['new (symbol spout-adder-class)] args)]
(launch-server! (eval form))
))