hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [hadoop] rdblue commented on a change in pull request #1442: HADOOP-16570. S3A committers encounter scale issues
Date Tue, 01 Oct 2019 16:23:20 GMT
rdblue commented on a change in pull request #1442: HADOOP-16570. S3A committers encounter
scale issues
URL: https://github.com/apache/hadoop/pull/1442#discussion_r330151306
 
 

 ##########
 File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
 ##########
 @@ -430,28 +445,151 @@ protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context)
   }
 
   /**
-   * Commit a list of pending uploads.
+   * Commit all the pending uploads.
+   * Each file listed in the ActiveCommit instance is queued for processing
+   * in a separate thread; its contents are loaded and then (sequentially)
+   * committed.
+   * On a failure or abort of a single file's commit, all its uploads are
+   * aborted.
+   * The revert operation lists the files already committed and deletes them.
    * @param context job context
-   * @param pending list of pending uploads
+   * @param pending  pending uploads
    * @throws IOException on any failure
    */
-  protected void commitPendingUploads(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+  protected void commitPendingUploads(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
     if (pending.isEmpty()) {
       LOG.warn("{}: No pending uploads to commit", getRole());
     }
-    LOG.debug("{}: committing the output of {} task(s)",
-        getRole(), pending.size());
-    try(CommitOperations.CommitContext commitContext
+    try (DurationInfo ignored = new DurationInfo(LOG,
+        "committing the output of %s task(s)", pending.size());
+        CommitOperations.CommitContext commitContext
             = initiateCommitOperation()) {
-      Tasks.foreach(pending)
+
+      Tasks.foreach(pending.getSourceFiles())
           .stopOnFailure()
           .executeWith(buildThreadPool(context))
+          .onFailure((path, exception) ->
+              loadAndAbort(commitContext, pending, path, true, false))
+          .abortWith(path ->
+              loadAndAbort(commitContext, pending, path, true, false))
+          .revertWith(path ->
+              loadAndRevert(commitContext, pending, path))
+          .run(path ->
+              loadAndCommit(commitContext, pending, path));
+    }
+  }
+
+  /**
+   * Run a precommit check that all files are loadable.
+   * This check avoids the situation where the inability to read
+   * a file only surfaces partway through the job commit, so
+   * results in the destination being tainted.
+   * @param context job context
+   * @param pending the pending operations
+   * @throws IOException any failure
+   */
+  protected void precommitCheckPendingFiles(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
+
+    FileSystem sourceFS = pending.getSourceFS();
+    try (DurationInfo ignored =
+             new DurationInfo(LOG, "Preflight Load of pending files")) {
+
+      Tasks.foreach(pending.getSourceFiles())
+          .stopOnFailure()
 
 Review comment:
   Should this explicitly call `throwFailureWhenFinished()`? I typically call either that
one or `suppressFailures()` so that it is obvious to the reader how the error is handled.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


Mime
View raw message