beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-2211] DataflowRunner: remove validation of file read/write paths
Date Tue, 09 May 2017 03:13:46 GMT
Repository: beam
Updated Branches:
  refs/heads/master 88e044fda -> 23731fe7a


[BEAM-2211] DataflowRunner: remove validation of file read/write paths

Now that users can implement and register custom FileSystems,
we can no longer really effectively validate filesystems they
can read or write files from. They can even register file://
to point to some HDFS path, e.g.,


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16f355f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16f355f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16f355f7

Branch: refs/heads/master
Commit: 16f355f7e481ebf029c0edb878742f6fea57b6cd
Parents: 88e044f
Author: Dan Halperin <dhalperi@google.com>
Authored: Mon May 8 15:37:29 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon May 8 20:13:35 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 60 --------------------
 .../beam/runners/dataflow/ReadTranslator.java   | 12 ----
 .../DataflowPipelineTranslatorTest.java         | 26 ---------
 .../runners/dataflow/DataflowRunnerTest.java    | 56 ------------------
 4 files changed, 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5278a4a..250c064 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -84,13 +84,10 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.PathValidator;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -98,7 +95,6 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
@@ -343,10 +339,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
                   PTransformMatchers.stateOrTimerParDoSingle(),
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
 
-          // WriteFiles uses views internally
-          .add(
-              PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(WriteFiles.class), new BatchWriteFactory(this)))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
@@ -805,58 +797,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
   }
 
-  private class BatchWriteFactory<T>
-      implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>>
{
-    private final DataflowRunner runner;
-    private BatchWriteFactory(DataflowRunner dataflowRunner) {
-      this.runner = dataflowRunner;
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
-        AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform)
{
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          new BatchWrite<>(runner, transform.getTransform()));
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
-      return Collections.emptyMap();
-    }
-  }
-
-  /**
-   * Specialized implementation which overrides
-   * {@link WriteFiles WriteFiles} to provide Google
-   * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
-   */
-  private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone>
{
-    private final DataflowRunner runner;
-    private final WriteFiles<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {
-      this.runner = runner;
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone expand(PCollection<T> input) {
-      ValueProvider<ResourceId> outputDirectory =
-          transform.getSink().getBaseOutputDirectoryProvider();
-      if (outputDirectory.isAccessible()) {
-        PathValidator validator = runner.options.getPathValidator();
-        validator.validateOutputResourceSupported(
-            outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE));
-      }
-      return transform.expand(input);
-    }
-  }
-
   // ================================================================================
   // PubsubIO translations
   // ================================================================================

http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index 396c305..30ecbf5 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -26,10 +26,8 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
-import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 
@@ -46,16 +44,6 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>>
{
       PTransform<?, ? extends PValue> transform,
       TranslationContext context) {
     try {
-      if (source instanceof FileBasedSource) {
-        ValueProvider<String> filePatternOrSpec =
-            ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
-        if (filePatternOrSpec.isAccessible()) {
-          context.getPipelineOptions()
-              .getPathValidator()
-              .validateInputFilePatternSupported(filePatternOrSpec.get());
-        }
-      }
-
       StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
       stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
       stepContext.addInput(

http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 48cb79f..9040f8f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -20,13 +20,10 @@ package org.apache.beam.runners.dataflow;
 import static org.apache.beam.runners.dataflow.util.Structs.addObject;
 import static org.apache.beam.runners.dataflow.util.Structs.getDictionary;
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -109,7 +106,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -809,28 +805,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     pipeline.apply("Read(" + path + ")", TextIO.read().from(path));
   }
 
-  /**
-   * Recursive wildcards are not supported.
-   * This tests "**".
-   */
-  @Test
-  public void testBadWildcardRecursive() throws Exception {
-    DataflowPipelineOptions options = buildPipelineOptions();
-    Pipeline pipeline = Pipeline.create(options);
-    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
-
-    pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz"));
-
-    // Check that translation does fail.
-    thrown.expectCause(allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
-    t.translate(
-        pipeline,
-        DataflowRunner.fromOptions(options),
-        Collections.<DataflowPackage>emptyList());
-  }
-
   private static class TestValueProvider implements ValueProvider<String>, Serializable
{
     @Override
     public boolean isAccessible() {

http://git-wip-us.apache.org/repos/asf/beam/blob/16f355f7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 3d47726..6848b85 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
@@ -97,7 +96,6 @@ import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -588,60 +586,6 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testNonGcsFilePathInReadFailure() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    p.apply("ReadMyNonGcsFile", TextIO.read().from(tmpFolder.newFile().getPath()));
-
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(
-            containsString("Expected a valid 'gs://' path but was given"))));
-    p.run();
-
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testNonGcsFilePathInWriteFailure() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-
-    p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
-        .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file"));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
-    p.run();
-  }
-
-  @Test
-  public void testMultiSlashGcsFileReadPath() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file"));
-
-    thrown.expectCause(Matchers.allOf(
-        instanceOf(IllegalArgumentException.class),
-        ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes"))));
-    p.run();
-
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
-    assertValidJob(jobCaptor.getValue());
-  }
-
-  @Test
-  public void testMultiSlashGcsFileWritePath() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
-    pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file"));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("consecutive slashes");
-    p.run();
-  }
-
-  @Test
   public void testInvalidGcpTempLocation() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     options.setGcpTempLocation("file://temp/location");


Mime
View raw message