beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: FileBasedSink: Detect bad shard name templates
Date Thu, 09 Jun 2016 16:47:35 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 61d8cf2c4 -> 39da22c76


FileBasedSink: Detect bad shard name templates

This is particularly relevant when TextIO.Write.withoutSharding() is
used: [BEAM-159].


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d036bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d036bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d036bc8

Branch: refs/heads/master
Commit: 4d036bc8a4b8c255d09467d09fe4ed7eae9dd035
Parents: 61d8cf2
Author: Robert Bradshaw <robertwb@google.com>
Authored: Wed Jun 8 14:59:56 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Jun 9 09:47:24 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 16 +++++++++-----
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 23 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d036bc8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9048380..521f54b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 
 /**
@@ -333,13 +334,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions
options)
         throws IOException {
       int numFiles = filenames.size();
-      List<String> srcFilenames = new ArrayList<>();
+      // Sort files for idempotence.
+      List<String> srcFilenames = Ordering.natural().sortedCopy(filenames);
       List<String> destFilenames = generateDestinationFilenames(numFiles);
 
-      // Sort files for copying.
-      srcFilenames.addAll(filenames);
-      Collections.sort(srcFilenames);
-
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
         FileOperations fileOperations =
@@ -366,6 +364,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
         destFilenames.add(IOChannelUtils.constructName(
             baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
       }
+
+      int numDistinctShards = new HashSet<String>(destFilenames).size();
+      Preconditions.checkState(numDistinctShards == numFiles,
+          "Shard name template '%s' only generated %s distinct file names for %s files.",
+          fileNamingTemplate, numDistinctShards, numFiles);
+
       return destFilenames;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d036bc8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 0e434fc..d3454da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation.TemporaryFileRetention;
@@ -374,6 +375,28 @@ public class FileBasedSinkTest {
   }
 
   /**
+   * Reject non-distinct output filenames.
+   */
+  @Test
+  public void testCollidingOutputFilenames() {
+    SimpleSink sink = new SimpleSink("output", "test", "-NN");
+    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+
+    // A single shard doesn't need to include the shard number.
+    assertEquals(Arrays.asList("output-01.test"),
+                 writeOp.generateDestinationFilenames(1));
+
+    // More than one shard does.
+    try {
+      writeOp.generateDestinationFilenames(3);
+      fail("Should have failed.");
+    } catch (IllegalStateException exn) {
+      assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3
files.",
+                   exn.getMessage());
+    }
+  }
+
+  /**
    * Output filenames are generated correctly when an extension is not supplied.
    */
   @Test


Mime
View raw message