beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/5] incubator-beam git commit: Move getPathValidator to GcsOptions and use it in temp/staging locations
Date Tue, 02 Aug 2016 18:07:33 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master d73e614b7 -> 63c5d19b3


Move getPathValidator to GcsOptions and use it in temp/staging locations


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70f394b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70f394b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70f394b8

Branch: refs/heads/master
Commit: 70f394b8f238811b9f237b0c401f5388e12955bd
Parents: 724c88d
Author: Dan Halperin <dhalperi@google.com>
Authored: Fri Jul 29 16:34:37 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   | 43 -------------------
 .../options/DataflowPipelineOptions.java        |  5 ++-
 .../runners/dataflow/DataflowRunnerTest.java    |  4 +-
 .../org/apache/beam/sdk/options/GcpOptions.java | 11 +++--
 .../org/apache/beam/sdk/options/GcsOptions.java | 44 ++++++++++++++++++++
 5 files changed, 57 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index bc92a5f..ac2e0b7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.GcsStager;
 import org.apache.beam.runners.dataflow.util.Stager;
@@ -28,7 +27,6 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
 
 import com.google.api.services.dataflow.Dataflow;
 
@@ -101,31 +99,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions
{
   void setDataflowJobFile(String value);
 
   /**
-   * The class of the validator that should be created and used to validate paths.
-   * If pathValidator has not been set explicitly, an instance of this class will be
-   * constructed and used as the path validator.
-   */
-  @Description("The class of the validator that should be created and used to validate paths.
"
-      + "If pathValidator has not been set explicitly, an instance of this class will be
"
-      + "constructed and used as the path validator.")
-  @Default.Class(GcsPathValidator.class)
-  Class<? extends PathValidator> getPathValidatorClass();
-  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
-  /**
-   * The path validator instance that should be used to validate paths.
-   * If no path validator has been set explicitly, the default is to use the instance factory
that
-   * constructs a path validator based upon the currently set pathValidatorClass.
-   */
-  @JsonIgnore
-  @Description("The path validator instance that should be used to validate paths. "
-      + "If no path validator has been set explicitly, the default is to use the instance
factory "
-      + "that constructs a path validator based upon the currently set pathValidatorClass.")
-  @Default.InstanceFactory(PathValidatorFactory.class)
-  PathValidator getPathValidator();
-  void setPathValidator(PathValidator validator);
-
-  /**
    * The class responsible for staging resources to be accessible by workers
    * during job execution. If stager has not been set explicitly, an instance of this class
    * will be created and used as the resource stager.
@@ -226,22 +199,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions
{
   void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
 
   /**
-   * Creates a {@link PathValidator} object using the class specified in
-   * {@link #getPathValidatorClass()}.
-   */
-  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator>
{
-      @Override
-      public PathValidator create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(PathValidator.class)
-          .fromClass(debugOptions.getPathValidatorClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-
-  /**
    * Creates a {@link Stager} object using the class specified in
    * {@link #getStagerClass()}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index b69b6f9..0c5bf47 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -138,13 +138,14 @@ public interface DataflowPipelineOptions
 
     @Override
     public String create(PipelineOptions options) {
-      String gcpTempLocation = options.as(GcpOptions.class).getGcpTempLocation();
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      String gcpTempLocation = gcsOptions.getGcpTempLocation();
       checkArgument(!isNullOrEmpty(gcpTempLocation),
           "Error constructing default value for stagingLocation: gcpTempLocation is missing."
           + "Either stagingLocation must be set explicitly or a valid value must be provided"
           + "for gcpTempLocation.");
       try {
-        GcsPath.fromUri(gcpTempLocation);
+        gcsOptions.getPathValidator().validateOutputFilePrefixSupported(gcpTempLocation);
       } catch (Exception e) {
         throw new IllegalArgumentException(String.format(
             "Error constructing default value for stagingLocation: gcpTempLocation is not"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d3f2f75..a29cdc9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -229,7 +229,7 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject(PROJECT_ID);
-    options.setTempLocation("gs://somebucket/some/path");
+    options.setGcpTempLocation("gs://somebucket/some/path");
     // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
     options.setFilesToStage(new LinkedList<String>());
     options.setDataflowClient(buildMockDataflow(jobCaptor));
@@ -601,7 +601,7 @@ public class DataflowRunnerTest {
         buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
     options.setGcsUtil(mockGcsUtil);
-    options.setTempLocation("gs://non-existent-bucket/location");
+    options.setGcpTempLocation("gs://non-existent-bucket/location");
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 1bf4dd6..de3f133 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.options;
 import org.apache.beam.sdk.util.CredentialFactory;
 import org.apache.beam.sdk.util.GcpCredentialFactory;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.PathValidator;
 
 import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
@@ -42,6 +42,8 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.annotation.Nullable;
+
 /**
  * Options used to configure Google Cloud Platform project and credentials.
  *
@@ -300,7 +302,7 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions
{
    */
   @Description("A GCS path for storing temporary files in GCP.")
   @Default.InstanceFactory(GcpTempLocationFactory.class)
-  String getGcpTempLocation();
+  @Nullable String getGcpTempLocation();
   void setGcpTempLocation(String value);
 
   /**
@@ -309,11 +311,14 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions
{
   public static class GcpTempLocationFactory implements DefaultValueFactory<String>
{
 
     @Override
+    @Nullable
     public String create(PipelineOptions options) {
       String tempLocation = options.getTempLocation();
       if (!Strings.isNullOrEmpty(tempLocation)) {
         try {
-          GcsPath.fromUri(tempLocation);
+          PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+          System.err.println(validator);
+          validator.validateOutputFilePrefixSupported(tempLocation);
         } catch (Exception e) {
           // Ignore the temp location because it is not a valid 'gs://' path.
           return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index bae4742..1b3436b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -18,7 +18,10 @@
 package org.apache.beam.sdk.options;
 
 import org.apache.beam.sdk.util.AppEngineEnvironment;
+import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
 
 import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -85,6 +88,31 @@ public interface GcsOptions extends
   void setGcsUploadBufferSizeBytes(Integer bytes);
 
   /**
+   * The class of the validator that should be created and used to validate paths.
+   * If pathValidator has not been set explicitly, an instance of this class will be
+   * constructed and used as the path validator.
+   */
+  @Description("The class of the validator that should be created and used to validate paths.
"
+      + "If pathValidator has not been set explicitly, an instance of this class will be
"
+      + "constructed and used as the path validator.")
+  @Default.Class(GcsPathValidator.class)
+  Class<? extends PathValidator> getPathValidatorClass();
+  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+  /**
+   * The path validator instance that should be used to validate paths.
+   * If no path validator has been set explicitly, the default is to use the instance factory
that
+   * constructs a path validator based upon the currently set pathValidatorClass.
+   */
+  @JsonIgnore
+  @Description("The path validator instance that should be used to validate paths. "
+      + "If no path validator has been set explicitly, the default is to use the instance
factory "
+      + "that constructs a path validator based upon the currently set pathValidatorClass.")
+  @Default.InstanceFactory(PathValidatorFactory.class)
+  PathValidator getPathValidator();
+  void setPathValidator(PathValidator validator);
+
+  /**
    * Returns the default {@link ExecutorService} to use within the Dataflow SDK. The
    * {@link ExecutorService} is compatible with AppEngine.
    */
@@ -112,4 +140,20 @@ public interface GcsOptions extends
           threadFactoryBuilder.build());
     }
   }
+
+  /**
+   * Creates a {@link PathValidator} object using the class specified in
+   * {@link #getPathValidatorClass()}.
+   */
+  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator>
{
+    @Override
+    public PathValidator create(PipelineOptions options) {
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      return InstanceBuilder.ofType(PathValidator.class)
+          .fromClass(gcsOptions.getPathValidatorClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
 }


Mime
View raw message