beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: Parameterize StreamingDataflowWorkerTest so we run all tests with/without streaming engine. I had to implement FakeServer.getDataStream. This is super basic at the moment. Some tests are skipped in streaming engine mode, I left TODO to go back and revisit those.
Date Fri, 22 Feb 2019 01:19:49 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 18370fb  Parameterize StreamingDataflowWorkerTest so we run all tests with/without
streaming engine. I had to implement FakeServer.getDataStream. This is super basic at the
moment. Some tests are skipped in streaming engine mode, I left TODO to go back and revisit
those.
     new c0f2040  Merge pull request #7908 from drieber/test_streaming_engine
18370fb is described below

commit 18370fb6d227b20dd962cfabbbb7b3d1c5504948
Author: David Rieber <drieber@google.com>
AuthorDate: Wed Feb 20 13:46:56 2019 -0800

    Parameterize StreamingDataflowWorkerTest so we run all tests with/without streaming engine.
I had to implement FakeServer.getDataStream. This is super basic at the moment. Some tests
are skipped in streaming engine mode, I left TODO to go back and revisit those.
---
 .../worker/windmill/GrpcWindmillServer.java        |  5 --
 .../worker/windmill/WindmillServerStub.java        |  7 --
 .../dataflow/worker/FakeWindmillServer.java        | 71 ++++++++++++++++----
 .../worker/StreamingDataflowWorkerTest.java        | 77 ++++++++++------------
 4 files changed, 91 insertions(+), 69 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index a9bb644..7ded86d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -735,11 +735,6 @@ public class GrpcWindmillServer extends WindmillServerStub {
     }
 
     @Override
-    public final void awaitTermination() throws InterruptedException {
-      finishLatch.await();
-    }
-
-    @Override
     public final boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException
{
       return finishLatch.await(time, unit);
     }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index 3fe8be8..230cf80 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -103,13 +103,6 @@ public abstract class WindmillServerStub implements StatusDataProvider
{
     /** Indicates that no more requests will be sent. */
     void close();
 
-    /**
-     * Waits for the server to close its end of the connection.
-     *
-     * <p>Should only be called after calling close.
-     */
-    void awaitTermination() throws InterruptedException;
-
     /** Waits for the server to close its end of the connection, with timeout. */
     boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException;
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index 1cff3a1..66c9cf0 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -24,13 +24,11 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -220,11 +218,6 @@ class FakeWindmillServer extends WindmillServerStub {
       }
 
       @Override
-      public void awaitTermination() throws InterruptedException {
-        done.await();
-      }
-
-      @Override
       public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException
{
         return done.await(time, unit);
       }
@@ -238,7 +231,62 @@ class FakeWindmillServer extends WindmillServerStub {
 
   @Override
   public GetDataStream getDataStream() {
-    throw new UnsupportedOperationException();
+    Instant startTime = Instant.now();
+    return new GetDataStream() {
+      @Override
+      public Windmill.KeyedGetDataResponse requestKeyedData(
+          String computation, KeyedGetDataRequest request) {
+        Windmill.GetDataRequest getDataRequest =
+            GetDataRequest.newBuilder()
+                .addRequests(
+                    ComputationGetDataRequest.newBuilder()
+                        .setComputationId(computation)
+                        .addRequests(request)
+                        .build())
+                .build();
+        GetDataResponse getDataResponse = getData(getDataRequest);
+        if (getDataResponse.getDataList().isEmpty()) {
+          return null;
+        }
+        assertEquals(1, getDataResponse.getDataCount());
+        if (getDataResponse.getData(0).getDataList().isEmpty()) {
+          return null;
+        }
+        assertEquals(1, getDataResponse.getData(0).getDataCount());
+        return getDataResponse.getData(0).getData(0);
+      }
+
+      @Override
+      public Windmill.GlobalData requestGlobalData(Windmill.GlobalDataRequest request) {
+        Windmill.GetDataRequest getDataRequest =
+            GetDataRequest.newBuilder().addGlobalDataFetchRequests(request).build();
+        GetDataResponse getDataResponse = getData(getDataRequest);
+        if (getDataResponse.getGlobalDataList().isEmpty()) {
+          return null;
+        }
+        assertEquals(1, getDataResponse.getGlobalDataCount());
+        return getDataResponse.getGlobalData(0);
+      }
+
+      @Override
+      public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> active)
{}
+
+      @Override
+      public void close() {}
+
+      @Override
+      public boolean awaitTermination(int time, TimeUnit unit) {
+        return true;
+      }
+
+      @Override
+      public void closeAfterDefaultTimeout() {}
+
+      @Override
+      public Instant startTime() {
+        return startTime;
+      }
+    };
   }
 
   @Override
@@ -267,9 +315,6 @@ class FakeWindmillServer extends WindmillServerStub {
       public void close() {}
 
       @Override
-      public void awaitTermination() {}
-
-      @Override
       public boolean awaitTermination(int time, TimeUnit unit) {
         return true;
       }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index f441fc0..fab5cdb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -152,7 +152,7 @@ import org.junit.rules.ErrorCollector;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 import org.junit.runners.model.Statement;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -160,8 +160,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Unit tests for {@link StreamingDataflowWorker}. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class StreamingDataflowWorkerTest {
+  private final boolean streamingEngine;
+
+  @Parameterized.Parameters(name = "{index}: [streamingEngine={0}]")
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {{false}, {true}});
+  }
+
+  public StreamingDataflowWorkerTest(Boolean streamingEngine) {
+    this.streamingEngine = streamingEngine;
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorkerTest.class);
 
   private static final IntervalWindow DEFAULT_WINDOW =
@@ -593,8 +604,13 @@ public class StreamingDataflowWorkerTest {
 
   private StreamingDataflowWorkerOptions createTestingPipelineOptions(
       FakeWindmillServer server, String... args) {
+    List<String> argsList = Lists.newArrayList(args);
+    if (streamingEngine) {
+      argsList.add("--experiments=enable_streaming_engine");
+    }
     StreamingDataflowWorkerOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(StreamingDataflowWorkerOptions.class);
+        PipelineOptionsFactory.fromArgs(argsList.toArray(new String[0]))
+            .as(StreamingDataflowWorkerOptions.class);
     options.setAppName("StreamingWorkerHarnessTest");
     options.setJobId("test_job_id");
     options.setStreaming(true);
@@ -650,45 +666,7 @@ public class StreamingDataflowWorkerTest {
   }
 
   @Test
-  public void testBasicWindmillServiceHarness() throws Exception {
-    List<ParallelInstruction> instructions =
-        Arrays.asList(
-            makeSourceInstruction(StringUtf8Coder.of()),
-            makeSinkInstruction(StringUtf8Coder.of(), 0));
-
-    FakeWindmillServer server = new FakeWindmillServer(errorCollector);
-    server.setIsReady(false);
-
-    StreamingConfigTask streamingConfig = new StreamingConfigTask();
-    streamingConfig.setStreamingComputationConfigs(
-        ImmutableList.of(makeDefaultStreamingComputationConfig(instructions)));
-    streamingConfig.setWindmillServiceEndpoint("foo");
-    WorkItem workItem = new WorkItem();
-    workItem.setStreamingConfigTask(streamingConfig);
-    when(mockWorkUnitClient.getGlobalStreamingConfigWorkItem()).thenReturn(Optional.of(workItem));
-
-    StreamingDataflowWorkerOptions options =
-        createTestingPipelineOptions(server, "--experiments=enable_windmill_service");
-    StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters
*/);
-    worker.start();
-
-    final int numIters = 2000;
-    for (int i = 0; i < numIters; ++i) {
-      server.addWorkToOffer(makeInput(i, TimeUnit.MILLISECONDS.toMicros(i)));
-    }
-
-    Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(numIters);
-    worker.stop();
-
-    for (int i = 0; i < numIters; ++i) {
-      assertTrue(result.containsKey((long) i));
-      assertEquals(
-          makeExpectedOutput(i, TimeUnit.MILLISECONDS.toMicros(i)).build(), result.get((long)
i));
-    }
-  }
-
-  @Test
-  public void testBasicWindmillServiceAsStreamingEngineHarness() throws Exception {
+  public void testBasic() throws Exception {
     List<ParallelInstruction> instructions =
         Arrays.asList(
             makeSourceInstruction(StringUtf8Coder.of()),
@@ -705,8 +683,7 @@ public class StreamingDataflowWorkerTest {
     workItem.setStreamingConfigTask(streamingConfig);
     when(mockWorkUnitClient.getGlobalStreamingConfigWorkItem()).thenReturn(Optional.of(workItem));
 
-    StreamingDataflowWorkerOptions options =
-        createTestingPipelineOptions(server, "--experiments=enable_streaming_engine");
+    StreamingDataflowWorkerOptions options = createTestingPipelineOptions(server);
     StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters
*/);
     worker.start();
 
@@ -887,6 +864,10 @@ public class StreamingDataflowWorkerTest {
 
   @Test
   public void testKeyTokenInvalidException() throws Exception {
+    if (streamingEngine) {
+      // TODO: This test needs to be adapted to work with streamingEngine=true.
+      return;
+    }
     KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
 
     List<ParallelInstruction> instructions =
@@ -1092,6 +1073,10 @@ public class StreamingDataflowWorkerTest {
 
   @Test(timeout = 30000)
   public void testExceptions() throws Exception {
+    if (streamingEngine) {
+      // TODO: This test needs to be adapted to work with streamingEngine=true.
+      return;
+    }
     List<ParallelInstruction> instructions =
         Arrays.asList(
             makeSourceInstruction(StringUtf8Coder.of()),
@@ -2422,6 +2407,10 @@ public class StreamingDataflowWorkerTest {
 
   @Test
   public void testActiveWorkRefresh() throws Exception {
+    if (streamingEngine) {
+      // TODO: This test needs to be adapted to work with streamingEngine=true.
+      return;
+    }
     List<ParallelInstruction> instructions =
         Arrays.asList(
             makeSourceInstruction(StringUtf8Coder.of()),


Mime
View raw message