blob: c154ed80ace6002e407dff9cd15718ebf1ce68cb [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.messaging.loader
(:use [backtype.storm util log])
(:import [java.util ArrayList Iterator])
(:import [backtype.storm.messaging IContext IConnection TaskMessage])
(:import [backtype.storm.utils DisruptorQueue MutableObject])
(:require [backtype.storm.messaging [local :as local]])
(:require [backtype.storm [disruptor :as disruptor]]))
(defn mk-local-context []
(local/mk-context))
(defn- mk-receive-thread [storm-id port transfer-local-fn daemon kill-fn priority socket thread-id]
(async-loop
(fn []
(log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id " ]")
(fn []
(let [batched (ArrayList.)
^Iterator iter (.recv ^IConnection socket 0 thread-id)
closed (atom false)]
(when iter
(while (and (not @closed) (.hasNext iter))
(let [packet (.next iter)
task (if packet (.task ^TaskMessage packet))
message (if packet (.message ^TaskMessage packet))]
(if (= task -1)
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
(reset! closed true))
(when packet (.add batched [task message]))))))
(when (not @closed)
(do
(if (> (.size batched) 0)
(transfer-local-fn batched))
0)))))
:factory? true
:daemon daemon
:kill-fn kill-fn
:priority priority
:thread-name (str "worker-receiver-thread-" thread-id)))
(defn- mk-receive-threads [storm-id port transfer-local-fn daemon kill-fn priority socket thread-count]
(into [] (for [thread-id (range thread-count)]
(mk-receive-thread storm-id port transfer-local-fn daemon kill-fn priority socket thread-id))))
(defnk launch-receive-thread!
[context socket storm-id receiver-thread-count port transfer-local-fn
:daemon true
:kill-fn (fn [t] (System/exit 1))
:priority Thread/NORM_PRIORITY]
(let [local-hostname (memoized-local-hostname)
thread-count (if receiver-thread-count receiver-thread-count 1)
vthreads (mk-receive-threads storm-id port transfer-local-fn daemon kill-fn priority socket thread-count)]
(fn []
(let [kill-socket (.connect ^IContext context storm-id local-hostname port)]
(log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
(.send ^IConnection kill-socket
-1 (byte-array []))
(.close ^IConnection kill-socket)
(log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
(for [thread-id (range thread-count)]
(.join (vthreads thread-id)))
(log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
))))