hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [hadoop] rdblue commented on a change in pull request #1442: HADOOP-16570. S3A committers encounter scale issues
Date Tue, 01 Oct 2019 16:26:34 GMT
rdblue commented on a change in pull request #1442: HADOOP-16570. S3A committers encounter
scale issues
URL: https://github.com/apache/hadoop/pull/1442#discussion_r330152720
 
 

 ##########
 File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
 ##########
 @@ -430,28 +445,151 @@ protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context)
   }
 
   /**
-   * Commit a list of pending uploads.
+   * Commit all the pending uploads.
+   * Each file listed in the ActiveCommit instance is queued for processing
+   * in a separate thread; its contents are loaded and then (sequentially)
+   * committed.
+   * On a failure or abort of a single file's commit, all its uploads are
+   * aborted.
+   * The revert operation lists the files already committed and deletes them.
    * @param context job context
-   * @param pending list of pending uploads
+   * @param pending  pending uploads
    * @throws IOException on any failure
    */
-  protected void commitPendingUploads(JobContext context,
-      List<SinglePendingCommit> pending) throws IOException {
+  protected void commitPendingUploads(
+      final JobContext context,
+      final ActiveCommit pending) throws IOException {
     if (pending.isEmpty()) {
       LOG.warn("{}: No pending uploads to commit", getRole());
     }
-    LOG.debug("{}: committing the output of {} task(s)",
-        getRole(), pending.size());
-    try(CommitOperations.CommitContext commitContext
+    try (DurationInfo ignored = new DurationInfo(LOG,
+        "committing the output of %s task(s)", pending.size());
+        CommitOperations.CommitContext commitContext
             = initiateCommitOperation()) {
-      Tasks.foreach(pending)
+
+      Tasks.foreach(pending.getSourceFiles())
           .stopOnFailure()
           .executeWith(buildThreadPool(context))
+          .onFailure((path, exception) ->
+              loadAndAbort(commitContext, pending, path, true, false))
 
 Review comment:
   Failures for each failed pending set are handled in `loadAndCommit` so I don't think you
need to handle failures here, just reverts and aborts.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


Mime
View raw message