storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [27/47] storm git commit: [STORM-537] A worker reconnects infinitely to another dead worker
Date Wed, 12 Nov 2014 16:45:28 GMT
[STORM-537] A worker reconnects infinitely to another dead worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1aacccf2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1aacccf2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1aacccf2

Branch: refs/heads/security
Commit: 1aacccf286829e9289d86a6ed10b23cb2b21bc47
Parents: 5a46038
Author: Sergey Tryuber <stryuber@gmail.com>
Authored: Wed Oct 29 18:27:56 2014 +0300
Committer: Sergey Tryuber <stryuber@gmail.com>
Committed: Wed Oct 29 18:36:35 2014 +0300

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  |   1 +
 .../storm/messaging/netty_unit_test.clj         | 179 +++++++++++++++++--
 2 files changed, 165 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1aacccf2/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index fed684e..3e4c2f6 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -153,6 +153,7 @@ public class Client implements IConnection {
                 if (!future.isSuccess()) {
                     if (null != current) {
                         current.close();
+                        channel = null;
                     }
                 } else {
                     channel = current;

http://git-wip-us.apache.org/repos/asf/storm/blob/1aacccf2/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index ea7b8dc..b2269ad 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -15,20 +15,21 @@
 ;; limitations under the License.
 (ns backtype.storm.messaging.netty-unit-test
   (:use [clojure test])
-  (:import [backtype.storm.messaging TransportFactory])
+  (:import [backtype.storm.messaging TransportFactory TaskMessage])
+  (:import [java.util.concurrent ExecutionException])
   (:use [backtype.storm bootstrap testing util]))
 
 (bootstrap)
 
-(def port 6700) 
-(def task 1) 
+(def port 6700)
+(def task 1)
 
 (deftest test-basic
   (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@ -43,14 +44,14 @@
     (is (= req_msg (String. (.message resp))))
     (.close client)
     (.close server)
-    (.term context)))    
+    (.term context)))
 
 (deftest test-large-msg
-  (let [req_msg (apply str (repeat 2048000 'c')) 
+  (let [req_msg (apply str (repeat 2048000 'c'))
         storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@ -65,21 +66,21 @@
     (is (= req_msg (String. (.message resp))))
     (.close client)
     (.close server)
-    (.term context)))    
-    
+    (.term context)))
+
 (deftest test-server-delayed
     (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
                     }
         context (TransportFactory/makeContext storm-conf)
         client (.connect context nil "localhost" port)
-        
+
         server (Thread.
                 (fn []
                   (Thread/sleep 1000)
@@ -88,7 +89,7 @@
                         resp (.next iter)]
                     (is (= task (.task resp)))
                     (is (= req_msg (String. (.message resp))))
-                    (.close server) 
+                    (.close server)
                   )))
         _ (.start server)
         _ (.send client task (.getBytes req_msg))
@@ -101,7 +102,7 @@
   (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
                     STORM-MESSAGING-NETTY-MAX-RETRIES 10
-                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                     STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
@@ -112,7 +113,7 @@
     (doseq [num  (range 1 100000)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
-    
+
     (let [resp (ArrayList.)
           received (atom 0)]
       (while (< @received (- 100000 1))
@@ -126,7 +127,155 @@
       (let [req_msg (str num)
             resp_msg (String. (.message (.get resp (- num 1))))]
         (is (= req_msg resp_msg)))))
-   
+
     (.close client)
     (.close server)
     (.term context)))
+
+
+(deftest test-reconnect-to-permanently-failed-server
+  "Tests that if connection to a server already established and server fails, then
+  Client#connect() throws an exception"
+  (let [dummy_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+        poison_msg (String. "kill_the_server")
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 5 ; just to decrease test duration
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    ;critical for this test
+                    ;                    STORM-NETTY-MESSAGE-BATCH-SIZE 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        client (.connect context nil "localhost" port)
+        server_fn (fn []
+                    (let [server (.bind context nil port)
+                          poll (fn []
+                                 (let [iter (.recv server 0 0)
+                                       result ()]
+                                   (if (nil? iter) () (iterator-seq iter))
+                                   ))
+                          process_msg (fn [msg]
+                                        (let [msg_body (String. (.message msg))]
+                                          (if (= poison_msg msg_body)
+                                            (do (print "Received a poison...")
+                                              true)
+                                            (do (is (= dummy_msg msg_body))
+                                              (println (str "Received: " msg_body))
+                                              (Thread/sleep 100)
+                                              false))
+                                          ))
+                          ]
+                      (loop [need_exit false]
+                        (when (or (false? need_exit) (nil? need_exit))
+                          (recur (some true? (map process_msg (poll))))))
+                      ;                          (recur (some true?  (poll)))))
+                      (.close server)
+                      (println "SERVER CLOSED")
+                      ))
+        stop_server (fn [server_future]
+                      (.send client task (.getBytes poison_msg))
+                      (if (= "timeout" (deref server_future 5000 "timeout"))
+                        (do
+                          ;Note, that this does not stop Server thread
+                          ;because of ignoring InterruptedException in Server#recv (what
is strange)
+                          (future-cancel server_future)
+                          (throw (RuntimeException. "Error. Server didn't stop as we asked
it."))
+                          ))
+                      )
+        server_1 (future (server_fn))
+        _ (println "Let the client connect to a server initially")
+        _ (.send client task (.getBytes dummy_msg))
+        _ (println "Permanently stopping the server")
+        _ (stop_server server_1)
+        _ (println "Sending batch of messages to the dead server")
+        batch (future (.send client (.iterator [(TaskMessage. task (.getBytes dummy_msg))
+                                                (TaskMessage. task (.getBytes dummy_msg))
+                                                (TaskMessage. task (.getBytes dummy_msg))
+                                                (TaskMessage. task (.getBytes dummy_msg))])))
+        _ (is
+            (thrown-cause-with-msg? ExecutionException #".*Remote address is not reachable\.
We will close this client.*"
+              (deref batch (* 2 (* (get storm-conf STORM-MESSAGING-NETTY-MAX-RETRIES) (get
storm-conf STORM-MESSAGING-NETTY-MIN-SLEEP-MS))) "timeout")))
+        _ (future-cancel batch)
+        ]
+    (.close client)
+    (.term context)))
+
+(deftest test-reconnect-to-temporarily-failed-server
+  "Tests that if connection to a server already established and server TEMPORARILY fails,
then
+  Client#connect() succeeds after several re-tries"
+  (let [dummy_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+        poison_msg (String. "kill_the_server")
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 50 ; important for this test
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    ;critical for this test
+                    ;                    STORM-NETTY-MESSAGE-BATCH-SIZE 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        client (.connect context nil "localhost" port)
+
+        server_fn (fn []
+                    (let [server (.bind context nil port)
+                          poll (fn []
+                                 (let [iter (.recv server 0 0)
+                                       result ()]
+                                   (if (nil? iter) () (iterator-seq iter))
+                                   ))
+                          process_msg (fn [msg]
+                                        (let [msg_body (String. (.message msg))]
+                                          (if (= poison_msg msg_body)
+                                            (do (print "Received a poison...")
+                                              true)
+                                            (do (is (= dummy_msg msg_body))
+                                              (println (str "Received: " msg_body))
+                                              (Thread/sleep 100)
+                                              false))
+                                          ))
+                          ]
+                      (loop [need_exit false]
+                        (when (or (false? need_exit) (nil? need_exit))
+                          (recur (some true? (map process_msg (poll))))))
+                      ;                          (recur (some true?  (poll)))))
+                      (.close server)
+                      (println "SERVER CLOSED")
+                      ))
+        stop_server (fn [server_future]
+                      (.send client task (.getBytes poison_msg))
+
+                      (if (= "timeout" (deref server_future 5000 "timeout"))
+                        (do
+                          ;Note, that this does not stop Server thread
+                          ;because of ignoring InterruptedException in Server#recv (what
is strange)
+                          (future-cancel server_future)
+                          (throw (RuntimeException. "Error. Server didn't stop as we asked
it."))
+                          ))
+                      )
+        server_1 (future (server_fn))
+        _ (println "Let the client connect to a server initially")
+        _ (.send client task (.getBytes dummy_msg))
+        _ (println "Closing the server")
+        _ (stop_server server_1)
+        _ (println "Connecting to the temporarily dead server")
+        _ (let [reconnect (future (.send client task (.getBytes dummy_msg)))
+                _ (print "Sleeping for 10 seconds before resuming the server...")
+                _ (Thread/sleep 10000)
+                server_2 (future (server_fn))
+                _ (println "RESUMED. Expecting that client will send the message successfully.")
+                _ (if (= "timeout" (deref reconnect 15000 "timeout"))
+                    (do
+                      (future-cancel reconnect)
+                      "timeout")
+                    )
+                ]
+            (stop_server server_2)
+            )
+        ]
+    (.close client)
+    (.term context)))


Mime
View raw message