blob: ea7b8dc0f5a1aba16085e320741ca88baefee6e4 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.messaging.netty-unit-test
(:use [clojure test])
(:import [backtype.storm.messaging TransportFactory])
(:use [backtype.storm bootstrap testing util]))
(bootstrap)
(def port 6700)
(def task 1)
(deftest test-basic
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-large-msg
(let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
(.term context)))
(deftest test-server-delayed
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
client (.connect context nil "localhost" port)
server (Thread.
(fn []
(Thread/sleep 1000)
(let [server (.bind context nil port)
iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close server)
)))
_ (.start server)
_ (.send client task (.getBytes req_msg))
]
(.close client)
(.join server)
(.term context)))
(deftest test-batch
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
server (.bind context nil port)
client (.connect context nil "localhost" port)]
(doseq [num (range 1 100000)]
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
(let [resp (ArrayList.)
received (atom 0)]
(while (< @received (- 100000 1))
(let [iter (.recv server 0 0)]
(while (.hasNext iter)
(let [msg (.next iter)]
(.add resp msg)
(swap! received inc)
))))
(doseq [num (range 1 100000)]
(let [req_msg (str num)
resp_msg (String. (.message (.get resp (- num 1))))]
(is (= req_msg resp_msg)))))
(.close client)
(.close server)
(.term context)))