beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [4/9] incubator-beam git commit: Rename DataflowPipelineRunner to DataflowRunner
Date Fri, 17 Jun 2016 22:13:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java
new file mode 100644
index 0000000..05297ec
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.annotations.Experimental;
+
+import com.google.api.services.dataflow.model.Environment;
+
+/**
+ * An instance of this class can be passed to the
+ * {@link DataflowRunner} to add user defined hooks to be
+ * invoked at various times during pipeline execution.
+ */
+@Experimental
+public class DataflowRunnerHooks {
+  /**
+   * Allows the user to modify the environment of their job before their job is submitted
+   * to the service for execution.
+   *
+   * @param environment The environment of the job. Users can make change to this instance
in order
+   *     to change the environment with which their job executes on the service.
+   */
+  public void modifyEnvironmentBeforeSubmission(Environment environment) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
index f2e8459..ea83bfb 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Implementation of the {@link org.apache.beam.runners.dataflow.DataflowPipelineRunner}.
+ * Implementation of the {@link org.apache.beam.runners.dataflow.DataflowRunner}.
  */
 package org.apache.beam.runners.dataflow.internal;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
index 7fa5ad6..809df35 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
@@ -29,9 +29,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.PrintStream;
 
 /**
- * Options that are used to configure the {@link BlockingDataflowPipelineRunner}.
+ * Options that are used to configure the {@link BlockingDataflowRunner}.
  */
-@Description("Configure options on the BlockingDataflowPipelineRunner.")
+@Description("Configure options on the BlockingDataflowRunner.")
 public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
   /**
    * Output stream for job status messages.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 6e6ad96..f665a08 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.Default;
@@ -38,7 +38,7 @@ import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 /**
- * Options that can be used to configure the {@link DataflowPipelineRunner}.
+ * Options that can be used to configure the {@link DataflowRunner}.
  */
 @Description("Options that configure the Dataflow pipeline.")
 public interface DataflowPipelineOptions

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index a29b328..e3a1a0f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -131,9 +131,9 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions
{
     public String create(PipelineOptions options) {
       DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
       if (dataflowOptions.isStreaming()) {
-        return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
+        return DataflowRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE;
       } else {
-        return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
+        return DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index c940e9a..f83a139 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -21,7 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 import org.apache.beam.runners.dataflow.DataflowJobExecutionException;
 import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
 import org.apache.beam.sdk.Pipeline;
@@ -54,7 +54,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a
- * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}.
+ * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
  *
  * @see TestPipeline
  */
@@ -63,12 +63,12 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
   private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
 
   private final TestDataflowPipelineOptions options;
-  private final DataflowPipelineRunner runner;
+  private final DataflowRunner runner;
   private int expectedNumberOfAssertions = 0;
 
   TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
     this.options = options;
-    this.runner = DataflowPipelineRunner.fromOptions(options);
+    this.runner = DataflowRunner.fromOptions(options);
   }
 
   /**
@@ -89,7 +89,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     return run(pipeline, runner);
   }
 
-  DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
+  DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
 
     TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
     final DataflowPipelineJob job;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
deleted file mode 100644
index 55b4027..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.TestCredential;
-
-import org.hamcrest.Description;
-import org.hamcrest.Factory;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Tests for BlockingDataflowPipelineRunner.
- */
-@RunWith(JUnit4.class)
-public class BlockingDataflowPipelineRunnerTest {
-
-  @Rule
-  public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowPipelineRunner.class);
-
-  @Rule
-  public ExpectedException expectedThrown = ExpectedException.none();
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link
Matcher}
-   * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
-   */
-  private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with job matching ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobException> Matcher<T> expectJob(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new DataflowJobExceptionMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link
Matcher}
-   * to the return value of {@link DataflowPipelineJob#getJobId()}.
-   */
-  private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T>
{
-
-    private final Matcher<String> matcher;
-
-    public JobIdMatcher(Matcher<String> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T job) {
-      return matcher.matches(job.getJobId());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("jobId ");
-        matcher.describeMismatch(item.getJobId(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("job with jobId ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final
String jobId) {
-      return new JobIdMatcher<T>(equalTo(jobId));
-    }
-  }
-
-  /**
-   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
-   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
-   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
-   */
-  private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
-      extends TypeSafeMatcher<T> {
-
-    private final Matcher<DataflowPipelineJob> matcher;
-
-    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
-        this.matcher = matcher;
-    }
-
-    @Override
-    public boolean matchesSafely(T ex) {
-      return matcher.matches(ex.getReplacedByJob());
-    }
-
-    @Override
-    protected void describeMismatchSafely(T item, Description description) {
-        description.appendText("job ");
-        matcher.describeMismatch(item.getMessage(), description);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("exception with replacedByJob() ");
-      description.appendDescriptionOf(matcher);
-    }
-
-    @Factory
-    public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
-        Matcher<DataflowPipelineJob> matcher) {
-      return new ReplacedByJobMatcher<T>(matcher);
-    }
-  }
-
-  /**
-   * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code
jobId}
-   * that will immediately terminate in the provided {@code terminalState}.
-   *
-   * <p>The return value may be further mocked.
-   */
-  private DataflowPipelineJob createMockJob(
-      String projectId, String jobId, State terminalState) throws Exception {
-    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
-    when(mockJob.getProjectId()).thenReturn(projectId);
-    when(mockJob.getJobId()).thenReturn(jobId);
-    when(mockJob.waitToFinish(
-        anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
-        .thenReturn(terminalState);
-    return mockJob;
-  }
-
-  /**
-   * Returns a {@link BlockingDataflowPipelineRunner} that will return the provided a job
to return.
-   * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
-   */
-  private BlockingDataflowPipelineRunner createMockRunner(DataflowPipelineJob job)
-      throws Exception {
-    DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
-    TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setRunner(BlockingDataflowPipelineRunner.class);
-    options.setProject(job.getProjectId());
-
-    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
-
-    return new BlockingDataflowPipelineRunner(mockRunner, options);
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} returns normally when a job terminates
in
-   * the {@link State#DONE DONE} state.
-   */
-  @Test
-  public void testJobDoneComplete() throws Exception {
-    createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
-        .run(TestPipeline.create());
-    expectedLogs.verifyInfo("Job finished with status DONE");
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#FAILED FAILED} state.
-   */
-  @Test
-  public void testFailedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobExecutionException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testFailedJob-jobId")));
-    createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
-   */
-  @Test
-  public void testCancelledJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobCancelledException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
-    createMockRunner(
-            createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#UPDATED UPDATED} state.
-   */
-  @Test
-  public void testUpdatedJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowJobUpdatedException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
-    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
-        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
-    DataflowPipelineJob job =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
-    DataflowPipelineJob replacedByJob =
-        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
-    when(job.getReplacedByJob()).thenReturn(replacedByJob);
-    createMockRunner(job).run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
-   * Dataflow service returned a state that the SDK is unfamiliar with (possibly because
it
-   * is an old SDK relative the service).
-   */
-  @Test
-  public void testUnknownJobThrowsException() throws Exception {
-    expectedThrown.expect(IllegalStateException.class);
-    createMockRunner(
-            createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
-        .run(TestPipeline.create());
-  }
-
-  /**
-   * Tests that the {@link BlockingDataflowPipelineRunner} throws the appropriate exception
-   * when a job returns a {@code null} state, indicating that it failed to contact the service,
-   * including all of its built-in resilience logic.
-   */
-  @Test
-  public void testNullJobThrowsException() throws Exception {
-    expectedThrown.expect(DataflowServiceException.class);
-    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
-        JobIdMatcher.expectJobId("testNullJob-jobId")));
-    createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
-        .run(TestPipeline.create());
-  }
-
-  @Test
-  public void testToString() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setJobName("TestJobName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setGcpCredential(new TestCredential());
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setRunner(BlockingDataflowPipelineRunner.class);
-    assertEquals("BlockingDataflowPipelineRunner#testjobname",
-        BlockingDataflowPipelineRunner.fromOptions(options).toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
new file mode 100644
index 0000000..7be074e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowRunnerTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+
+import org.hamcrest.Description;
+import org.hamcrest.Factory;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for BlockingDataflowRunner.
+ */
+@RunWith(JUnit4.class)
+public class BlockingDataflowRunnerTest {
+
+  @Rule
+  public ExpectedLogs expectedLogs = ExpectedLogs.none(BlockingDataflowRunner.class);
+
+  @Rule
+  public ExpectedException expectedThrown = ExpectedException.none();
+
+  /**
+   * A {@link Matcher} for a {@link DataflowJobException} that applies an underlying {@link
Matcher}
+   * to the {@link DataflowPipelineJob} returned by {@link DataflowJobException#getJob()}.
+   */
+  private static class DataflowJobExceptionMatcher<T extends DataflowJobException>
+      extends TypeSafeMatcher<T> {
+
+    private final Matcher<DataflowPipelineJob> matcher;
+
+    public DataflowJobExceptionMatcher(Matcher<DataflowPipelineJob> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T ex) {
+      return matcher.matches(ex.getJob());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("job ");
+        matcher.describeMismatch(item.getMessage(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("exception with job matching ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowJobException> Matcher<T> expectJob(
+        Matcher<DataflowPipelineJob> matcher) {
+      return new DataflowJobExceptionMatcher<T>(matcher);
+    }
+  }
+
+  /**
+   * A {@link Matcher} for a {@link DataflowPipelineJob} that applies an underlying {@link
Matcher}
+   * to the return value of {@link DataflowPipelineJob#getJobId()}.
+   */
+  private static class JobIdMatcher<T extends DataflowPipelineJob> extends TypeSafeMatcher<T>
{
+
+    private final Matcher<String> matcher;
+
+    public JobIdMatcher(Matcher<String> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T job) {
+      return matcher.matches(job.getJobId());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("jobId ");
+        matcher.describeMismatch(item.getJobId(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("job with jobId ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowPipelineJob> Matcher<T> expectJobId(final
String jobId) {
+      return new JobIdMatcher<T>(equalTo(jobId));
+    }
+  }
+
+  /**
+   * A {@link Matcher} for a {@link DataflowJobUpdatedException} that applies an underlying
+   * {@link Matcher} to the {@link DataflowPipelineJob} returned by
+   * {@link DataflowJobUpdatedException#getReplacedByJob()}.
+   */
+  private static class ReplacedByJobMatcher<T extends DataflowJobUpdatedException>
+      extends TypeSafeMatcher<T> {
+
+    private final Matcher<DataflowPipelineJob> matcher;
+
+    public ReplacedByJobMatcher(Matcher<DataflowPipelineJob> matcher) {
+        this.matcher = matcher;
+    }
+
+    @Override
+    public boolean matchesSafely(T ex) {
+      return matcher.matches(ex.getReplacedByJob());
+    }
+
+    @Override
+    protected void describeMismatchSafely(T item, Description description) {
+        description.appendText("job ");
+        matcher.describeMismatch(item.getMessage(), description);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("exception with replacedByJob() ");
+      description.appendDescriptionOf(matcher);
+    }
+
+    @Factory
+    public static <T extends DataflowJobUpdatedException> Matcher<T> expectReplacedBy(
+        Matcher<DataflowPipelineJob> matcher) {
+      return new ReplacedByJobMatcher<T>(matcher);
+    }
+  }
+
+  /**
+   * Creates a mocked {@link DataflowPipelineJob} with the given {@code projectId} and {@code
jobId}
+   * that will immediately terminate in the provided {@code terminalState}.
+   *
+   * <p>The return value may be further mocked.
+   */
+  private DataflowPipelineJob createMockJob(
+      String projectId, String jobId, State terminalState) throws Exception {
+    DataflowPipelineJob mockJob = mock(DataflowPipelineJob.class);
+    when(mockJob.getProjectId()).thenReturn(projectId);
+    when(mockJob.getJobId()).thenReturn(jobId);
+    when(mockJob.waitToFinish(
+        anyLong(), isA(TimeUnit.class), isA(MonitoringUtil.JobMessagesHandler.class)))
+        .thenReturn(terminalState);
+    return mockJob;
+  }
+
+  /**
+   * Returns a {@link BlockingDataflowRunner} that will return the provided a job to return.
+   * Some {@link PipelineOptions} will be extracted from the job, such as the project ID.
+   */
+  private BlockingDataflowRunner createMockRunner(DataflowPipelineJob job)
+      throws Exception {
+    DataflowRunner mockRunner = mock(DataflowRunner.class);
+    TestDataflowPipelineOptions options =
+        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setRunner(BlockingDataflowRunner.class);
+    options.setProject(job.getProjectId());
+
+    when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
+
+    return new BlockingDataflowRunner(mockRunner, options);
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowRunner} returns normally when a job terminates
in
+   * the {@link State#DONE DONE} state.
+   */
+  @Test
+  public void testJobDoneComplete() throws Exception {
+    createMockRunner(createMockJob("testJobDone-projectId", "testJobDone-jobId", State.DONE))
+        .run(TestPipeline.create());
+    expectedLogs.verifyInfo("Job finished with status DONE");
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#FAILED FAILED} state.
+   */
+  @Test
+  public void testFailedJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobExecutionException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testFailedJob-jobId")));
+    createMockRunner(createMockJob("testFailedJob-projectId", "testFailedJob-jobId", State.FAILED))
+        .run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#CANCELLED CANCELLED} state.
+   */
+  @Test
+  public void testCancelledJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobCancelledException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testCancelledJob-jobId")));
+    createMockRunner(
+            createMockJob("testCancelledJob-projectId", "testCancelledJob-jobId", State.CANCELLED))
+        .run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#UPDATED UPDATED} state.
+   */
+  @Test
+  public void testUpdatedJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowJobUpdatedException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testUpdatedJob-jobId")));
+    expectedThrown.expect(ReplacedByJobMatcher.expectReplacedBy(
+        JobIdMatcher.expectJobId("testUpdatedJob-replacedByJobId")));
+    DataflowPipelineJob job =
+        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-jobId", State.UPDATED);
+    DataflowPipelineJob replacedByJob =
+        createMockJob("testUpdatedJob-projectId", "testUpdatedJob-replacedByJobId", State.DONE);
+    when(job.getReplacedByJob()).thenReturn(replacedByJob);
+    createMockRunner(job).run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception
+   * when a job terminates in the {@link State#UNKNOWN UNKNOWN} state, indicating that the
+   * Dataflow service returned a state that the SDK is unfamiliar with (possibly because
it
+   * is an old SDK relative the service).
+   */
+  @Test
+  public void testUnknownJobThrowsException() throws Exception {
+    expectedThrown.expect(IllegalStateException.class);
+    createMockRunner(
+            createMockJob("testUnknownJob-projectId", "testUnknownJob-jobId", State.UNKNOWN))
+        .run(TestPipeline.create());
+  }
+
+  /**
+   * Tests that the {@link BlockingDataflowRunner} throws the appropriate exception
+   * when a job returns a {@code null} state, indicating that it failed to contact the service,
+   * including all of its built-in resilience logic.
+   */
+  @Test
+  public void testNullJobThrowsException() throws Exception {
+    expectedThrown.expect(DataflowServiceException.class);
+    expectedThrown.expect(DataflowJobExceptionMatcher.expectJob(
+        JobIdMatcher.expectJobId("testNullJob-jobId")));
+    createMockRunner(createMockJob("testNullJob-projectId", "testNullJob-jobId", null))
+        .run(TestPipeline.create());
+  }
+
+  @Test
+  public void testToString() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setGcpCredential(new TestCredential());
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setRunner(BlockingDataflowRunner.class);
+    assertEquals("BlockingDataflowRunner#testjobname",
+        BlockingDataflowRunner.fromOptions(options).toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3841f411/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
index cf9a95a..388a85a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrarTest.java
@@ -46,8 +46,8 @@ public class DataflowPipelineRegistrarTest {
 
   @Test
   public void testCorrectRunnersAreReturned() {
-    assertEquals(ImmutableList.of(DataflowPipelineRunner.class,
-                                  BlockingDataflowPipelineRunner.class),
+    assertEquals(ImmutableList.of(DataflowRunner.class,
+                                  BlockingDataflowRunner.class),
         new DataflowPipelineRegistrar.Runner().getPipelineRunners());
   }
 



Mime
View raw message