datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [17/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java
new file mode 100644
index 0000000..48deaad
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Performs a simple random sample using an in-memory reservoir to produce
+ * a uniformly random sample of a given size.
+ * 
+ * <p>
+ * This is similar to {@link SimpleRandomSample}, however it is guaranteed to produce
+ * a sample of the given size.  This comes at the cost of scalability.
+ * {@link SimpleRandomSample} produces a sample of the desired size with likelihood of 99.99%,
+ * while using less internal storage.  ReservoirSample on the other hand uses internal storage
+ * with size equaling the desired sample to guarantee the exact sample size.
+ * </p>
+ * 
+ * <p>
+ * This algebraic implementation is backed by a heap and maintains the original roll in order
+ * to compensate for skew.
+ * </p>
+ * 
+ * @author wvaughan
+ *
+ */
+@Nondeterministic
+public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Algebraic
+{
+  protected Integer numSamples;
+  
+  private Reservoir reservoir;
+  
+  protected ScoredTuple.ScoreGenerator scoreGen;
+  
+  private Reservoir getReservoir()
+  {
+    if (reservoir == null) {
+      reservoir = new Reservoir(this.numSamples);
+    }
+    return reservoir;
+  }
+  
+  public ReservoirSample(String numSamples)
+  {
+    this.numSamples = Integer.parseInt(numSamples);    
+  }
+  
+  protected ScoredTuple.ScoreGenerator getScoreGenerator()
+  {
+      if(this.scoreGen == null)
+      {
+          this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+      }
+      return this.scoreGen;
+  }
+  
+  @Override
+  public void accumulate(Tuple input) throws IOException
+  {
+    DataBag samples = (DataBag) input.get(0);
+    ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
+    for (Tuple sample : samples) {
+      getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
+    }  
+  }
+
+  @Override
+  public void cleanup()
+  {
+    this.reservoir.clear();
+  }
+
+  @Override
+  public DataBag getValue()
+  {
+    DataBag output = BagFactory.getInstance().newDefaultBag();  
+    for (ScoredTuple sample : getReservoir()) {
+      output.add(sample.getTuple());
+    }
+    return output;
+  }
+
+  @Override
+  public DataBag exec(Tuple input) throws IOException 
+  {    
+    DataBag samples = (DataBag)input.get(0);
+    if (samples.size() <= numSamples) {
+      return samples;
+    }
+    else
+    {
+      return super.exec(input);
+    }
+  }
+  
+  @Override
+  public Schema outputSchema(Schema input) {
+    try {
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG) {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+      
+      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+                                               inputFieldSchema.schema, DataType.BAG));    
+    } catch (FrontendException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+  
+  String param = null;
+  private String getParam()
+  {
+    if (param == null) {
+      if (numSamples != null) {
+        param = String.format("('%d')", numSamples);
+      } else {
+        param = "";
+      }
+    }
+    return param;
+  }
+
+  @Override
+  public String getInitial() {
+    return Initial.class.getName()+getParam();
+  }
+
+  @Override
+  public String getIntermed() {
+    return Intermediate.class.getName()+getParam();
+  }
+
+  @Override
+  public String getFinal() {
+    return Final.class.getName()+getParam();
+  }
+  
+  static public class Initial extends EvalFunc<Tuple>
+  {
+    int numSamples;
+    private Reservoir reservoir;
+    protected ScoredTuple.ScoreGenerator scoreGen;
+    TupleFactory tupleFactory = TupleFactory.getInstance();
+    
+    public Initial(){}
+    
+    public Initial(String numSamples)
+    {
+      this.numSamples = Integer.parseInt(numSamples);
+    }
+    
+    private Reservoir getReservoir()
+    {
+      if (reservoir == null) {
+        reservoir = new Reservoir(this.numSamples);
+      }
+      return reservoir;
+    }
+    
+    protected ScoredTuple.ScoreGenerator getScoreGenerator()
+    {
+        if(this.scoreGen == null)
+        {
+            this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+        }
+        return this.scoreGen;
+    }
+
+    @Override
+    public Tuple exec(Tuple input) throws IOException {
+      DataBag output = BagFactory.getInstance().newDefaultBag();
+      
+      ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
+      
+      DataBag samples = (DataBag) input.get(0);
+      if (samples == null)
+      {
+        // do nothing
+      }
+      else if (samples.size() <= numSamples) {
+        // no need to construct a reservoir, so just emit intermediate tuples
+        for (Tuple sample : samples) {
+          // add the score on to the intermediate tuple
+          output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
+        }
+      } else {     
+        getReservoir().clear();
+        
+        for (Tuple sample : samples) {
+          getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
+        }    
+        
+        for (ScoredTuple scoredTuple : getReservoir()) {
+          // add the score on to the intermediate tuple
+          output.add(scoredTuple.getIntermediateTuple(tupleFactory));
+        }
+      }
+
+      return tupleFactory.newTuple(output);
+    }
+    
+  }
+  
+  static public class Intermediate extends EvalFunc<Tuple>
+  {
+    int numSamples;
+    private Reservoir reservoir;
+    TupleFactory tupleFactory = TupleFactory.getInstance();
+    
+    public Intermediate(){}
+    
+    public Intermediate(String numSamples)
+    {
+      this.numSamples = Integer.parseInt(numSamples);
+    }
+    
+    private Reservoir getReservoir()
+    {
+      if (reservoir == null) {
+        reservoir = new Reservoir(this.numSamples);
+      }
+      return reservoir;
+    }
+
+    @Override
+    public Tuple exec(Tuple input) throws IOException {
+      getReservoir().clear();
+      
+      DataBag bagOfSamples = (DataBag) input.get(0);
+      for (Tuple innerTuple : bagOfSamples) {
+        DataBag samples = (DataBag) innerTuple.get(0);        
+        
+        for (Tuple sample : samples) {
+          // use the same score as previously generated
+          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+        }
+      }
+      
+      DataBag output = BagFactory.getInstance().newDefaultBag();
+      for (ScoredTuple scoredTuple : getReservoir()) {
+        // add the score on to the intermediate tuple
+        output.add(scoredTuple.getIntermediateTuple(tupleFactory));
+      }
+
+      return tupleFactory.newTuple(output);
+    }
+    
+  }
+  
+  static public class Final extends EvalFunc<DataBag>
+  {
+    int numSamples;
+    private Reservoir reservoir;
+    TupleFactory tupleFactory = TupleFactory.getInstance();
+    
+    public Final(){}
+    
+    public Final(String numSamples)
+    {
+      this.numSamples = Integer.parseInt(numSamples);
+    }
+    
+    private Reservoir getReservoir()
+    {
+      if (reservoir == null) {
+        reservoir = new Reservoir(this.numSamples);
+      }
+      return reservoir;
+    }
+    
+    @Override
+    public DataBag exec(Tuple input) throws IOException {
+      getReservoir().clear();
+      
+      DataBag bagOfSamples = (DataBag) input.get(0);
+      for (Tuple innerTuple : bagOfSamples) {
+        DataBag samples = (DataBag) innerTuple.get(0);        
+        
+        for (Tuple sample : samples) {
+          // use the same score as previously generated
+          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+        }
+      }
+      
+      DataBag output = BagFactory.getInstance().newDefaultBag();  
+      for (ScoredTuple scoredTuple : getReservoir()) {
+        // output the original tuple
+        output.add(scoredTuple.getTuple());
+      }
+
+      return output;
+    }    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java b/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java
new file mode 100644
index 0000000..90ea576
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SampleByKey.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Provides a way of sampling tuples based on certain fields.
+ * This is essentially equivalent to grouping on the fields, applying SAMPLE,
+ * and then flattening.  It is much more efficient though because it does not require
+ * a reduce step.
+ * 
+ * <p>
+ * The method of sampling is to convert the key to a hash, derive a double value
+ * from this, and then test this against a supplied probability.  The double value
+ * derived from a key is uniformly distributed between 0 and 1.
+ * </p>
+ * 
+ * <p>
+ * The only required parameter is the sampling probability.  This may be followed
+ * by an optional seed value to control the random number generation.  
+ * </p>
+ * 
+ * <p>
+ * SampleByKey will work deterministically as long as the same seed is provided.  
+ * </p>
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * DEFINE SampleByKey datafu.pig.sampling.SampleByKey('0.5');
+ * 
+ *-- input: (A,1), (A,2), (A,3), (B,1), (B,3)
+ * 
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ * output = FILTER data BY SampleByKey(A_id);
+ * 
+ * --output: (B,1), (B,3)
+ * } 
+ * 
+ * </pre>
+ * </p>
+ * @author evion 
+ * 
+ */
+
+public class SampleByKey extends FilterFunc
+{
+  final static int PRIME_NUMBER = 31;
+  
+  Integer seed = null;
+  double probability;
+  
+  public SampleByKey(String probability) {
+    this.probability = Double.parseDouble(probability);
+  }
+  
+  public SampleByKey(String probability, String salt) {
+    this(probability);
+    this.seed = salt.hashCode();
+  }
+
+  @Override
+  public void setUDFContextSignature(String signature)
+  {
+    if (this.seed == null)
+      this.seed = signature.hashCode();
+    super.setUDFContextSignature(signature);
+  }
+
+  @Override
+  public Boolean exec(Tuple input) throws IOException 
+  {
+    int hashCode = 0;
+    for(int i=0; i<input.size(); i++) {
+      Object each = input.get(i);
+      hashCode = hashCode*PRIME_NUMBER + each.hashCode();
+    }
+      
+    try {
+      return intToRandomDouble(hashCode) <= probability;
+    }
+    catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException("Exception on intToRandomDouble");
+    }
+  }
+  
+  private Double intToRandomDouble(int input) throws Exception
+  {
+    MessageDigest hasher = MessageDigest.getInstance("sha-1");
+
+    ByteBuffer b = ByteBuffer.allocate(4+4);
+    ByteBuffer b2 = ByteBuffer.allocate(20);
+
+    b.putInt(seed);
+    b.putInt(input);
+    byte[] digest = hasher.digest(b.array());
+    b.clear();
+
+    b2.put(digest);
+    b2.rewind();
+    double result = (((double)b2.getInt())/Integer.MAX_VALUE  + 1)/2;
+    b2.clear();
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java b/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java
new file mode 100644
index 0000000..c793584
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/ScoredTuple.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+class ScoredTuple implements Comparable<ScoredTuple>
+{
+  Double score;
+  private Tuple tuple;
+  
+  public ScoredTuple()
+  {
+    
+  }
+                     
+  public ScoredTuple(Double score, Tuple tuple)
+  {
+    this.score = score;       
+    this.setTuple(tuple);
+  }
+  
+  public Double getScore() {
+    return score;
+  }
+
+  public void setScore(Double score) {
+    this.score = score;
+  }
+
+  public Tuple getTuple() {
+    return tuple;
+  }
+
+  public void setTuple(Tuple tuple) {
+    this.tuple = tuple;
+  }
+  
+  public Tuple getIntermediateTuple(TupleFactory tupleFactory)
+  {
+    Tuple intermediateTuple = tupleFactory.newTuple(2);
+    try {
+      intermediateTuple.set(0, score);
+      intermediateTuple.set(1, tuple);
+    }
+    catch (ExecException e) {
+      throw new RuntimeException(e);
+    }
+    
+    return intermediateTuple;
+  }
+  
+  public static ScoredTuple fromIntermediateTuple(Tuple intermediateTuple) throws ExecException
+  {
+    //Double score = ((Number)intermediateTuple.get(0)).doubleValue();
+    try {
+    Double score = (Double)intermediateTuple.get(0);
+    Tuple originalTuple = (Tuple)intermediateTuple.get(1);
+    return new ScoredTuple(score, originalTuple);
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot deserialize intermediate tuple: "+intermediateTuple.toString(), e);
+    }
+  }
+
+  @Override
+  public int compareTo(ScoredTuple o) {
+    if (score == null) {
+      if (o == null) return 0;
+      else return -1;
+    }
+    return score.compareTo(o.score);
+  }
+  
+  static interface ScoreGenerator
+  {      
+      double generateScore(Tuple sample) throws ExecException;
+  }
+  
+  static class PureRandomScoreGenerator implements ScoreGenerator
+  {
+      public PureRandomScoreGenerator(){}
+      
+      public double generateScore(Tuple sample)
+      {
+          return Math.random();
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java
new file mode 100644
index 0000000..8e8debf
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSample.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Scalable simple random sampling (ScaSRS).
+ * <p/>
+ * This UDF implements a scalable simple random sampling algorithm described in
+ * 
+ * <pre>
+ * X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
+ * </pre>
+ * 
+ * It takes a bag of n items and a sampling probability p as the inputs, and outputs a
+ * simple random sample of size exactly ceil(p*n) in a bag, with probability at least
+ * 99.99%. For example, the following script generates a simple random sample with
+ * sampling probability 0.1:
+ * 
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ * 
+ * item    = LOAD 'input' AS (x:double); 
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01));
+ * </pre>
+ * 
+ * Optionally, user can provide a good lower bound of n as the third argument to help
+ * reduce the size of intermediate data, for example:
+ * 
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ * 
+ * item    = LOAD 'input' AS (x:double); 
+ * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01, summary.count));
+ * </pre>
+ * 
+ * This UDF is very useful for stratified sampling. For example, the following script
+ * keeps all positive examples while downsampling negatives with probability 0.1:
+ * 
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ * 
+ * item    = LOAD 'input' AS (x:double, label:int);
+ * grouped = FOREACH (GROUP item BY label) GENERATE item, (group == 1 ? 1.0 : 0.1) AS p; 
+ * sampled = FOREACH grouped GENERATE FLATTEN(SRS(item, p));
+ * </pre>
+ * 
+ * In a Java Hadoop MapReduce job, we can output selected items directly using
+ * MultipleOutputs. However, this feature is not available in a Pig UDF. So we still let
+ * selected items go through the sort phase. However, as long as the sample size is not
+ * huge, this should not be a big problem.
+ * 
+ * In the first version, the sampling probability is specified in the constructor. This 
+ * method is deprecated now and will be removed in the next release.
+ * 
+ * @author ximeng
+ * 
+ */
+public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
+{
+  /**
+   * Prefix for the output bag name.
+   */
+  public static final String OUTPUT_BAG_NAME_PREFIX = "SRS";
+
+  private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
+  private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
+
+  public SimpleRandomSample()
+  {
+    // empty
+  }
+
+  /**
+   * Constructs this UDF with a sampling probability.
+   * 
+   * @deprecated Should specify the sampling probability in the function call.
+   */
+  @Deprecated
+  public SimpleRandomSample(String samplingProbability)
+  {
+	double p = Double.parseDouble(samplingProbability);
+	verifySamplingProbability(p);
+  }
+  
+  @Override
+  public String getInitial()
+  {
+    return Initial.class.getName();
+  }
+
+  @Override
+  public String getIntermed()
+  {
+    return Intermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal()
+  {
+    return Final.class.getName();
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try
+    {
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+
+      return new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+                                                                   input),
+                                               inputFieldSchema.schema,
+                                               DataType.BAG));
+    }
+    catch (FrontendException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static public class Initial extends EvalFunc<Tuple>
+  {
+    // Should avoid creating many random number generator instances.
+    private static RandomDataImpl _RNG = new RandomDataImpl();
+
+    synchronized private static double nextDouble()
+    {
+      return _RNG.nextUniform(0.0d, 1.0d);
+    }
+
+    public Initial()
+    {
+      // empty
+    }
+
+    @Deprecated
+    public Initial(String samplingProbability)
+    {
+      _p = Double.parseDouble(samplingProbability);
+    }
+    
+    private boolean _first = true;
+    private double _p = -1.0d; // the sampling probability
+    private long _n1 = 0L; // the input lower bound of the size of the population
+    private long _localCount = 0L; // number of items processed by this instance
+
+    @Override
+    public Tuple exec(Tuple input) throws IOException
+    {
+      int numArgs = input.size();
+      
+      // The first if clause is for backward compatibility, which should be removed 
+      // after we remove specifying sampling probability in the constructor.
+      if(numArgs == 1)
+      {
+        if(_p < 0.0d)
+        {
+          throw new IllegalArgumentException("Sampling probability is not given.");
+        }
+      }
+      else if (numArgs < 2 || numArgs > 3)
+      {
+        throw new IllegalArgumentException("The input tuple should have either two or three fields: "
+            + "a bag of items, the sampling probability, "
+            + "and optionally a good lower bound of the size of the population or the exact number.");
+      }
+
+      DataBag items = (DataBag) input.get(0);
+      long numItems = items.size();
+      _localCount += numItems;
+
+      // This is also for backward compatibility. Should change to
+      // double p = ((Number) input.get(1)).doubleValue();
+      // after we remove specifying sampling probability in the constructor.
+      double p = numArgs == 1 ? _p : ((Number) input.get(1)).doubleValue();
+      if (_first)
+      {
+        _p = p;
+        verifySamplingProbability(p);
+      }
+      else
+      {
+        if (p != _p)
+        {
+          throw new IllegalArgumentException("The sampling probability must be a scalar, but found two different values: "
+              + _p + " and " + p + ".");
+        }
+      }
+
+      long n1 = 0L;
+      if (numArgs > 2)
+      {
+        n1 = ((Number) input.get(2)).longValue();
+
+        if (_first)
+        {
+          _n1 = n1;
+        }
+        else
+        {
+          if (n1 != _n1)
+          {
+            throw new IllegalArgumentException("The lower bound of the population size must be a scalar, but found two different values: "
+                + _n1 + " and " + n1 + ".");
+          }
+        }
+      }
+
+      _first = false;
+
+      // Use the local count if the input lower bound is smaller.
+      n1 = Math.max(n1, _localCount);
+
+      DataBag selected = _BAG_FACTORY.newDefaultBag();
+      DataBag waiting = _BAG_FACTORY.newDefaultBag();
+
+      if (n1 > 0L)
+      {
+        double q1 = getQ1(n1, p);
+        double q2 = getQ2(n1, p);
+
+        for (Tuple t : items)
+        {
+          double x = nextDouble();
+          if (x < q1)
+          {
+            selected.add(t);
+          }
+          else if (x < q2)
+          {
+            waiting.add(new ScoredTuple(x, t).getIntermediateTuple(_TUPLE_FACTORY));
+          }
+        }
+      }
+
+      /*
+       * The output tuple contains the following fields: sampling probability (double),
+       * number of processed items in this tuple (long), a good lower bound of the size of
+       * the population or the exact number (long), a bag of selected items (bag), and a
+       * bag of waitlisted items with scores (bag).
+       */
+      Tuple output = _TUPLE_FACTORY.newTuple();
+
+      output.append(p);
+      output.append(numItems);
+      output.append(n1);
+      output.append(selected);
+      output.append(waiting);
+
+      return output;
+    }
+  }
+
+  public static class Intermediate extends EvalFunc<Tuple>
+  {
+    public Intermediate()
+    {
+      // empty
+    }
+    
+    @Deprecated
+    public Intermediate(String samplingProbability)
+    {
+      // empty
+    }
+
+    @Override
+    public Tuple exec(Tuple input) throws IOException
+    {
+      DataBag bag = (DataBag) input.get(0);
+
+      DataBag selected = _BAG_FACTORY.newDefaultBag();
+      DataBag aggWaiting = _BAG_FACTORY.newDefaultBag();
+
+      boolean first = true;
+      double p = 0.0d;
+      long numItems = 0L; // number of items processed, including rejected
+      long n1 = 0L;
+
+      for (Tuple tuple : bag)
+      {
+        if (first)
+        {
+          p = (Double) tuple.get(0);
+          first = false;
+        }
+
+        numItems += (Long) tuple.get(1);
+        n1 = Math.max((Long) tuple.get(2), numItems);
+
+        selected.addAll((DataBag) tuple.get(3));
+        aggWaiting.addAll((DataBag) tuple.get(4));
+      }
+
+      DataBag waiting = _BAG_FACTORY.newDefaultBag();
+
+      if (n1 > 0L)
+      {
+        double q1 = getQ1(n1, p);
+        double q2 = getQ2(n1, p);
+
+        for (Tuple t : aggWaiting)
+        {
+          ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
+
+          if (scored.getScore() < q1)
+          {
+            selected.add(scored.getTuple());
+          }
+          else if (scored.getScore() < q2)
+          {
+            waiting.add(t);
+          }
+        }
+      }
+
+      Tuple output = _TUPLE_FACTORY.newTuple();
+
+      output.append(p);
+      output.append(numItems);
+      output.append(n1);
+      output.append(selected);
+      output.append(waiting);
+
+      return output;
+    }
+  }
+
+  static public class Final extends EvalFunc<DataBag>
+  {
+    public Final()
+    {
+      // empty
+    }
+
+    @Deprecated
+    public Final(String samplingProbability)
+    {
+      // empty
+    }
+    
+    @Override
+    public DataBag exec(Tuple input) throws IOException
+    {
+      DataBag bag = (DataBag) input.get(0);
+
+      boolean first = true;
+      double p = 0.0d; // the sampling probability
+      long n = 0L; // the size of the population (total number of items)
+
+      DataBag selected = _BAG_FACTORY.newDefaultBag();
+      DataBag waiting = _BAG_FACTORY.newSortedBag(ScoredTupleComparator.getInstance());
+
+      for (Tuple tuple : bag)
+      {
+        if (first)
+        {
+          p = (Double) tuple.get(0);
+          first = false;
+        }
+
+        n += (Long) tuple.get(1);
+        selected.addAll((DataBag) tuple.get(3));
+        waiting.addAll((DataBag) tuple.get(4));
+      }
+
+      long numSelected = selected.size();
+      long numWaiting = waiting.size();
+
+      long s = (long) Math.ceil(p * n); // sample size
+
+      System.out.println("To sample " + s + " items from " + n + ", we pre-selected "
+          + numSelected + ", and waitlisted " + waiting.size() + ".");
+
+      long numNeeded = s - selected.size();
+
+      if (numNeeded < 0)
+      {
+        System.err.println("Pre-selected " + numSelected + " items, but only needed " + s
+            + ".");
+      }
+
+      for (Tuple scored : waiting)
+      {
+        if (numNeeded <= 0)
+        {
+          break;
+        }
+        selected.add(ScoredTuple.fromIntermediateTuple(scored).getTuple());
+        numNeeded--;
+      }
+
+      if (numNeeded > 0)
+      {
+        System.err.println("The waiting list only has " + numWaiting
+            + " items, but needed " + numNeeded + " more.");
+      }
+
+      return selected;
+    }
+  }
+
+  // computes a threshold to select items
+  private static double getQ1(long n, double p)
+  {
+    double t1 = 20.0d / (3.0d * n);
+    double q1 = p + t1 - Math.sqrt(t1 * t1 + 3.0d * t1 * p);
+    return q1;
+  }
+
+  // computes a threshold to reject items
+  private static double getQ2(long n, double p)
+  {
+    double t2 = 10.0d / n;
+    double q2 = p + t2 + Math.sqrt(t2 * t2 + 2.0d * t2 * p);
+    return q2;
+  }
+  
+  private static void verifySamplingProbability(double p)
+  {
+	if(p < 0.0 || p > 1.0) 
+	{
+	  throw new IllegalArgumentException("Sampling probabiilty must be inside [0, 1].");
+	}
+  }
+
+  static class ScoredTupleComparator implements Comparator<Tuple>
+  {
+    public static final ScoredTupleComparator getInstance()
+    {
+      return _instance;
+    }
+
+    private static final ScoredTupleComparator _instance = new ScoredTupleComparator();
+
+    @Override
+    public int compare(Tuple o1, Tuple o2)
+    {
+      try
+      {
+        ScoredTuple t1 = ScoredTuple.fromIntermediateTuple(o1);
+        ScoredTuple t2 = ScoredTuple.fromIntermediateTuple(o2);
+        return t1.getScore().compareTo(t2.getScore());
+      }
+      catch (Throwable e)
+      {
+        throw new RuntimeException("Cannot compare " + o1 + " and " + o2 + ".", e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java
new file mode 100644
index 0000000..a59816a
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementElect.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Select the candidate with the smallest score for each position from the candidates
+ * proposed by {@link SimpleRandomSampleWithReplacementVote}.
+ * 
+ * @see SimpleRandomSampleWithReplacementVote
+ * 
+ * @author ximeng
+ * 
+ */
+public class SimpleRandomSampleWithReplacementElect extends AlgebraicEvalFunc<DataBag>
+{
+  /**
+   * Prefix for the output bag name.
+   */
+  public static final String OUTPUT_BAG_NAME_PREFIX = "SRSWR_ELECT";
+
+  public static final TupleFactory tupleFactory = TupleFactory.getInstance();
+  public static final BagFactory bagFactory = BagFactory.getInstance();
+
+  static class CandidateComparator implements Comparator<Tuple>
+  {
+    private static CandidateComparator _instance = new CandidateComparator();
+
+    public static CandidateComparator get()
+    {
+      return _instance;
+    }
+
+    private CandidateComparator()
+    {
+      // singleton
+    }
+
+    @Override
+    public int compare(Tuple o1, Tuple o2)
+    {
+      try
+      {
+        // first by position
+        int c1 = ((Integer) o1.get(0)).compareTo((Integer) o2.get(0));
+        if (c1 != 0)
+        {
+          return c1;
+        }
+        else
+        {
+          // then by score
+          return ((Double) o1.get(1)).compareTo((Double) o2.get(1));
+        }
+      }
+      catch (ExecException e)
+      {
+        throw new RuntimeException("Error comparing tuples " + o1 + " and " + o2, e);
+      }
+    }
+  }
+
+  @Override
+  public String getInitial()
+  {
+    return Initial.class.getName();
+  }
+
+  @Override
+  public String getIntermed()
+  {
+    return Intermediate.class.getName();
+  }
+
+  @Override
+  public String getFinal()
+  {
+    return Final.class.getName();
+  }
+
+  static public class Initial extends EvalFunc<Tuple>
+  {
+    @Override
+    public Tuple exec(Tuple input) throws IOException
+    {
+      // output each input candidate
+      return input;
+    }
+  }
+
+  static public class Intermediate extends EvalFunc<Tuple>
+  {
+    @Override
+    public Tuple exec(Tuple tuple) throws IOException
+    {
+      // sort candidates first by index, then by key
+      DataBag candidates = bagFactory.newSortedBag(CandidateComparator.get());
+      for (Tuple intermediateOutputTuple : (DataBag) tuple.get(0))
+      {
+        candidates.addAll((DataBag) intermediateOutputTuple.get(0));
+      }
+
+      DataBag outputBag = bagFactory.newDefaultBag();
+      int i = -1;
+      for (Tuple candidate : candidates)
+      {
+        int pos = (Integer) candidate.get(0);
+        if (pos > i)
+        {
+          outputBag.add(candidate);
+          i = pos;
+        }
+      }
+
+      return tupleFactory.newTuple(outputBag);
+    }
+
+  }
+
+  static public class Final extends EvalFunc<DataBag>
+  {
+    @Override
+    public DataBag exec(Tuple tuple) throws IOException
+    {
+      DataBag candidates = bagFactory.newSortedBag(CandidateComparator.get());
+      for (Tuple intermediateOutputTuple : (DataBag) tuple.get(0))
+      {
+        candidates.addAll((DataBag) intermediateOutputTuple.get(0));
+      }
+
+      DataBag outputBag = bagFactory.newDefaultBag();
+      int i = -1;
+      for (Tuple candidate : candidates)
+      {
+        int pos = (Integer) candidate.get(0);
+        if (pos > i)
+        {
+          outputBag.add((Tuple) candidate.get(2));
+          i = pos;
+        }
+      }
+      return outputBag;
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try
+    {
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+
+      // the output is a bag of selected items
+      Schema outputSchema =
+          new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+                                                                input),
+                                            inputFieldSchema.schema.getField(0).schema.getField(2).schema,
+                                            DataType.BAG));
+
+      return outputSchema;
+    }
+    catch (FrontendException e)
+    {
+      throw new RuntimeException("Error deriving output schema.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java
new file mode 100644
index 0000000..598e58c
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/SimpleRandomSampleWithReplacementVote.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.math.MathException;
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+
+/**
+ * Scalable simple random sampling with replacement (ScaSRSWR).
+ * <p/>
+ * This UDF together with {@link SimpleRandomSampleWithReplacementElect} implement a
+ * scalable algorithm for simple random sampling with replacement (SRSWR), which is a
+ * randomized algorithm with a failure rate less than {@value #FAILURE_RATE}.
+ * <p/>
+ * Let s be the desired sample size. To compute an SRSWR sample of size s, for each output
+ * position in {0, 1, ..., s-1}, we want to select an item from the population uniformly
+ * at random. This algorithm consists of two stages: vote and election. In the vote stage,
+ * this UDF {@link SimpleRandomSampleWithReplacementVote} votes items, called candidates,
+ * for each position. In the election stage, the paired UDF
+ * {@link SimpleRandomSampleWithReplacementElect} elects one candidate for each position.
+ * The algorithm succeeds if we have at least one candidate for each position.
+ * <p/>
+ * To use this UDF pair, user needs to provide: 1) the desired sample size, 2) a good
+ * lower bound of the population size or the exact size. The input to the vote UDF
+ * {@link SimpleRandomSampleWithReplacementVote} is a tuple that consists of a bag of
+ * items, the desired sample size (int), and the population size (long) or a good lower
+ * bound of it, where the latter two must be scalars. The output from the vote UDF is a
+ * tuple that consists of position:int, score:double, and candidate. The input to the
+ * elect UDF {@link SimpleRandomSampleWithReplacementElect} is a tuple that contains all
+ * candidates voted by the vote UDF for some positions. The output from the elect UDF is a
+ * bag of sampled items.
+ * <p/>
+ * For example, the following script generates a sample of size 100000 with replacement:
+ * 
+ * <pre>
+ * DEFINE SRSWR_VOTE  datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
+ * DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
+ * 
+ * item       = LOAD 'input' AS (x:double); 
+ * summary    = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+ * candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count));
+ * sampled    = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE FLATTEN(SRSWR_ELECT(candidates));
+ * </pre>
+ * 
+ * Because for election we only need to group candidates voted for the same position, this
+ * algorithm can use many reducers to consume the candidates. See the "PARALLEL 10"
+ * statement above. If the item to sample is the entire row, use TOBAG(TOTUPLE(*)).
+ * <p/>
+ * SRSWR is heavily used in bootstrapping. Bootstrapping can be done easily with this UDF
+ * pair. For example, the following script generates 100 bootstrap samples, computes the
+ * mean value for each sample, and then outputs the bootstrap estimates.
+ * 
+ * <pre>
+ * summary    = FOREACH (GROUP item ALL) GENERATE AVG(item.x) AS mean, COUNT(item) AS count;
+ * candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count));
+ * sampled    = FOREACH (GROUP candidates BY (position % 100) PARALLEL 10) GENERATE AVG(SRSWR_ELECT(candidates)) AS mean;
+ * bootstrap  = FOREACH (GROUP sampled ALL) GENERATE summary.mean AS mean, sampled.mean AS bootstrapMeans;
+ * </pre>
+ * 
+ * Another usage of this UDF pair is to generate random pairs or tuples without computing
+ * the cross product, where each pair or tuple consist of items from different input
+ * sources. Let s be the number of random tuples we want to generate. For each input
+ * source, simply use the vote UDF to propose candidates, then join the candidates from
+ * different sources by their positions and for each position use the elect UDF to select
+ * one candidate from each source to form the pair or tuple for that position.
+ * <p/>
+ * The algorithm is a simple extension to the work
+ * 
+ * <pre>
+ * X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
+ * </pre>
+ * 
+ * Basically, for each output position, it performs a random sort on the population
+ * (associates each item with a random score independently drawn from the uniform
+ * distribution and then sorts items based on the scores), and picks the one that has the
+ * smallest score. However, a probabilistic threshold is used to avoid sorting the entire
+ * population. For example, if the population size is one billion and the random score
+ * generated for an item is 0.9, very likely it won't become the smallest and hence we do
+ * not need to propose it as a candidate.
+ * <p/>
+ * More precisely, let n be the population size, n1 be a good lower bound of n, s be the
+ * sample size, delta be the failure rate, and q be the threshold. For each output
+ * position the probability of all random scores being greater than q is (1-q)^n. Thus, if
+ * we throw away items with associated scores greater than q, with probability at least 1
+ * - s*(1-q)^n, we can still capture the item with the smallest score for each position.
+ * Fix delta = s*(1-q)^n and solve for q, we get q = 1-exp(log(delta/s)/n), Note that
+ * replacing n by n1 < n can only decrease the failure rate, though at the cost of
+ * increased number of candidates. The expected number of candidates is (1 -
+ * exp(log(delta/s)/n1)*s*n. When n1 equals n, this number is approximately
+ * s*log(s/delta).
+ * <p/>
+ * Generating a random score for each (item, position) pair is very expensive and
+ * unnecessary. For each item, the number of positions for which it gets voted follows a
+ * binomial distribution B(s,q). We can simply draw a number from this distribution,
+ * determine the positions by sampling without replacement, and then generate random
+ * scores for those positions. This reduces the running time significantly.
+ * <p/>
+ * Since for each position we only need the candidate with the smallest score, we
+ * implement a combiner to reduce the size of intermediate data in the elect UDF
+ * {@link SimpleRandomSampleWithReplacementElect}.
+ * 
+ * @see SimpleRandomSampleWithReplacementElect
+ * @see <a href="http://en.wikipedia.org/wiki/Bootstrapping_(statistics) target="_blank
+ *      ">Boostrapping (Wikipedia)</a>
+ * 
+ * @author ximeng
+ * 
+ */
+public class SimpleRandomSampleWithReplacementVote extends EvalFunc<DataBag>
+{
+  public static final String OUTPUT_BAG_NAME_PREFIX = "SRSWR_VOTE";
+  public static final String CANDIDATE_FIELD_NAME = "candidate";
+  public static final String POSITION_FIELD_NAME = "position";
+  public static final String SCORE_FIELD_NAME = "score";
+  public static final double FAILURE_RATE = 1e-4;
+
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+
+  private RandomDataImpl _rdg = new RandomDataImpl();
+
+  /**
+   * Samples k integers from [0, n) without replacement efficiently.
+   * 
+   * If k is small, we can repeatedly draw integers from [0, n) until there are k distinct
+   * values. For each trial, with probability at least (n-k)/n, we can draw a new value.
+   * So the expected number of trials is smaller than (k*n)/(n-k), which is a very rough
+   * bound. If k is large, we use the selection-rejection sampling algorithm. Basically,
+   * we want the running time to be O(k).
+   * 
+   */
+  private int[] sampleWithoutReplacement(int n, int k)
+  {
+    if (k == 0)
+    {
+      return new int[] {};
+    }
+
+    if (k < n / 3L)
+    {
+      Set<Integer> sample = Sets.newHashSetWithExpectedSize(k);
+
+      // The expected number of iterations is less than 1.5*k
+      while (sample.size() < k)
+      {
+        sample.add(_rdg.nextInt(0, n - 1));
+      }
+
+      return Ints.toArray(sample);
+    }
+    else
+    {
+      int[] sample = new int[k];
+
+      int i = 0;
+      for (int j = 0; j < n && i < k; ++j)
+      {
+        if (_rdg.nextUniform(0.0d, 1.0d) < 1.0d * (k - i) / (n - j))
+        {
+          sample[i] = j;
+          i++;
+        }
+      }
+
+      return sample;
+    }
+  }
+
+  @Override
+  public DataBag exec(Tuple tuple) throws IOException
+  {
+    if (tuple.size() != 3)
+    {
+      throw new IllegalArgumentException("The input arguments are: "
+          + "a bag of items, the desired sample size (int), and the population size (long) or a good lower bound of it");
+    }
+
+    DataBag items = (DataBag) tuple.get(0);
+    int sampleSize = ((Number) tuple.get(1)).intValue();
+    long count = ((Number) tuple.get(2)).longValue();
+
+    /*
+     * The following threshold is to guarantee that each output position contains at least
+     * one candidate with high probability.
+     */
+    double threshold = 1.0d - Math.exp(Math.log(FAILURE_RATE / sampleSize) / count);
+
+    DataBag candidates = bagFactory.newDefaultBag();
+
+    for (Tuple item : items)
+    {
+      // Should be able to support long sample size if nextBinomial supports long.
+      int numOutputPositions;
+      try
+      {
+        numOutputPositions = _rdg.nextBinomial(sampleSize, threshold);
+      }
+      catch (MathException e)
+      {
+        throw new RuntimeException("Failed to generate a binomial value with n = "
+            + sampleSize + " and p = " + threshold, e);
+      }
+      for (int outputPosition : sampleWithoutReplacement(sampleSize, numOutputPositions))
+      {
+        Tuple candidate = tupleFactory.newTuple();
+        candidate.append(outputPosition);
+        candidate.append(_rdg.nextUniform(0.0d, 1.0d)); // generate a random score
+        candidate.append(item);
+        candidates.add(candidate);
+      }
+    }
+
+    return candidates;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try
+    {
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+
+      List<Schema.FieldSchema> fieldSchemas = Lists.newArrayList();
+
+      fieldSchemas.add(new Schema.FieldSchema(POSITION_FIELD_NAME, DataType.INTEGER));
+      fieldSchemas.add(new Schema.FieldSchema(SCORE_FIELD_NAME, DataType.DOUBLE));
+      fieldSchemas.add(new Schema.FieldSchema(CANDIDATE_FIELD_NAME,
+                                              inputFieldSchema.schema.getField(0).schema));
+
+      Schema outputSchema =
+          new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+                                                                input),
+                                            new Schema(fieldSchemas),
+                                            DataType.BAG));
+
+      return outputSchema;
+    }
+    catch (FrontendException e)
+    {
+      throw new RuntimeException("Error deriving output schema.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java
new file mode 100644
index 0000000..92af6a3
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * <p>
+ * Performs a weighted random sample using an in-memory reservoir to produce
+ * a weighted random sample of a given size based on the A-Res algorithm described in 
+ * {@link <a href="http://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf" target="_blank">paper</a>}. 
+ * </p>
+ * <p>
+ * Species with larger weight have higher probability to be selected in the final sample set.
+ * </p>
+ * <p>
+ * This UDF inherits from {@link ReservoirSample} and it is guaranteed to produce
+ * a sample of the given size.  Similarly it comes at the cost of scalability.
+ * since it uses internal storage with size equaling the desired sample to guarantee the exact sample size.
+ * </p>
+ * <p>
+ * Its constructor takes 2 arguments. 
+ * <ul>
+ *     <li>The 1st argument specifies the sample size which should be a string of positive integer.
+ *     <li>The 2nd argument specifies the index of the weight field in the input tuple, 
+ *     which should be a string of non-negative integer that is no greater than the input tuple size. 
+ * </ul>
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+ * input = LOAD 'input' AS (v1:chararray, v2:INT);
+ * input_g = GROUP input ALL;
+ * sampled = FOREACH input_g GENERATE WeightedSample(input);
+ * }
+ * </pre>
+ * </p>
+ * @author wjian
+ */
+
+@Nondeterministic
+public class WeightedReservoirSample extends ReservoirSample {
+    
+    private Integer weightIdx;
+    
+    public WeightedReservoirSample(String strNumSamples, String strWeightIdx)
+    {
+        super(strNumSamples);
+        this.weightIdx = Integer.parseInt(strWeightIdx);
+        if(this.weightIdx < 0) {
+            throw new IllegalArgumentException("Invalid negative index of weight field argument for WeightedReserviorSample constructor: " 
+                                     + strWeightIdx);
+        }
+    }
+    
+    @Override
+    protected ScoredTuple.ScoreGenerator getScoreGenerator()
+    {
+        if(super.scoreGen == null)
+        {
+            super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+        }
+        return this.scoreGen;
+    }
+    
+    @Override
+    public Schema outputSchema(Schema input) {
+      try {
+        Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+        if (inputFieldSchema.type != DataType.BAG) {
+          throw new RuntimeException("Expected a BAG as input");
+        }
+        
+        Schema inputBagSchema = inputFieldSchema.schema;
+        
+        if (inputBagSchema.getField(0).type != DataType.TUPLE)
+        {
+            throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                   DataType.findTypeName(inputBagSchema.getField(0).type)));
+        }
+        
+        Schema tupleSchema = inputBagSchema.getField(0).schema;
+        
+        if(tupleSchema == null) {
+            throw new RuntimeException("The tuple of input bag has no schema");
+        }
+        
+        List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+        
+        if(fieldSchemaList == null || fieldSchemaList.size() <= Math.max(0, this.weightIdx)) {
+            throw new RuntimeException("The field schema of the input tuple is null " +
+            		                   "or the tuple size is no more than the weight field index: "
+                                       + this.weightIdx);
+        }
+        
+        if(fieldSchemaList.get(this.weightIdx).type != DataType.INTEGER &&
+           fieldSchemaList.get(this.weightIdx).type != DataType.LONG &&
+           fieldSchemaList.get(this.weightIdx).type != DataType.FLOAT &&
+           fieldSchemaList.get(this.weightIdx).type != DataType.DOUBLE)
+        {
+            String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
+                                                   DataType.findTypeName(DataType.LONG),
+                                                   DataType.findTypeName(DataType.FLOAT),
+                                                   DataType.findTypeName(DataType.DOUBLE)};
+            throw new RuntimeException("Expect the type of the weight field of the input tuple to be of (" +
+                    java.util.Arrays.toString(expectedTypes) + "), but instead found (" + 
+                    DataType.findTypeName(fieldSchemaList.get(this.weightIdx).type) + "), weight field: " + 
+                    this.weightIdx);
+        } 
+        
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+                                                 inputFieldSchema.schema, DataType.BAG));    
+      } catch (FrontendException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+    
+    String param = null;
+    
+    private String getParam()
+    {
+      if (this.param == null) {
+          if(super.numSamples != null && this.weightIdx != null) {
+              this.param = String.format("('%d','%d')", 
+                                       super.numSamples,
+                                       this.weightIdx);
+          } else {
+              this.param = "";
+          }
+      }
+      
+      return this.param;
+    }
+
+   
+    @Override
+    public String getInitial() 
+    {
+      return Initial.class.getName() + getParam();
+    }
+  
+    @Override
+    public String getIntermed() 
+    {
+      return Intermediate.class.getName() + getParam();
+    }
+    
+    @Override
+    public String getFinal() 
+    {
+      return Final.class.getName() + getParam();
+    }
+   
+    static public class Initial extends ReservoirSample.Initial
+    {
+      private Integer weightIdx; 
+        
+      public Initial()
+      {
+          super();
+          this.weightIdx = null;
+      }
+      
+      public Initial(String strNumSamples, String strWeightIdx)
+      {
+          super(strNumSamples);
+          this.weightIdx = Integer.parseInt(strWeightIdx);
+          if(this.weightIdx < 0) {
+              throw new IllegalArgumentException("Invalid negative index of weight field for WeightedReserviorSample.Initial constructor: " 
+                                       + strWeightIdx);
+          }
+      }
+      
+      @Override
+      protected ScoredTuple.ScoreGenerator getScoreGenerator()
+      {
+          if(super.scoreGen == null)
+          {
+              super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+          }
+          return super.scoreGen;
+      }
+    }
+    
+    static public class Intermediate extends ReservoirSample.Intermediate 
+    {        
+        public Intermediate()
+        {
+            super();
+        }
+        
+        public Intermediate(String strNumSamples, String strWeightIdx)
+        {
+            super(strNumSamples);
+        }        
+    }
+    
+    static public class Final extends ReservoirSample.Final 
+    {        
+        public Final()
+        {
+            super();
+        }
+        
+        public Final(String strNumSamples, String strWeightIdx)
+        {
+            super(strNumSamples);
+        }        
+    }
+
+    static class InverseWeightScoreGenerator implements ScoredTuple.ScoreGenerator
+    {        
+        //index of the weight field of the input tuple
+        private int weightIdx;
+        
+        InverseWeightScoreGenerator(Integer weightIdx) 
+        {
+            if(weightIdx == null || weightIdx < 0) {
+                throw new IllegalArgumentException("Invalid null or negative weight index input: " + weightIdx);
+            }
+            this.weightIdx = weightIdx;
+        }
+        
+        @Override
+        public double generateScore(Tuple sample) throws ExecException
+        {
+            if(this.weightIdx >= sample.size())
+            {
+                throw new ExecException(String.format("Weight index %d is outside tuple bounds", this.weightIdx));
+            }
+            if (sample.get(this.weightIdx) == null)
+            { 
+                throw new ExecException(String.format("null value for weight at index %d",this.weightIdx));
+            }
+            double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
+            if(Double.compare(weight, 0.0) <= 0)
+            {
+                //non-positive weight should be avoided
+                throw new ExecException(String.format("Invalid sample weight [%f]. It should be a positive real number", weight));
+            }
+            //a differnt approach to try: u^(1/w) could be exp(log(u)/w) ?
+            return Math.pow(Math.random(), 1/weight);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java
new file mode 100644
index 0000000..a77164d
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/WeightedSample.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Performs weighted bernoulli sampling on a bag. 
+ * 
+ * <p>
+ * Create a new bag by performing a weighted sampling without replacement
+ * from the input bag. Sampling is biased according to a weight that
+ * is part of the inner tuples in the bag.  That is, tuples with relatively 
+ * high weights are more likely to be chosen over tuples with low weights. 
+ * Optionally, a limit on the number of items to return may be specified.
+ * </p>
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define WeightedSample datafu.pig.sampling.WeightedSample()
+ * 
+ * -- input:
+ * -- ({(a, 100),(b, 1),(c, 5),(d, 2)})
+ * input = LOAD 'input' AS (A: bag{T: tuple(name:chararray,score:int)});
+ * 
+ * output1 = FOREACH input GENERATE WeightedSample(A,1);
+ * -- output1:
+ * -- uses the field indexed by 1 as a score
+ * -- ({(a,100),(c,5),(b,1),(d,2)}) -- example of random
+ * 
+ * -- sample using the second column (index 1) and keep only the top 3
+ * output2 = FOREACH input GENERATE WeightedSample(A,1,3);
+ * -- output2:
+ * -- ({(a,100),(c,5),(b,1)})
+ * }
+ * </pre>
+ */
+@Nondeterministic
+public class WeightedSample extends EvalFunc<DataBag>
+{
+  BagFactory bagFactory = BagFactory.getInstance();
+  Long seed = null;
+
+  public WeightedSample() {
+  }
+  
+  public WeightedSample(String seed) {
+    this.seed = Long.parseLong(seed);
+  }
+
+  @Override
+  public DataBag exec(Tuple input) throws IOException {   
+    DataBag output = bagFactory.newDefaultBag();
+
+    DataBag samples = (DataBag) input.get(0);
+    if (samples == null || samples.size() == 0) {
+      return output; // if we are given null we will return an empty bag
+    }
+    int numSamples = (int) samples.size();
+    if (numSamples == 1) return samples;
+       
+    Tuple[] tuples = new Tuple[numSamples];
+    int tupleIndex = 0;
+    for (Tuple tuple : samples) {
+      tuples[tupleIndex] = tuple;
+      tupleIndex++;
+    }
+
+    double[] scores = new double[numSamples];
+    int scoreIndex = ((Number)input.get(1)).intValue();
+    tupleIndex = 0;
+    for (Tuple tuple : samples) {
+      double score = ((Number)tuple.get(scoreIndex)).doubleValue();
+      score = Math.max(score, Double.MIN_NORMAL); // negative scores cause problems
+      scores[tupleIndex] = score;
+      tupleIndex++;
+    }
+    
+    // accept any type of number for sample size, but convert to int
+    int limitSamples = numSamples;
+    if (input.size() == 3) {
+      // sample limit included
+      limitSamples = Math.min(((Number)input.get(2)).intValue(), numSamples);      
+    }
+
+    /*
+     * Here's how the algorithm works:
+     * 
+     * 1. Create a cumulative distribution of the scores 2. Draw a random number 3. Find
+     * the interval in which the drawn number falls into 4. Select the element
+     * encompassing that interval 5. Remove the selected element from consideration 6.
+     * Repeat 1-5 k times
+     * 
+     * However, rather than removing the element (#5), which is expensive for an array,
+     * this function performs some extra bookkeeping by replacing the selected element
+     * with an element from the front of the array and truncating the front. This
+     * complicates matters as the element positions have changed, so another mapping for
+     * positions is needed.
+     * 
+     * This is an O(k*n) algorithm, where k is the number of elements to sample and n is
+     * the number of scores.
+     */    
+    Random rng = null;    
+    if (seed == null) {
+      rng = new Random();
+    } else {
+      rng = new Random(seed);
+    }
+    
+    for (int k = 0; k < limitSamples; k++) {
+      double val = rng.nextDouble();
+      int idx = find_cumsum_interval(scores, val, k, numSamples);
+      if (idx == numSamples)
+        idx = rng.nextInt(numSamples - k) + k;
+
+      output.add(tuples[idx]);
+
+      scores[idx] = scores[k];
+      tuples[idx] = tuples[k];
+    }
+
+    return output;
+  }
+
+  public int find_cumsum_interval(double[] scores, double val, int begin, int end) {
+    double sum = 0.0;
+    double cumsum = 0.0;
+    for (int i = begin; i < end; i++) {
+      sum += scores[i];
+    }
+
+    for (int i = begin; i < end; i++) {
+      cumsum += scores[i];
+      if ((cumsum / sum) > val)
+        return i;
+    }
+    return end;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    try {
+      if (!(input.size() == 2 || input.size() == 3))
+      {
+        throw new RuntimeException("Expected input to have two or three fields");
+      }
+      
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG) {
+        throw new RuntimeException("Expected a BAG as first input, got: "+inputFieldSchema.type);
+      }
+      
+      if (input.getField(1).type != DataType.INTEGER) {
+        throw new RuntimeException("Expected an INT as second input, got: "+input.getField(1).type);
+      }      
+      
+      if (input.size() == 3 && !(input.getField(2).type == DataType.INTEGER || input.getField(2).type == DataType.LONG)) {
+        throw new RuntimeException("Expected an INT or LONG as second input, got: "+input.getField(2).type);
+      }
+      
+      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+                                               inputFieldSchema.schema, DataType.BAG));    
+    } catch (FrontendException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java b/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java
new file mode 100644
index 0000000..0c7d8bd
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Sampling UDFs, including weighted sample, reservoir sampling, sampling by key, etc.
+ */
+package datafu.pig.sampling;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
new file mode 100644
index 0000000..fe68888
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/SessionCount.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.sessions;
+
+import java.io.IOException;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Performs a count of events, ignoring events which occur within the
+ * same time window.
+ * <p>
+ * This is useful for tasks such as counting the number of page views per user since it:
+ *  a) prevent reloads and go-backs from overcounting actual views
+ *  b) captures the notion that views across multiple sessions are more meaningful
+ * <p>
+ * Input <b>must</b> be sorted ascendingly by time for this UDF to work.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * 
+ * %declare TIME_WINDOW  10m
+ * 
+ * define SessionCount datafu.pig.sessions.SessionCount('$TIME_WINDOW');
+ * 
+ * views = LOAD 'views' as (user_id:int, page_id:int, time:chararray);
+ * views_grouped = GROUP views by (user_id, page_id);
+ * view_counts = FOREACH views_grouped { 
+ *   views = order views by time;
+ *   generate group.user_id as user_id, 
+ *            group.page_id as page_id, 
+ *            SessionCount(views.(time)) as count; }
+ * }
+ * </pre>
+ * 
+ */
+public class SessionCount extends AccumulatorEvalFunc<Long>
+{
+  private final long millis;
+  private DateTime last_date;
+  private long sum;
+
+  public SessionCount(String timeSpec)
+  {
+    Period p = new Period("PT" + timeSpec.toUpperCase());
+    this.millis = p.toStandardSeconds().getSeconds() * 1000;
+    cleanup();
+  }
+
+  @Override
+  public void accumulate(Tuple input) throws IOException
+  {
+    for (Tuple t : (DataBag) input.get(0)) {
+      DateTime date = new DateTime(t.get(0));
+
+      if (last_date == null) {
+        last_date = date;
+        sum = 1;
+      } else if (date.isAfter(last_date.plus(this.millis)))
+        sum += 1;
+      else if (date.isBefore(last_date))
+        throw new IOException("input time series is not sorted");
+
+      last_date = date;
+    }    
+  }
+
+  @Override
+  public Long getValue()
+  {
+    return sum;
+  }
+
+  @Override
+  public void cleanup()
+  {
+    this.last_date = null;
+    this.sum = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
new file mode 100644
index 0000000..52d159b
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/Sessionize.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sessions;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+/**
+ * Sessionizes an input stream, appending a session ID to each tuple.
+ *
+ * <p>
+ * This UDF takes a constructor argument which is the session timeout (an idle
+ * period of this amount indicates that a new session has started) and assumes
+ * the first element of the input bag is an ISO8601 timestamp. The input bag
+ * must be sorted by this timestamp. It returns the input bag with a new field,
+ * session_id, that is a GUID indicating the session of the request.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * 
+ * %declare TIME_WINDOW  30m
+ * 
+ * define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
+ *
+ * views = LOAD 'views.tsv' AS (visit_date:chararray, member_id:int, url:chararray);
+ *
+ * -- sessionize the visit stream
+ * views = GROUP views BY member_id;
+ * sessions = FOREACH views {
+ *   visits = ORDER views BY visit_date;
+ *   GENERATE FLATTEN(Sessionize(VISITS)) AS (visit_date,member_id,url,session_id); 
+ * }
+ *
+ * -- count the number of sessions hitting the url
+ * rollup = GROUP sessions BY url;
+ * result = FOREACH rollup GENERATE group AS url, COUNT(SESSIONS) AS session_cnt;
+ * }
+ * </pre>
+ * </p>
+ */
+@Nondeterministic
+public class Sessionize extends AccumulatorEvalFunc<DataBag>
+{
+  private final long millis;
+
+  private DataBag outputBag;
+  private DateTime last_date;
+  private String id;
+
+  public Sessionize(String timeSpec)
+  {
+    Period p = new Period("PT" + timeSpec.toUpperCase());
+    this.millis = p.toStandardSeconds().getSeconds() * 1000;
+
+    cleanup();
+  }
+
+  @Override
+  public void accumulate(Tuple input) throws IOException
+  {
+    for (Tuple t : (DataBag) input.get(0)) {
+      Object timeObj = t.get(0);
+      
+      DateTime date;
+      if (timeObj instanceof String)
+      {
+        date = new DateTime((String)timeObj);
+      }
+      else if (timeObj instanceof Long)
+      {
+        date = new DateTime((Long)timeObj);
+      }
+      else
+      {
+        throw new RuntimeException("Time must either be a String or Long");
+      }
+      
+      if (this.last_date == null)
+        this.last_date = date;
+      else if (date.isAfter(this.last_date.plus(this.millis)))
+        this.id = UUID.randomUUID().toString();
+      else if (date.isBefore(last_date))
+        throw new IOException(String.format("input time series is not sorted (%s < %s)", date, last_date));
+
+      Tuple t_new = TupleFactory.getInstance().newTuple(t.getAll());
+      t_new.append(this.id);
+      outputBag.add(t_new);
+      
+      this.last_date = date;
+    }
+  }
+
+  @Override
+  public DataBag getValue()
+  {
+    return outputBag;
+  }
+
+  @Override
+  public void cleanup()
+  {
+    this.last_date = null;
+    this.outputBag = BagFactory.getInstance().newDefaultBag();
+    this.id = UUID.randomUUID().toString();
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try {
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+      
+      Schema inputBagSchema = inputFieldSchema.schema;
+      
+      if (inputBagSchema.getField(0).type != DataType.TUPLE)
+      {
+        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
+      }
+      
+      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+      
+      if (inputTupleSchema.getField(0).type != DataType.CHARARRAY
+          && inputTupleSchema.getField(0).type != DataType.LONG)
+      {
+        throw new RuntimeException(String.format("Expected first element of tuple to be a CHARARRAY or LONG, but instead found %s",
+                                                 DataType.findTypeName(inputTupleSchema.getField(0).type)));
+      }
+      
+      Schema outputTupleSchema = inputTupleSchema.clone();
+      outputTupleSchema.add(new Schema.FieldSchema("session_id", DataType.CHARARRAY));      
+      
+      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                                                             .getName()
+                                                             .toLowerCase(), input),
+                                           outputTupleSchema,
+                                           DataType.BAG));
+    }
+    catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+    catch (FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java b/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java
new file mode 100644
index 0000000..686fbf0
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sessions/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for sessionizing data.
+ */
+package datafu.pig.sessions;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java b/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java
new file mode 100644
index 0000000..b6469e9
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sets/SetDifference.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sets;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Computes the set difference of two or more bags.  Duplicates are eliminated. <b>The input bags must be sorted.</b>
+ * 
+ * <p>
+ * If bags A and B are provided, then this computes A-B, i.e. all elements in A that are not in B.
+ * If bags A, B and C are provided, then this computes A-B-C, i.e. all elements in A that are not in B or C.
+ * </p>
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define SetDifference datafu.pig.sets.SetDifference();
+ *
+ * -- input:
+ * -- ({(1),(2),(3),(4),(5),(6)},{(3),(4)})
+ * input = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
+ *
+ * input = FOREACH input {
+ *   B1 = ORDER B1 BY val ASC;
+ *   B2 = ORDER B2 BY val ASC;
+ *
+ *   -- output:
+ *   -- ({(1),(2),(5),(6)})
+ *   GENERATE SetDifference(B1,B2);
+ * }
+ * }</pre>
+ */
+public class SetDifference extends SetOperationsBase
+{
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+  
+  /**
+   * Loads the data bags from the input tuple and puts them in a priority queue,
+   * where ordering is determined by the data from the iterator for each bag.
+   * 
+   * <p>
+   * The bags are wrapped in a {@link Pair} object that is comparable on the data
+   * currently available from the iterator.
+   * These objects are ordered first by the data, then by the index within the tuple
+   * the bag came from.
+   * </p>
+   *  
+   * @param input
+   * @return
+   * @throws IOException
+   */
+  private PriorityQueue<Pair> loadBags(Tuple input) throws IOException
+  {    
+    PriorityQueue<Pair> pq = new PriorityQueue<Pair>(input.size());
+
+    for (int i=0; i < input.size(); i++) 
+    {
+      if (input.get(i) != null)
+      {
+        Iterator<Tuple> inputIterator = ((DataBag)input.get(i)).iterator();      
+        if(inputIterator.hasNext())
+        {
+          pq.add(new Pair(inputIterator,i));
+        }
+      }
+    }
+    return pq;
+  }
+
+  /**
+   * Counts how many elements in the priority queue match the
+   * element at the front of the queue, which should be from the first bag. 
+   * 
+   * @param pq priority queue
+   * @return number of matches
+   */
+  public int countMatches(PriorityQueue<Pair> pq)
+  {
+    Pair nextPair = pq.peek();
+    Tuple data = nextPair.data;
+    
+    // sanity check
+    if (!nextPair.index.equals(0))
+    {
+      throw new RuntimeException("Expected next bag to have index 0");
+    }
+    
+    int matches = 0;
+    for (Pair p : pq) {
+      if (data.equals(p.data))
+        matches++;
+    }
+    // subtract 1 since element matches itself
+    return matches - 1;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public DataBag exec(Tuple input) throws IOException
+  {
+    if (input.size() < 2)
+    {
+      throw new RuntimeException("Expected at least two inputs, but found " + input.size());
+    }
+    
+    for (Object o : input)
+    {
+      if (o != null && !(o instanceof DataBag))
+      {
+        throw new RuntimeException("Inputs must be bags");
+      }
+    }
+
+    DataBag outputBag = bagFactory.newDefaultBag();
+    
+    DataBag bag1 = (DataBag)input.get(0);
+    DataBag bag2 = (DataBag)input.get(1);
+    
+    if (bag1 == null || bag1.size() == 0)
+    {
+      return outputBag;
+    }
+    // optimization
+    else if (input.size() == 2 && (bag2 == null || bag2.size() == 0))
+    {
+      return bag1;
+    }
+    
+    PriorityQueue<Pair> pq = loadBags(input);
+    
+    Tuple lastData = null;
+
+    while (true) 
+    {
+      Pair nextPair = pq.peek();
+      
+      // ignore data we've already encountered
+      if (nextPair.data.compareTo(lastData) != 0)
+      {
+        // Only take data from the first bag, where there are no other
+        // bags that have the same data.
+        if (nextPair.index.equals(0) && countMatches(pq) == 0)
+        {
+          outputBag.add(nextPair.data);
+          lastData = nextPair.data;
+        }
+      }
+
+      Pair p = pq.poll();      
+      
+      // only put the bag back into the queue if it still has data
+      if (p.hasNext())
+      {
+        p.next();
+        pq.offer(p);
+      }
+      else if (p.index.equals(0))
+      {
+        // stop when we exhaust all elements from the first bag
+        break;
+      }
+    }
+
+    return outputBag;
+  }
+
+  /**
+   * A wrapper for the tuple iterator that implements comparable so it can be used in the priority queue.
+   * 
+   * <p>
+   * This is compared first on the data, then on the index the bag came from
+   * in the input tuple.
+   * </p>
+   * @author mhayes
+   *
+   */
+  private static class Pair implements Comparable<Pair>
+  {
+    private final Iterator<Tuple> it;
+    private final Integer index;
+    private Tuple data;
+
+    /**
+     * Constructs the {@link Pair}.
+     * 
+     * @param it tuple iterator
+     * @param index index within the tuple that the bag came from
+     */
+    public Pair(Iterator<Tuple> it, int index)
+    {
+      this.index = index;
+      this.it = it;
+      this.data = it.next();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public int compareTo(Pair o)
+    {
+      int r = this.data.compareTo(o.data);
+      if (r == 0)
+      {
+        return index.compareTo(o.index);
+      }
+      else
+      {
+        return r;
+      }
+    }
+    
+    public boolean hasNext()
+    {
+      return it.hasNext();
+    }
+    
+    @SuppressWarnings("unchecked")
+    public Tuple next()
+    {
+      Tuple nextData = it.next();
+      // algorithm assumes data is in order
+      if (data.compareTo(nextData) > 0)
+      {
+        throw new RuntimeException("Out of order!");
+      }
+      this.data = nextData;
+      return this.data;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("[%s within %d]",data,index);
+    }
+  }
+}
+
+


Mime
View raw message