 Diff: flinkcore/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java

@@ 0,0 +1,105 @@
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample without replacement,
each element sample choice is just a bernoulli trail.
+ *
+ * @param <T> The type of sample.
+ */
+public class BernoulliSampler<T> extends RandomSampler<T> {
+
+ private final double fraction;
+ private final Random random;
+
+ /**
+ * Create a bernoulli sampler sample fraction and default random number generator.
+ *
+ * @param fraction sample fraction, aka the bernoulli sampler possibility.
+ */
+ public BernoulliSampler(double fraction) {
+ this(fraction, new Random());
+ }
+
+ /**
+ * Create a bernoulli sampler sample fraction and random number generator seed.
+ *
+ * @param fraction sample fraction, aka the bernoulli sampler possibility.
+ * @param seed random number generator seed.
+ */
+ public BernoulliSampler(double fraction, long seed) {
+ this(fraction, new Random(seed));
+ }
+
+ /**
+ * Create a bernoulli sampler sample fraction and random number generator.
+ *
+ * @param fraction sample fraction, aka the bernoulli sampler possibility.
+ * @param random the random number generator.
+ */
+ public BernoulliSampler(double fraction, Random random) {
+ Preconditions.checkArgument(fraction >= 0 && fraction <= 1.0d, "fraction
fraction must between [0, 1].");
+ this.fraction = fraction;
+ this.random = random;
+ }
+
+ /**
+ * Sample the input elements, for each input element, take a Bernoulli Trail for sample.
+ *
+ * @param input elements to be sampled.
+ * @return the sampled result which is lazy computed upon input elements.
+ */
+ @Override
+ public Iterator<T> sample(final Iterator<T> input) {
+ if (fraction == 0) {
+ return EMPTY_ITERABLE;
+ }
+
+ return new SampledIterator<T>() {
+ T current;
+
+ @Override
+ public boolean hasNext() {
+ if (current == null) {
+ while (input.hasNext()) {
+ T element = input.next();
+ if (random.nextDouble() <= fraction) {
+ current = element;
+ return true;
+ }
+ }
+ current = null;
+ return false;
+ }
+ return false;
I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it should return
`true` if `current != null`.
> Create sample operator for Dataset
> 
>
> Key: FLINK1901
> URL: https://issues.apache.org/jira/browse/FLINK1901
> Project: Flink
> Issue Type: Improvement
> Components: Core
> Reporter: Theodore Vasiloudis
> Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of other machine
learning algorithms we need to have a way to take a random sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, choose the
relative size of the sample, and set a seed for reproducibility.

