datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mha...@apache.org
Subject [2/2] git commit: DATAFU-2 UDFs for entropy and weighted sampling algorithms
Date Wed, 22 Jan 2014 18:45:09 GMT
DATAFU-2 UDFs for entropy and weighted sampling algorithms

https://issues.apache.org/jira/browse/DATAFU-2

Signed-off-by: Matt Hayes <mhayes@linkedin.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/e8084146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/e8084146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/e8084146

Branch: refs/heads/master
Commit: e8084146843bd546f9af51bffa32f29fbf7633db
Parents: 9d7dffd
Author: Jian J. Wang <wjian@dogfavorshot-lm.peking.corp.yahoo.com>
Authored: Wed Jan 22 10:41:37 2014 -0800
Committer: Matt Hayes <mhayes@linkedin.com>
Committed: Wed Jan 22 10:41:37 2014 -0800

----------------------------------------------------------------------
 .../datafu/pig/sampling/ReservoirSample.java    |  36 +-
 src/java/datafu/pig/sampling/ScoredTuple.java   |  17 +-
 .../pig/sampling/WeightedReservoirSample.java   | 265 +++++++++
 .../stats/entropy/ChaoShenEntropyEstimator.java | 230 ++++++++
 .../entropy/EmpiricalEntropyEstimator.java      |  71 +++
 src/java/datafu/pig/stats/entropy/Entropy.java  | 428 ++++++++++++++
 .../pig/stats/entropy/EntropyEstimator.java     |  87 +++
 .../datafu/pig/stats/entropy/EntropyUtil.java   |  51 ++
 .../pig/stats/entropy/StreamingCondEntropy.java | 269 +++++++++
 .../pig/stats/entropy/StreamingEntropy.java     | 223 +++++++
 .../WeightedReservoirSamplingTests.java         | 293 ++++++++++
 .../pig/stats/entropy/AbstractEntropyTests.java |  46 ++
 .../test/pig/stats/entropy/EntropyTests.java    | 585 +++++++++++++++++++
 .../entropy/StreamingChaoShenEntropyTests.java  | 373 ++++++++++++
 .../StreamingEmpiricalCondEntropyTests.java     | 412 +++++++++++++
 .../entropy/StreamingEmpiricalEntropyTests.java | 429 ++++++++++++++
 16 files changed, 3809 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/sampling/ReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/ReservoirSample.java b/src/java/datafu/pig/sampling/ReservoirSample.java
index 403bfaf..747b28c 100644
--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -56,9 +56,12 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
 @Nondeterministic
 public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Algebraic
 {
-  Integer numSamples;
+  protected Integer numSamples;
+  
   private Reservoir reservoir;
   
+  protected ScoredTuple.ScoreGenerator scoreGen;
+  
   private Reservoir getReservoir()
   {
     if (reservoir == null) {
@@ -71,13 +74,23 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
   {
     this.numSamples = Integer.parseInt(numSamples);    
   }
-
+  
+  protected ScoredTuple.ScoreGenerator getScoreGenerator()
+  {
+      if(this.scoreGen == null)
+      {
+          this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+      }
+      return this.scoreGen;
+  }
+  
   @Override
   public void accumulate(Tuple input) throws IOException
   {
     DataBag samples = (DataBag) input.get(0);
+    ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
     for (Tuple sample : samples) {
-      getReservoir().consider(new ScoredTuple(Math.random(), sample));
+      getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
     }  
   }
 
@@ -85,6 +98,7 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
   public void cleanup()
   {
     this.reservoir = null;
+    this.scoreGen = null;
   }
 
   @Override
@@ -159,6 +173,7 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
   {
     int numSamples;
     private Reservoir reservoir;
+    protected ScoredTuple.ScoreGenerator scoreGen;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
     public Initial(){}
@@ -175,11 +190,22 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
       }
       return reservoir;
     }
+    
+    protected ScoredTuple.ScoreGenerator getScoreGenerator()
+    {
+        if(this.scoreGen == null)
+        {
+            this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+        }
+        return this.scoreGen;
+    }
 
     @Override
     public Tuple exec(Tuple input) throws IOException {
       DataBag output = BagFactory.getInstance().newDefaultBag();
       
+      ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
+      
       DataBag samples = (DataBag) input.get(0);
       if (samples == null)
       {
@@ -189,11 +215,11 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
         // no need to construct a reservoir, so just emit intermediate tuples
         for (Tuple sample : samples) {
           // add the score on to the intermediate tuple
-          output.add(new ScoredTuple(Math.random(), sample).getIntermediateTuple(tupleFactory));
+          output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
         }
       } else {     
         for (Tuple sample : samples) {
-          getReservoir().consider(new ScoredTuple(Math.random(), sample));
+          getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
         }    
         
         for (ScoredTuple scoredTuple : getReservoir()) {

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/sampling/ScoredTuple.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/ScoredTuple.java b/src/java/datafu/pig/sampling/ScoredTuple.java
index 293cf3d..c793584 100644
--- a/src/java/datafu/pig/sampling/ScoredTuple.java
+++ b/src/java/datafu/pig/sampling/ScoredTuple.java
@@ -88,5 +88,20 @@ class ScoredTuple implements Comparable<ScoredTuple>
       else return -1;
     }
     return score.compareTo(o.score);
-  }    
+  }
+  
+  static interface ScoreGenerator
+  {      
+      double generateScore(Tuple sample) throws ExecException;
+  }
+  
+  static class PureRandomScoreGenerator implements ScoreGenerator
+  {
+      public PureRandomScoreGenerator(){}
+      
+      public double generateScore(Tuple sample)
+      {
+          return Math.random();
+      }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/sampling/WeightedReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/WeightedReservoirSample.java b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
new file mode 100644
index 0000000..0792411
--- /dev/null
+++ b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * <p>
+ * Performs a weighted random sample using an in-memory reservoir to produce
+ * a weighted random sample of a given size based on the A-Res algorithm described in 
+ * {@link <a href="http://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf" target="_blank">paper</a>}. 
+ * </p>
+ * <p>
+ * Species with larger weight have higher probability to be selected in the final sample set.
+ * </p>
+ * <p>
+ * This UDF inherits from {@link ReservoirSample} and it is guaranteed to produce
+ * a sample of the given size.  Similarly it comes at the cost of scalability.
+ * since it uses internal storage with size equaling the desired sample to guarantee the exact sample size.
+ * </p>
+ * <p>
+ * Its constructor takes 2 arguments. 
+ * <ul>
+ *     <li>The 1st argument specifies the sample size which should be a string of positive integer.
+ *     <li>The 2nd argument specifies the index of the weight field in the input tuple, 
+ *     which should be a string of non-negative integer that is no greater than the input tuple size. 
+ * </ul>
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+ * input = LOAD 'input' AS (v1:chararray, v2:INT);
+ * input_g = GROUP input ALL;
+ * sampled = FOREACH input_g GENERATE WeightedSample(input);
+ * }
+ * </pre>
+ * </p>
+ * @author wjian
+ */
+
+@Nondeterministic
+public class WeightedReservoirSample extends ReservoirSample {
+    
+    private Integer weightIdx;
+    
+    public WeightedReservoirSample(String strNumSamples, String strWeightIdx)
+    {
+        super(strNumSamples);
+        this.weightIdx = Integer.parseInt(strWeightIdx);
+        if(this.weightIdx < 0) {
+            throw new IllegalArgumentException("Invalid negative index of weight field argument for WeightedReserviorSample constructor: " 
+                                     + strWeightIdx);
+        }
+    }
+    
+    @Override
+    protected ScoredTuple.ScoreGenerator getScoreGenerator()
+    {
+        if(super.scoreGen == null)
+        {
+            super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+        }
+        return this.scoreGen;
+    }
+    
+    @Override
+    public Schema outputSchema(Schema input) {
+      try {
+        Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+        if (inputFieldSchema.type != DataType.BAG) {
+          throw new RuntimeException("Expected a BAG as input");
+        }
+        
+        Schema inputBagSchema = inputFieldSchema.schema;
+        
+        if (inputBagSchema.getField(0).type != DataType.TUPLE)
+        {
+            throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                   DataType.findTypeName(inputBagSchema.getField(0).type)));
+        }
+        
+        Schema tupleSchema = inputBagSchema.getField(0).schema;
+        
+        if(tupleSchema == null) {
+            throw new RuntimeException("The tuple of input bag has no schema");
+        }
+        
+        List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+        
+        if(fieldSchemaList == null || fieldSchemaList.size() <= Math.max(0, this.weightIdx)) {
+            throw new RuntimeException("The field schema of the input tuple is null " +
+            		                   "or the tuple size is no more than the weight field index: "
+                                       + this.weightIdx);
+        }
+        
+        if(fieldSchemaList.get(this.weightIdx).type != DataType.INTEGER &&
+           fieldSchemaList.get(this.weightIdx).type != DataType.LONG &&
+           fieldSchemaList.get(this.weightIdx).type != DataType.FLOAT &&
+           fieldSchemaList.get(this.weightIdx).type != DataType.DOUBLE)
+        {
+            String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
+                                                   DataType.findTypeName(DataType.LONG),
+                                                   DataType.findTypeName(DataType.FLOAT),
+                                                   DataType.findTypeName(DataType.DOUBLE)};
+            throw new RuntimeException("Expect the type of the weight field of the input tuple to be of (" +
+                    java.util.Arrays.toString(expectedTypes) + "), but instead found (" + 
+                    DataType.findTypeName(fieldSchemaList.get(this.weightIdx).type) + "), weight field: " + 
+                    this.weightIdx);
+        } 
+        
+        return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+                                                 inputFieldSchema.schema, DataType.BAG));    
+      } catch (FrontendException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+    
+    String param = null;
+    
+    private String getParam()
+    {
+      if (this.param == null) {
+          if(super.numSamples != null && this.weightIdx != null) {
+              this.param = String.format("('%d','%d')", 
+                                       super.numSamples,
+                                       this.weightIdx);
+          } else {
+              this.param = "";
+          }
+      }
+      
+      return this.param;
+    }
+
+   
+    @Override
+    public String getInitial() 
+    {
+      return Initial.class.getName() + getParam();
+    }
+  
+    @Override
+    public String getIntermed() 
+    {
+      return Intermediate.class.getName() + getParam();
+    }
+    
+    @Override
+    public String getFinal() 
+    {
+      return Final.class.getName() + getParam();
+    }
+   
+    static public class Initial extends ReservoirSample.Initial
+    {
+      private Integer weightIdx; 
+        
+      public Initial()
+      {
+          super();
+          this.weightIdx = null;
+      }
+      
+      public Initial(String strNumSamples, String strWeightIdx)
+      {
+          super(strNumSamples);
+          this.weightIdx = Integer.parseInt(strWeightIdx);
+          if(this.weightIdx < 0) {
+              throw new IllegalArgumentException("Invalid negative index of weight field for WeightedReserviorSample.Initial constructor: " 
+                                       + strWeightIdx);
+          }
+      }
+      
+      @Override
+      protected ScoredTuple.ScoreGenerator getScoreGenerator()
+      {
+          if(super.scoreGen == null)
+          {
+              super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+          }
+          return super.scoreGen;
+      }
+    }
+    
+    static public class Intermediate extends ReservoirSample.Intermediate 
+    {        
+        public Intermediate()
+        {
+            super();
+        }
+        
+        public Intermediate(String strNumSamples, String strWeightIdx)
+        {
+            super(strNumSamples);
+        }        
+    }
+    
+    static public class Final extends ReservoirSample.Final 
+    {        
+        public Final()
+        {
+            super();
+        }
+        
+        public Final(String strNumSamples, String strWeightIdx)
+        {
+            super(strNumSamples);
+        }        
+    }
+
+    static class InverseWeightScoreGenerator implements ScoredTuple.ScoreGenerator
+    {        
+        //index of the weight field of the input tuple
+        private int weightIdx;
+        
+        InverseWeightScoreGenerator(Integer weightIdx) 
+        {
+            if(weightIdx == null || weightIdx < 0) {
+                throw new IllegalArgumentException("Invalid null or negative weight index input: " + weightIdx);
+            }
+            this.weightIdx = weightIdx;
+        }
+        
+        @Override
+        public double generateScore(Tuple sample) throws ExecException
+        {
+            double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
+            if(Double.compare(weight, 0.0) <= 0)
+            {
+                //non-positive weight should be avoided
+                throw new ExecException(String.format("Invalid sample weight [%f]. It should be a positive real number", weight));
+            }
+            //a differnt approach to try: u^(1/w) could be exp(log(u)/w) ?
+            return Math.pow(Math.random(), 1/weight);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java b/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
new file mode 100644
index 0000000..bfb7398
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+
+import java.util.Collections;
+
+
+
+/*
+ * This class implements the Chao-Shen entropy estimator based on this  
+ * {@link <a href="http://www.researchgate.net/publication/226041346_Nonparametric_estimation_of_Shannons_index_of_diversity_when_there_are_unseen_species_in_sample/file/d912f510a79b8aae74.pdf" target="_blank">paper</a>} 
+ * <p>
+ * It combines the {@link <a href="http://en.wikipedia.org/wiki/Horvitz%E2%80%93Thompson_estimator" target="_blank">Horvitz-Thompson estimator</a>}
+ * to adjust for missing species and the concept of sample coverage to estimate the relative abundances of species in the sample. 
+ * The entropy calculation formula is as follows:
+ * <pre>
+ * {@code
+ *   H(X) = - SUM( ( C * c(x) / N * log(C * c(x) / N) ) / ( 1 - (1 - C * c(x) / N) ^ N ) )
+ *   c(x) is the occurrence frequency of sample x, N is the sum of c(x)
+ *   C = 1 - N1 / N, N1 is the number of samples whose c(x) equals 1.
+ * }
+ * </pre>
+ * </p>
+ */
+class ChaoShenEntropyEstimator extends EntropyEstimator {
+    //sample frequency 
+    private AccumulativeSampleFrequencyMap freqBaggedMap;
+    
+    //sum of frequency of all samples
+    private long N;
+    
+    //number of samples whose occurrence frequency is 1
+    private long N1;
+    
+    ChaoShenEntropyEstimator(String base){
+        super(base);
+        reset();
+    }
+
+    @Override
+    public void accumulate(long cx) throws ExecException {
+        if(cx > 0) {
+            freqBaggedMap.accumulate(cx);  
+            N += cx;
+            if(cx == 1) {
+               N1++;
+            }
+        }
+    }
+
+    @Override
+    public double getEntropy() {
+        double h = 0.0;
+        
+        if(N > 0) {
+
+            if(N1 == N) {
+               //avoid c == 0
+               N1 = N - 1;
+            }
+
+            //sample coverage estimation
+            double c = 1 - (double)N1 / N;
+            
+            try {
+                 //from in-memory frequency map
+                 for(Iterator<Map.Entry<Long, MutableLong>> iter = this.freqBaggedMap.getInternalMap().entrySet().iterator();
+                         iter.hasNext(); ) {
+                     Map.Entry<Long, MutableLong> entry = iter.next();
+                     long cx = entry.getKey();
+                     long cnt = entry.getValue().longValue();
+                     h += accumlateEntropy(cx, this.N, c, cnt);
+                 }
+                 //from backup databag
+                 for(Iterator<Tuple> iter = this.freqBaggedMap.getInternalBag().iterator();
+                          iter.hasNext(); ) {
+                    Tuple t = iter.next();
+                    long cx = (Long)t.get(0);
+                    long cnt = (Long)t.get(1);
+                    h += accumlateEntropy(cx, this.N, c, cnt);
+                 }
+            } catch(ExecException ex) {
+                 throw new RuntimeException(
+                         "Error while computing chao-shen entropy, exception: " + ex);
+            }
+        }
+
+        return EntropyUtil.logTransform(h, super.base);
+    }
+    
+    private double accumlateEntropy(long cx, 
+                                    long N,
+                                    double c,
+                                    long cnt) {
+        //sample proportion
+        double p = (double)cx / N;
+        //sample coverage adjusted probability
+        double pa = c * p;
+        //probability to see an individual in the sample
+        double la = 1 - Math.pow((1 - pa), N);
+        return -(cnt * pa * Math.log(pa) / la);
+    }
+
+    @Override
+    public void reset() {
+        this.N = 0;
+        this.N1 = 0;
+        if(this.freqBaggedMap != null) {
+            this.freqBaggedMap.clear();
+        }
+        this.freqBaggedMap = new AccumulativeSampleFrequencyMap();
+    }
+    
+    /*
+     * A map backed by a data bag to record the sample occurrence frequencies which might be repeated
+     * The purpose of this class is that we suspect in the real applications, the sample occurrence frequency
+     * might be repeated and putting all these repeated numbers into databag may be space inefficient
+     * So we have an in-memory map to record the repetitions and use a databag to backup when the size of the
+     * in-memory map exceeds a pre-defined threshold. At present we use 5M as the default threshold to control
+     * the number of entries in the in-memory map, which approximates to 80M bytes. 
+     */
+    private class AccumulativeSampleFrequencyMap {
+        
+        private long spillBytesThreshold;
+        
+        /* key is the sample occurrence frequency
+         * value is the number of repetitions of this frequency
+         */
+        private Map<Long, MutableLong> countMap;
+        
+        /* the backed databag
+         * each tuple has 2 elements
+         * the 1st element is sample occurrence frequency
+         * the 2nd element is the number of repetitions
+         */
+        private DataBag countBag;
+           
+        AccumulativeSampleFrequencyMap(){
+            this(1024 * 1024 * 5 * (Long.SIZE + Long.SIZE) / 8);
+        }
+        
+        AccumulativeSampleFrequencyMap(long spillBytesThreshold) {
+            this.spillBytesThreshold = spillBytesThreshold;
+            clear();
+        }
+        
+        void accumulate(long key) throws ExecException {
+            MutableLong val = this.countMap.get(key);
+            if(val == null) {
+                this.countMap.put(key, new MutableLong(1));
+            } else {
+                val.add(1);
+            }
+            
+            if(this.countMap.size() * (Long.SIZE + Long.SIZE) / 8 > spillBytesThreshold) {
+                spillFromMap2Bag();
+            }
+        }
+        
+        private void spillFromMap2Bag() throws ExecException {
+            for(Map.Entry<Long, MutableLong> entry : this.countMap.entrySet()) {
+                Tuple t = TupleFactory.getInstance().newTuple(2);
+                t.set(0, entry.getKey());
+                t.set(1, entry.getValue().longValue());
+                this.countBag.add(t);
+            }
+            this.countMap.clear();
+        }
+        
+        Map<Long, MutableLong> getInternalMap() {
+            return Collections.unmodifiableMap(this.countMap);
+        }
+        
+        DataBag getInternalBag() {
+            return this.countBag;
+        }
+        
+        void clear() {
+            countMap = new HashMap<Long, MutableLong>();
+            countBag = BagFactory.getInstance().newDefaultBag();
+        }
+    }
+    
+    private class MutableLong {
+        private long val;
+        
+        MutableLong(){
+            this(0L);
+        }
+        
+        MutableLong(long val) {
+            this.val = val;
+        }
+        
+        long longValue(){
+            return val;
+        }
+        
+        void add(long additive){
+            this.val += additive;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java b/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
new file mode 100644
index 0000000..3f0acab
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+/*
+ * This entropy estimator calculates the empirical entropy of given species
+ * using the occurrence frequency of individual in the sample as an estimation of its probability. 
+ * The empirical estimator is also called the maximum likelihood estimator (MLE). 
+ * <p>
+ * It applies the following formula to compute entropy:
+ * <pre>
+ * {@code
+ * H(X) = log(N) - 1 / N * SUM(c(x) * log(c(x)) )
+ * c(x) is the occurrence frequency of sample x, N is the sum of c(x)
+ * }
+ * </pre>
+ * </p>
+ * <p>
+ * This entropy estimator is widely used when the number of species is known and small.
+ * But it is biased since it does not cover the rare species with zero frequency in the sample.
+ * </p>
+ */
+class EmpiricalEntropyEstimator extends EntropyEstimator {
+    
+    //sum of frequency cx of all input samples
+    private long N;
+    
+    //sum of cx * log(cx) of all input samples
+    private double M;
+    
+    EmpiricalEntropyEstimator(String base) throws IllegalArgumentException {
+        super(base);
+        reset();
+    }
+
+    @Override
+    public void accumulate(long cx) {
+        if(cx > 0) {
+           N += cx;
+           M += cx * Math.log(cx);
+        }
+    }
+
+    @Override
+    public double getEntropy() {
+        return N > 0 ? EntropyUtil.logTransform(Math.log(N) - M / N, super.base) : 0;
+    }
+    
+    @Override
+    public void reset(){
+        N = 0;
+        M = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/Entropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/Entropy.java b/src/java/datafu/pig/stats/entropy/Entropy.java
new file mode 100644
index 0000000..a074bc0
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/Entropy.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.stats.entropy.EntropyUtil;
+
+
+/**
+ * Calculate the empirical entropy given a bag of data sample counts following entropy's
+ * {@link <a href="http://en.wikipedia.org/wiki/Entropy_%28information_theory%29" target="_blank">wiki definition</a>} 
+ * <p>
+ * It supports entropy calculation both in a streaming way and in a distributed way using combiner.
+ * </p>
+ * <p>
+ * This UDF's constructor accepts the logarithm base as its single argument. 
+ * The definition of supported logarithm base is the same as {@link datafu.pig.stats.entropy.StreamingEntropy}
+ * </p>
+ * <p>
+ * Note: 
+ * <ul>
+ *     <li>the input to the UDF is a bag of data sample's occurrence frequency, 
+ *     which is different from * {StreamingEntropy}, whose input is a sorted bag of raw data samples.
+ *     <li>the UDF accepts int or long number. Other data types will be rejected and an exception will be thrown
+ *     <li>the input int or long number should be non-negative,
+ *     negative input will be silently discarded and a warning message will be logged.
+ *     <li>the returned entropy value is of double type.
+ * </ul>
+ * </p>
+ * <p>
+ * How to use: 
+ * </p>
+ * <p>
+ * This UDF is suitable to calculate entropy on the whole data set when we 
+ * could easily get each sample's frequency using an outer GROUP BY. 
+ * </p>
+ * <p>
+ * Then we could use another outer GROUP BY on the sample frequencies to get the entropy. 
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * 
+ * define Entropy datafu.pig.stats.entropy.Entropy();
+ *
+ * input = LOAD 'input' AS (val: double);
+ *
+ * -- calculate the occurrence of each sample
+ * counts_g = GROUP input BY val;
+ * counts = FOREACh counts_g GENERATE COUNT(input) AS cnt;
+ * 
+ * -- calculate entropy 
+ * input_counts_g = GROUP counts ALL;
+ * entropy = FOREACH input_counts_g GENERATE Entropy(counts) AS entropy;
+ * }
+ * </pre>
+ * </p>
+ * Use case to calculate mutual information using Entropy:
+ * <p>
+ * <pre>
+ * {@code
+ * 
+ * define Entropy datafu.pig.stats.entropy.Entropy();
+ * 
+ * input = LOAD 'input' AS (valX: double, valY: double);
+ * 
+ * ------------
+ * -- calculate mutual information I(X, Y) using entropy
+ * -- I(X, Y) = H(X) + H(Y) -  H(X, Y)
+ * ------------
+ * 
+ * input_x_y_g = GROUP input BY (valX, valY);
+ * input_x_y_cnt = FOREACH input_x_y_g GENERATE flatten(group) as (valX, valY), COUNT(input) AS cnt;
+ * 
+ * input_x_g = GROUP input_x_y_cnt BY valX;
+ * input_x_cnt = FOREACH input_x_g GENERATE flatten(group) as valX, SUM(input_x_y_cnt.cnt) AS cnt;
+ * 
+ * input_y_g = GROUP input_x_y_cnt BY valY;
+ * input_y_cnt = FOREACH input_y_g GENERATE flatten(group) as valY, SUM(input_x_y_cnt.cnt) AS cnt;
+ * 
+ * input_x_y_entropy_g = GROUP input_x_y_cnt ALL;
+ * input_x_y_entropy = FOREACH input_x_y_entropy_g {
+ *                         input_x_y_entropy_cnt = input_x_y_cnt.cnt;
+ *                         GENERATE Entropy(input_x_y_entropy_cnt) AS x_y_entropy;
+ *                     }
+ *                         
+ * input_x_entropy_g = GROUP input_x_cnt ALL;
+ * input_x_entropy = FOREACH input_x_entropy_g {
+ *                         input_x_entropy_cnt = input_x_cnt.cnt;
+ *                         GENERATE Entropy(input_x_entropy_cnt) AS x_entropy;
+ *                   }
+ *                       
+ * input_y_entropy_g = GROUP input_y_cnt ALL;
+ * input_y_entropy = FOREACH input_y_entropy_g {
+ *                         input_y_entropy_cnt = input_y_cnt.cnt;
+ *                         GENERATE Entropy(input_y_entropy_cnt) AS y_entropy;
+ *                   }
+ *
+ * input_mi_cross = CROSS input_x_y_entropy, input_x_entropy, input_y_entropy;
+ * input_mi = FOREACH input_mi_cross GENERATE (input_x_entropy::x_entropy +
+ *                                             input_y_entropy::y_entropy - 
+ *                                             input_x_y_entropy::x_y_entropy) AS mi;
+ * }
+ * </pre>
+ * </p>
+ * @see datafu.pig.stats.entropy.StreamingEntropy
+ */
+
+public class Entropy extends AccumulatorEvalFunc<Double> implements Algebraic {
+    
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    
+    //entropy estimator for accumulator
+    //re-use the same entropy estimator for StreamingEntropy
+    private EntropyEstimator streamEstimator;
+    
+    //logarithm base
+    private String base;
+    
+    public Entropy() throws ExecException {
+        //empirical estimator using Euler's number as logarithm base
+        this(EntropyUtil.LOG);
+    }
+    
+    public Entropy(String base) throws ExecException {
+        try {
+            this.streamEstimator = EntropyEstimator.createEstimator(EntropyEstimator.EMPIRICAL_ESTIMATOR, base);
+        } catch (IllegalArgumentException ex) {
+            throw new ExecException(
+                    String.format("Fail to initialize Entropy with logarithm base: (%s), exception: (%s)", base, ex));
+        }
+        this.base = base;
+    }
+    
+    /*
+     * Algebraic implementation part
+     */
+    
+    private String param = null;
+    private String getParam()
+    {
+      if (param == null) {
+        if (this.base != null) {
+          param = String.format("('%s')", this.base);
+        } else {
+          param = "";
+        }
+      }
+      return param;
+    }
+    
+    @Override
+    public String getFinal() {
+        return Final.class.getName() + getParam();
+    }
+
+    @Override
+    public String getInitial() {
+       return Initial.class.getName() + getParam();
+    }
+
+    @Override
+    public String getIntermed() {
+        return Intermediate.class.getName() + getParam();
+    }
+    
+    static public class Initial extends EvalFunc<Tuple> {
+                
+        public Initial(){}
+        
+        public Initial(String base){}
+        
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            Tuple t = mTupleFactory.newTuple(2);
+
+            try{
+                //input is a bag with one tuple containing
+                //the sample's occurrence frequency
+                DataBag bg = (DataBag) input.get(0);
+                Long cxl = null;
+                if(bg.iterator().hasNext()) {
+                   Tuple tp = bg.iterator().next();
+                   cxl = ((Number)(tp.get(0))).longValue();
+                }
+                
+                if(cxl == null || cxl.longValue() < 0) {
+                    //emit null in case of invalid input frequency
+                    t.set(0, null);
+                    t.set(1, null);
+                    warn("Non-positive input frequency number: " + cxl, PigWarning.UDF_WARNING_1);
+                } else {
+                    long cx = cxl.longValue();
+                    double logcx = (cx > 0 ? Math.log(cx) : 0);
+                    double cxlogcx = cx * logcx;
+                    
+                    //1st element of the returned tuple is freq * log(freq)
+                    t.set(0, cxlogcx);
+                    
+                    //2nd element of the returned tuple is freq
+                    t.set(1, cxl);
+                }
+                
+                return t;
+            } catch (ExecException ee) {
+                throw ee;
+            } catch(Exception e) {
+                int errCode = 10080;
+                String msg = "Error while computing entropy in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+            
+        }
+        
+    }
+    
+    static public class Intermediate extends EvalFunc<Tuple> {
+        
+        public Intermediate(){}
+        
+        public Intermediate(String base){}
+        
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                DataBag b = (DataBag)input.get(0);
+                return combine(b);
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 10081;
+                String msg = "Error while computing entropy in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    static public class Final extends EvalFunc<Double> {
+        private String base;
+        
+        public Final()
+        {
+            this(EntropyUtil.LOG);
+        }
+        
+        public Final(String base)
+        {
+            this.base = base;
+        }
+        
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            try {
+                DataBag b = (DataBag)input.get(0);
+                Tuple combined = combine(b);
+                
+                Double sumOfCxLogCx = (Double)combined.get(0);
+                Long sumOfCx = (Long)combined.get(1);
+                
+                if(sumOfCxLogCx == null || sumOfCx == null) {
+                    //emit null if there is at least 1 invalid input
+                    warn("Invalid null field output from combine(), " +
+                    		"1st field: " + sumOfCxLogCx + ", 2nd field: " + sumOfCx, PigWarning.UDF_WARNING_1);
+                    return null;
+                }
+                
+                Double entropy = null;
+                
+                double scxlogcx = sumOfCxLogCx.doubleValue();
+                long scx = sumOfCx.longValue();
+                                
+                if (scx > 0) {
+                    //H(X) = log(N) - 1 / N * SUM(c(x) * log(c(x)) )
+                    entropy = EntropyUtil.logTransform(Math.log(scx) - scxlogcx / scx, this.base);
+                }
+
+                return entropy;
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 10082;
+                String msg = "Error while computing average in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+
+        }        
+    }
+    
+    static protected Tuple combine(DataBag values) throws ExecException { 
+        Tuple output = mTupleFactory.newTuple(2);
+        
+        boolean sawNonNull = false;
+        double sumOfCxLogCx = 0;
+        long sumOfCx = 0;
+        
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            Double scxlogcx = (Double)t.get(0);
+            Long scx = (Long)t.get(1);
+            
+            if(scxlogcx != null && scx != null) {
+                sumOfCxLogCx += scxlogcx.doubleValue();
+                sumOfCx += scx.longValue();
+                sawNonNull = true;
+            }
+        }
+        
+        if (sawNonNull) {
+            output.set(0, sumOfCxLogCx);
+            output.set(1, sumOfCx);
+        } else {
+            //emit null if there is no invalid input
+            output.set(0, null);
+            output.set(1, null);
+        }
+        
+        return output;
+    }
+    
+    /*
+     * Accumulator implementation part
+     */
+
+    @Override
+    public void accumulate(Tuple input) throws IOException
+    {
+        for (Tuple t : (DataBag) input.get(0)) {
+            long cx = ((Number)(t.get(0))).longValue();
+            this.streamEstimator.accumulate(cx);
+        }
+    }
+
+    @Override
+    public Double getValue()
+    {
+      return this.streamEstimator.getEntropy();
+    }
+
+    @Override
+    public void cleanup()
+    {
+        this.streamEstimator.reset();
+    }
+    
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.BAG)
+            {
+                throw new RuntimeException("Expected a BAG as input");
+            }
+            
+            Schema inputBagSchema = inputFieldSchema.schema;
+            
+            if (inputBagSchema.getField(0).type != DataType.TUPLE)
+            {
+                throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                       DataType.findTypeName(inputBagSchema.getField(0).type)));
+            }
+            
+            Schema tupleSchema = inputBagSchema.getField(0).schema;
+            
+            if(tupleSchema == null) {
+                throw new RuntimeException("The tuple of input bag has no schema");
+            }
+            
+            List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+            
+            if(fieldSchemaList == null || fieldSchemaList.size() != 1) {
+                throw new RuntimeException("The field schema of the input tuple is null or its size is not 1");
+            }
+            
+            if(fieldSchemaList.get(0).type != DataType.INTEGER &&
+               fieldSchemaList.get(0).type != DataType.LONG )
+            {
+                String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
+                                                       DataType.findTypeName(DataType.LONG)};
+                throw new RuntimeException("Expect the type of the input tuple to be of (" +
+                        java.util.Arrays.toString(expectedTypes) + "), but instead found " + 
+                        DataType.findTypeName(fieldSchemaList.get(0).type));
+            } 
+            
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                                                                   .getName()
+                                                                   .toLowerCase(), input),
+                                                 DataType.DOUBLE));
+          } catch (FrontendException e) {
+            throw new RuntimeException(e);
+          }
+     }    
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EntropyEstimator.java b/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
new file mode 100644
index 0000000..336ef86
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+/*
+* Entropy estimator which exposes a unified interface to application
+* and hides the implementation details in its subclasses
+*/
+public abstract class EntropyEstimator {
+
+    public static final String EMPIRICAL_ESTIMATOR = "empirical";
+    
+    public static final String CHAOSHEN_ESTIMATOR = "chaosh";
+    
+    /*
+     * logarithm base
+     * by default, we use Euler's number as the default logarithm base
+     */ 
+    protected String base;
+    
+    protected EntropyEstimator(String base) throws IllegalArgumentException {
+        if(!EntropyUtil.isValidLogBase(base)) {
+            throw new IllegalArgumentException("Invalid input logarithm base. " + 
+                    "Please refer to StreamingEntropy's javadoc for supported logarithm base");
+        }
+        this.base = base;
+    }
+    
+    /* 
+     * create estimator instance based on the input type
+     * @param type  type of entropy estimator
+     * @param base logarithm base
+     * if the method is not supported, a null estimator instance is returned
+     * and the external application needs to handle this case   
+     */
+    public static EntropyEstimator createEstimator(String type, 
+                                                   String base) throws IllegalArgumentException 
+    {
+        //empirical estimator
+        if(EmpiricalEntropyEstimator.EMPIRICAL_ESTIMATOR.equalsIgnoreCase(type)) {
+            return new EmpiricalEntropyEstimator(base);
+        }
+        //chao-shen estimator
+        if(EmpiricalEntropyEstimator.CHAOSHEN_ESTIMATOR.equalsIgnoreCase(type)) {
+           return new ChaoShenEntropyEstimator(base);
+        }
+        //unsupported estimator type
+        throw new IllegalArgumentException("invalid input entropy estimator type. " +
+                    "Please refer to StreamingEntropy's javadoc for the supported estimator types");        
+    }
+    
+    /*
+     * accumulate occurrence frequency from input stream of samples
+     * @param cx the occurrence frequency of the last input sample
+     */
+    public abstract void accumulate(long cx) throws ExecException;
+
+    /*
+     * calculate the output entropy value
+     * return entropy value as a double type number
+     */
+    public abstract double getEntropy();
+
+    /*
+     * cleanup and reset internal states
+     */
+    public abstract void reset();
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/EntropyUtil.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EntropyUtil.java b/src/java/datafu/pig/stats/entropy/EntropyUtil.java
new file mode 100644
index 0000000..685f3e2
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/EntropyUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+public class EntropyUtil {
+    
+    public static final String LOG = "log";
+    
+    public static final String LOG2 = "log2";
+    
+    public static final String LOG10 = "log10";
+
+    /*
+     * Transform the input entropy to that in the input logarithm base
+     * The input entropy is assumed to be calculated in the Euler's number logarithm base
+     * */
+    public static double logTransform(double h, String base) {
+        if(LOG2.equalsIgnoreCase(base)) {
+            return h / Math.log(2);
+        }
+
+        if(LOG10.equalsIgnoreCase(base)) {
+            return h / Math.log(10);
+        }
+        
+        return h;
+    }
+    
+    public static boolean isValidLogBase(String base) {
+        return LOG.equalsIgnoreCase(base) ||
+               LOG2.equalsIgnoreCase(base) ||
+               LOG10.equalsIgnoreCase(base);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java b/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java
new file mode 100644
index 0000000..67ad4d3
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import org.apache.pig.AccumulatorEvalFunc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+
+/**
+ * Calculate the conditional entropy H(Y|X) of random variables X and Y according to its 
+ * {@link <a href="http://en.wikipedia.org/wiki/Conditional_entropy" target="_blank">wiki definition</a>}, 
+ * X is the conditional variable and Y is the variable that conditions on X.
+ * <p>
+ * The input to this UDF is a bag of 2 field tuple.
+ * <ul>
+ *     <li>the 1st field of the tuple is an instance of variable X.
+ *     <li>the 2nd field of the tuple is an instance of variable Y.
+ * </ul>
+ * </p>
+ * <p>
+ * An exception will be thrown if the input tuple does not have 2 fields.
+ * </p> 
+ * <p>
+ * This UDF's constructor definition and parameters are the same as that of * {@link datafu.pig.stats.entropy.StreamingEntropy}
+ * </p>
+ * <p>
+ * Note:
+ * <ul>
+ *     <li>input bag to the UDF must be sorted on X and Y, with X in the first order.
+ *     <li>Entropy value is returned as double type.
+ * </ul>
+ * </p>
+ * <p>
+ * How to use: 
+ * </p>
+ * <p>
+ * This UDF is suitable to calculate conditional entropy in a nested FOREACH after a GROUP BY,
+ * where we sort the inner bag and use the sorted bag as the input to this UDF.
+ * </p>
+ * <p>
+ * This is a scenario in which we would like to get 2 variables' conditional entropy in different constraint groups.
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * --define empirical conditional entropy with Euler's number as the logarithm base
+ * define CondEntropy datafu.pig.stats.entropy.stream.StreamingCondEntropy();
+ *
+ * input = LOAD 'input' AS (grp: chararray, valX: double, valY: double);
+ *
+ * -- calculate conditional entropy H(Y|X) in each group
+ * input_group_g = GROUP input BY grp;
+ * entropy_group = FOREACH input_group_g {
+ *   input_val = input.(valX, valY)
+ *   input_ordered = ORDER input_val BY $0, $1;
+ *   GENERATE FLATTEN(group) AS group, CondEntropy(input_ordered) AS cond_entropy; 
+ * }
+ * }
+ * </pre>
+ * </p>
+ * Use case to calculate mutual information:
+ * <p>
+ * <pre>
+ * {@code
+ * ------------
+ * -- calculate mutual information I(X, Y) using streaming conditional entropy and streaming entropy
+ * -- I(X, Y) = H(Y) - H(Y|X)
+ * ------------
+ * 
+ * define CondEntropy datafu.pig.stats.entropy.stream.StreamingCondEntropy();
+ * define Entropy datafu.pig.stats.entropy.stream.StreamingEntropy();
+ * 
+ * input = LOAD 'input' AS (grp: chararray, valX: double, valY: double);
+ * 
+ * -- calculate the I(X,Y) in each group
+ * input_group_g = GROUP input BY grp;
+ * mutual_information = FOREACH input_group_g {
+ *      input_val_x_y = input.(valX, valY);
+ *      input_val_x_y_ordered = ORDER input_val_x_y BY $0,$1;
+ *      input_val_y = input.valY;
+ *      input_val_y_ordered = ORDER input_val_y BY $0;
+ *      cond_h_x_y = CondEntropy(input_val_x_y_ordered);
+ *      h_y = Entropy(input_val_y_ordered);
+ *      GENERATE FLATTEN(group), h_y - cond_h_x_y;
+ * }
+ * }
+ * </pre>
+ * </p>
+ * @see StreamingEntropy
+ */
+public class StreamingCondEntropy extends AccumulatorEvalFunc<Double> {
+    //last visited tuple of <x,y>
+    private Tuple xy;
+    
+    //number of occurrence of last visited <x,y>
+    private long cxy;
+    
+    //number of occurrence of last visited x
+    private long cx;
+    
+    //comparison result between the present tuple and the last visited tuple
+    private int lastCmp;
+    
+    //entropy estimator for H(x,y)
+    private EntropyEstimator combEstimator;
+    
+    //entropy estimator for H(x)
+    private EntropyEstimator condXEstimator;
+    
+    public StreamingCondEntropy() throws ExecException
+    {
+      this(EntropyEstimator.EMPIRICAL_ESTIMATOR);
+    }
+    
+    public StreamingCondEntropy(String type) throws ExecException 
+    {
+      this(type, EntropyUtil.LOG);
+    }
+
+    public StreamingCondEntropy(String type, String base) throws ExecException
+    {
+      try {
+          this.combEstimator = EntropyEstimator.createEstimator(type, base);
+          this.condXEstimator = EntropyEstimator.createEstimator(type, base);
+      } catch (IllegalArgumentException ex) {
+          throw new ExecException(String.format(
+                  "Fail to initialize StreamingCondEntropy with entropy estimator of type (%s), base: (%s). Exception: (%s)",
+                  type, base, ex)); 
+      }
+      cleanup();
+    }
+    
+    /*
+     * Accumulate occurrence frequency of <x,y> and x
+     * as we stream through the input bag
+     */
+    @Override
+    public void accumulate(Tuple input) throws IOException
+    {
+      for (Tuple t : (DataBag) input.get(0)) {
+
+        if (this.xy != null)
+        {
+            int cmp = t.compareTo(this.xy);
+            
+            //check if the comparison result is different from previous compare result
+            if ((cmp < 0 && this.lastCmp > 0)
+                || (cmp > 0 && this.lastCmp < 0)) {
+                throw new ExecException("Out of order! previous tuple: " + this.xy + ", present tuple: " + t
+                                        + ", comparsion: " + cmp + ", previous comparsion: " + this.lastCmp);
+            }
+            if (cmp != 0) {
+               //different <x,y>
+               this.combEstimator.accumulate(this.cxy);
+               this.cxy = 0;
+               this.lastCmp = cmp;
+               if(DataType.compare(this.xy.get(0), t.get(0)) != 0) {
+                  //different x
+                   this.condXEstimator.accumulate(this.cx);
+                   this.cx = 0;
+               }
+            } 
+        }
+
+        //set tuple t as the next tuple for comparison
+        this.xy = t;
+
+        //accumulate cx
+        this.cx++;
+        
+        //accumulate cxy
+        this.cxy++;
+      }
+    }
+    
+    @Override
+    public Double getValue()
+    {
+      //do not miss the last tuple
+      try {
+          this.combEstimator.accumulate(this.cxy);
+          this.condXEstimator.accumulate(this.cx);
+      } catch (ExecException ex) {
+          throw new RuntimeException("Error while accumulating sample frequency: " + ex);
+      }
+      
+      //Chain rule: H(Y|X) = H(X, Y) - H(X)
+      return this.combEstimator.getEntropy() - this.condXEstimator.getEntropy();
+    }
+    
+    @Override
+    public void cleanup()
+    {
+      this.xy = null;
+      this.cxy = 0;
+      this.cx = 0;
+      this.lastCmp = 0;
+      this.combEstimator.reset();
+      this.condXEstimator.reset();
+    }
+    
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.BAG)
+            {
+              throw new RuntimeException("Expected a BAG as input");
+            }
+            
+            Schema inputBagSchema = inputFieldSchema.schema;
+            
+            if (inputBagSchema.getField(0).type != DataType.TUPLE)
+            {
+              throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                       DataType.findTypeName(inputBagSchema.getField(0).type)));
+            }
+            
+            Schema tupleSchema = inputBagSchema.getField(0).schema;
+            
+            if(tupleSchema == null) {
+                throw new RuntimeException("The tuple of the input bag has no schema");
+            }
+            
+            List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+            
+            if(fieldSchemaList == null || fieldSchemaList.size() != 2) {
+                throw new RuntimeException("The field schema of the input tuple is null or its size is not 2");
+            }
+            
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                                                                   .getName()
+                                                                   .toLowerCase(), input),
+                                                 DataType.DOUBLE));
+          } catch (FrontendException e) {
+            throw new RuntimeException(e);
+          }
+     }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/StreamingEntropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/StreamingEntropy.java b/src/java/datafu/pig/stats/entropy/StreamingEntropy.java
new file mode 100644
index 0000000..707e690
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/StreamingEntropy.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+
+/**
+ * Calculate entropy of a given stream of raw data samples according to entropy's 
+ * {@link <a href="http://en.wikipedia.org/wiki/Entropy_%28information_theory%29" target="_blank">wiki definition</a>}
+ * <p>
+ * Its constructor takes 2 arguments. 
+ * </p>
+ * <p>
+ * The 1st argument, the type of entropy estimator algorithm we currently support, includes:
+ * <ul>
+ *     <li>empirical (empirical entropy estimator)
+ *     <li>chaosh (Chao-Shen entropy estimator) 
+ * </ul>
+ * </p>
+ * <p>
+ * The default estimation algorithm is empirical.
+ * </p>
+ * <p>
+ * The 2nd argument, the logarithm base we currently support, includes:
+ * </p>
+ * <p>
+ * <ul>
+ *     <li>log (use Euler's number as the logarithm base)
+ *     <li>log2 (use 2 as the logarithm base)
+ *     <li>log10 (use 10 as the logarithm base) 
+ * </ul>
+ * </p>
+ * <p>
+ * The default logarithm base is log.
+ * </p> 
+ * <p>
+ * Note:
+ * <ul>
+ *     <li>The input bag to the UDF must be sorted. 
+ *     <li>The entropy value is returned as double type.
+ * </ul>
+ * </p>
+ * <p>
+ * How to use: 
+ * </p>
+ * <p>
+ * This UDF is suitable to calculate entropy in a nested FOREACH after a GROUP BY,
+ * where we sort the inner bag and use the sorted bag as the input to this UDF.
+ * </p>
+ * <p>
+ * This is a scenario in which we would like to get a variable's entropy in different constraint groups.
+ * </p>
+ * Example:
+ * <p>
+ * <pre>
+ * {@code
+ * --calculate empirical entropy with Euler's number as the logarithm base
+ * define Entropy datafu.pig.stats.entropy.stream.StreamingEntropy();
+ *
+ * input = LOAD 'input' AS (grp: chararray, val: double);
+ *
+ * -- calculate the input samples' entropy in each group
+ * input_group_g = GROUP input BY grp;
+ * entropy_group = FOREACH input_group_g {
+ *   input_val = input.val;
+ *   input_ordered = ORDER input_val BY $0;
+ *   GENERATE FLATTEN(group) AS group, Entropy(input_ordered) AS entropy; 
+ * }
+ * }
+ * </pre>
+ * </p>
+ * @see StreamingCondEntropy
+ */
+public class StreamingEntropy extends AccumulatorEvalFunc<Double>
+{ 
+  //last visited tuple
+  private Tuple x;
+  
+  //number of occurrence of last visited tuple
+  private long cx;
+  
+  //comparison result between the present tuple and the last visited tuple
+  private int lastCmp;
+  
+  //entropy estimator that accumulates sample's occurrence frequency to
+  //calculates the actual entropy
+  private EntropyEstimator estimator;
+  
+  public StreamingEntropy() throws ExecException
+  {
+    this(EntropyEstimator.EMPIRICAL_ESTIMATOR);
+  }
+  
+  public StreamingEntropy(String type) throws ExecException 
+  {
+    this(type, EntropyUtil.LOG);
+  }
+
+  public StreamingEntropy(String type, String base) throws ExecException
+  {
+    try {
+        this.estimator = EntropyEstimator.createEstimator(type, base);
+    } catch (IllegalArgumentException ex) {
+        throw new ExecException(
+                String.format("Fail to initialize StreamingEntropy with entropy estimator of type (%s), base: (%s), exception: (%s)",
+                       type, base, ex) 
+              ); 
+    }
+    cleanup();
+  }
+
+  /*
+   * Accumulate occurrence frequency of each tuple as we stream through the input bag
+   */
+  @Override
+  public void accumulate(Tuple input) throws IOException
+  {
+    for (Tuple t : (DataBag) input.get(0)) {
+
+      if (this.x != null)
+      {
+          int cmp = t.compareTo(this.x);
+          
+          //check if the comparison result is different from previous compare result
+          if ((cmp < 0 && this.lastCmp > 0)
+              || (cmp > 0 && this.lastCmp < 0)) {
+              throw new ExecException("Out of order! previous tuple: " + this.x + ", present tuple: " + t
+                                      + ", comparsion: " + cmp + ", previous comparsion: " + this.lastCmp);
+          }
+
+          if (cmp != 0) {
+             //different tuple
+             this.estimator.accumulate(this.cx);
+             this.cx = 0;
+             this.lastCmp = cmp;
+          } 
+      }
+
+      //set tuple t as the next tuple for comparison
+      this.x = t;
+
+      //accumulate cx
+      this.cx++;
+    }
+  }
+
+  @Override
+  public Double getValue()
+  {
+    //do not miss the last tuple
+    try {
+        this.estimator.accumulate(this.cx);
+    } catch (ExecException ex) {
+        throw new RuntimeException("Error while accumulating sample frequency: " + ex);
+    }
+
+    return this.estimator.getEntropy();
+  }
+
+  @Override
+  public void cleanup()
+  {
+    this.x = null;
+    this.cx = 0;
+    this.lastCmp = 0;
+    this.estimator.reset();
+  }
+  
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+      try {
+          Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+          if (inputFieldSchema.type != DataType.BAG)
+          {
+            throw new RuntimeException("Expected a BAG as input");
+          }
+          
+          Schema inputBagSchema = inputFieldSchema.schema;
+          
+          if (inputBagSchema.getField(0).type != DataType.TUPLE)
+          {
+            throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                     DataType.findTypeName(inputBagSchema.getField(0).type)));
+          }
+          
+          return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                                                                 .getName()
+                                                                 .toLowerCase(), input),
+                                               DataType.DOUBLE));
+        } catch (FrontendException e) {
+          throw new RuntimeException(e);
+        }
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
new file mode 100644
index 0000000..f507cbd
--- /dev/null
+++ b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import java.io.IOException;
+import java.util.*;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.WeightedReservoirSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link WeightedReservoirSample}.
+ *
+ * @author wjian 
+ *
+ */
+public class WeightedReservoirSamplingTests extends PigTests
+{
+ /** 
+  register $JAR_PATH 
+ 
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
+
+  data = LOAD 'input' AS (v1:chararray,v2:INT);
+  data_g = group data all;
+  sampled = FOREACH data_g GENERATE WeightedSample(data);
+  --describe sampled; 
+   
+  STORE sampled INTO 'output'; 
+ 
+  */ 
+  @Multiline 
+  private String weightedSampleTest; 
+   
+  @Test 
+  public void weightedSampleTest() throws Exception 
+  {
+     Map<String, Integer> count = new HashMap<String, Integer>();
+
+     count.put("a", 0);
+     count.put("b", 0);
+     count.put("c", 0);
+     count.put("d", 0);
+
+     PigTest test = createPigTestFromString(weightedSampleTest); 
+ 
+     writeLinesToFile("input",  
+                "a\t100","b\t1","c\t5","d\t2");
+
+     for(int i = 0; i < 10; i++) {
+        test.runScript();
+
+        List<Tuple> output = this.getLinesForAlias(test, "sampled");
+
+        Tuple t = output.get(0);
+
+        DataBag sampleBag = (DataBag)t.get(0);
+
+        for(Iterator<Tuple> sampleIter = sampleBag.iterator(); sampleIter.hasNext();) {
+           Tuple st = sampleIter.next();
+           String key = (String)st.get(0);
+           count.put(key, count.get(key) + 1);
+        }              
+     }
+
+     String maxKey = "";
+     int maxCount = 0;
+     for(String key : count.keySet()) {
+        if(maxCount < count.get(key)) {
+           maxKey = key; 
+           maxCount = count.get(key);
+        } 
+     }
+
+     Assert.assertEquals(maxKey, "a");
+  }
+
+  @Test
+  public void weightedReservoirSampleAccumulateTest() throws IOException
+  {
+     WeightedReservoirSample sampler = new WeightedReservoirSample("10", "1");
+
+     for (int i=0; i<100; i++)
+     {
+       Tuple t = TupleFactory.getInstance().newTuple(2);
+       t.set(0, i);
+       t.set(1, i + 1);
+       DataBag bag = BagFactory.getInstance().newDefaultBag();
+       bag.add(t);
+       Tuple input = TupleFactory.getInstance().newTuple(bag);
+       sampler.accumulate(input);
+     }
+
+     DataBag result = sampler.getValue();
+     verifyNoRepeatAllFound(result, 10, 0, 100); 
+  }
+
+  @Test
+  public void weightedReservoirSampleAlgebraicTest() throws IOException
+  {
+    WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
+    WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
+    WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
+    
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    for (int i=0; i<100; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(2);
+      t.set(0, i);
+      t.set(1, i + 1);
+      bag.add(t);
+    }
+    
+    Tuple input = TupleFactory.getInstance().newTuple(bag);
+    
+    Tuple intermediateTuple = initialSampler.exec(input);  
+    DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
+    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));  
+    intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
+    DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+    verifyNoRepeatAllFound(result, 10, 0, 100); 
+   }
+
+  private void verifyNoRepeatAllFound(DataBag result,
+                                      int expectedResultSize,
+                                      int left,
+                                      int right) throws ExecException
+  {
+    Assert.assertEquals(expectedResultSize, result.size());
+    
+    // all must be found, no repeats
+    Set<Integer> found = new HashSet<Integer>();
+    for (Tuple t : result)
+    {
+      Integer i = (Integer)t.get(0);
+      Assert.assertTrue(i>=left && i<right);
+      Assert.assertFalse(String.format("Found duplicate of %d",i), found.contains(i));
+      found.add(i);
+    }
+  }
+
+  @Test
+  public void weightedReservoirSampleLimitExecTest() throws IOException
+  {
+    WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
+   
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    for (int i=0; i<10; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(2);
+      t.set(0, i);
+      t.set(1, 1); // score is equal for all
+      bag.add(t);
+    }
+   
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    input.set(0, bag);
+   
+    DataBag result = sampler.exec(input);
+   
+    verifyNoRepeatAllFound(result, 10, 0, 10); 
+
+    Set<Integer> found = new HashSet<Integer>();
+    for (Tuple t : result)
+    {
+      Integer i = (Integer)t.get(0);
+      found.add(i);
+    }
+
+    for(int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(found.contains(i));
+    }
+  }
+
+  @Test
+  public void invalidConstructorArgTest() throws Exception
+  {
+    try {
+         WeightedReservoirSample sampler = new WeightedReservoirSample("1", "-1");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+         Assert.assertTrue(ex.getMessage().indexOf("Invalid negative index of weight field argument for WeightedReserviorSample constructor: -1") >= 0);
+    }
+  }
+
+  @Test
+  public void invalidWeightTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(weightedSampleTest);
+
+    writeLinesToFile("input",  
+                "a\t100","b\t1","c\t0","d\t2");
+    try {
+         test.runScript();
+         List<Tuple> output = this.getLinesForAlias(test, "sampled");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+    }
+  }
+
+ /** 
+  register $JAR_PATH 
+ 
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
+
+  data = LOAD 'input' AS (v1:chararray);
+  data_g = group data all;
+  sampled = FOREACH data_g GENERATE WeightedSample(data);
+  describe sampled; 
+   
+  STORE sampled INTO 'output'; 
+ 
+  */ 
+  @Multiline 
+  private String invalidInputTupleSizeTest; 
+ 
+  @Test
+  public void invalidInputTupleSizeTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
+
+    writeLinesToFile("input",  
+                "a","b","c","d");
+    try {
+         test.runScript();
+         List<Tuple> output = this.getLinesForAlias(test, "sampled");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+         Assert.assertTrue(ex.getMessage().indexOf("The field schema of the input tuple is null or the tuple size is no more than the weight field index: 1") >= 0);
+    }
+  }
+
+ /** 
+  register $JAR_PATH 
+ 
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0'); 
+
+  data = LOAD 'input' AS (v1:chararray, v2:INT);
+  data_g = group data all;
+  sampled = FOREACH data_g GENERATE WeightedSample(data);
+  describe sampled; 
+   
+  STORE sampled INTO 'output'; 
+ 
+  */ 
+  @Multiline 
+  private String invalidWeightFieldSchemaTest; 
+ 
+  @Test
+  public void invalidWeightFieldSchemaTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
+
+    writeLinesToFile("input",  
+                "a\t100","b\t1","c\t5","d\t2");
+    try {
+         test.runScript();
+         List<Tuple> output = this.getLinesForAlias(test, "sampled");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+         Assert.assertTrue(ex.getMessage().indexOf("Expect the type of the weight field of the input tuple to be of ([int, long, float, double]), but instead found (chararray), weight field: 0") >= 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java b/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java
new file mode 100644
index 0000000..6e512d4
--- /dev/null
+++ b/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.stats.entropy;
+
+import static org.testng.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+import datafu.test.pig.PigTests;
+
+public abstract class AbstractEntropyTests extends PigTests
+{
+  protected void verifyEqualEntropyOutput(List<Double> expectedOutput, List<Tuple> output, int digits) throws ExecException {
+    assertEquals(expectedOutput.size(), output.size());
+    Iterator<Double> expectationIterator = expectedOutput.iterator();
+    String formatDigits = "%." + digits + "f";
+    for (Tuple t : output)
+    {
+      Double entropy = (Double)t.get(0);
+      assertEquals(String.format(formatDigits,entropy),String.format(formatDigits, expectationIterator.next()));
+    }
+  }
+}


Mime
View raw message