blob: 441360a96f7937e076a2fc0260451d1cdf9dcd7b [file] [log] [blame]
(ns backtype.storm.messaging.netty-unit-test
(:use [clojure test])
(:import [backtype.storm.messaging TransportFactory])
(:use [backtype.storm bootstrap testing util]))
(bootstrap)
(def port 6700)
(def task 1)
(deftest test-basic
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 }
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
resp (.recv server 0)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-large-msg
(let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 }
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
resp (.recv server 0)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-server-delayed
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000 }
context (TransportFactory/makeContext storm-conf)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
_ (Thread/sleep 1000)
server (.bind context nil port)
resp (.recv server 0)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-batch
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000}
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)]
(doseq [num (range 1 100000)]
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
(doseq [num (range 1 100000)]
(let [req_msg (str num)
resp (.recv server 0)
resp_msg (String. (.message resp))]
(is (= req_msg resp_msg))))
(.close client)
(.close server)
(.term context)))