kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From granthe...@apache.org
Subject [kudu] 03/03: KUDU-2812: Fix error reporting in KuduRestore
Date Mon, 06 May 2019 18:09:25 GMT
This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 81d715d555355d7af928c24e164a626f75713afe
Author: Grant Henke <granthenke@apache.org>
AuthorDate: Mon May 6 08:52:15 2019 -0500

    KUDU-2812: Fix error reporting in KuduRestore
    
    Before this patch, calls to
    `session.getPendingErrors.getRowErrors.length`
    would clear the errors buffer before we called it again
    to report the errors.
    
    This patch ensures we capture the result of
    `session.getPendingErrors` before checking the length.
    
    Note: This patch also updates the message format for
    both kudu-backup and kudu-spark.
    
    Change-Id: Iadf3941614a4879a9f35d1df9ee0cea274711c94
    Reviewed-on: http://gerrit.cloudera.org:8080/13244
    Reviewed-by: Will Berkeley <wdberkeley@gmail.com>
    Tested-by: Kudu Jenkins
---
 .../scala/org/apache/kudu/backup/KuduRestore.scala   | 20 ++++++++++++++------
 .../org/apache/kudu/spark/kudu/KuduContext.scala     | 17 +++++++++++------
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
index 77bddeb..3b6a537 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduRestore.scala
@@ -131,12 +131,20 @@ object KuduRestore {
             session.close()
           }
           // Fail the task if there are any errors.
-          val errorCount = session.getPendingErrors.getRowErrors.length
-          if (errorCount > 0) {
-            val errors =
-              session.getPendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
-            throw new RuntimeException(
-              s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
+          // It is important to capture all of the errors via getRowErrors and then check
+          // the length because each call to session.getPendingErrors clears the ErrorCollector.
+          val pendingErrors = session.getPendingErrors
+          if (pendingErrors.getRowErrors.nonEmpty) {
+            val errors = pendingErrors.getRowErrors
+            val sample = errors.take(5).map(_.getErrorStatus).mkString
+            if (pendingErrors.isOverflowed) {
+              throw new RuntimeException(
+                s"PendingErrors overflowed. Failed to write at least ${errors.length} rows
" +
+                  s"to Kudu; Sample errors: $sample")
+            } else {
+              throw new RuntimeException(
+                s"Failed to write ${errors.length} rows to Kudu; Sample errors: $sample")
+            }
           }
         }
       }
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 28aec32..a27a1e7 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -355,12 +355,17 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
         operation,
         lastPropagatedTimestamp,
         writeOptions)
-      val errorCount = pendingErrors.getRowErrors.length
-      if (errorCount > 0) {
-        val errors =
-          pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
-        throw new RuntimeException(
-          s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
+      if (pendingErrors.getRowErrors.nonEmpty) {
+        val errors = pendingErrors.getRowErrors
+        val sample = errors.take(5).map(_.getErrorStatus).mkString
+        if (pendingErrors.isOverflowed) {
+          throw new RuntimeException(
+            s"PendingErrors overflowed. Failed to write at least ${errors.length} rows "
+
+              s"to Kudu; Sample errors: $sample")
+        } else {
+          throw new RuntimeException(
+            s"Failed to write ${errors.length} rows to Kudu; Sample errors: $sample")
+        }
       }
     })
     log.info(s"completed $operation ops: duration histogram: $durationHistogram")


Mime
View raw message