blob: 4aa67abec5b45345f50784b3634ef288035aa330 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.messaging.local
(:refer-clojure :exclude [send])
(:use [backtype.storm log])
(:import [backtype.storm.messaging IContext IConnection TaskMessage])
(:import [java.util.concurrent LinkedBlockingQueue])
(:import [java.util Map Iterator])
(:import [java.util Iterator ArrayList])
(:gen-class))
(defn add-queue! [queues-map lock storm-id port]
(let [id (str storm-id "-" port)]
(locking lock
(when-not (contains? @queues-map id)
(swap! queues-map assoc id (LinkedBlockingQueue.))))
(@queues-map id)))
(deftype LocalConnection [storm-id port queues-map lock queue]
IConnection
(^Iterator recv [this ^int flags ^int clientId]
(when-not queue
(throw (IllegalArgumentException. "Cannot receive on this socket")))
(let [ret (ArrayList.)
msg (if (= flags 1) (.poll queue) (.take queue))]
(if msg
(do
(.add ret msg)
(.iterator ret))
nil)))
(^void send [this ^int taskId ^bytes payload]
(let [send-queue (add-queue! queues-map lock storm-id port)]
(.put send-queue (TaskMessage. taskId payload))
))
(^void send [this ^Iterator iter]
(let [send-queue (add-queue! queues-map lock storm-id port)]
(while (.hasNext iter)
(.put send-queue (.next iter)))
))
(^void close [this]
))
(deftype LocalContext [^{:unsynchronized-mutable true} queues-map
^{:unsynchronized-mutable true} lock]
IContext
(^void prepare [this ^Map storm-conf]
(set! queues-map (atom {}))
(set! lock (Object.)))
(^IConnection bind [this ^String storm-id ^int port]
(LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port)))
(^IConnection connect [this ^String storm-id ^String host ^int port]
(LocalConnection. storm-id port queues-map lock nil))
(^void term [this]
))
(defn mk-context []
(let [context (LocalContext. nil nil)]
(.prepare ^IContext context nil)
context))