From commits-return-13210-apmail-beam-commits-archive=beam.apache.org@beam.incubator.apache.org Wed Nov 2 04:32:12 2016 Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90B0A191D2 for ; Wed, 2 Nov 2016 04:32:12 +0000 (UTC) Received: (qmail 26663 invoked by uid 500); 2 Nov 2016 04:32:12 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 26611 invoked by uid 500); 2 Nov 2016 04:32:12 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 26602 invoked by uid 99); 2 Nov 2016 04:32:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Nov 2016 04:32:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id DA0F7180BCC for ; Wed, 2 Nov 2016 04:32:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id LtZn7eN2RjaO for ; Wed, 2 Nov 2016 04:32:09 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D22355FC76 for ; Wed, 2 Nov 2016 04:32:07 +0000 (UTC) Received: (qmail 26163 invoked by uid 99); 2 Nov 2016 04:32:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Nov 2016 04:32:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F32D4E0BB1; Wed, 2 Nov 2016 04:32:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Wed, 02 Nov 2016 04:32:06 -0000 Message-Id: <2d9356f8cab245fdae0cbaa10d93a8cd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Add templateRunner option to Dataflow runner Repository: incubator-beam Updated Branches: refs/heads/master 8883877ae -> 2c0d0f476 Add templateRunner option to Dataflow runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be4c0256 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be4c0256 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be4c0256 Branch: refs/heads/master Commit: be4c0256f9be0813692674ba931579a72f9cc15c Parents: 8883877 Author: Sam McVeety Authored: Tue Nov 1 09:27:44 2016 -0700 Committer: Dan Halperin Committed: Tue Nov 1 21:31:55 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 45 ++++++++++--- .../options/DataflowPipelineOptions.java | 8 +++ .../dataflow/util/DataflowTemplateJob.java | 70 ++++++++++++++++++++ .../runners/dataflow/DataflowRunnerTest.java | 45 +++++++++++++ 4 files changed, 158 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ce126db..841b13f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; @@ -49,7 +50,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -58,6 +58,8 @@ import java.io.Serializable; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -86,6 +88,7 @@ import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.Pipeline; @@ -140,6 +143,7 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.PathValidator; import org.apache.beam.sdk.util.PropertyNames; @@ -550,16 +554,37 @@ public class DataflowRunner extends PipelineRunner { hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment()); } - if (!isNullOrEmpty(options.getDataflowJobFile())) { - try (PrintWriter printWriter = new PrintWriter( - new File(options.getDataflowJobFile()))) { - String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); + if (!isNullOrEmpty(options.getDataflowJobFile()) + || !isNullOrEmpty(options.getTemplateLocation())) { + boolean isTemplate = !isNullOrEmpty(options.getTemplateLocation()); + if (isTemplate) { + checkArgument(isNullOrEmpty(options.getDataflowJobFile()), + "--dataflowJobFile and --templateLocation are mutually exclusive."); + } + String fileLocation = firstNonNull( + options.getTemplateLocation(), options.getDataflowJobFile()); + checkArgument(fileLocation.startsWith("/") || fileLocation.startsWith("gs://"), + String.format( + "Location must be local or on Cloud Storage, got {}.", fileLocation)); + String workSpecJson = DataflowPipelineTranslator.jobToString(newJob); + try ( + WritableByteChannel writer = + IOChannelUtils.create(fileLocation, MimeTypes.TEXT); + PrintWriter printWriter = new PrintWriter(Channels.newOutputStream(writer))) { printWriter.print(workSpecJson); - LOG.info("Printed workflow specification to {}", options.getDataflowJobFile()); - } catch (IllegalStateException ex) { - LOG.warn("Cannot translate workflow spec to json for debug."); - } catch (FileNotFoundException ex) { - LOG.warn("Cannot create workflow spec output file."); + LOG.info("Printed job specification to {}", fileLocation); + } catch (IOException ex) { + String error = + String.format("Cannot create output file at {}", fileLocation); + if (isTemplate) { + throw new RuntimeException(error, ex); + } else { + LOG.warn(error, ex); + } + } + if (isTemplate) { + LOG.info("Template successfully created."); + return new DataflowTemplateJob(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index e853f22..66632ad 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -82,6 +82,14 @@ public interface DataflowPipelineOptions void setUpdate(boolean value); /** + * Where the runner should generate a template file. Must either be local or Cloud Storage. + */ + @Description("Where the runner should generate a template file. " + + "Must either be local or Cloud Storage.") + String getTemplateLocation(); + void setTemplateLocation(String value); + + /** * Run the job as a specific service account, instead of the default GCE robot. */ @Hidden http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java new file mode 100644 index 0000000..2937184 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.dataflow.util; + +import com.google.api.client.util.NanoClock; +import com.google.api.client.util.Sleeper; +import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.DataflowPipelineJob; +import org.apache.beam.sdk.PipelineResult.State; +import org.joda.time.Duration; + +/** + * A {@link DataflowPipelineJob} that is returned when {@code --templateRunner} is set. + */ +public class DataflowTemplateJob extends DataflowPipelineJob { + private static final String ERROR = + "The result of template creation should not be used."; + + public DataflowTemplateJob() { + super(null, null, null, null); + } + + @Override + public String getJobId() { + throw new UnsupportedOperationException(ERROR); + } + + @Override + public String getProjectId() { + throw new UnsupportedOperationException(ERROR); + } + + @Override + public DataflowPipelineJob getReplacedByJob() { + throw new UnsupportedOperationException(ERROR); + } + + @Nullable + @VisibleForTesting + State waitUntilFinish( + Duration duration, + MonitoringUtil.JobMessagesHandler messageHandler, + Sleeper sleeper, + NanoClock nanoClock) { + throw new UnsupportedOperationException(ERROR); + } + + @Override + public State cancel() { + throw new UnsupportedOperationException(ERROR); + } + + @Override + public State getState() { + throw new UnsupportedOperationException(ERROR); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be4c0256/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ddb7cf8..3925ed4 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.runners.dataflow.TestCountingSource; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFnTester; @@ -133,6 +134,8 @@ public class DataflowRunnerTest { public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class); // Asserts that the given Job has all expected fields set. private static void assertValidJob(Job job) { @@ -1442,4 +1445,46 @@ public class DataflowRunnerTest { assertEquals(1, outputMap.size()); assertThat(outputMap.get(4L), containsInAnyOrder(41L)); } + + /** + * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally + * when the runner issuccessfully run. + */ + @Test + public void testTemplateRunnerFullCompletion() throws Exception { + File existingFile = tmpFolder.newFile(); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + options.setProject("test-project"); + options.setRunner(DataflowRunner.class); + options.setTemplateLocation(existingFile.getPath()); + options.setTempLocation(tmpFolder.getRoot().getPath()); + Pipeline p = Pipeline.create(options); + + p.run(); + expectedLogs.verifyInfo("Template successfully created"); + } + + /** + * Tests that the {@link DataflowRunner} with {@code --templateLocation} throws the appropriate + * exception when an output file is not writable. + */ + @Test + public void testTemplateRunnerLoggedErrorForFile() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + options.setRunner(DataflowRunner.class); + options.setTemplateLocation("//bad/path"); + options.setProject("test-project"); + options.setTempLocation(tmpFolder.getRoot().getPath()); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + Pipeline p = Pipeline.create(options); + + thrown.expectMessage("Cannot create output file at"); + thrown.expect(RuntimeException.class); + p.run(); + } }