Merge branch 'storm-537-infinite-reconnection' of https://github.com/Sergeant007/storm into STORM-537
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index fed684e..c516b63 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -142,6 +142,8 @@
}
int tried = 0;
+ //setting channel to null to make sure we throw an exception when reconnection fails
+ channel = null;
while (tried <= max_retries) {
LOG.info("Reconnect started for {}... [{}]", name(), tried);
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index ea7b8dc..ed5797d 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -20,15 +20,15 @@
(bootstrap)
-(def port 6700)
-(def task 1)
+(def port 6700)
+(def task 1)
(deftest test-basic
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@ -43,14 +43,14 @@
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
- (.term context)))
+ (.term context)))
(deftest test-large-msg
- (let [req_msg (apply str (repeat 2048000 'c'))
+ (let [req_msg (apply str (repeat 2048000 'c'))
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@ -65,21 +65,21 @@
(is (= req_msg (String. (.message resp))))
(.close client)
(.close server)
- (.term context)))
-
+ (.term context)))
+
(deftest test-server-delayed
(let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
}
context (TransportFactory/makeContext storm-conf)
client (.connect context nil "localhost" port)
-
+
server (Thread.
(fn []
(Thread/sleep 1000)
@@ -88,7 +88,7 @@
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
- (.close server)
+ (.close server)
)))
_ (.start server)
_ (.send client task (.getBytes req_msg))
@@ -101,7 +101,7 @@
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
STORM-MESSAGING-NETTY-MAX-RETRIES 10
- STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+ STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@ -112,7 +112,7 @@
(doseq [num (range 1 100000)]
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
-
+
(let [resp (ArrayList.)
received (atom 0)]
(while (< @received (- 100000 1))
@@ -126,7 +126,7 @@
(let [req_msg (str num)
resp_msg (String. (.message (.get resp (- num 1))))]
(is (= req_msg resp_msg)))))
-
+
(.close client)
(.close server)
(.term context)))