beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/2] incubator-beam git commit: Fail to split in FileBasedSource if filePattern expands to empty.
Date Thu, 15 Dec 2016 00:45:29 GMT
Fail to split in FileBasedSource if filePattern expands to empty.

Typically, input file patterns are validated during Pipeline
construction, but standard Read transforms include an option to disable
validation. This is generally useful but can lead to cases where a
Pipeline executes successfully with empty inputs.

This changes the behavior to fail execution on empty file-based inputs
even when validation is disabled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf4229a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf4229a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf4229a6

Branch: refs/heads/master
Commit: cf4229a6d7e1b79416a1be4e78f5c90c38dd77b0
Parents: 46566fc
Author: Scott Wegner <swegner@google.com>
Authored: Wed Dec 14 14:52:34 2016 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Dec 14 16:35:59 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/FileBasedSource.java    |  6 +++++-
 .../org/apache/beam/sdk/io/FileBasedSourceTest.java     | 12 ++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf4229a6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d835f9b..5659d5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -331,7 +331,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       try {
         checkState(fileOrPatternSpec.isAccessible(),
                    "Bundle splitting should only happen at execution time.");
-        for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec.get()))
{
+        Collection<String> expandedFiles =
+            FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+        checkArgument(!expandedFiles.isEmpty(),
+            "Unable to find any files matching %s", fileOrPatternSpec.get());
+        for (final String file : expandedFiles) {
           futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
         }
         List<? extends FileBasedSource<T>> splitResults =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cf4229a6/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index a065191..f4b8574 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -73,6 +74,7 @@ public class FileBasedSourceTest {
   Random random = new Random(0L);
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
 
   /**
    * If {@code splitHeader} is null, this is just a simple line-based reader. Otherwise,
the file is
@@ -418,6 +420,16 @@ public class FileBasedSourceTest {
   }
 
   @Test
+  public void testSplittingFailsOnEmptyFileExpansion() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    String missingFilePath = tempFolder.newFolder().getAbsolutePath() + "/missing.txt";
+    TestFileBasedSource source = new TestFileBasedSource(missingFilePath, Long.MAX_VALUE,
null);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Unable to find any files matching %s", missingFilePath));
+    source.splitIntoBundles(1234, options);
+  }
+
+  @Test
   public void testFractionConsumedWhenReadingFilepattern() throws IOException {
     List<String> data1 = createStringDataset(3, 1000);
     File file1 = createFileWithData("file1", data1);


Mime
View raw message