beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [beam] branch master updated: [BEAM-9779] Patch HL7v2IOWriteIT Flakiness (#11450)
Date Mon, 27 Apr 2020 21:48:20 GMT
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81d7cbe  [BEAM-9779] Patch HL7v2IOWriteIT Flakiness (#11450)
81d7cbe is described below

commit 81d7cbe53364da65329069152d9d7a66b21df278
Author: Jacob Ferriero <jferriero@google.com>
AuthorDate: Mon Apr 27 14:48:08 2020 -0700

    [BEAM-9779] Patch HL7v2IOWriteIT Flakiness (#11450)
    
    * Patches for HL7v2IO
    
    * Use TestPipeline in ITs
    * Drop schematized data before calling message ingest (should be output only) to help
pipelines that read/write from/to two HL7v2 stores
    * Make HL7v2MessageCoder constructor public
    
    * block on run
    
    * add sleep to avoid flakiness due to asyncronous HL7v2 indexing
    
    * E2E integration test
    
    * fix merge issue
    
    * reconcile double sleeping
    
    * improve error hanlding
    
    * improve error handling
    
    * fix docs typo
    
    * add latency distribution metrics
    
    * remove unused imports
    
    * ingest only data and labels
    
    * fix comment
    
    * call spliterator directly, use page size 1000
    
    * output elements more eagerly in ListHL72MessageFn
    
    * eagerly emit data from early pages
    
    * Optimization of Listing and Stablization of ITs
    
    * allow HL7v2 Message listing to emit early panes rather than waiting on pagination of
all list results
    * add EBO on HL7v2 Message listing reaching a certain expected length in ITs to account
for async indexing BEAM-9779
    
    * revert unrelated changes
    
    * add back test
    
    * Add constant for HL7v2 indexing timeout minutes
    
    * Add constant for HL7v2 indexing timeout minutes
    
    * fix checkstyle
---
 .../beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java  | 63 +-------------------
 .../sdk/io/gcp/healthcare/HL7v2IOTestUtil.java     | 67 ++++++++++++++++++----
 .../beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java | 33 +++++++----
 3 files changed, 79 insertions(+), 84 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
index 1b41223..fb51b08 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
@@ -27,10 +27,8 @@ import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
 import java.security.SecureRandom;
-import java.util.Collections;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.ListHL7v2MessageIDs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -39,6 +37,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -54,6 +53,7 @@ public class HL7v2IOReadIT {
           + "_"
           + (new SecureRandom().nextInt(32))
           + "_read_it";
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void createHL7v2tore() throws IOException {
@@ -87,36 +87,6 @@ public class HL7v2IOReadIT {
   }
 
   @Test
-  public void testHL7v2IORead() throws Exception {
-    // Should read all messages.
-    Pipeline pipeline = Pipeline.create();
-    HL7v2IO.Read.Result result =
-        pipeline
-            .apply(
-                new ListHL7v2MessageIDs(
-                    Collections.singletonList(
-                        healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME)))
-            .apply(HL7v2IO.getAll());
-    PCollection<Long> numReadMessages =
-        result.getMessages().setCoder(new HL7v2MessageCoder()).apply(Count.globally());
-    PAssert.thatSingleton(numReadMessages).isEqualTo((long) MESSAGES.size());
-    PAssert.that(result.getFailedReads()).empty();
-
-    PAssert.that(result.getMessages())
-        .satisfies(
-            input -> {
-              for (HL7v2Message elem : input) {
-                assertFalse(elem.getName().isEmpty());
-                assertFalse(elem.getData().isEmpty());
-                assertFalse(elem.getMessageType().isEmpty());
-              }
-              return null;
-            });
-
-    pipeline.run();
-  }
-
-  @Test
   public void testHL7v2IO_ListHL7v2Messages() throws Exception {
     // Should read all messages.
     Pipeline pipeline = Pipeline.create();
@@ -164,33 +134,4 @@ public class HL7v2IOReadIT {
 
     pipeline.run();
   }
-
-  @Test
-  public void testHL7v2IORead_filtered() throws Exception {
-    final String adtFilter = "messageType = \"ADT\"";
-    // Should read only messages matching the filter.
-    Pipeline pipeline = Pipeline.create();
-    HL7v2IO.Read.Result result =
-        pipeline
-            .apply(
-                new ListHL7v2MessageIDs(
-                    Collections.singletonList(
-                        healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME),
-                    adtFilter))
-            .apply(HL7v2IO.getAll());
-    PCollection<Long> numReadMessages =
-        result.getMessages().setCoder(new HL7v2MessageCoder()).apply(Count.globally());
-    PAssert.thatSingleton(numReadMessages).isEqualTo(NUM_ADT);
-    PAssert.that(result.getFailedReads()).empty();
-
-    PAssert.that(result.getMessages())
-        .satisfies(
-            input -> {
-              for (HL7v2Message elem : input) {
-                assertEquals("ADT", elem.getMessageType());
-              }
-              return null;
-            });
-    pipeline.run();
-  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
index c42d6b1..fc56280 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
@@ -18,19 +18,25 @@
 package org.apache.beam.sdk.io.gcp.healthcare;
 
 import com.google.api.client.util.Base64;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.healthcare.v1beta1.model.Message;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HL7v2MessagePages;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 class HL7v2IOTestUtil {
+  public static final long HL7V2_INDEXING_TIMEOUT_MINUTES = 10L;
   /** Google Cloud Healthcare Dataset in Apache Beam integration test project. */
   public static final String HEALTHCARE_DATASET_TEMPLATE =
       "projects/%s/locations/us-central1/datasets/apache-beam-integration-testing";
@@ -81,20 +87,58 @@ class HL7v2IOTestUtil {
   /** Clear all messages from the HL7v2 store. */
   static void deleteAllHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
       throws IOException {
-    for (String msgId :
-        client
-            .getHL7v2MessageStream(hl7v2Store)
-            .map(HL7v2Message::getName)
-            .collect(Collectors.toList())) {
-      client.deleteHL7v2Message(msgId);
+    for (List<HL7v2Message> page : new HL7v2MessagePages(client, hl7v2Store)) {
+      for (String msgId : page.stream().map(HL7v2Message::getName).collect(Collectors.toList()))
{
+        client.deleteHL7v2Message(msgId);
+      }
+    }
+  }
+
+  /** Utiliy for waiting on HL7v2 Store indexing to be complete see BEAM-9779. */
+  public static void waitForHL7v2Indexing(
+      HealthcareApiClient client, String hl7v2Store, long expectedNumMessages, Duration timeout)
+      throws InterruptedException, TimeoutException {
+
+    Instant start = Instant.now();
+    long sleepMs = 50;
+    long numListedMessages = 0;
+    while (new Duration(start, Instant.now()).isShorterThan(timeout)) {
+      numListedMessages = 0;
+      // count messages in HL7v2 Store.
+      for (List<HL7v2Message> page :
+          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store)) {
+        numListedMessages += page.size();
+      }
+      if (numListedMessages == expectedNumMessages) {
+        return;
+      }
+      // exponential backoff.
+      sleepMs *= 2;
+      // exit if next sleep will violate timeout
+      if (new Duration(start, Instant.now()).plus(sleepMs).isShorterThan(timeout)) {
+        Sleeper.DEFAULT.sleep(sleepMs);
+      } else {
+        throw new TimeoutException(
+            String.format(
+                "Timed out waiting for %s to reach %s messages. last list request returned
%s messages.",
+                hl7v2Store, expectedNumMessages, numListedMessages));
+      }
     }
   }
 
   /** Populate the test messages into the HL7v2 store. */
-  static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store) throws IOException
{
+  static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
+      throws IOException, InterruptedException, TimeoutException {
     for (HL7v2Message msg : MESSAGES) {
       client.createHL7v2Message(hl7v2Store, msg.toModel());
     }
+    // [BEAM-9779] HL7v2 indexing is asyncronous. Block until indexing completes to stabilize
this
+    // IT.
+    HL7v2IOTestUtil.waitForHL7v2Indexing(
+        client,
+        hl7v2Store,
+        MESSAGES.size(),
+        Duration.standardMinutes(HL7V2_INDEXING_TIMEOUT_MINUTES));
   }
 
   /**
@@ -170,10 +214,11 @@ class HL7v2IOTestUtil {
     public void listMessages(ProcessContext context) throws IOException {
       String hl7v2Store = context.element();
       // Output all elements of all pages.
-      this.client
-          .getHL7v2MessageStream(hl7v2Store, this.filter)
-          .map(HL7v2Message::getName)
-          .forEach(context::output);
+      HttpHealthcareApiClient.HL7v2MessagePages pages =
+          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+      for (List<HL7v2Message> page : pages) {
+        page.stream().map(HL7v2Message::getName).forEach(context::output);
+      }
     }
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
index d261d96..f52d607 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
@@ -18,22 +18,25 @@
 package org.apache.beam.sdk.io.gcp.healthcare;
 
 import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HL7V2_INDEXING_TIMEOUT_MINUTES;
 import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.MESSAGES;
 import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.deleteAllHL7v2Messages;
-import static org.junit.Assert.assertEquals;
 
+import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
 import java.io.IOException;
 import java.security.SecureRandom;
-import org.apache.beam.sdk.Pipeline;
+import java.util.concurrent.TimeoutException;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -46,12 +49,15 @@ public class HL7v2IOWriteIT {
   private static final String HL7V2_STORE_NAME =
       "hl7v2_store_write_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));
 
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
   @BeforeClass
   public static void createHL7v2tore() throws IOException {
     String project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
     healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
     HealthcareApiClient client = new HttpHealthcareApiClient();
-    client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
+    Hl7V2Store store = client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
+    store.getParserConfig();
   }
 
   @AfterClass
@@ -65,7 +71,6 @@ public class HL7v2IOWriteIT {
     if (client == null) {
       client = new HttpHealthcareApiClient();
     }
-    PipelineOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
   }
 
   @After
@@ -74,8 +79,7 @@ public class HL7v2IOWriteIT {
   }
 
   @Test
-  public void testHL7v2IOWrite() throws IOException {
-    Pipeline pipeline = Pipeline.create();
+  public void testHL7v2IOWrite() throws Exception {
     HL7v2IO.Write.Result result =
         pipeline
             .apply(Create.of(MESSAGES).withCoder(new HL7v2MessageCoder()))
@@ -84,10 +88,15 @@ public class HL7v2IOWriteIT {
     PAssert.that(result.getFailedInsertsWithErr()).empty();
 
     pipeline.run().waitUntilFinish();
-    long numWrittenMessages =
-        client
-            .getHL7v2MessageStream(healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME)
-            .count();
-    assertEquals(MESSAGES.size(), numWrittenMessages);
+
+    try {
+      HL7v2IOTestUtil.waitForHL7v2Indexing(
+          client,
+          healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME,
+          MESSAGES.size(),
+          Duration.standardMinutes(HL7V2_INDEXING_TIMEOUT_MINUTES));
+    } catch (TimeoutException e) {
+      Assert.fail(e.getMessage());
+    }
   }
 }


Mime
View raw message