storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: STORM-1642: Rethrow exception on serialization error and kill worker
Date Fri, 09 Jun 2017 20:38:59 GMT
Repository: storm
Updated Branches:
  refs/heads/master f47875195 -> ca10da2fd


STORM-1642: Rethrow exception on serialization error and kill worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4f9d5c01
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4f9d5c01
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4f9d5c01

Branch: refs/heads/master
Commit: 4f9d5c013da12284353a2411ef386c07ef210214
Parents: f478751
Author: Govind Menon <govindappumenon@gmail.com>
Authored: Fri Apr 28 16:11:19 2017 -0500
Committer: Govind Menon <govindappumenon@gmail.com>
Committed: Fri Jun 9 12:42:17 2017 -0500

----------------------------------------------------------------------
 .../netty/NettyUncaughtExceptionHandler.java    | 13 +++++++---
 .../messaging/netty/StormServerHandler.java     | 21 +++++++++++++---
 .../src/jvm/org/apache/storm/utils/Utils.java   | 25 +++++++++++---------
 3 files changed, 42 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4f9d5c01/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
index fd48bdc..87fae1e 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java
@@ -26,10 +26,17 @@ public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHa
     @Override
     public void uncaughtException(Thread t, Throwable e) {
         try {
+            LOG.error("Uncaught exception in netty " + e.getCause());
+        } catch (Throwable err) {
+            // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException
will handle it
+        }
+
+        try {
             Utils.handleUncaughtException(e);
-        } catch (Error error) {
-            LOG.info("Received error in netty thread.. terminating server...");
-            Runtime.getRuntime().exit(1);
+        } catch (Throwable throwable) {
+            LOG.error("Exception thrown while handling uncaught exception " + throwable.getCause());
         }
+        LOG.info("Received error in netty thread.. terminating server...");
+        Runtime.getRuntime().exit(1);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4f9d5c01/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index fbec965..1e060ad 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -26,10 +26,16 @@ import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class StormServerHandler extends SimpleChannelUpstreamHandler  {
     private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
+    private static final Set<Class> allowedExceptions = new HashSet<>(Arrays.asList(new
Class[] {IOException.class}));
     IServer server;
     private AtomicInteger failure_count; 
     private Channel channel;
@@ -67,8 +73,17 @@ public class StormServerHandler extends SimpleChannelUpstreamHandler  {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-        LOG.error("server errors in handling the request", e.getCause());
-        Utils.handleUncaughtException(e.getCause());
-        server.closeChannel(e.getChannel());
+        try {
+            LOG.error("server errors in handling the request", e.getCause());
+        } catch (Throwable err) {
+            // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException
will handle it
+        }
+        try {
+            Utils.handleUncaughtException(e.getCause(), allowedExceptions);
+        } catch (Error error) {
+            LOG.info("Received error in netty thread.. terminating server...");
+            Runtime.getRuntime().exit(1);
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4f9d5c01/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 26043d1..d9b6684 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -99,7 +99,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class Utils {
     public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
-
+    private static final Set<Class> defaultAllowedExceptions = new HashSet<>();
     public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator");
 
     private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
@@ -528,7 +528,11 @@ public class Utils {
     }
 
     public static void handleUncaughtException(Throwable t) {
-        if (t != null && t instanceof Error) {
+        handleUncaughtException(t, defaultAllowedExceptions);
+    }
+
+    public static void handleUncaughtException(Throwable t, Set<Class> allowedExceptions)
{
+        if (t != null) {
             if (t instanceof OutOfMemoryError) {
                 try {
                     System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
@@ -536,17 +540,16 @@ public class Utils {
                     //Again we don't want to exit because of logging issues.
                 }
                 Runtime.getRuntime().halt(-1);
-            } else {
-                //Running in daemon mode, we would pass Error to calling thread.
-                throw (Error) t;
             }
-        } else if (t instanceof Exception) {
-            System.err.println("Uncaught Exception detected. Leave error log and ignore...
Exception: " + t);
-            System.err.println("Stack trace:");
-            StringWriter sw = new StringWriter();
-            t.printStackTrace(new PrintWriter(sw));
-            System.err.println(sw.toString());
         }
+
+        if(allowedExceptions.contains(t.getClass())) {
+            LOG.info("Swallowing {} {}", t.getClass(), t);
+            return;
+        }
+
+        //Running in daemon mode, we would pass Error to calling thread.
+        throw new Error(t);
     }
 
     public static byte[] thriftSerialize(TBase t) {


Mime
View raw message