beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject [1/2] beam git commit: jstorm-runner: handle UserCodeException in TestJStormRunner, and wraps in PipelineExecutionException if receives a checked Exception.
Date Thu, 07 Sep 2017 06:13:42 GMT
Repository: beam
Updated Branches:
  refs/heads/jstorm-runner 7a28bf1af -> 80bd7f8be


jstorm-runner: handle UserCodeException in TestJStormRunner, and wraps in PipelineExecutionException
if receives a checked Exception.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34bf5af9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34bf5af9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34bf5af9

Branch: refs/heads/jstorm-runner
Commit: 34bf5af9b19c2fb90f2301823b698d06f21879a1
Parents: 7a28bf1
Author: Pei He <pei@apache.org>
Authored: Mon Sep 4 15:02:14 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Sep 7 14:12:51 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/TestJStormRunner.java   | 32 +++++++++++++++++---
 1 file changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/34bf5af9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index 9d2e2f1..b637b7c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,23 +85,23 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult>
{
                result.getTopologyName(), numberOfAssertions);
       if (numberOfAssertions == 0) {
         result.waitUntilFinish(Duration.millis(RESULT_WAITING_TIME_MS));
-        Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
+        Throwable taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
         if (taskExceptionRec != null) {
           LOG.info("Exception was found.", taskExceptionRec);
-          throw new RuntimeException(taskExceptionRec.getCause());
+          handleTaskException(taskExceptionRec);
         }
         return result;
       } else {
         for (int waitTime = 0; waitTime <= ASSERTION_WAITING_TIME_MS;) {
           Optional<Boolean> success = checkForPAssertSuccess(result.metrics(), numberOfAssertions);
-          Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
+          Throwable taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
           if (success.isPresent() && success.get()) {
             return result;
           } else if (success.isPresent() && !success.get()) {
             throw new AssertionError("Failed assertion checks.");
           } else if (taskExceptionRec != null) {
             LOG.info("Exception was found.", taskExceptionRec);
-            throw new RuntimeException(taskExceptionRec.getCause());
+            handleTaskException(taskExceptionRec);
           } else {
             JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS);
             waitTime += RESULT_CHECK_INTERVAL_MS;
@@ -116,6 +117,29 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult>
{
     }
   }
 
+  private void handleTaskException(Throwable taskExceptionRec) {
+    Throwable cause;
+    if (taskExceptionRec.getCause() != null) {
+      cause = taskExceptionRec.getCause();
+    } else {
+      cause = taskExceptionRec;
+    }
+
+    UserCodeException innermostUserCodeException = null;
+    for (Throwable current = cause; current.getCause() != null; current = current.getCause())
{
+      if (current instanceof UserCodeException) {
+        innermostUserCodeException = ((UserCodeException) current);
+      }
+    }
+    if (innermostUserCodeException != null) {
+      cause = innermostUserCodeException.getCause();
+    }
+    if (cause instanceof AssertionError) {
+      throw (AssertionError) cause;
+    }
+    throw new Pipeline.PipelineExecutionException(cause);
+  }
+
   private Optional<Boolean> checkForPAssertSuccess(
       MetricResults metricResults,
       int expectedNumberOfAssertions) {


Mime
View raw message