beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-292] Write: always produce at least 1 WriteT
Date Tue, 07 Jun 2016 18:05:07 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master aaab15766 -> b9845a700


[BEAM-292] Write: always produce at least 1 WriteT

Write has a degenerate case wherein, if no elements were in the written PCollection,
the finalize step will get produced with nothing to finalize. This often prevents
correct operation, for example when a FileBasedSink produces no files instead of one
empty file.

Catch and handle this case in Write by opening and closing an empty
Writer to produce a single WriteT.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f97c38d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f97c38d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f97c38d1

Branch: refs/heads/master
Commit: f97c38d1c5d85e8ad0d94229c47e9e619be93632
Parents: aaab157
Author: Dan Halperin <dhalperi@google.com>
Authored: Mon Jun 6 21:46:37 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Jun 7 11:04:39 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/Write.java     |  9 +++++++++
 .../test/java/org/apache/beam/sdk/io/TextIOTest.java    | 12 ++++++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f97c38d1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 66fef84..c7b3f5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.values.PDone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.UUID;
 
 /**
@@ -230,6 +231,14 @@ public class Write {
               LOG.info("Finalizing write operation {}", writeOperation);
               Iterable<WriteT> results = c.sideInput(resultsView);
               LOG.debug("Side input initialized to finalize write operation {}", writeOperation);
+              if (!results.iterator().hasNext()) {
+                LOG.info("No write results, creating a single empty output.");
+                Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                writer.open(UUID.randomUUID().toString());
+                WriteT emptyWrite = writer.close();
+                results = Collections.singleton(emptyWrite);
+                LOG.debug("Done creating a single empty output.");
+              }
               writeOperation.finalize(results, c.getPipelineOptions());
               LOG.debug("Done finalizing write operation {}", writeOperation);
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f97c38d1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 73aeda9..724a113 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -202,15 +202,17 @@ public class TextIOTest {
     }
     if (numShards == 1) {
       write = write.withoutSharding();
-    } else {
+    } else if (numShards > 0) {
       write = write.withNumShards(numShards).withShardNameTemplate(ShardNameTemplate.INDEX_OF_MAX);
     }
+    int numOutputShards = (numShards > 0) ? numShards : 1;
 
     input.apply(write);
 
     p.run();
 
-    assertOutputFiles(elems, coder, numShards, tmpFolder, outputName, write.getShardNameTemplate());
+    assertOutputFiles(elems, coder, numOutputShards, tmpFolder, outputName,
+        write.getShardNameTemplate());
   }
 
   public static <T> void assertOutputFiles(
@@ -261,6 +263,12 @@ public class TextIOTest {
 
   @Test
   @Category(NeedsRunner.class)
+  public void testWriteEmptyStringsNoSharding() throws Exception {
+    runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of(), 0);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
   public void testWriteEmptyStrings() throws Exception {
     runTestWrite(NO_LINES_ARRAY, StringUtf8Coder.of());
   }


Mime
View raw message