beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [beam] branch master updated: [BEAM-6257] PAssert.thatSingleton: use GBK instead of side inputs
Date Tue, 26 Mar 2019 10:20:17 GMT
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9d5d9f  [BEAM-6257] PAssert.thatSingleton: use GBK instead of side inputs
     new 53fbfae  Merge pull request #8094: [BEAM-6257] PAssert.thatSingleton: use GBK instead
of side inputs
e9d5d9f is described below

commit e9d5d9f9cb3c9f06ea6b8afabfaa707ef80039e2
Author: Kyle Weaver <kcweaver@google.com>
AuthorDate: Tue Mar 19 14:23:57 2019 -0700

    [BEAM-6257] PAssert.thatSingleton: use GBK instead of side inputs
---
 .../apache/beam/runners/apex/ApexRunnerTest.java   |   2 +-
 .../java/org/apache/beam/sdk/testing/PAssert.java  | 139 +++++++++++++++++++--
 .../org/apache/beam/sdk/testing/PAssertTest.java   |   8 +-
 3 files changed, 131 insertions(+), 18 deletions(-)

diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
index 69cdf47..539fc75 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
@@ -82,7 +82,7 @@ public class ApexRunnerTest {
     ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
     DAG dag = TestApexRunner.translate(p, options);
 
-    String[] expectedThreadLocal = {"/CreateActual/FilterActuals/Window.Assign"};
+    String[] expectedThreadLocal = {"/GroupGlobally/RewindowActuals/Window.Assign"};
     Set<String> actualThreadLocal = Sets.newHashSet();
     for (DAG.StreamMeta sm : dag.getAllStreamsMeta()) {
       DAG.OutputPortMeta opm = sm.getSource();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 0048ed3..e1016cf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -427,8 +428,7 @@ public class PAssert {
    * PCollection<T>} with the specified reason. The provided PCollection must be a
singleton.
    */
   public static <T> SingletonAssert<T> thatSingleton(String reason, PCollection<T>
actual) {
-    return new PCollectionViewAssert<>(
-        actual, View.asSingleton(), actual.getCoder(), PAssertionSite.capture(reason));
+    return new PCollectionSingletonAssert<>(actual, PAssertionSite.capture(reason));
   }
 
   /**
@@ -783,6 +783,121 @@ public class PAssert {
   }
 
   /**
+   * A {@link SingletonAssert} about the contents of a {@link PCollection} when it contains
a single
+   * value of type {@code T}. This does not require the runner to support side inputs.
+   */
+  private static class PCollectionSingletonAssert<T> implements SingletonAssert<T>
{
+    private final PCollection<T> actual;
+    private final Coder<T> coder;
+    private final AssertionWindows rewindowingStrategy;
+    private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor;
+
+    private final PAssertionSite site;
+
+    PCollectionSingletonAssert(PCollection<T> actual, PAssertionSite site) {
+      this(actual, IntoGlobalWindow.of(), PaneExtractors.allPanes(), site);
+    }
+
+    PCollectionSingletonAssert(
+        PCollection<T> actual,
+        AssertionWindows rewindowingStrategy,
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor,
+        PAssertionSite site) {
+      this.actual = actual;
+      this.coder = actual.getCoder();
+      this.rewindowingStrategy = rewindowingStrategy;
+      this.paneExtractor = paneExtractor;
+      this.site = site;
+    }
+
+    @Override
+    public PCollectionSingletonAssert<T> inFinalPane(BoundedWindow window) {
+      return withPanes(window, PaneExtractors.finalPane());
+    }
+
+    @Override
+    public PCollectionSingletonAssert<T> inOnTimePane(BoundedWindow window) {
+      return withPanes(window, PaneExtractors.onTimePane());
+    }
+
+    @Override
+    public PCollectionSingletonAssert<T> inEarlyPane(BoundedWindow window) {
+      return withPanes(window, PaneExtractors.earlyPanes());
+    }
+
+    @Override
+    public SingletonAssert<T> isEqualTo(T expected) {
+      return satisfies(new AssertIsEqualToRelation<>(), expected);
+    }
+
+    @Override
+    public SingletonAssert<T> notEqualTo(T notExpected) {
+      return satisfies(new AssertNotEqualToRelation<>(), notExpected);
+    }
+
+    @Override
+    public PCollectionSingletonAssert<T> inOnlyPane(BoundedWindow window) {
+      return withPanes(window, PaneExtractors.onlyPane(site));
+    }
+
+    private PCollectionSingletonAssert<T> withPanes(
+        BoundedWindow window,
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor) {
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder<BoundedWindow> windowCoder =
+          (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
+      return new PCollectionSingletonAssert<>(
+          actual, IntoStaticWindows.of(windowCoder, window), paneExtractor, site);
+    }
+
+    @Override
+    public PCollectionSingletonAssert<T> satisfies(SerializableFunction<T, Void>
checkerFn) {
+      actual.apply(
+          "PAssert$" + (assertCount++),
+          new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy, paneExtractor,
site));
+      return this;
+    }
+
+    /**
+     * Applies an {@link AssertRelation} to check the provided relation against the value
of this
+     * assert and the provided expected value.
+     *
+     * <p>Returns this {@code SingletonAssert}.
+     */
+    private PCollectionSingletonAssert<T> satisfies(
+        AssertRelation<T, T> relation, final T expected) {
+      return satisfies(new CheckRelationAgainstExpected<>(relation, expected, coder));
+    }
+
+    /**
+     * @throws UnsupportedOperationException always
+     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If
you meant
+     *     to test PCollection equality, use {@link #isEqualTo} instead.
+     */
+    @SuppressFBWarnings("EQ_UNUSUAL")
+    @Deprecated
+    @Override
+    public boolean equals(Object o) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "tests for Java equality of the %s object, not the PCollection in question.
"
+                  + "Call a test method, such as isEqualTo.",
+              getClass().getSimpleName()));
+    }
+
+    /**
+     * @throws UnsupportedOperationException always.
+     * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects.
+     */
+    @Deprecated
+    @Override
+    public int hashCode() {
+      throw new UnsupportedOperationException(
+          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
+    }
+  }
+
+  /**
    * An assertion about the contents of a {@link PCollection} when it is viewed as a single
value of
    * type {@code ViewT}. This requires side input support from the runner.
    */
@@ -1113,22 +1228,20 @@ public class PAssert {
   }
 
   /**
-   * A transform that applies an assertion-checking function to a single iterable contained
as the
-   * sole element of a {@link PCollection}.
+   * A transform that applies an assertion-checking function to the sole element of a {@link
+   * PCollection}.
    */
-  public static class GroupThenAssertForSingleton<T>
-      extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable
{
-    private final SerializableFunction<Iterable<T>, Void> checkerFn;
+  public static class GroupThenAssertForSingleton<T> extends PTransform<PCollection<T>,
PDone>
+      implements Serializable {
+    private final SerializableFunction<T, Void> checkerFn;
     private final AssertionWindows rewindowingStrategy;
-    private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>,
Iterable<Iterable<T>>>
-        paneExtractor;
+    private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor;
     private final PAssertionSite site;
 
     private GroupThenAssertForSingleton(
-        SerializableFunction<Iterable<T>, Void> checkerFn,
+        SerializableFunction<T, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
-        SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>>
-            paneExtractor,
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor,
         PAssertionSite site) {
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
@@ -1137,7 +1250,7 @@ public class PAssert {
     }
 
     @Override
-    public PDone expand(PCollection<Iterable<T>> input) {
+    public PDone expand(PCollection<T> input) {
       input
           .apply("GroupGlobally", new GroupGlobally<>(rewindowingStrategy))
           .apply("GetPane", MapElements.via(paneExtractor))
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 408b84b..a6f736b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -301,7 +301,7 @@ public class PAssertTest implements Serializable {
 
   /** Basic test for {@code isEqualTo}. */
   @Test
-  @Category({ValidatesRunner.class, UsesSideInputs.class})
+  @Category({ValidatesRunner.class})
   public void testIsEqualTo() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).isEqualTo(43);
@@ -348,7 +348,7 @@ public class PAssertTest implements Serializable {
 
   /** Basic test for {@code notEqualTo}. */
   @Test
-  @Category({ValidatesRunner.class, UsesSideInputs.class})
+  @Category({ValidatesRunner.class})
   public void testNotEqualTo() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).notEqualTo(42);
@@ -357,7 +357,7 @@ public class PAssertTest implements Serializable {
 
   /** Test that we throw an error for false assertion on singleton. */
   @Test
-  @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class})
+  @Category({ValidatesRunner.class, UsesFailureMessage.class})
   public void testPAssertEqualsSingletonFalse() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44);
@@ -373,7 +373,7 @@ public class PAssertTest implements Serializable {
 
   /** Test that we throw an error for false assertion on singleton. */
   @Test
-  @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class})
+  @Category({ValidatesRunner.class, UsesFailureMessage.class})
   public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception {
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton(pcollection).isEqualTo(44);


Mime
View raw message