beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] branch spark-runner_structured-streaming updated: Improve exception flow
Date Tue, 11 Dec 2018 15:02:24 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this
push:
     new 8bcfa5f  Improve exception flow
8bcfa5f is described below

commit 8bcfa5feb2cdb3fad59bddaee19b619d333716d8
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Tue Dec 11 16:00:26 2018 +0100

    Improve exception flow
---
 .../spark/structuredstreaming/translation/io/DatasetSource.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
index 75cdd5d..d23ecf3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
@@ -137,6 +136,8 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
                   try {
                     reader = source.createReader(options);
                   } catch (IOException e) {
+                    throw new RuntimeException(
+                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(),
e);
                   }
                   return new DatasetMicroBatchPartitionReader(reader);
                 }
@@ -145,9 +146,9 @@ public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
         return result;
 
       } catch (Exception e) {
-        e.printStackTrace();
+        throw new RuntimeException(
+            "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
       }
-      return result;
     }
   }
 


Mime
View raw message