blob: f7d3802d35d5cec2f408f2cacd47950c5b3302bb [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.messaging.netty-unit-test
(:use [clojure test])
(:import [backtype.storm.messaging TransportFactory])
(:use [backtype.storm testing util config log])
(:use [backtype.storm.daemon.worker :only [is-connection-ready]])
(:import [java.util ArrayList]))
(def port (available-port))
(def task 1)
;; In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be
;; activated once all the workers' connections are ready. The tests in this file however launch Netty servers and
;; clients directly, and thus we must ensure manually that the server and the client connections are ready before we
;; commence testing. If we don't do this, then we will lose the first messages being sent between the client and the
;; server, which will fail the tests.
(defn- wait-until-ready
([connections]
(do (log-message "Waiting until all Netty connections are ready...")
(wait-until-ready connections 0)))
([connections waited-ms]
(let [interval-ms 10
max-wait-ms 5000]
(if-not (every? is-connection-ready connections)
(if (<= waited-ms max-wait-ms)
(do
(Thread/sleep interval-ms)
(wait-until-ready connections (+ waited-ms interval-ms)))
(throw (RuntimeException. (str "Netty connections were not ready within " max-wait-ms " ms"))))
(log-message "All Netty connections are ready")))))
(deftest test-basic
(log-message "Should send and receive a basic message")
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])
_ (.send client task (.getBytes req_msg))
iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-large-msg
(log-message "Should send and receive a large message")
(let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])
_ (.send client task (.getBytes req_msg))
iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-batch
(let [num-messages 100000
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-AUTHENTICATION false
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
_ (log-message "Should send and receive many messages (testing with " num-messages " messages)")
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (wait-until-ready [server client])]
(doseq [num (range 1 num-messages)]
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
(let [resp (ArrayList.)
received (atom 0)]
(while (< @received (- num-messages 1))
(let [iter (.recv server 0 0)]
(while (.hasNext iter)
(let [msg (.next iter)]
(.add resp msg)
(swap! received inc)
))))
(doseq [num (range 1 num-messages)]
(let [req_msg (str num)
resp_msg (String. (.message (.get resp (- num 1))))]
(is (= req_msg resp_msg)))))
(.close client)
(.close server)
(.term context)))