beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Add templateRunner option to Dataflow runner
Date Wed, 02 Nov 2016 04:32:06 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 8883877ae -> 2c0d0f476


Add templateRunner option to Dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be4c0256
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be4c0256
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be4c0256

Branch: refs/heads/master
Commit: be4c0256f9be0813692674ba931579a72f9cc15c
Parents: 8883877
Author: Sam McVeety <sgmc@google.com>
Authored: Tue Nov 1 09:27:44 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Nov 1 21:31:55 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 45 ++++++++++---
 .../options/DataflowPipelineOptions.java        |  8 +++
 .../dataflow/util/DataflowTemplateJob.java      | 70 ++++++++++++++++++++
 .../runners/dataflow/DataflowRunnerTest.java    | 45 +++++++++++++
 4 files changed, 158 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ce126db..841b13f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -49,7 +50,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -58,6 +58,8 @@ import java.io.Serializable;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -86,6 +88,7 @@ import org.apache.beam.runners.dataflow.internal.ReadTranslator;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.sdk.Pipeline;
@@ -140,6 +143,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.PathValidator;
 import org.apache.beam.sdk.util.PropertyNames;
@@ -550,16 +554,37 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
       hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
     }
 
-    if (!isNullOrEmpty(options.getDataflowJobFile())) {
-      try (PrintWriter printWriter = new PrintWriter(
-          new File(options.getDataflowJobFile()))) {
-        String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
+    if (!isNullOrEmpty(options.getDataflowJobFile())
+        || !isNullOrEmpty(options.getTemplateLocation())) {
+      boolean isTemplate = !isNullOrEmpty(options.getTemplateLocation());
+      if (isTemplate) {
+        checkArgument(isNullOrEmpty(options.getDataflowJobFile()),
+            "--dataflowJobFile and --templateLocation are mutually exclusive.");
+      }
+      String fileLocation = firstNonNull(
+          options.getTemplateLocation(), options.getDataflowJobFile());
+      checkArgument(fileLocation.startsWith("/") || fileLocation.startsWith("gs://"),
+          String.format(
+              "Location must be local or on Cloud Storage, got {}.", fileLocation));
+      String workSpecJson = DataflowPipelineTranslator.jobToString(newJob);
+      try (
+          WritableByteChannel writer =
+              IOChannelUtils.create(fileLocation, MimeTypes.TEXT);
+          PrintWriter printWriter = new PrintWriter(Channels.newOutputStream(writer))) {
         printWriter.print(workSpecJson);
-        LOG.info("Printed workflow specification to {}", options.getDataflowJobFile());
-      } catch (IllegalStateException ex) {
-        LOG.warn("Cannot translate workflow spec to json for debug.");
-      } catch (FileNotFoundException ex) {
-        LOG.warn("Cannot create workflow spec output file.");
+        LOG.info("Printed job specification to {}", fileLocation);
+      } catch (IOException ex) {
+        String error =
+            String.format("Cannot create output file at {}", fileLocation);
+        if (isTemplate) {
+          throw new RuntimeException(error, ex);
+        } else {
+          LOG.warn(error, ex);
+        }
+      }
+      if (isTemplate) {
+        LOG.info("Template successfully created.");
+        return new DataflowTemplateJob();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index e853f22..66632ad 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -82,6 +82,14 @@ public interface DataflowPipelineOptions
   void setUpdate(boolean value);
 
   /**
+   * Where the runner should generate a template file. Must either be local or Cloud Storage.
+   */
+  @Description("Where the runner should generate a template file. "
+      + "Must either be local or Cloud Storage.")
+  String getTemplateLocation();
+  void setTemplateLocation(String value);
+
+  /**
    * Run the job as a specific service account, instead of the default GCE robot.
    */
   @Hidden

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
new file mode 100644
index 0000000..2937184
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.api.client.util.NanoClock;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.joda.time.Duration;
+
+/**
+ * A {@link DataflowPipelineJob} that is returned when {@code --templateRunner} is set.
+ */
+public class DataflowTemplateJob extends DataflowPipelineJob {
+  private static final String ERROR =
+      "The result of template creation should not be used.";
+
+  public DataflowTemplateJob() {
+    super(null, null, null, null);
+  }
+
+  @Override
+  public String getJobId() {
+    throw new UnsupportedOperationException(ERROR);
+  }
+
+  @Override
+  public String getProjectId() {
+    throw new UnsupportedOperationException(ERROR);
+  }
+
+  @Override
+  public DataflowPipelineJob getReplacedByJob() {
+    throw new UnsupportedOperationException(ERROR);
+  }
+
+  @Nullable
+  @VisibleForTesting
+  State waitUntilFinish(
+      Duration duration,
+      MonitoringUtil.JobMessagesHandler messageHandler,
+      Sleeper sleeper,
+      NanoClock nanoClock) {
+    throw new UnsupportedOperationException(ERROR);
+  }
+
+  @Override
+  public State cancel() {
+    throw new UnsupportedOperationException(ERROR);
+  }
+
+  @Override
+  public State getState() {
+    throw new UnsupportedOperationException(ERROR);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ddb7cf8..3925ed4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -133,6 +134,8 @@ public class DataflowRunnerTest {
   public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule
   public ExpectedException thrown = ExpectedException.none();
+  @Rule
+  public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
 
   // Asserts that the given Job has all expected fields set.
   private static void assertValidJob(Job job) {
@@ -1442,4 +1445,46 @@ public class DataflowRunnerTest {
     assertEquals(1, outputMap.size());
     assertThat(outputMap.get(4L), containsInAnyOrder(41L));
   }
+
+  /**
+   * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
+   * when the runner issuccessfully run.
+   */
+  @Test
+  public void testTemplateRunnerFullCompletion() throws Exception {
+    File existingFile = tmpFolder.newFile();
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    options.setGcpCredential(new TestCredential());
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setProject("test-project");
+    options.setRunner(DataflowRunner.class);
+    options.setTemplateLocation(existingFile.getPath());
+    options.setTempLocation(tmpFolder.getRoot().getPath());
+    Pipeline p = Pipeline.create(options);
+
+    p.run();
+    expectedLogs.verifyInfo("Template successfully created");
+  }
+
+  /**
+   * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate
+   * exception when an output file is not writable.
+   */
+  @Test
+  public void testTemplateRunnerLoggedErrorForFile() throws Exception {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    options.setRunner(DataflowRunner.class);
+    options.setTemplateLocation("//bad/path");
+    options.setProject("test-project");
+    options.setTempLocation(tmpFolder.getRoot().getPath());
+    options.setGcpCredential(new TestCredential());
+    options.setPathValidatorClass(NoopPathValidator.class);
+    Pipeline p = Pipeline.create(options);
+
+    thrown.expectMessage("Cannot create output file at");
+    thrown.expect(RuntimeException.class);
+    p.run();
+  }
 }


Mime
View raw message