http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java
new file mode 100644
index 0000000..48deaad
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Performs a simple random sample using an in-memory reservoir to produce
+ * a uniformly random sample of a given size.
+ *
+ * <p>
+ * This is similar to {@link SimpleRandomSample}, however it is guaranteed to produce
+ * a sample of the given size. This comes at the cost of scalability.
+ * {@link SimpleRandomSample} produces a sample of the desired size with likelihood of 99.99%,
+ * while using less internal storage. ReservoirSample on the other hand uses internal storage
+ * with size equaling the desired sample to guarantee the exact sample size.
+ * </p>
+ *
+ * <p>
+ * This algebraic implementation is backed by a heap and maintains the original roll in order
+ * to compensate for skew.
+ * </p>
+ *
+ * @author wvaughan
+ *
+ */
+@Nondeterministic
+public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Algebraic
+{
+ protected Integer numSamples;
+
+ private Reservoir reservoir;
+
+ protected ScoredTuple.ScoreGenerator scoreGen;
+
+ private Reservoir getReservoir()
+ {
+ if (reservoir == null) {
+ reservoir = new Reservoir(this.numSamples);
+ }
+ return reservoir;
+ }
+
+ public ReservoirSample(String numSamples)
+ {
+ this.numSamples = Integer.parseInt(numSamples);
+ }
+
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(this.scoreGen == null)
+ {
+ this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+ }
+ return this.scoreGen;
+ }
+
+ @Override
+ public void accumulate(Tuple input) throws IOException
+ {
+ DataBag samples = (DataBag) input.get(0);
+ ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
+ for (Tuple sample : samples) {
+ getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
+ }
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.reservoir.clear();
+ }
+
+ @Override
+ public DataBag getValue()
+ {
+ DataBag output = BagFactory.getInstance().newDefaultBag();
+ for (ScoredTuple sample : getReservoir()) {
+ output.add(sample.getTuple());
+ }
+ return output;
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ DataBag samples = (DataBag)input.get(0);
+ if (samples.size() <= numSamples) {
+ return samples;
+ }
+ else
+ {
+ return super.exec(input);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG) {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+ inputFieldSchema.schema, DataType.BAG));
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ String param = null;
+ private String getParam()
+ {
+ if (param == null) {
+ if (numSamples != null) {
+ param = String.format("('%d')", numSamples);
+ } else {
+ param = "";
+ }
+ }
+ return param;
+ }
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName()+getParam();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermediate.class.getName()+getParam();
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName()+getParam();
+ }
+
+ static public class Initial extends EvalFunc<Tuple>
+ {
+ int numSamples;
+ private Reservoir reservoir;
+ protected ScoredTuple.ScoreGenerator scoreGen;
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ public Initial(){}
+
+ public Initial(String numSamples)
+ {
+ this.numSamples = Integer.parseInt(numSamples);
+ }
+
+ private Reservoir getReservoir()
+ {
+ if (reservoir == null) {
+ reservoir = new Reservoir(this.numSamples);
+ }
+ return reservoir;
+ }
+
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(this.scoreGen == null)
+ {
+ this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+ }
+ return this.scoreGen;
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ DataBag output = BagFactory.getInstance().newDefaultBag();
+
+ ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
+
+ DataBag samples = (DataBag) input.get(0);
+ if (samples == null)
+ {
+ // do nothing
+ }
+ else if (samples.size() <= numSamples) {
+ // no need to construct a reservoir, so just emit intermediate tuples
+ for (Tuple sample : samples) {
+ // add the score on to the intermediate tuple
+ output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
+ }
+ } else {
+ getReservoir().clear();
+
+ for (Tuple sample : samples) {
+ getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
+ }
+
+ for (ScoredTuple scoredTuple : getReservoir()) {
+ // add the score on to the intermediate tuple
+ output.add(scoredTuple.getIntermediateTuple(tupleFactory));
+ }
+ }
+
+ return tupleFactory.newTuple(output);
+ }
+
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple>
+ {
+ int numSamples;
+ private Reservoir reservoir;
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ public Intermediate(){}
+
+ public Intermediate(String numSamples)
+ {
+ this.numSamples = Integer.parseInt(numSamples);
+ }
+
+ private Reservoir getReservoir()
+ {
+ if (reservoir == null) {
+ reservoir = new Reservoir(this.numSamples);
+ }
+ return reservoir;
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ getReservoir().clear();
+
+ DataBag bagOfSamples = (DataBag) input.get(0);
+ for (Tuple innerTuple : bagOfSamples) {
+ DataBag samples = (DataBag) innerTuple.get(0);
+
+ for (Tuple sample : samples) {
+ // use the same score as previously generated
+ getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+ }
+ }
+
+ DataBag output = BagFactory.getInstance().newDefaultBag();
+ for (ScoredTuple scoredTuple : getReservoir()) {
+ // add the score on to the intermediate tuple
+ output.add(scoredTuple.getIntermediateTuple(tupleFactory));
+ }
+
+ return tupleFactory.newTuple(output);
+ }
+
+ }
+
+ static public class Final extends EvalFunc<DataBag>
+ {
+ int numSamples;
+ private Reservoir reservoir;
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ public Final(){}
+
+ public Final(String numSamples)
+ {
+ this.numSamples = Integer.parseInt(numSamples);
+ }
+
+ private Reservoir getReservoir()
+ {
+ if (reservoir == null) {
+ reservoir = new Reservoir(this.numSamples);
+ }
+ return reservoir;
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ getReservoir().clear();
+
+ DataBag bagOfSamples = (DataBag) input.get(0);
+ for (Tuple innerTuple : bagOfSamples) {
+ DataBag samples = (DataBag) innerTuple.get(0);
+
+ for (Tuple sample : samples) {
+ // use the same score as previously generated
+ getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+ }
+ }
+
+ DataBag output = BagFactory.getInstance().newDefaultBag();
+ for (ScoredTuple scoredTuple : getReservoir()) {
+ // output the original tuple
+ output.add(scoredTuple.getTuple());
+ }
+
+ return output;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java b/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java
new file mode 100644
index 0000000..90ea576
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Provides a way of sampling tuples based on certain fields.
+ * This is essentially equivalent to grouping on the fields, applying SAMPLE,
+ * and then flattening. It is much more efficient though because it does not require
+ * a reduce step.
+ *
+ * <p>
+ * The method of sampling is to convert the key to a hash, derive a double value
+ * from this, and then test this against a supplied probability. The double value
+ * derived from a key is uniformly distributed between 0 and 1.
+ * </p>
+ *
+ * <p>
+ * The only required parameter is the sampling probability. This may be followed
+ * by an optional seed value to control the random number generation.
+ * </p>
+ *
+ * <p>
+ * SampleByKey will work deterministically as long as the same seed is provided.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.5');
+ *
+ *-- input: (A,1), (A,2), (A,3), (B,1), (B,3)
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ * output = FILTER data BY SampleByKey(A_id);
+ *
+ * --output: (B,1), (B,3)
+ * }
+ *
+ * </pre>
+ * </p>
+ * @author evion
+ *
+ */
+
+public class SampleByKey extends FilterFunc
+{
+ final static int PRIME_NUMBER = 31;
+
+ Integer seed = null;
+ double probability;
+
+ public SampleByKey(String probability) {
+ this.probability = Double.parseDouble(probability);
+ }
+
+ public SampleByKey(String probability, String salt) {
+ this(probability);
+ this.seed = salt.hashCode();
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature)
+ {
+ if (this.seed == null)
+ this.seed = signature.hashCode();
+ super.setUDFContextSignature(signature);
+ }
+
+ @Override
+ public Boolean exec(Tuple input) throws IOException
+ {
+ int hashCode = 0;
+ for(int i=0; i<input.size(); i++) {
+ Object each = input.get(i);
+ hashCode = hashCode*PRIME_NUMBER + each.hashCode();
+ }
+
+ try {
+ return intToRandomDouble(hashCode) <= probability;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Exception on intToRandomDouble");
+ }
+ }
+
+ private Double intToRandomDouble(int input) throws Exception
+ {
+ MessageDigest hasher = MessageDigest.getInstance("sha-1");
+
+ ByteBuffer b = ByteBuffer.allocate(4+4);
+ ByteBuffer b2 = ByteBuffer.allocate(20);
+
+ b.putInt(seed);
+ b.putInt(input);
+ byte[] digest = hasher.digest(b.array());
+ b.clear();
+
+ b2.put(digest);
+ b2.rewind();
+ double result = (((double)b2.getInt())/Integer.MAX_VALUE + 1)/2;
+ b2.clear();
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java b/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java
new file mode 100644
index 0000000..c793584
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+class ScoredTuple implements Comparable<ScoredTuple>
+{
+ Double score;
+ private Tuple tuple;
+
+ public ScoredTuple()
+ {
+
+ }
+
+ public ScoredTuple(Double score, Tuple tuple)
+ {
+ this.score = score;
+ this.setTuple(tuple);
+ }
+
+ public Double getScore() {
+ return score;
+ }
+
+ public void setScore(Double score) {
+ this.score = score;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+
+ public void setTuple(Tuple tuple) {
+ this.tuple = tuple;
+ }
+
+ public Tuple getIntermediateTuple(TupleFactory tupleFactory)
+ {
+ Tuple intermediateTuple = tupleFactory.newTuple(2);
+ try {
+ intermediateTuple.set(0, score);
+ intermediateTuple.set(1, tuple);
+ }
+ catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+
+ return intermediateTuple;
+ }
+
+ public static ScoredTuple fromIntermediateTuple(Tuple intermediateTuple) throws ExecException
+ {
+ //Double score = ((Number)intermediateTuple.get(0)).doubleValue();
+ try {
+ Double score = (Double)intermediateTuple.get(0);
+ Tuple originalTuple = (Tuple)intermediateTuple.get(1);
+ return new ScoredTuple(score, originalTuple);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot deserialize intermediate tuple: "+intermediateTuple.toString(), e);
+ }
+ }
+
+ @Override
+ public int compareTo(ScoredTuple o) {
+ if (score == null) {
+ if (o == null) return 0;
+ else return -1;
+ }
+ return score.compareTo(o.score);
+ }
+
+ static interface ScoreGenerator
+ {
+ double generateScore(Tuple sample) throws ExecException;
+ }
+
+ static class PureRandomScoreGenerator implements ScoreGenerator
+ {
+ public PureRandomScoreGenerator(){}
+
+ public double generateScore(Tuple sample)
+ {
+ return Math.random();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java
new file mode 100644
index 0000000..8e8debf
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Scalable simple random sampling (ScaSRS).
+ * <p/>
+ * This UDF implements a scalable simple random sampling algorithm described in
+ *
+ * <pre>
+ * X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
+ * </pre>
+ *
+ * It takes a bag of n items and a sampling probability p as the inputs, and outputs a
+ * simple random sample of size exactly ceil(p*n) in a bag, with probability at least
+ * 99.99%. For example, the following script generates a simple random sample with
+ * sampling probability 0.1:
+ *
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * item = LOAD 'input' AS (x:double);
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01));
+ * </pre>
+ *
+ * Optionally, user can provide a good lower bound of n as the third argument to help
+ * reduce the size of intermediate data, for example:
+ *
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * item = LOAD 'input' AS (x:double);
+ * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01, summary.count));
+ * </pre>
+ *
+ * This UDF is very useful for stratified sampling. For example, the following script
+ * keeps all positive examples while downsampling negatives with probability 0.1:
+ *
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * item = LOAD 'input' AS (x:double, label:int);
+ * grouped = FOREACH (GROUP item BY label) GENERATE item, (group == 1 ? 1.0 : 0.1) AS p;
+ * sampled = FOREACH grouped GENERATE FLATTEN(SRS(item, p));
+ * </pre>
+ *
+ * In a Java Hadoop MapReduce job, we can output selected items directly using
+ * MultipleOutputs. However, this feature is not available in a Pig UDF. So we still let
+ * selected items go through the sort phase. However, as long as the sample size is not
+ * huge, this should not be a big problem.
+ *
+ * In the first version, the sampling probability is specified in the constructor. This
+ * method is deprecated now and will be removed in the next release.
+ *
+ * @author ximeng
+ *
+ */
+public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
+{
+ /**
+ * Prefix for the output bag name.
+ */
+ public static final String OUTPUT_BAG_NAME_PREFIX = "SRS";
+
+ private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
+
+ public SimpleRandomSample()
+ {
+ // empty
+ }
+
+ /**
+ * Constructs this UDF with a sampling probability.
+ *
+ * @deprecated Should specify the sampling probability in the function call.
+ */
+ @Deprecated
+ public SimpleRandomSample(String samplingProbability)
+ {
+ double p = Double.parseDouble(samplingProbability);
+ verifySamplingProbability(p);
+ }
+
+ @Override
+ public String getInitial()
+ {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed()
+ {
+ return Intermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal()
+ {
+ return Final.class.getName();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ return new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+ input),
+ inputFieldSchema.schema,
+ DataType.BAG));
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static public class Initial extends EvalFunc<Tuple>
+ {
+ // Should avoid creating many random number generator instances.
+ private static RandomDataImpl _RNG = new RandomDataImpl();
+
+ synchronized private static double nextDouble()
+ {
+ return _RNG.nextUniform(0.0d, 1.0d);
+ }
+
+ public Initial()
+ {
+ // empty
+ }
+
+ @Deprecated
+ public Initial(String samplingProbability)
+ {
+ _p = Double.parseDouble(samplingProbability);
+ }
+
+ private boolean _first = true;
+ private double _p = -1.0d; // the sampling probability
+ private long _n1 = 0L; // the input lower bound of the size of the population
+ private long _localCount = 0L; // number of items processed by this instance
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ int numArgs = input.size();
+
+ // The first if clause is for backward compatibility, which should be removed
+ // after we remove specifying sampling probability in the constructor.
+ if(numArgs == 1)
+ {
+ if(_p < 0.0d)
+ {
+ throw new IllegalArgumentException("Sampling probability is not given.");
+ }
+ }
+ else if (numArgs < 2 || numArgs > 3)
+ {
+ throw new IllegalArgumentException("The input tuple should have either two or three fields: "
+ + "a bag of items, the sampling probability, "
+ + "and optionally a good lower bound of the size of the population or the exact number.");
+ }
+
+ DataBag items = (DataBag) input.get(0);
+ long numItems = items.size();
+ _localCount += numItems;
+
+ // This is also for backward compatibility. Should change to
+ // double p = ((Number) input.get(1)).doubleValue();
+ // after we remove specifying sampling probability in the constructor.
+ double p = numArgs == 1 ? _p : ((Number) input.get(1)).doubleValue();
+ if (_first)
+ {
+ _p = p;
+ verifySamplingProbability(p);
+ }
+ else
+ {
+ if (p != _p)
+ {
+ throw new IllegalArgumentException("The sampling probability must be a scalar, but found two different values: "
+ + _p + " and " + p + ".");
+ }
+ }
+
+ long n1 = 0L;
+ if (numArgs > 2)
+ {
+ n1 = ((Number) input.get(2)).longValue();
+
+ if (_first)
+ {
+ _n1 = n1;
+ }
+ else
+ {
+ if (n1 != _n1)
+ {
+ throw new IllegalArgumentException("The lower bound of the population size must be a scalar, but found two different values: "
+ + _n1 + " and " + n1 + ".");
+ }
+ }
+ }
+
+ _first = false;
+
+ // Use the local count if the input lower bound is smaller.
+ n1 = Math.max(n1, _localCount);
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag waiting = _BAG_FACTORY.newDefaultBag();
+
+ if (n1 > 0L)
+ {
+ double q1 = getQ1(n1, p);
+ double q2 = getQ2(n1, p);
+
+ for (Tuple t : items)
+ {
+ double x = nextDouble();
+ if (x < q1)
+ {
+ selected.add(t);
+ }
+ else if (x < q2)
+ {
+ waiting.add(new ScoredTuple(x, t).getIntermediateTuple(_TUPLE_FACTORY));
+ }
+ }
+ }
+
+ /*
+ * The output tuple contains the following fields: sampling probability (double),
+ * number of processed items in this tuple (long), a good lower bound of the size of
+ * the population or the exact number (long), a bag of selected items (bag), and a
+ * bag of waitlisted items with scores (bag).
+ */
+ Tuple output = _TUPLE_FACTORY.newTuple();
+
+ output.append(p);
+ output.append(numItems);
+ output.append(n1);
+ output.append(selected);
+ output.append(waiting);
+
+ return output;
+ }
+ }
+
+ public static class Intermediate extends EvalFunc<Tuple>
+ {
+ public Intermediate()
+ {
+ // empty
+ }
+
+ @Deprecated
+ public Intermediate(String samplingProbability)
+ {
+ // empty
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ DataBag bag = (DataBag) input.get(0);
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag aggWaiting = _BAG_FACTORY.newDefaultBag();
+
+ boolean first = true;
+ double p = 0.0d;
+ long numItems = 0L; // number of items processed, including rejected
+ long n1 = 0L;
+
+ for (Tuple tuple : bag)
+ {
+ if (first)
+ {
+ p = (Double) tuple.get(0);
+ first = false;
+ }
+
+ numItems += (Long) tuple.get(1);
+ n1 = Math.max((Long) tuple.get(2), numItems);
+
+ selected.addAll((DataBag) tuple.get(3));
+ aggWaiting.addAll((DataBag) tuple.get(4));
+ }
+
+ DataBag waiting = _BAG_FACTORY.newDefaultBag();
+
+ if (n1 > 0L)
+ {
+ double q1 = getQ1(n1, p);
+ double q2 = getQ2(n1, p);
+
+ for (Tuple t : aggWaiting)
+ {
+ ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
+
+ if (scored.getScore() < q1)
+ {
+ selected.add(scored.getTuple());
+ }
+ else if (scored.getScore() < q2)
+ {
+ waiting.add(t);
+ }
+ }
+ }
+
+ Tuple output = _TUPLE_FACTORY.newTuple();
+
+ output.append(p);
+ output.append(numItems);
+ output.append(n1);
+ output.append(selected);
+ output.append(waiting);
+
+ return output;
+ }
+ }
+
+ static public class Final extends EvalFunc<DataBag>
+ {
+ public Final()
+ {
+ // empty
+ }
+
+ @Deprecated
+ public Final(String samplingProbability)
+ {
+ // empty
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ DataBag bag = (DataBag) input.get(0);
+
+ boolean first = true;
+ double p = 0.0d; // the sampling probability
+ long n = 0L; // the size of the population (total number of items)
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag waiting = _BAG_FACTORY.newSortedBag(ScoredTupleComparator.getInstance());
+
+ for (Tuple tuple : bag)
+ {
+ if (first)
+ {
+ p = (Double) tuple.get(0);
+ first = false;
+ }
+
+ n += (Long) tuple.get(1);
+ selected.addAll((DataBag) tuple.get(3));
+ waiting.addAll((DataBag) tuple.get(4));
+ }
+
+ long numSelected = selected.size();
+ long numWaiting = waiting.size();
+
+ long s = (long) Math.ceil(p * n); // sample size
+
+ System.out.println("To sample " + s + " items from " + n + ", we pre-selected "
+ + numSelected + ", and waitlisted " + waiting.size() + ".");
+
+ long numNeeded = s - selected.size();
+
+ if (numNeeded < 0)
+ {
+ System.err.println("Pre-selected " + numSelected + " items, but only needed " + s
+ + ".");
+ }
+
+ for (Tuple scored : waiting)
+ {
+ if (numNeeded <= 0)
+ {
+ break;
+ }
+ selected.add(ScoredTuple.fromIntermediateTuple(scored).getTuple());
+ numNeeded--;
+ }
+
+ if (numNeeded > 0)
+ {
+ System.err.println("The waiting list only has " + numWaiting
+ + " items, but needed " + numNeeded + " more.");
+ }
+
+ return selected;
+ }
+ }
+
+ // computes a threshold to select items
+ private static double getQ1(long n, double p)
+ {
+ double t1 = 20.0d / (3.0d * n);
+ double q1 = p + t1 - Math.sqrt(t1 * t1 + 3.0d * t1 * p);
+ return q1;
+ }
+
+ // computes a threshold to reject items
+ private static double getQ2(long n, double p)
+ {
+ double t2 = 10.0d / n;
+ double q2 = p + t2 + Math.sqrt(t2 * t2 + 2.0d * t2 * p);
+ return q2;
+ }
+
+ private static void verifySamplingProbability(double p)
+ {
+ if(p < 0.0 || p > 1.0)
+ {
+ throw new IllegalArgumentException("Sampling probabiilty must be inside [0, 1].");
+ }
+ }
+
+ static class ScoredTupleComparator implements Comparator<Tuple>
+ {
+ public static final ScoredTupleComparator getInstance()
+ {
+ return _instance;
+ }
+
+ private static final ScoredTupleComparator _instance = new ScoredTupleComparator();
+
+ @Override
+ public int compare(Tuple o1, Tuple o2)
+ {
+ try
+ {
+ ScoredTuple t1 = ScoredTuple.fromIntermediateTuple(o1);
+ ScoredTuple t2 = ScoredTuple.fromIntermediateTuple(o2);
+ return t1.getScore().compareTo(t2.getScore());
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("Cannot compare " + o1 + " and " + o2 + ".", e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java
new file mode 100644
index 0000000..a59816a
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Select the candidate with the smallest score for each position from the candidates
+ * proposed by {@link SimpleRandomSampleWithReplacementVote}.
+ *
+ * @see SimpleRandomSampleWithReplacementVote
+ *
+ * @author ximeng
+ *
+ */
+public class SimpleRandomSampleWithReplacementElect extends AlgebraicEvalFunc<DataBag>
+{
+ /**
+ * Prefix for the output bag name.
+ */
+ public static final String OUTPUT_BAG_NAME_PREFIX = "SRSWR_ELECT";
+
+ public static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ public static final BagFactory bagFactory = BagFactory.getInstance();
+
+ static class CandidateComparator implements Comparator<Tuple>
+ {
+ private static CandidateComparator _instance = new CandidateComparator();
+
+ public static CandidateComparator get()
+ {
+ return _instance;
+ }
+
+ private CandidateComparator()
+ {
+ // singleton
+ }
+
+ @Override
+ public int compare(Tuple o1, Tuple o2)
+ {
+ try
+ {
+ // first by position
+ int c1 = ((Integer) o1.get(0)).compareTo((Integer) o2.get(0));
+ if (c1 != 0)
+ {
+ return c1;
+ }
+ else
+ {
+ // then by score
+ return ((Double) o1.get(1)).compareTo((Double) o2.get(1));
+ }
+ }
+ catch (ExecException e)
+ {
+ throw new RuntimeException("Error comparing tuples " + o1 + " and " + o2, e);
+ }
+ }
+ }
+
+ @Override
+ public String getInitial()
+ {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed()
+ {
+ return Intermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal()
+ {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple>
+ {
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ // output each input candidate
+ return input;
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple>
+ {
+ @Override
+ public Tuple exec(Tuple tuple) throws IOException
+ {
+ // sort candidates first by index, then by key
+ DataBag candidates = bagFactory.newSortedBag(CandidateComparator.get());
+ for (Tuple intermediateOutputTuple : (DataBag) tuple.get(0))
+ {
+ candidates.addAll((DataBag) intermediateOutputTuple.get(0));
+ }
+
+ DataBag outputBag = bagFactory.newDefaultBag();
+ int i = -1;
+ for (Tuple candidate : candidates)
+ {
+ int pos = (Integer) candidate.get(0);
+ if (pos > i)
+ {
+ outputBag.add(candidate);
+ i = pos;
+ }
+ }
+
+ return tupleFactory.newTuple(outputBag);
+ }
+
+ }
+
+ static public class Final extends EvalFunc<DataBag>
+ {
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException
+ {
+ DataBag candidates = bagFactory.newSortedBag(CandidateComparator.get());
+ for (Tuple intermediateOutputTuple : (DataBag) tuple.get(0))
+ {
+ candidates.addAll((DataBag) intermediateOutputTuple.get(0));
+ }
+
+ DataBag outputBag = bagFactory.newDefaultBag();
+ int i = -1;
+ for (Tuple candidate : candidates)
+ {
+ int pos = (Integer) candidate.get(0);
+ if (pos > i)
+ {
+ outputBag.add((Tuple) candidate.get(2));
+ i = pos;
+ }
+ }
+ return outputBag;
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ // the output is a bag of selected items
+ Schema outputSchema =
+ new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+ input),
+ inputFieldSchema.schema.getField(0).schema.getField(2).schema,
+ DataType.BAG));
+
+ return outputSchema;
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException("Error deriving output schema.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java
new file mode 100644
index 0000000..598e58c
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.math.MathException;
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+
+/**
+ * Scalable simple random sampling with replacement (ScaSRSWR).
+ * <p/>
+ * This UDF together with {@link SimpleRandomSampleWithReplacementElect} implement a
+ * scalable algorithm for simple random sampling with replacement (SRSWR), which is a
+ * randomized algorithm with a failure rate less than {@value #FAILURE_RATE}.
+ * <p/>
+ * Let s be the desired sample size. To compute an SRSWR sample of size s, for each output
+ * position in {0, 1, ..., s-1}, we want to select an item from the population uniformly
+ * at random. This algorithm consists of two stages: vote and election. In the vote stage,
+ * this UDF {@link SimpleRandomSampleWithReplacementVote} votes items, called candidates,
+ * for each position. In the election stage, the paired UDF
+ * {@link SimpleRandomSampleWithReplacementElect} elects one candidate for each position.
+ * The algorithm succeeds if we have at least one candidate for each position.
+ * <p/>
+ * To use this UDF pair, user needs to provide: 1) the desired sample size, 2) a good
+ * lower bound of the population size or the exact size. The input to the vote UDF
+ * {@link SimpleRandomSampleWithReplacementVote} is a tuple that consists of a bag of
+ * items, the desired sample size (int), and the population size (long) or a good lower
+ * bound of it, where the latter two must be scalars. The output from the vote UDF is a
+ * tuple that consists of position:int, score:double, and candidate. The input to the
+ * elect UDF {@link SimpleRandomSampleWithReplacementElect} is a tuple that contains all
+ * candidates voted by the vote UDF for some positions. The output from the elect UDF is a
+ * bag of sampled items.
+ * <p/>
+ * For example, the following script generates a sample of size 100000 with replacement:
+ *
+ * <pre>
+ * DEFINE SRSWR_VOTE datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
+ * DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
+ *
+ * item = LOAD 'input' AS (x:double);
+ * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+ * candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count));
+ * sampled = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE FLATTEN(SRSWR_ELECT(candidates));
+ * </pre>
+ *
+ * Because for election we only need to group candidates voted for the same position, this
+ * algorithm can use many reducers to consume the candidates. See the "PARALLEL 10"
+ * statement above. If the item to sample is the entire row, use TOBAG(TOTUPLE(*)).
+ * <p/>
+ * SRSWR is heavily used in bootstrapping. Bootstrapping can be done easily with this UDF
+ * pair. For example, the following script generates 100 bootstrap samples, computes the
+ * mean value for each sample, and then outputs the bootstrap estimates.
+ *
+ * <pre>
+ * summary = FOREACH (GROUP item ALL) GENERATE AVG(item.x) AS mean, COUNT(item) AS count;
+ * candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count));
+ * sampled = FOREACH (GROUP candidates BY (position % 100) PARALLEL 10) GENERATE AVG(SRSWR_ELECT(candidates)) AS mean;
+ * bootstrap = FOREACH (GROUP sampled ALL) GENERATE summary.mean AS mean, sampled.mean AS bootstrapMeans;
+ * </pre>
+ *
+ * Another usage of this UDF pair is to generate random pairs or tuples without computing
+ * the cross product, where each pair or tuple consist of items from different input
+ * sources. Let s be the number of random tuples we want to generate. For each input
+ * source, simply use the vote UDF to propose candidates, then join the candidates from
+ * different sources by their positions and for each position use the elect UDF to select
+ * one candidate from each source to form the pair or tuple for that position.
+ * <p/>
+ * The algorithm is a simple extension to the work
+ *
+ * <pre>
+ * X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
+ * </pre>
+ *
+ * Basically, for each output position, it performs a random sort on the population
+ * (associates each item with a random score independently drawn from the uniform
+ * distribution and then sorts items based on the scores), and picks the one that has the
+ * smallest score. However, a probabilistic threshold is used to avoid sorting the entire
+ * population. For example, if the population size is one billion and the random score
+ * generated for an item is 0.9, very likely it won't become the smallest and hence we do
+ * not need to propose it as a candidate.
+ * <p/>
+ * More precisely, let n be the population size, n1 be a good lower bound of n, s be the
+ * sample size, delta be the failure rate, and q be the threshold. For each output
+ * position the probability of all random scores being greater than q is (1-q)^n. Thus, if
+ * we throw away items with associated scores greater than q, with probability at least 1
+ * - s*(1-q)^n, we can still capture the item with the smallest score for each position.
+ * Fix delta = s*(1-q)^n and solve for q, we get q = 1-exp(log(delta/s)/n), Note that
+ * replacing n by n1 < n can only decrease the failure rate, though at the cost of
+ * increased number of candidates. The expected number of candidates is (1 -
+ * exp(log(delta/s)/n1)*s*n. When n1 equals n, this number is approximately
+ * s*log(s/delta).
+ * <p/>
+ * Generating a random score for each (item, position) pair is very expensive and
+ * unnecessary. For each item, the number of positions for which it gets voted follows a
+ * binomial distribution B(s,q). We can simply draw a number from this distribution,
+ * determine the positions by sampling without replacement, and then generate random
+ * scores for those positions. This reduces the running time significantly.
+ * <p/>
+ * Since for each position we only need the candidate with the smallest score, we
+ * implement a combiner to reduce the size of intermediate data in the elect UDF
+ * {@link SimpleRandomSampleWithReplacementElect}.
+ *
+ * @see SimpleRandomSampleWithReplacementElect
+ * @see <a href="http://en.wikipedia.org/wiki/Bootstrapping_(statistics) target="_blank
+ * ">Boostrapping (Wikipedia)</a>
+ *
+ * @author ximeng
+ *
+ */
+public class SimpleRandomSampleWithReplacementVote extends EvalFunc<DataBag>
+{
+ public static final String OUTPUT_BAG_NAME_PREFIX = "SRSWR_VOTE";
+ public static final String CANDIDATE_FIELD_NAME = "candidate";
+ public static final String POSITION_FIELD_NAME = "position";
+ public static final String SCORE_FIELD_NAME = "score";
+ public static final double FAILURE_RATE = 1e-4;
+
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ private RandomDataImpl _rdg = new RandomDataImpl();
+
+ /**
+ * Samples k integers from [0, n) without replacement efficiently.
+ *
+ * If k is small, we can repeatedly draw integers from [0, n) until there are k distinct
+ * values. For each trial, with probability at least (n-k)/n, we can draw a new value.
+ * So the expected number of trials is smaller than (k*n)/(n-k), which is a very rough
+ * bound. If k is large, we use the selection-rejection sampling algorithm. Basically,
+ * we want the running time to be O(k).
+ *
+ */
+ private int[] sampleWithoutReplacement(int n, int k)
+ {
+ if (k == 0)
+ {
+ return new int[] {};
+ }
+
+ if (k < n / 3L)
+ {
+ Set<Integer> sample = Sets.newHashSetWithExpectedSize(k);
+
+ // The expected number of iterations is less than 1.5*k
+ while (sample.size() < k)
+ {
+ sample.add(_rdg.nextInt(0, n - 1));
+ }
+
+ return Ints.toArray(sample);
+ }
+ else
+ {
+ int[] sample = new int[k];
+
+ int i = 0;
+ for (int j = 0; j < n && i < k; ++j)
+ {
+ if (_rdg.nextUniform(0.0d, 1.0d) < 1.0d * (k - i) / (n - j))
+ {
+ sample[i] = j;
+ i++;
+ }
+ }
+
+ return sample;
+ }
+ }
+
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException
+ {
+ if (tuple.size() != 3)
+ {
+ throw new IllegalArgumentException("The input arguments are: "
+ + "a bag of items, the desired sample size (int), and the population size (long) or a good lower bound of it");
+ }
+
+ DataBag items = (DataBag) tuple.get(0);
+ int sampleSize = ((Number) tuple.get(1)).intValue();
+ long count = ((Number) tuple.get(2)).longValue();
+
+ /*
+ * The following threshold is to guarantee that each output position contains at least
+ * one candidate with high probability.
+ */
+ double threshold = 1.0d - Math.exp(Math.log(FAILURE_RATE / sampleSize) / count);
+
+ DataBag candidates = bagFactory.newDefaultBag();
+
+ for (Tuple item : items)
+ {
+ // Should be able to support long sample size if nextBinomial supports long.
+ int numOutputPositions;
+ try
+ {
+ numOutputPositions = _rdg.nextBinomial(sampleSize, threshold);
+ }
+ catch (MathException e)
+ {
+ throw new RuntimeException("Failed to generate a binomial value with n = "
+ + sampleSize + " and p = " + threshold, e);
+ }
+ for (int outputPosition : sampleWithoutReplacement(sampleSize, numOutputPositions))
+ {
+ Tuple candidate = tupleFactory.newTuple();
+ candidate.append(outputPosition);
+ candidate.append(_rdg.nextUniform(0.0d, 1.0d)); // generate a random score
+ candidate.append(item);
+ candidates.add(candidate);
+ }
+ }
+
+ return candidates;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ List<Schema.FieldSchema> fieldSchemas = Lists.newArrayList();
+
+ fieldSchemas.add(new Schema.FieldSchema(POSITION_FIELD_NAME, DataType.INTEGER));
+ fieldSchemas.add(new Schema.FieldSchema(SCORE_FIELD_NAME, DataType.DOUBLE));
+ fieldSchemas.add(new Schema.FieldSchema(CANDIDATE_FIELD_NAME,
+ inputFieldSchema.schema.getField(0).schema));
+
+ Schema outputSchema =
+ new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+ input),
+ new Schema(fieldSchemas),
+ DataType.BAG));
+
+ return outputSchema;
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException("Error deriving output schema.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java
new file mode 100644
index 0000000..92af6a3
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * <p>
+ * Performs a weighted random sample using an in-memory reservoir to produce
+ * a weighted random sample of a given size based on the A-Res algorithm described in
+ * {@link <a href="http://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf" target="_blank">paper</a>}.
+ * </p>
+ * <p>
+ * Species with larger weight have higher probability to be selected in the final sample set.
+ * </p>
+ * <p>
+ * This UDF inherits from {@link ReservoirSample} and it is guaranteed to produce
+ * a sample of the given size. Similarly it comes at the cost of scalability.
+ * since it uses internal storage with size equaling the desired sample to guarantee the exact sample size.
+ * </p>
+ * <p>
+ * Its constructor takes 2 arguments.
+ * <ul>
+ * <li>The 1st argument specifies the sample size which should be a string of positive integer.
+ * <li>The 2nd argument specifies the index of the weight field in the input tuple,
+ * which should be a string of non-negative integer that is no greater than the input tuple size.
+ * </ul>
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+ * input = LOAD 'input' AS (v1:chararray, v2:INT);
+ * input_g = GROUP input ALL;
+ * sampled = FOREACH input_g GENERATE WeightedSample(input);
+ * }
+ * </pre>
+ * </p>
+ * @author wjian
+ */
+
+@Nondeterministic
+public class WeightedReservoirSample extends ReservoirSample {
+
+ private Integer weightIdx;
+
+ public WeightedReservoirSample(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ this.weightIdx = Integer.parseInt(strWeightIdx);
+ if(this.weightIdx < 0) {
+ throw new IllegalArgumentException("Invalid negative index of weight field argument for WeightedReserviorSample constructor: "
+ + strWeightIdx);
+ }
+ }
+
+ @Override
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(super.scoreGen == null)
+ {
+ super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+ }
+ return this.scoreGen;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG) {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema tupleSchema = inputBagSchema.getField(0).schema;
+
+ if(tupleSchema == null) {
+ throw new RuntimeException("The tuple of input bag has no schema");
+ }
+
+ List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+
+ if(fieldSchemaList == null || fieldSchemaList.size() <= Math.max(0, this.weightIdx)) {
+ throw new RuntimeException("The field schema of the input tuple is null " +
+ "or the tuple size is no more than the weight field index: "
+ + this.weightIdx);
+ }
+
+ if(fieldSchemaList.get(this.weightIdx).type != DataType.INTEGER &&
+ fieldSchemaList.get(this.weightIdx).type != DataType.LONG &&
+ fieldSchemaList.get(this.weightIdx).type != DataType.FLOAT &&
+ fieldSchemaList.get(this.weightIdx).type != DataType.DOUBLE)
+ {
+ String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
+ DataType.findTypeName(DataType.LONG),
+ DataType.findTypeName(DataType.FLOAT),
+ DataType.findTypeName(DataType.DOUBLE)};
+ throw new RuntimeException("Expect the type of the weight field of the input tuple to be of (" +
+ java.util.Arrays.toString(expectedTypes) + "), but instead found (" +
+ DataType.findTypeName(fieldSchemaList.get(this.weightIdx).type) + "), weight field: " +
+ this.weightIdx);
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+ inputFieldSchema.schema, DataType.BAG));
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ String param = null;
+
+ private String getParam()
+ {
+ if (this.param == null) {
+ if(super.numSamples != null && this.weightIdx != null) {
+ this.param = String.format("('%d','%d')",
+ super.numSamples,
+ this.weightIdx);
+ } else {
+ this.param = "";
+ }
+ }
+
+ return this.param;
+ }
+
+
+ @Override
+ public String getInitial()
+ {
+ return Initial.class.getName() + getParam();
+ }
+
+ @Override
+ public String getIntermed()
+ {
+ return Intermediate.class.getName() + getParam();
+ }
+
+ @Override
+ public String getFinal()
+ {
+ return Final.class.getName() + getParam();
+ }
+
+ static public class Initial extends ReservoirSample.Initial
+ {
+ private Integer weightIdx;
+
+ public Initial()
+ {
+ super();
+ this.weightIdx = null;
+ }
+
+ public Initial(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ this.weightIdx = Integer.parseInt(strWeightIdx);
+ if(this.weightIdx < 0) {
+ throw new IllegalArgumentException("Invalid negative index of weight field for WeightedReserviorSample.Initial constructor: "
+ + strWeightIdx);
+ }
+ }
+
+ @Override
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(super.scoreGen == null)
+ {
+ super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+ }
+ return super.scoreGen;
+ }
+ }
+
+ static public class Intermediate extends ReservoirSample.Intermediate
+ {
+ public Intermediate()
+ {
+ super();
+ }
+
+ public Intermediate(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ }
+ }
+
+ static public class Final extends ReservoirSample.Final
+ {
+ public Final()
+ {
+ super();
+ }
+
+ public Final(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ }
+ }
+
+ static class InverseWeightScoreGenerator implements ScoredTuple.ScoreGenerator
+ {
+ //index of the weight field of the input tuple
+ private int weightIdx;
+
+ InverseWeightScoreGenerator(Integer weightIdx)
+ {
+ if(weightIdx == null || weightIdx < 0) {
+ throw new IllegalArgumentException("Invalid null or negative weight index input: " + weightIdx);
+ }
+ this.weightIdx = weightIdx;
+ }
+
+ @Override
+ public double generateScore(Tuple sample) throws ExecException
+ {
+ if(this.weightIdx >= sample.size())
+ {
+ throw new ExecException(String.format("Weight index %d is outside tuple bounds", this.weightIdx));
+ }
+ if (sample.get(this.weightIdx) == null)
+ {
+ throw new ExecException(String.format("null value for weight at index %d",this.weightIdx));
+ }
+ double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
+ if(Double.compare(weight, 0.0) <= 0)
+ {
+ //non-positive weight should be avoided
+ throw new ExecException(String.format("Invalid sample weight [%f]. It should be a positive real number", weight));
+ }
+ //a differnt approach to try: u^(1/w) could be exp(log(u)/w) ?
+ return Math.pow(Math.random(), 1/weight);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java
new file mode 100644
index 0000000..a77164d
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Performs weighted bernoulli sampling on a bag.
+ *
+ * <p>
+ * Create a new bag by performing a weighted sampling without replacement
+ * from the input bag. Sampling is biased according to a weight that
+ * is part of the inner tuples in the bag. That is, tuples with relatively
+ * high weights are more likely to be chosen over tuples with low weights.
+ * Optionally, a limit on the number of items to return may be specified.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define WeightedSample datafu.pig.sampling.WeightedSample()
+ *
+ * -- input:
+ * -- ({(a, 100),(b, 1),(c, 5),(d, 2)})
+ * input = LOAD 'input' AS (A: bag{T: tuple(name:chararray,score:int)});
+ *
+ * output1 = FOREACH input GENERATE WeightedSample(A,1);
+ * -- output1:
+ * -- uses the field indexed by 1 as a score
+ * -- ({(a,100),(c,5),(b,1),(d,2)}) -- example of random
+ *
+ * -- sample using the second column (index 1) and keep only the top 3
+ * output2 = FOREACH input GENERATE WeightedSample(A,1,3);
+ * -- output2:
+ * -- ({(a,100),(c,5),(b,1)})
+ * }
+ * </pre>
+ */
+@Nondeterministic
+public class WeightedSample extends EvalFunc<DataBag>
+{
+ BagFactory bagFactory = BagFactory.getInstance();
+ Long seed = null;
+
+ public WeightedSample() {
+ }
+
+ public WeightedSample(String seed) {
+ this.seed = Long.parseLong(seed);
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ DataBag output = bagFactory.newDefaultBag();
+
+ DataBag samples = (DataBag) input.get(0);
+ if (samples == null || samples.size() == 0) {
+ return output; // if we are given null we will return an empty bag
+ }
+ int numSamples = (int) samples.size();
+ if (numSamples == 1) return samples;
+
+ Tuple[] tuples = new Tuple[numSamples];
+ int tupleIndex = 0;
+ for (Tuple tuple : samples) {
+ tuples[tupleIndex] = tuple;
+ tupleIndex++;
+ }
+
+ double[] scores = new double[numSamples];
+ int scoreIndex = ((Number)input.get(1)).intValue();
+ tupleIndex = 0;
+ for (Tuple tuple : samples) {
+ double score = ((Number)tuple.get(scoreIndex)).doubleValue();
+ score = Math.max(score, Double.MIN_NORMAL); // negative scores cause problems
+ scores[tupleIndex] = score;
+ tupleIndex++;
+ }
+
+ // accept any type of number for sample size, but convert to int
+ int limitSamples = numSamples;
+ if (input.size() == 3) {
+ // sample limit included
+ limitSamples = Math.min(((Number)input.get(2)).intValue(), numSamples);
+ }
+
+ /*
+ * Here's how the algorithm works:
+ *
+ * 1. Create a cumulative distribution of the scores 2. Draw a random number 3. Find
+ * the interval in which the drawn number falls into 4. Select the element
+ * encompassing that interval 5. Remove the selected element from consideration 6.
+ * Repeat 1-5 k times
+ *
+ * However, rather than removing the element (#5), which is expensive for an array,
+ * this function performs some extra bookkeeping by replacing the selected element
+ * with an element from the front of the array and truncating the front. This
+ * complicates matters as the element positions have changed, so another mapping for
+ * positions is needed.
+ *
+ * This is an O(k*n) algorithm, where k is the number of elements to sample and n is
+ * the number of scores.
+ */
+ Random rng = null;
+ if (seed == null) {
+ rng = new Random();
+ } else {
+ rng = new Random(seed);
+ }
+
+ for (int k = 0; k < limitSamples; k++) {
+ double val = rng.nextDouble();
+ int idx = find_cumsum_interval(scores, val, k, numSamples);
+ if (idx == numSamples)
+ idx = rng.nextInt(numSamples - k) + k;
+
+ output.add(tuples[idx]);
+
+ scores[idx] = scores[k];
+ tuples[idx] = tuples[k];
+ }
+
+ return output;
+ }
+
+ public int find_cumsum_interval(double[] scores, double val, int begin, int end) {
+ double sum = 0.0;
+ double cumsum = 0.0;
+ for (int i = begin; i < end; i++) {
+ sum += scores[i];
+ }
+
+ for (int i = begin; i < end; i++) {
+ cumsum += scores[i];
+ if ((cumsum / sum) > val)
+ return i;
+ }
+ return end;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ if (!(input.size() == 2 || input.size() == 3))
+ {
+ throw new RuntimeException("Expected input to have two or three fields");
+ }
+
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG) {
+ throw new RuntimeException("Expected a BAG as first input, got: "+inputFieldSchema.type);
+ }
+
+ if (input.getField(1).type != DataType.INTEGER) {
+ throw new RuntimeException("Expected an INT as second input, got: "+input.getField(1).type);
+ }
+
+ if (input.size() == 3 && !(input.getField(2).type == DataType.INTEGER || input.getField(2).type == DataType.LONG)) {
+ throw new RuntimeException("Expected an INT or LONG as second input, got: "+input.getField(2).type);
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+ inputFieldSchema.schema, DataType.BAG));
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java b/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java
new file mode 100644
index 0000000..0c7d8bd
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Sampling UDFs, including weighted sample, reservoir sampling, sampling by key, etc.
+ */
+package datafu.pig.sampling;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
new file mode 100644
index 0000000..fe68888
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sessions;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Performs a count of events, ignoring events which occur within the
+ * same time window.
+ * <p>
+ * This is useful for tasks such as counting the number of page views per user since it:
+ * a) prevent reloads and go-backs from overcounting actual views
+ * b) captures the notion that views across multiple sessions are more meaningful
+ * <p>
+ * Input <b>must</b> be sorted ascendingly by time for this UDF to work.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * %declare TIME_WINDOW 10m
+ *
+ * define SessionCount datafu.pig.sessions.SessionCount('$TIME_WINDOW');
+ *
+ * views = LOAD 'views' as (user_id:int, page_id:int, time:chararray);
+ * views_grouped = GROUP views by (user_id, page_id);
+ * view_counts = FOREACH views_grouped {
+ * views = order views by time;
+ * generate group.user_id as user_id,
+ * group.page_id as page_id,
+ * SessionCount(views.(time)) as count; }
+ * }
+ * </pre>
+ *
+ */
+public class SessionCount extends AccumulatorEvalFunc<Long>
+{
+ private final long millis;
+ private DateTime last_date;
+ private long sum;
+
+ public SessionCount(String timeSpec)
+ {
+ Period p = new Period("PT" + timeSpec.toUpperCase());
+ this.millis = p.toStandardSeconds().getSeconds() * 1000;
+ cleanup();
+ }
+
+ @Override
+ public void accumulate(Tuple input) throws IOException
+ {
+ for (Tuple t : (DataBag) input.get(0)) {
+ DateTime date = new DateTime(t.get(0));
+
+ if (last_date == null) {
+ last_date = date;
+ sum = 1;
+ } else if (date.isAfter(last_date.plus(this.millis)))
+ sum += 1;
+ else if (date.isBefore(last_date))
+ throw new IOException("input time series is not sorted");
+
+ last_date = date;
+ }
+ }
+
+ @Override
+ public Long getValue()
+ {
+ return sum;
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.last_date = null;
+ this.sum = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
new file mode 100644
index 0000000..52d159b
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sessions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+/**
+ * Sessionizes an input stream, appending a session ID to each tuple.
+ *
+ * <p>
+ * This UDF takes a constructor argument which is the session timeout (an idle
+ * period of this amount indicates that a new session has started) and assumes
+ * the first element of the input bag is an ISO8601 timestamp. The input bag
+ * must be sorted by this timestamp. It returns the input bag with a new field,
+ * session_id, that is a GUID indicating the session of the request.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * %declare TIME_WINDOW 30m
+ *
+ * define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
+ *
+ * views = LOAD 'views.tsv' AS (visit_date:chararray, member_id:int, url:chararray);
+ *
+ * -- sessionize the visit stream
+ * views = GROUP views BY member_id;
+ * sessions = FOREACH views {
+ * visits = ORDER views BY visit_date;
+ * GENERATE FLATTEN(Sessionize(VISITS)) AS (visit_date,member_id,url,session_id);
+ * }
+ *
+ * -- count the number of sessions hitting the url
+ * rollup = GROUP sessions BY url;
+ * result = FOREACH rollup GENERATE group AS url, COUNT(SESSIONS) AS session_cnt;
+ * }
+ * </pre>
+ * </p>
+ */
+@Nondeterministic
+public class Sessionize extends AccumulatorEvalFunc<DataBag>
+{
+ private final long millis;
+
+ private DataBag outputBag;
+ private DateTime last_date;
+ private String id;
+
+ public Sessionize(String timeSpec)
+ {
+ Period p = new Period("PT" + timeSpec.toUpperCase());
+ this.millis = p.toStandardSeconds().getSeconds() * 1000;
+
+ cleanup();
+ }
+
+ @Override
+ public void accumulate(Tuple input) throws IOException
+ {
+ for (Tuple t : (DataBag) input.get(0)) {
+ Object timeObj = t.get(0);
+
+ DateTime date;
+ if (timeObj instanceof String)
+ {
+ date = new DateTime((String)timeObj);
+ }
+ else if (timeObj instanceof Long)
+ {
+ date = new DateTime((Long)timeObj);
+ }
+ else
+ {
+ throw new RuntimeException("Time must either be a String or Long");
+ }
+
+ if (this.last_date == null)
+ this.last_date = date;
+ else if (date.isAfter(this.last_date.plus(this.millis)))
+ this.id = UUID.randomUUID().toString();
+ else if (date.isBefore(last_date))
+ throw new IOException(String.format("input time series is not sorted (%s < %s)", date, last_date));
+
+ Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());
+ t_new.append(this.id);
+ outputBag.add(t_new);
+
+ this.last_date = date;
+ }
+ }
+
+ @Override
+ public DataBag getValue()
+ {
+ return outputBag;
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.last_date = null;
+ this.outputBag = BagFactory.getInstance().newDefaultBag();
+ this.id = UUID.randomUUID().toString();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+
+ if (inputTupleSchema.getField(0).type != DataType.CHARARRAY
+ && inputTupleSchema.getField(0).type != DataType.LONG)
+ {
+ throw new RuntimeException(String.format("Expected first element of tuple to be a CHARARRAY or LONG, but instead found %s",
+ DataType.findTypeName(inputTupleSchema.getField(0).type)));
+ }
+
+ Schema outputTupleSchema = inputTupleSchema.clone();
+ outputTupleSchema.add(new Schema.FieldSchema("session_id", DataType.CHARARRAY));
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ outputTupleSchema,
+ DataType.BAG));
+ }
+ catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java b/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java
new file mode 100644
index 0000000..686fbf0
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for sessionizing data.
+ */
+package datafu.pig.sessions;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java b/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java
new file mode 100644
index 0000000..b6469e9
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sets;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Computes the set difference of two or more bags. Duplicates are eliminated. <b>The input bags must be sorted.</b>
+ *
+ * <p>
+ * If bags A and B are provided, then this computes A-B, i.e. all elements in A that are not in B.
+ * If bags A, B and C are provided, then this computes A-B-C, i.e. all elements in A that are not in B or C.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define SetDifference datafu.pig.sets.SetDifference();
+ *
+ * -- input:
+ * -- ({(1),(2),(3),(4),(5),(6)},{(3),(4)})
+ * input = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
+ *
+ * input = FOREACH input {
+ * B1 = ORDER B1 BY val ASC;
+ * B2 = ORDER B2 BY val ASC;
+ *
+ * -- output:
+ * -- ({(1),(2),(5),(6)})
+ * GENERATE SetDifference(B1,B2);
+ * }
+ * }</pre>
+ */
+public class SetDifference extends SetOperationsBase
+{
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+
+ /**
+ * Loads the data bags from the input tuple and puts them in a priority queue,
+ * where ordering is determined by the data from the iterator for each bag.
+ *
+ * <p>
+ * The bags are wrapped in a {@link Pair} object that is comparable on the data
+ * currently available from the iterator.
+ * These objects are ordered first by the data, then by the index within the tuple
+ * the bag came from.
+ * </p>
+ *
+ * @param input
+ * @return
+ * @throws IOException
+ */
+ private PriorityQueue<Pair> loadBags(Tuple input) throws IOException
+ {
+ PriorityQueue<Pair> pq = new PriorityQueue<Pair>(input.size());
+
+ for (int i=0; i < input.size(); i++)
+ {
+ if (input.get(i) != null)
+ {
+ Iterator<Tuple> inputIterator = ((DataBag)input.get(i)).iterator();
+ if(inputIterator.hasNext())
+ {
+ pq.add(new Pair(inputIterator,i));
+ }
+ }
+ }
+ return pq;
+ }
+
+ /**
+ * Counts how many elements in the priority queue match the
+ * element at the front of the queue, which should be from the first bag.
+ *
+ * @param pq priority queue
+ * @return number of matches
+ */
+ public int countMatches(PriorityQueue<Pair> pq)
+ {
+ Pair nextPair = pq.peek();
+ Tuple data = nextPair.data;
+
+ // sanity check
+ if (!nextPair.index.equals(0))
+ {
+ throw new RuntimeException("Expected next bag to have index 0");
+ }
+
+ int matches = 0;
+ for (Pair p : pq) {
+ if (data.equals(p.data))
+ matches++;
+ }
+ // subtract 1 since element matches itself
+ return matches - 1;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ if (input.size() < 2)
+ {
+ throw new RuntimeException("Expected at least two inputs, but found " + input.size());
+ }
+
+ for (Object o : input)
+ {
+ if (o != null && !(o instanceof DataBag))
+ {
+ throw new RuntimeException("Inputs must be bags");
+ }
+ }
+
+ DataBag outputBag = bagFactory.newDefaultBag();
+
+ DataBag bag1 = (DataBag)input.get(0);
+ DataBag bag2 = (DataBag)input.get(1);
+
+ if (bag1 == null || bag1.size() == 0)
+ {
+ return outputBag;
+ }
+ // optimization
+ else if (input.size() == 2 && (bag2 == null || bag2.size() == 0))
+ {
+ return bag1;
+ }
+
+ PriorityQueue<Pair> pq = loadBags(input);
+
+ Tuple lastData = null;
+
+ while (true)
+ {
+ Pair nextPair = pq.peek();
+
+ // ignore data we've already encountered
+ if (nextPair.data.compareTo(lastData) != 0)
+ {
+ // Only take data from the first bag, where there are no other
+ // bags that have the same data.
+ if (nextPair.index.equals(0) && countMatches(pq) == 0)
+ {
+ outputBag.add(nextPair.data);
+ lastData = nextPair.data;
+ }
+ }
+
+ Pair p = pq.poll();
+
+ // only put the bag back into the queue if it still has data
+ if (p.hasNext())
+ {
+ p.next();
+ pq.offer(p);
+ }
+ else if (p.index.equals(0))
+ {
+ // stop when we exhaust all elements from the first bag
+ break;
+ }
+ }
+
+ return outputBag;
+ }
+
+ /**
+ * A wrapper for the tuple iterator that implements comparable so it can be used in the priority queue.
+ *
+ * <p>
+ * This is compared first on the data, then on the index the bag came from
+ * in the input tuple.
+ * </p>
+ * @author mhayes
+ *
+ */
+ private static class Pair implements Comparable<Pair>
+ {
+ private final Iterator<Tuple> it;
+ private final Integer index;
+ private Tuple data;
+
+ /**
+ * Constructs the {@link Pair}.
+ *
+ * @param it tuple iterator
+ * @param index index within the tuple that the bag came from
+ */
+ public Pair(Iterator<Tuple> it, int index)
+ {
+ this.index = index;
+ this.it = it;
+ this.data = it.next();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(Pair o)
+ {
+ int r = this.data.compareTo(o.data);
+ if (r == 0)
+ {
+ return index.compareTo(o.index);
+ }
+ else
+ {
+ return r;
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return it.hasNext();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple next()
+ {
+ Tuple nextData = it.next();
+ // algorithm assumes data is in order
+ if (data.compareTo(nextData) > 0)
+ {
+ throw new RuntimeException("Out of order!");
+ }
+ this.data = nextData;
+ return this.data;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("[%s within %d]",data,index);
+ }
+ }
+}
+
+
|