beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [beam] branch master updated: [BEAM-3247] fix Sample.any performance
Date Tue, 28 Nov 2017 23:38:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f309e4  [BEAM-3247] fix Sample.any performance
     new 1ef170e  This closes #4175: [BEAM-3247] fix Sample.any performance
6f309e4 is described below

commit 6f309e462e48cd2b928564b2d3ed70f53a26b76b
Author: Neville Li <neville@spotify.com>
AuthorDate: Fri Nov 24 16:50:30 2017 -0500

    [BEAM-3247] fix Sample.any performance
---
 .../org/apache/beam/sdk/transforms/Sample.java     | 63 ++++++++++------
 .../org/apache/beam/sdk/transforms/SampleTest.java | 87 +++++++++++++++++++++-
 2 files changed, 125 insertions(+), 25 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index f3bd07a..2eb12d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -27,12 +28,10 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 
 /**
  * {@code PTransform}s for taking samples of the elements in a
@@ -57,10 +56,6 @@ public class Sample {
    * <p>If limit is greater than or equal to the size of the input
    * {@code PCollection}, then all the input's elements will be selected.
    *
-   * <p>All of the elements of the output {@code PCollection} should fit into
-   * main memory of a single worker machine.  This operation does not
-   * run in parallel.
-   *
    * <p>Example of use:
    * <pre> {@code
    * PCollection<String> input = ...;
@@ -149,11 +144,9 @@ public class Sample {
 
     @Override
     public PCollection<T> expand(PCollection<T> in) {
-      PCollectionView<Iterable<T>> iterableView = in.apply(View.<T>asIterable());
-      return in.getPipeline()
-          .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo.of(new SampleAnyDoFn<>(limit, iterableView)).withSideInputs(iterableView))
-          .setCoder(in.getCoder());
+      return in
+          .apply(Combine.globally(new SampleAnyCombineFn<T>(limit)).withoutDefaults())
+          .apply(Flatten.<T>iterables());
     }
 
     @Override
@@ -209,25 +202,49 @@ public class Sample {
   }
 
   /**
-   * A {@link DoFn} that returns up to limit elements from the side input PCollection.
+   * A {@link CombineFn} that combines into a {@link List} of up to limit elements.
    */
-  private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
-    long limit;
-    final PCollectionView<Iterable<T>> iterableView;
+  private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>,
Iterable<T>> {
+    private final long limit;
 
-    public SampleAnyDoFn(long limit, PCollectionView<Iterable<T>> iterableView)
{
+    private SampleAnyCombineFn(long limit) {
       this.limit = limit;
-      this.iterableView = iterableView;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      for (T i : c.sideInput(iterableView)) {
-        if (limit-- <= 0) {
-          break;
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>((int) limit);
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      if (accumulator.size() < limit) {
+        accumulator.add(input);
+      }
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      Iterator<List<T>> iter = accumulators.iterator();
+      if (!iter.hasNext()) {
+        return createAccumulator();
+      }
+      List<T> res = iter.next();
+      while (iter.hasNext()) {
+        for (T t : iter.next()) {
+          res.add(t);
+          if (res.size() >= limit) {
+            return res;
+          }
         }
-        c.output(i);
       }
+      return res;
+    }
+
+    @Override
+    public Iterable<T> extractOutput(List<T> accumulator) {
+      return accumulator;
     }
   }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 80f361f..357f256 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -40,7 +41,13 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -185,7 +192,7 @@ public class SampleTest {
     @SuppressWarnings("rawtypes")
     public static class VerifyCorrectSample<T extends Comparable>
         implements SerializableFunction<Iterable<T>, Void> {
-      private T[] expectedValues;
+      private Object[] expectedValues;
       private int expectedSize;
 
       /**
@@ -203,7 +210,7 @@ public class SampleTest {
        * of elements that the sample may contain.
        */
       VerifyCorrectSample(int expectedSize, Collection<T> expected) {
-        this.expectedValues = (T[]) expected.toArray();
+        this.expectedValues = expected.toArray();
         this.expectedSize = expectedSize;
       }
 
@@ -234,6 +241,82 @@ public class SampleTest {
       }
     }
 
+    private static TimestampedValue<Integer> tv(int i) {
+      return TimestampedValue.of(i, new Instant(i * 1000));
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAny() {
+      PCollection<Integer> input =
+          pipeline
+              .apply(
+                  Create.timestamped(ImmutableList.of(tv(0), tv(1), tv(2), tv(3), tv(4),
tv(5)))
+                      .withCoder(BigEndianIntegerCoder.of()))
+              .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3))));
+      PCollection<Integer> output = input.apply(Sample.<Integer>any(2));
+
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(0), Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(2, Arrays.asList(0, 1, 2)));
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(3000), Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(2, Arrays.asList(3, 4, 5)));
+      pipeline.run();
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAnyEmpty() {
+      PCollection<Integer> input = pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3))))
+          .apply(Sample.<Integer>any(2));
+
+      PAssert.that(output).satisfies(new VerifyCorrectSample<>(0, EMPTY));
+      pipeline.run();
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAnyZero() {
+      PCollection<Integer> input =
+          pipeline.apply(
+              Create.timestamped(ImmutableList.of(tv(0), tv(1), tv(2), tv(3), tv(4), tv(5)))
+                  .withCoder(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3))))
+          .apply(Sample.<Integer>any(0));
+
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(0), Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(0, EMPTY));
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(3000), Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(0, EMPTY));
+      pipeline.run();
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testSampleAnyInsufficientElements() {
+      PCollection<Integer> input = pipeline.apply(Create.empty(BigEndianIntegerCoder.of()));
+      PCollection<Integer> output = input
+          .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3))))
+          .apply(Sample.<Integer>any(10));
+
+      PAssert.that(output)
+          .inWindow(new IntervalWindow(new Instant(0), Duration.standardSeconds(3)))
+          .satisfies(new VerifyCorrectSample<>(0, EMPTY));
+      pipeline.run();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSampleAnyNegative() {
+      pipeline.enableAbandonedNodeEnforcement(false);
+      pipeline.apply(Create.empty(BigEndianIntegerCoder.of())).apply(Sample.<Integer>any(-10));
+    }
+
     @Test
     @Category(ValidatesRunner.class)
     public void testSample() {

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Mime
View raw message