beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] beam git commit: [BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get()
Date Mon, 08 May 2017 18:48:46 GMT
Repository: beam
Updated Branches:
  refs/heads/master 2b0e699b8 -> e1791c3f8


[BEAM-2212] FileBasedSource: refactor to remove uses of fileOrPatternSpec.get()

Makes it less likely to have errors from printing ValueProviders instead of runtime values


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9423babd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9423babd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9423babd

Branch: refs/heads/master
Commit: 9423babd8f827e843723c218441e9a91aaa7b361
Parents: 5bac40e
Author: Dan Halperin <dhalperi@google.com>
Authored: Mon May 8 09:59:16 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon May 8 11:48:39 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSource.java | 39 ++++++++++++--------
 1 file changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9423babd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 4e07342..d4413c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -196,19 +196,20 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses.
Here
     // we perform the size estimation of files and file patterns using the interface provided
by
     // FileSystem.
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
-      checkState(fileOrPatternSpec.isAccessible(),
-                 "Size estimation should be done at execution time.");
-      String pattern = fileOrPatternSpec.get();
       long totalSize = 0;
-      List<MatchResult> inputs =
-          FileSystems.match(Collections.singletonList(pattern));
+      List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
       MatchResult result = Iterables.getOnlyElement(inputs);
       checkArgument(
           result.status() == Status.OK,
           "Error matching the pattern or glob %s: status %s",
-          pattern,
+          fileOrPattern,
           result.status());
       List<Metadata> allMatches = result.metadata();
       for (Metadata metadata : allMatches) {
@@ -216,7 +217,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       }
       LOG.info(
           "Filepattern {} matched {} files with total size {}",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           allMatches.size(),
           totalSize);
       return totalSize;
@@ -245,14 +246,17 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     // split a FileBasedSource based on a file pattern to FileBasedSources based on full
single
     // files. For files that can be efficiently seeked, we further split FileBasedSources
based on
     // those files to FileBasedSources based on sub ranges of single files.
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot split a FileBasedSource without access to the file or pattern specification:
{}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      checkState(fileOrPatternSpec.isAccessible(),
-                 "Bundle splitting should only happen at execution time.");
-      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
       checkArgument(!expandedFiles.isEmpty(),
-          "Unable to find any files matching %s", fileOrPatternSpec.get());
+          "Unable to find any files matching %s", fileOrPattern);
       List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
       for (Metadata metadata : expandedFiles) {
         FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -268,7 +272,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       LOG.info(
           "Splitting filepattern {} into bundles of size {} took {} ms "
               + "and produced {} files and {} bundles",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           desiredBundleSizeBytes,
           System.currentTimeMillis() - startTime,
           expandedFiles.size(),
@@ -283,7 +287,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       } else {
         LOG.debug("The source for file {} is not split into sub-range based sources since
"
             + "the file is not seekable",
-            fileOrPatternSpec);
+            fileOrPattern);
         return ImmutableList.of(this);
       }
     }
@@ -315,10 +319,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
   public final BoundedReader<T> createReader(PipelineOptions options) throws IOException
{
     // Validate the current source prior to creating a reader for it.
     this.validate();
+    checkState(
+        fileOrPatternSpec.isAccessible(),
+        "Cannot create a file reader without access to the file or pattern specification:
{}.",
+        fileOrPatternSpec);
+    String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
       for (Metadata metadata : fileMetadata) {
         long endOffset = metadata.sizeBytes();
@@ -327,7 +336,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       }
       LOG.debug(
           "Creating a reader for file pattern {} took {} ms",
-          fileOrPatternSpec.get(),
+          fileOrPattern,
           System.currentTimeMillis() - startTime);
       if (fileReaders.size() == 1) {
         return fileReaders.get(0);


Mime
View raw message