blob: e5678657cab4d51ea2b8805b43ef6c3296647d9d [file] [log] [blame]
(ns zilch.virtual-port
(:use [clojure.contrib.def :only [defnk]])
(:use [backtype.storm util log])
(:require [zilch [mq :as mq]])
(:import [java.nio ByteBuffer])
(:import [java.util.concurrent Semaphore]))
(mq/zeromq-imports)
(defn mk-packet [virtual-port ^bytes message]
(let [bb (ByteBuffer/allocate (+ 2 (count message)))]
(.putShort bb (short virtual-port))
(.put bb message)
(.array bb)
))
(defn parse-packet [^bytes packet]
(let [bb (ByteBuffer/wrap packet)
port (.getShort bb)
msg (byte-array (- (count packet) 2))]
(.get bb msg)
[port msg]
))
(defn virtual-url [port]
(str "inproc://" port))
(defn- get-virtual-socket! [context mapping-atom port]
(when-not (contains? @mapping-atom port)
(log-message "Connecting to virtual port " port)
(swap! mapping-atom
assoc
port
(-> context (mq/socket mq/push) (mq/connect (virtual-url port)))
))
(@mapping-atom port))
(defn close-virtual-sockets! [mapping-atom]
(doseq [[_ virtual-socket] @mapping-atom]
(.close virtual-socket))
(reset! mapping-atom {}))
(defn virtual-send
([^ZMQ$Socket socket virtual-port ^bytes message flags]
(mq/send socket (mk-packet virtual-port message) flags))
([^ZMQ$Socket socket virtual-port ^bytes message]
(virtual-send socket virtual-port message ZMQ/NOBLOCK)))
(defnk launch-virtual-port!
[context url :daemon true
:kill-fn (fn [] (System/exit 1))
:priority Thread/NORM_PRIORITY
:valid-ports nil]
(let [valid-ports (set (map short valid-ports))
vthread (async-loop
(fn [^ZMQ$Socket socket virtual-mapping]
(let [[port msg] (parse-packet (mq/recv socket))]
(if (= port -1)
(do
(log-message "Virtual port " url " received shutdown notice")
(close-virtual-sockets! virtual-mapping)
(.close socket)
nil )
(if (or (nil? valid-ports) (contains? valid-ports port))
(let [^ZMQ$Socket virtual-socket (get-virtual-socket! context virtual-mapping port)]
;; TODO: probably need to handle multi-part messages here or something
(mq/send virtual-socket msg)
0
)
(log-message "Received invalid message directed at port " port ". Dropping...")
))))
:args-fn (fn [] [(-> context (mq/socket mq/pull) (mq/bind url)) (atom {})])
:daemon daemon
:kill-fn kill-fn
:priority priority)]
(fn []
(let [kill-socket (-> context (mq/socket mq/push) (mq/connect url))]
(log-message "Shutting down virtual port at url: " url)
(virtual-send kill-socket
-1
(mq/barr 1))
(.close kill-socket)
(log-message "Waiting for virtual port at url " url " to die")
(.join vthread)
(log-message "Shutdown virtual port at url: " url)
))))
(defn virtual-bind
[^ZMQ$Socket socket virtual-port]
(mq/bind socket (virtual-url virtual-port))
)
(defn virtual-connect
[^ZMQ$Socket socket virtual-port]
(mq/connect socket (virtual-url virtual-port))
)