datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [06/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/QuantileUtil.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/QuantileUtil.java b/src/java/datafu/pig/stats/QuantileUtil.java
deleted file mode 100644
index c6fd36a..0000000
--- a/src/java/datafu/pig/stats/QuantileUtil.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.util.ArrayList;
-
-/**
- * Methods used by {@link Quantile}.
- * 
- * @author "Matthew Hayes <mhayes@linkedin.com>"
- *
- */
-public class QuantileUtil
-{ 
-  public static ArrayList<Double> getNQuantiles(int numQuantiles)
-  {
-    ArrayList<Double> quantiles = new ArrayList<Double>(numQuantiles);
-    quantiles = new ArrayList<Double>(numQuantiles);
-    int divisor = numQuantiles-1;
-    for (int q = 0; q <= divisor; q++)
-    {
-      double quantile = ((double)q)/divisor;
-      quantiles.add(quantile);
-    }
-    return quantiles;
-  }
-  
-  public static ArrayList<Double> getQuantilesFromParams(String... k)
-  {
-    ArrayList<Double> quantiles = new ArrayList<Double>(k.length);
-    for (String s : k) { 
-      quantiles.add(Double.parseDouble(s));
-    }
-    
-    if (quantiles.size() == 1 && quantiles.get(0) > 1.0)
-    {
-      int numQuantiles = Integer.parseInt(k[0]);
-      if (numQuantiles < 1)
-      {
-        throw new IllegalArgumentException("Number of quantiles must be greater than 1");
-      }
-      
-      quantiles = getNQuantiles(numQuantiles);
-    }
-    else
-    {
-      for (Double d : quantiles)
-      {
-        if (d < 0.0 || d > 1.0)
-        {
-          throw new IllegalArgumentException("Quantile must be between 0.0 and 1.0");
-        }
-      }
-    }
-    
-    return quantiles;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/StreamingMedian.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/StreamingMedian.java b/src/java/datafu/pig/stats/StreamingMedian.java
deleted file mode 100644
index c4c3be4..0000000
--- a/src/java/datafu/pig/stats/StreamingMedian.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-/**
- * Computes the approximate {@link <a href="http://en.wikipedia.org/wiki/Median" target="_blank">median</a>} 
- * for a (not necessarily sorted) input bag, using the Munro-Paterson algorithm.  
- * This is a convenience wrapper around StreamingQuantile.
- *
- * <p>
- * N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is 
- * done (e.g., group by 'day') if the data is too large.  That is, this isn't distributed median.
- * </p>
- * 
- * @see StreamingQuantile
- */
-public class StreamingMedian extends StreamingQuantile
-{
-  public StreamingMedian()
-  {
-    super("0.5");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/StreamingQuantile.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/StreamingQuantile.java b/src/java/datafu/pig/stats/StreamingQuantile.java
deleted file mode 100644
index e4a65b4..0000000
--- a/src/java/datafu/pig/stats/StreamingQuantile.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * Computes approximate {@link <a href="http://en.wikipedia.org/wiki/Quantile" target="_blank">quantiles</a>} 
- * for a (not necessarily sorted) input bag, using the Munro-Paterson algorithm.
- * 
- * <p>
- * The algorithm is described here:
- * {@link <a href="http://www.cs.ucsb.edu/~suri/cs290/MunroPat.pdf" target="_blank">http://www.cs.ucsb.edu/~suri/cs290/MunroPat.pdf</a>}
- * </p>
- * 
- * <p>
- * The implementation is based on the one in Sawzall, available here:
- * {@link <a href="http://szl.googlecode.com/svn-history/r41/trunk/src/emitters/szlquantile.cc">szlquantile.cc</a>}
- * </p>
- * 
- * <p>
- * N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is 
- * done (e.g., group by 'day') if the data is too large.  That is, this isn't distributed quantiles.
- * </p>
- * 
- * <p>
- * Note that unlike datafu's standard Quantile algorithm, the Munro-Paterson algorithm gives
- * <b>approximate</b> quantiles and does not require the input bag to be sorted.  Because it implements
- * accumulate, StreamingQuantile can be much more efficient than Quantile for large amounts of data which
- * do not fit in memory.  Quantile must spill to disk when the input data is too large to fit in memory, 
- * which will contribute to longer runtimes.
- * </p>
- * 
- * <p>The constructor takes a single integer argument that specifies the number of evenly-spaced 
- * quantiles to compute, e.g.,</p>
- * 
- * <ul>
- *   <li>StreamingQuantile('3') yields the min, the median, and the max
- *   <li>StreamingQuantile('5') yields the min, the 25th, 50th, 75th percentiles, and the max
- *   <li>StreamingQuantile('101') yields the min, the max, and all 99 percentiles.
- * </ul>
- * 
- * <p>Alternatively the constructor can take the explicit list of quantiles to compute, e.g.</p>
- *
- * <ul>
- *   <li>StreamingQuantile('0.0','0.5','1.0') yields the min, the median, and the max
- *   <li>StreamingQuantile('0.0','0.25','0.5','0.75','1.0') yields the min, the 25th, 50th, 75th percentiles, and the max
- * </ul>
- *
- * <p>The list of quantiles need not span the entire range from 0.0 to 1.0, nor do they need to be evenly spaced, e.g.</p>
- * 
- * <ul>
- *   <li>StreamingQuantile('0.5','0.90','0.95','0.99') yields the median, the 90th, 95th, and the 99th percentiles
- *   <li>StreamingQuantile('0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987') yields the 0.13th, 2.28th, 15.87th, 50th, 84.13th, 97.72nd, and 99.87th percentiles
- * </ul>
- *
- * <p>Be aware when specifying the list of quantiles in this way that more quantiles may be computed internally than are actually returned.
- * The GCD of the quantiles is found and this determines the number of evenly spaced quantiles to compute.  The requested quantiles
- * are then returned from this set.  For instance:</p>
- * 
- * <ul>
- *   <li>If the quantiles 0.2 and 0.6 are requested then the quantiles 0.0, 0.2, 0.4, 0.6, 0.8, and 1.0 are computed 
- *       because 0.2 is the GCD of 0.2, 0.6, and 1.0.</li>  
- *   <li>If 0.2 and 0.7 are requested then the quantiles 0.0, 0.1, 0.2, ... , 0.9, 1.0 are computed because 0.1 is the 
- *       GCD of 0.2, 0.7, and 1.0.</li>
- *   <li>If 0.999 is requested the quantiles 0.0, 0.001, 0.002, ... , 0.998, 0.999, 1.0 are computed because 0.001 is
- *       the GCD of 0.999 and 1.0.</li> 
- *  </p>  
- * </ul>
- * 
- * <p>The error on the approximation goes down as the number of buckets computed goes up.</p>
- * 
- * <p>
- * Example:
- * <pre>
- * {@code
- *
- * define Quantile datafu.pig.stats.StreamingQuantile('5');
-
- * -- input: 9,10,2,3,5,8,1,4,6,7
- * input = LOAD 'input' AS (val:int);
- *
- * grouped = GROUP input ALL;
- *
- * -- produces: (1.0,3.0,5.0,8.0,10.0)
- * quantiles = FOREACH grouped generate Quantile(input);
- * }
- * </pre></p>
- *
- * @see StreamingMedian
- * @see Quantile
- */
-public class StreamingQuantile extends AccumulatorEvalFunc<Tuple>
-{
-  private final int numQuantiles;
-  private final QuantileEstimator estimator;
-  private List<Double> quantiles;
- 
- // For the output schema, label the quantiles 0, 1, 2, ... n
- // Otherwise label the quantiles based on the quantile value.
- // e.g. 50% quantile 0.5 will be labeled as 0_5
- private boolean ordinalOutputSchema;
-  
-  public StreamingQuantile(String... k)
-  {
-    if (k.length == 1 && Double.parseDouble(k[0]) > 1.0) 
-    {
-      this.ordinalOutputSchema = true;
-      this.numQuantiles = Integer.parseInt(k[0]);
-    }
-    else
-    {
-      this.quantiles = QuantileUtil.getQuantilesFromParams(k);
-      this.numQuantiles = getNumQuantiles(this.quantiles);
-    }
-    this.estimator = new QuantileEstimator(this.numQuantiles);
-  }
-  
-  private static int getNumQuantiles(List<Double> quantiles)
-  {
-    quantiles = new ArrayList<Double>(quantiles);
-    Collections.sort(quantiles);
-    int start = 0;
-    int end = quantiles.size()-1;
-    while (quantiles.get(start) == 0.0) start++;
-    while (quantiles.get(end) == 1.0) end--;
-    double gcd = 1.0;
-    for (int i=end; i>=start; i--)
-    {
-      gcd = gcd(gcd,quantiles.get(i));
-    }
-    int numQuantiles = (int)(1/gcd) + 1;
-    return numQuantiles;
-  }
-  
-  private static double gcd(double a, double b)
-  {
-    if (round(a) == 0.0)
-    {
-      throw new IllegalArgumentException("Quantiles are smaller than the allowed precision");
-    }
-    if (round(b) == 0.0)
-    {
-      throw new IllegalArgumentException("Quantiles are smaller than the allowed precision");
-    }
-    while (round(b) != 0.0)
-    {
-      double t = b;
-      b = a % b;
-      a = t;
-    }
-    return round(a);
-  }
-  
-  private static double round(double d)
-  {
-    return Math.round(d*100000.0)/100000.0;
-  }
-
-  @Override
-  public void accumulate(Tuple b) throws IOException
-  {
-    DataBag bag = (DataBag) b.get(0);
-    if (bag == null || bag.size() == 0)
-      return;
-
-    for (Tuple t : bag) {
-      Object o = t.get(0);
-      if (!(o instanceof Number)) {
-        throw new IllegalStateException("bag must have numerical values (and be non-null)");
-      }
-      estimator.add(((Number) o).doubleValue());
-    }
-  }
-
-  @Override
-  public void cleanup()
-  {
-    estimator.clear();
-  }
-
-  @Override
-  public Tuple getValue()
-  {
-    Tuple t = TupleFactory.getInstance().newTuple(this.quantiles != null ? this.quantiles.size() : this.numQuantiles);
-    try {
-      if (this.quantiles == null)
-      {
-        int j = 0;
-        for (double quantileValue : estimator.getQuantiles()) 
-        {
-          t.set(j, quantileValue);
-          j++;
-        }
-      }
-      else
-      {
-        HashMap<Double,Double> quantileValues = new HashMap<Double,Double>(this.quantiles.size());
-        double quantileKey = 0.0;
-        for (double quantileValue : estimator.getQuantiles()) {
-          quantileValues.put(round(quantileKey), quantileValue);
-          quantileKey += 1.0/(this.numQuantiles-1);
-        }
-        int j = 0;
-        for (double d : this.quantiles)
-        {
-          Double quantileValue = quantileValues.get(round(d));
-          t.set(j, quantileValue);
-          j++;
-        }
-      }
-    } catch (IOException e) {
-      return null;
-    }
-    return t;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    Schema tupleSchema = new Schema();
-    if (ordinalOutputSchema)
-    {
-      for (int i = 0; i < this.numQuantiles; i++) 
-      {
-        tupleSchema.add(new Schema.FieldSchema("quantile_" + i, DataType.DOUBLE));
-      }
-    }
-    else
-    {
-      for (Double x : this.quantiles)
-        tupleSchema.add(new Schema.FieldSchema("quantile_" + x.toString().replace(".", "_"), DataType.DOUBLE));
-    }
-
-    try {
-      return new Schema(new FieldSchema(null, tupleSchema, DataType.TUPLE));
-    } catch(FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  static class QuantileEstimator
-  {
-    private static final long MAX_TOT_ELEMS = 1024L * 1024L * 1024L * 1024L;
-
-    private final List<List<Double>> buffer = new ArrayList<List<Double>>();
-    private final int numQuantiles;
-    private final int maxElementsPerBuffer;
-    private int totalElements;
-    private double min;
-    private double max;
-    
-    public QuantileEstimator(int numQuantiles)
-    {
-      this.numQuantiles = numQuantiles;
-      this.maxElementsPerBuffer = computeMaxElementsPerBuffer();
-    }
-    
-    private int computeMaxElementsPerBuffer()
-    {
-      double epsilon = 1.0 / (numQuantiles - 1.0);
-      int b = 2;
-      while ((b - 2) * (0x1L << (b - 2)) + 0.5 <= epsilon * MAX_TOT_ELEMS) {
-        ++b;
-      }
-      return (int) (MAX_TOT_ELEMS / (0x1L << (b - 1)));
-    }
-    
-    private void ensureBuffer(int level)
-    {
-      while (buffer.size() < level + 1) {
-        buffer.add(null);
-      }
-      if (buffer.get(level) == null) {
-        buffer.set(level, new ArrayList<Double>());
-      }
-    }
-    
-    private void collapse(List<Double> a, List<Double> b, List<Double> out)
-    {
-      int indexA = 0, indexB = 0, count = 0;
-      Double smaller = null;
-      while (indexA < maxElementsPerBuffer || indexB < maxElementsPerBuffer) {
-        if (indexA >= maxElementsPerBuffer ||
-            (indexB < maxElementsPerBuffer && a.get(indexA) >= b.get(indexB))) {
-          smaller = b.get(indexB++);
-        } else {
-          smaller = a.get(indexA++);
-        }
-        
-        if (count++ % 2 == 0) {
-          out.add(smaller);
-        }
-      }
-      a.clear();
-      b.clear();
-    }
-    
-    private void recursiveCollapse(List<Double> buf, int level)
-    {
-      ensureBuffer(level + 1);
-      
-      List<Double> merged;
-      if (buffer.get(level + 1).isEmpty()) {
-        merged = buffer.get(level + 1);
-      } else {
-        merged = new ArrayList<Double>(maxElementsPerBuffer);
-      }
-      
-      collapse(buffer.get(level), buf, merged);
-      if (buffer.get(level + 1) != merged) {
-        recursiveCollapse(merged, level + 1);
-      }
-    }
-    
-    public void add(double elem)
-    {
-      if (totalElements == 0 || elem < min) {
-        min = elem;
-      }
-      if (totalElements == 0 || max < elem) {
-        max = elem;
-      }
-      
-      if (totalElements > 0 && totalElements % (2 * maxElementsPerBuffer) == 0) {
-        Collections.sort(buffer.get(0));
-        Collections.sort(buffer.get(1));
-        recursiveCollapse(buffer.get(0), 1);
-      }
-      
-      ensureBuffer(0);
-      ensureBuffer(1);
-      int index = buffer.get(0).size() < maxElementsPerBuffer ? 0 : 1;
-      buffer.get(index).add(elem);
-      totalElements++;
-    }
-
-    public void clear()
-    {
-      buffer.clear();
-      totalElements = 0;
-    }
-
-    public List<Double> getQuantiles()
-    {
-      List<Double> quantiles = new ArrayList<Double>();
-      quantiles.add(min);
-      
-      if (buffer.get(0) != null) {
-        Collections.sort(buffer.get(0));
-      }
-      if (buffer.get(1) != null) {
-        Collections.sort(buffer.get(1));
-      }
-      
-      int[] index = new int[buffer.size()];
-      long S = 0;
-      for (int i = 1; i <= numQuantiles - 2; i++) {
-        long targetS = (long) Math.ceil(i * (totalElements / (numQuantiles - 1.0)));
-        
-        while (true) {
-          double smallest = max;
-          int minBufferId = -1;
-          for (int j = 0; j < buffer.size(); j++) {
-            if (buffer.get(j) != null && index[j] < buffer.get(j).size()) {
-              if (!(smallest < buffer.get(j).get(index[j]))) {
-                smallest = buffer.get(j).get(index[j]);
-                minBufferId = j;
-              }
-            }
-          }
-          
-          long incrementS = minBufferId <= 1 ? 1L : (0x1L << (minBufferId - 1));
-          if (S + incrementS >= targetS) {
-            quantiles.add(smallest);
-            break;
-          } else {
-            index[minBufferId]++;
-            S += incrementS;
-          }
-        }
-      }
-      
-      quantiles.add(max);
-      return quantiles;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/VAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/VAR.java b/src/java/datafu/pig/stats/VAR.java
deleted file mode 100644
index 6f22f25..0000000
--- a/src/java/datafu/pig/stats/VAR.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Generates the {@link <a href="http://en.wikipedia.org/wiki/Variance" target="_blank">Variance</a>} 
-* of a set of Values. This UDF uses the fact that variance(x) = average(x^2) - average(x)^2
-* This class implements * {@link org.apache.pig.Algebraic}, so if possible the execution will performed in a distributed fashion.
-* VAR implements the {@link org.apache.pig.Accumulator} interface as well.
-* 
-* Input: Bag of int, long, double, float or bytearray
-* Output: Double
-* 
-* <p>
-* Example:
-* <pre>
-* define VAR datafu.pig.stats.VAR();
-* 
-* -- input: 1,2,3,4,10,5,6,7,8,9
-* input = LOAD 'input' AS (val:int);
-* grouped = GROUP input ALL;
-* variance = FOREACH grouped GENERATE VAR(input.val) AS variance;
-* </pre>
-* </p>
-*/
-public class VAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            Double sum = sum(input);
-            Double sumSquare = sumSquare(input);
-            
-            if(sum == null) {
-                // either we were handed an empty bag or a bag
-                // filled with nulls - return null in this case
-                return null;
-            }
-            long count = count(input);
-
-            Double var = null;
-            if (count > 0){
-                Double avg = new Double(sum / count);
-                Double avgSquare = new Double(sumSquare / count);
-                var = avgSquare - avg*avg;
-            }
-    
-            return var;
-        } catch (ExecException ee) {
-            throw ee;
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            Tuple t = mTupleFactory.newTuple(3);
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to get variance 
-                DataBag bg = (DataBag) input.get(0);
-                DataByteArray dba = null;
-                Iterator<Tuple> iter = bg.iterator();
-                if(iter.hasNext()) {
-                    Tuple tp = iter.next();
-                    dba = (DataByteArray)tp.get(0);
-                }
-                
-                if (iter.hasNext())
-                {
-                  throw new RuntimeException("Expected only one tuple in bag");
-                }
-                
-                Double d = dba !=null ? Double.valueOf(dba.toString()) : null;
-                if (dba == null){
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                }
-                else {
-                    t.set(0, d);
-                    t.set(1, d*d);
-                    t.set(2, 1L);
-                }
-                return t;
-            } catch(NumberFormatException nfe) {
-                nfe.printStackTrace();
-                // invalid input,
-                // treat this input as null
-                try {
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                } catch (ExecException e) {
-                    throw e;
-                }
-                return t;
-            } catch (ExecException ee) {
-                ee.printStackTrace();
-                throw ee;
-            } catch (Exception e) {
-                e.printStackTrace();
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-                
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                return combine(b);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            
-            }
-        }
-    }
-
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                Tuple combined = combine(b);
-
-                Double sum = (Double)combined.get(0);
-                Double sumSquare = (Double)combined.get(1);
-                if(sum == null) {
-                    return null;
-                }
-                Long count = (Long)combined.get(2);
-
-                Double var = null;
-                
-                if (count > 0) {
-                    Double avg = new Double(sum / count);
-                    Double avgSquare = new Double(sumSquare / count);
-                    var = avgSquare - avg*avg;
-                }
-                return var;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Tuple combine(DataBag values) throws ExecException{
-        double sum = 0;
-        double sumSquare = 0;
-        long totalCount = 0;
-
-        // combine is called from Intermediate and Final
-        // In either case, Initial would have been called
-        // before and would have sent in valid tuples
-        // Hence we don't need to check if incoming bag
-        // is empty
-
-        Tuple output = mTupleFactory.newTuple(3);
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            Double d = (Double)t.get(0);
-            Double dSquare = (Double)t.get(1);
-            Long count = (Long)t.get(2);
-            
-            // we count nulls in var as contributing 0
-            // a departure from SQL for performance of
-            // COUNT() which implemented by just inspecting
-            // size of the bag
-            if(d == null) {
-                d = 0.0;
-            } else {
-                sawNonNull = true;
-            }
-            sum += d;
-            sumSquare += dSquare;
-            totalCount += count;
-        }
-        if(sawNonNull) {
-            output.set(0, new Double(sum));
-            output.set(1, new Double(sumSquare));
-        } else {
-            output.set(0, null);
-            output.set(1, null);
-        }
-        output.set(2, Long.valueOf(totalCount));
-        return output;
-    }
-
-    static protected long count(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        long cnt = 0;
-        Iterator<Tuple> it = values.iterator();
-        while (it.hasNext()){
-            Tuple t = (Tuple)it.next();
-            if (t != null && t.size() > 0 && t.get(0) != null)
-                cnt ++;
-        }
-                    
-        return cnt;
-    }
-
-    static protected Double sum(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                DataByteArray dba = (DataByteArray)t.get(0);
-                Double d = dba != null ? Double.valueOf(dba.toString()) : null;
-                if (d == null) continue;
-                sawNonNull = true;
-                sum += d;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-    }
-    
-    static protected Double sumSquare(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sumSquare = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                DataByteArray dba = (DataByteArray)t.get(0);
-                Double d = dba != null ? Double.valueOf(dba.toString()) : null;
-                if (d == null) continue;
-                sawNonNull = true;
-                sumSquare += d*d;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of squared values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(sumSquare);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
-    }
-
-    @Override
-    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
-        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
-        funcList.add(new FuncSpec(this.getClass().getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));
-        funcList.add(new FuncSpec(DoubleVAR.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
-        funcList.add(new FuncSpec(FloatVAR.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
-        funcList.add(new FuncSpec(IntVAR.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
-        funcList.add(new FuncSpec(LongVAR.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
-        return funcList;
-    }
-
-    /* Accumulator interface implementation */
-    private Double intermediateSumSquare = null;
-    private Double intermediateSum = null;
-    private Long intermediateCount = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double sum = sum(b);
-            if(sum == null) {
-                return;
-            }
-            
-            Double sumSquare = sumSquare(b);
-            if(sumSquare == null) {
-                return;
-            }
-            
-            // set default values
-            if (intermediateSum == null || intermediateCount == null) {
-                intermediateSumSquare = 0.0;
-                intermediateSum = 0.0;
-                intermediateCount = (long) 0;
-            }
-            
-            long count = (Long)count(b);
-
-            if (count > 0) {
-                intermediateCount += count;
-                intermediateSum += sum;
-                intermediateSumSquare += sumSquare;
-            }
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSumSquare = null;
-        intermediateSum = null;
-        intermediateCount = null;
-    }
-
-    @Override
-    public Double getValue() {
-        Double var = null;
-        if (intermediateCount != null && intermediateCount > 0) {
-            Double avg = new Double(intermediateSum / intermediateCount);
-            Double avgSquare = new Double(intermediateSumSquare / intermediateCount);
-            var = avgSquare - avg*avg;
-        }
-        return var;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/WilsonBinConf.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/WilsonBinConf.java b/src/java/datafu/pig/stats/WilsonBinConf.java
deleted file mode 100644
index 1448611..0000000
--- a/src/java/datafu/pig/stats/WilsonBinConf.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.math.MathException;
-import org.apache.commons.math.distribution.NormalDistribution;
-import org.apache.commons.math.distribution.NormalDistributionImpl;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Computes the {@link <a href="http://en.wikipedia.org/wiki/Binomial_proportion_confidence_interval#Wilson_score_interval" target="_blank">Wilsonian binomial proportion confidence interval</a>}
- * <p>
- * Constructor requires the confidence interval (alpha) parameter, and the
- * parameters are the number of positive (success) outcomes and the total
- * number of observations. The UDF returns the (lower,upper) confidence
- * interval. 
- * <p>
- * Example:
- * <pre>
- * {@code
- * -- the Wilsonian binomial proportion confidence interval for scoring
- * %declare WILSON_ALPHA 0.10
- *
- * define WilsonBinConf      datafu.pig.stats.WilsonBinConf('$WILSON_ALPHA'); 
- *
- * bar = FOREACH foo GENERATE WilsonBinConf(successes, totals).lower as score;
- * quux = ORDER bar BY score DESC;
- * top = LIMIT quux 10;
- * }
- * </pre></p>
- */
-public class WilsonBinConf extends SimpleEvalFunc<Tuple>
-{
-  private static TupleFactory tupleFactory = TupleFactory.getInstance();
-  private final double alpha;
-
-  public WilsonBinConf(double alpha)
-  {
-    this.alpha = alpha;
-  }
-
-  public WilsonBinConf(String alpha)
-  {
-    this(Double.parseDouble(alpha));
-  }
-
-  public Tuple call(Number x, Number n) throws IOException
-  {
-    if (x == null || n == null)
-      return null;
-    return binconf(x.longValue(), n.longValue());
-  }
-  
-  /**
-   * @param x The number of positive (success) outcomes
-   * @param n The number of observations
-   * @return The (lower,upper) confidence interval
-   */
-  public Tuple binconf(Long x, Long n) throws IOException
-  {
-    NormalDistribution normalDist = new NormalDistributionImpl();
-
-    if (x == null || n == null)
-      return null;
-    if (x < 0 || n < 0)
-      throw new IllegalArgumentException("non-negative values expected");
-    if (x > n)
-      throw new IllegalArgumentException("invariant violation: number of successes > number of obs");
-    if (n == 0)
-      return tupleFactory.newTuple(Arrays.asList(Double.valueOf(0), Double.valueOf(0)));
-
-    try {
-      double zcrit = -1.0 * normalDist.inverseCumulativeProbability(alpha/2);
-      double z2 = zcrit * zcrit;
-      double p = x/(double)n;
-
-      double a = p + z2/2/n;
-      double b = zcrit * Math.sqrt((p * (1 - p) + z2/4/n)/n);
-      double c = (1 + z2/n);
-
-      double lower = (a - b) / c;
-      double upper = (a + b) / c;
-
-      // Add corrections for when x is very close to n.  This improves the estimates.
-      // For more info on wilson binomial confidence interval, see paper:
-      // L.D. Brown, T.T. Cai and A. DasGupta, Interval estimation for a binomial proportion (with discussion), 
-      //   _Statistical Science,_*16*:101-133, 2001. 
-      // http://www-stat.wharton.upenn.edu/~tcai/paper/Binomial-StatSci.pdf
-      
-      if (x == 1)
-        lower = -Math.log(1 - alpha)/n;
-      if (x == (n - 1))
-        upper = 1 + Math.log(1 - alpha)/n;
-
-      return tupleFactory.newTuple(Arrays.asList(lower, upper));
-    }
-    catch (MathException e) {
-      throw new IOException("math error", e);
-    }
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      Schema innerSchema =  new  Schema(Arrays.asList(
-              new Schema.FieldSchema("lower", DataType.DOUBLE),
-              new Schema.FieldSchema("upper", DataType.DOUBLE)));
-
-      return new Schema(new FieldSchema(null, innerSchema, DataType.TUPLE));
-    } catch(FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java b/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
deleted file mode 100644
index bfb7398..0000000
--- a/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-
-import java.util.Collections;
-
-
-
-/*
- * This class implements the Chao-Shen entropy estimator based on this  
- * {@link <a href="http://www.researchgate.net/publication/226041346_Nonparametric_estimation_of_Shannons_index_of_diversity_when_there_are_unseen_species_in_sample/file/d912f510a79b8aae74.pdf" target="_blank">paper</a>} 
- * <p>
- * It combines the {@link <a href="http://en.wikipedia.org/wiki/Horvitz%E2%80%93Thompson_estimator" target="_blank">Horvitz-Thompson estimator</a>}
- * to adjust for missing species and the concept of sample coverage to estimate the relative abundances of species in the sample. 
- * The entropy calculation formula is as follows:
- * <pre>
- * {@code
- *   H(X) = - SUM( ( C * c(x) / N * log(C * c(x) / N) ) / ( 1 - (1 - C * c(x) / N) ^ N ) )
- *   c(x) is the occurrence frequency of sample x, N is the sum of c(x)
- *   C = 1 - N1 / N, N1 is the number of samples whose c(x) equals 1.
- * }
- * </pre>
- * </p>
- */
-class ChaoShenEntropyEstimator extends EntropyEstimator {
-    //sample frequency 
-    private AccumulativeSampleFrequencyMap freqBaggedMap;
-    
-    //sum of frequency of all samples
-    private long N;
-    
-    //number of samples whose occurrence frequency is 1
-    private long N1;
-    
-    ChaoShenEntropyEstimator(String base){
-        super(base);
-        reset();
-    }
-
-    @Override
-    public void accumulate(long cx) throws ExecException {
-        if(cx > 0) {
-            freqBaggedMap.accumulate(cx);  
-            N += cx;
-            if(cx == 1) {
-               N1++;
-            }
-        }
-    }
-
-    @Override
-    public double getEntropy() {
-        double h = 0.0;
-        
-        if(N > 0) {
-
-            if(N1 == N) {
-               //avoid c == 0
-               N1 = N - 1;
-            }
-
-            //sample coverage estimation
-            double c = 1 - (double)N1 / N;
-            
-            try {
-                 //from in-memory frequency map
-                 for(Iterator<Map.Entry<Long, MutableLong>> iter = this.freqBaggedMap.getInternalMap().entrySet().iterator();
-                         iter.hasNext(); ) {
-                     Map.Entry<Long, MutableLong> entry = iter.next();
-                     long cx = entry.getKey();
-                     long cnt = entry.getValue().longValue();
-                     h += accumlateEntropy(cx, this.N, c, cnt);
-                 }
-                 //from backup databag
-                 for(Iterator<Tuple> iter = this.freqBaggedMap.getInternalBag().iterator();
-                          iter.hasNext(); ) {
-                    Tuple t = iter.next();
-                    long cx = (Long)t.get(0);
-                    long cnt = (Long)t.get(1);
-                    h += accumlateEntropy(cx, this.N, c, cnt);
-                 }
-            } catch(ExecException ex) {
-                 throw new RuntimeException(
-                         "Error while computing chao-shen entropy, exception: " + ex);
-            }
-        }
-
-        return EntropyUtil.logTransform(h, super.base);
-    }
-    
-    private double accumlateEntropy(long cx, 
-                                    long N,
-                                    double c,
-                                    long cnt) {
-        //sample proportion
-        double p = (double)cx / N;
-        //sample coverage adjusted probability
-        double pa = c * p;
-        //probability to see an individual in the sample
-        double la = 1 - Math.pow((1 - pa), N);
-        return -(cnt * pa * Math.log(pa) / la);
-    }
-
-    @Override
-    public void reset() {
-        this.N = 0;
-        this.N1 = 0;
-        if(this.freqBaggedMap != null) {
-            this.freqBaggedMap.clear();
-        }
-        this.freqBaggedMap = new AccumulativeSampleFrequencyMap();
-    }
-    
-    /*
-     * A map backed by a data bag to record the sample occurrence frequencies which might be repeated
-     * The purpose of this class is that we suspect in the real applications, the sample occurrence frequency
-     * might be repeated and putting all these repeated numbers into databag may be space inefficient
-     * So we have an in-memory map to record the repetitions and use a databag to backup when the size of the
-     * in-memory map exceeds a pre-defined threshold. At present we use 5M as the default threshold to control
-     * the number of entries in the in-memory map, which approximates to 80M bytes. 
-     */
-    private class AccumulativeSampleFrequencyMap {
-        
-        private long spillBytesThreshold;
-        
-        /* key is the sample occurrence frequency
-         * value is the number of repetitions of this frequency
-         */
-        private Map<Long, MutableLong> countMap;
-        
-        /* the backed databag
-         * each tuple has 2 elements
-         * the 1st element is sample occurrence frequency
-         * the 2nd element is the number of repetitions
-         */
-        private DataBag countBag;
-           
-        AccumulativeSampleFrequencyMap(){
-            this(1024 * 1024 * 5 * (Long.SIZE + Long.SIZE) / 8);
-        }
-        
-        AccumulativeSampleFrequencyMap(long spillBytesThreshold) {
-            this.spillBytesThreshold = spillBytesThreshold;
-            clear();
-        }
-        
-        void accumulate(long key) throws ExecException {
-            MutableLong val = this.countMap.get(key);
-            if(val == null) {
-                this.countMap.put(key, new MutableLong(1));
-            } else {
-                val.add(1);
-            }
-            
-            if(this.countMap.size() * (Long.SIZE + Long.SIZE) / 8 > spillBytesThreshold) {
-                spillFromMap2Bag();
-            }
-        }
-        
-        private void spillFromMap2Bag() throws ExecException {
-            for(Map.Entry<Long, MutableLong> entry : this.countMap.entrySet()) {
-                Tuple t = TupleFactory.getInstance().newTuple(2);
-                t.set(0, entry.getKey());
-                t.set(1, entry.getValue().longValue());
-                this.countBag.add(t);
-            }
-            this.countMap.clear();
-        }
-        
-        Map<Long, MutableLong> getInternalMap() {
-            return Collections.unmodifiableMap(this.countMap);
-        }
-        
-        DataBag getInternalBag() {
-            return this.countBag;
-        }
-        
-        void clear() {
-            countMap = new HashMap<Long, MutableLong>();
-            countBag = BagFactory.getInstance().newDefaultBag();
-        }
-    }
-    
-    private class MutableLong {
-        private long val;
-        
-        MutableLong(){
-            this(0L);
-        }
-        
-        MutableLong(long val) {
-            this.val = val;
-        }
-        
-        long longValue(){
-            return val;
-        }
-        
-        void add(long additive){
-            this.val += additive;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/CondEntropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/CondEntropy.java b/src/java/datafu/pig/stats/entropy/CondEntropy.java
deleted file mode 100644
index 26b743e..0000000
--- a/src/java/datafu/pig/stats/entropy/CondEntropy.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-import org.apache.pig.AccumulatorEvalFunc;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
-
-/**
- * Calculate conditional entropy H(Y|X) of random variables X and Y following conditional entropy's 
- * {@link <a href="http://en.wikipedia.org/wiki/Conditional_entropy" target="_blank">wiki definition</a>}, 
- * X is the conditional variable and Y is the variable that conditions on X.
- * <p>
- * Each tuple of the input bag has 2 fields, the 1st field is an object instance of variable X and
- * the 2nd field is an object instance of variable Y. An exception will be thrown if the number of fields is not 2.
- * </p> 
- * <p>
- * This UDF's constructor definition and parameters are the same as that of {@link datafu.pig.stats.entropy.Entropy}
- * </p>
- * <p>
- * Note:
- * <ul>
- *     <li>The input bag to this UDF must be <b>sorted</b> on X and Y, with X in the first sort order.
- *     An exception will be thrown if the input bag is not sorted.
- *     <li>The returned entropy value is of double type.
- * </ul>
- * </p>
- * <p>
- * How to use: 
- * </p>
- * <p>
- * This UDF calculates conditional entropy given raw data tuples of X and Y without the need to pre-compute per tuple occurrence frequency.
- * </p>
- * <p>
- * It could be used in a nested FOREACH after a GROUP BY, in which we sort the inner bag and use the sorted bag as this UDF's input.
- * </p>
- * <p>
- * Example:
- * <pre>
- * {@code
- * --define empirical conditional entropy with Euler's number as the logarithm base
- * define CondEntropy datafu.pig.stats.entropy.CondEntropy();
- *
- * input = LOAD 'input' AS (grp: chararray, valX: double, valY: double);
- *
- * -- calculate conditional entropy H(Y|X) in each group
- * input_group_g = GROUP input BY grp;
- * entropy_group = FOREACH input_group_g {
- *   input_val = input.(valX, valY)
- *   input_ordered = ORDER input_val BY $0, $1;
- *   GENERATE FLATTEN(group) AS group, CondEntropy(input_ordered) AS cond_entropy; 
- * }
- * }
- * </pre>
- * </p>
- * Use case to calculate mutual information:
- * <p>
- * <pre>
- * {@code
- * ------------
- * -- calculate mutual information I(X, Y) using conditional entropy UDF and entropy UDF
- * -- I(X, Y) = H(Y) - H(Y|X)
- * ------------
- * 
- * define CondEntropy datafu.pig.stats.entropy.CondEntropy();
- * define Entropy datafu.pig.stats.entropy.Entropy();
- * 
- * input = LOAD 'input' AS (grp: chararray, valX: double, valY: double);
- * 
- * -- calculate the I(X,Y) in each group
- * input_group_g = GROUP input BY grp;
- * mutual_information = FOREACH input_group_g {
- *      input_val_x_y = input.(valX, valY);
- *      input_val_x_y_ordered = ORDER input_val_x_y BY $0,$1;
- *      input_val_y = input.valY;
- *      input_val_y_ordered = ORDER input_val_y BY $0;
- *      cond_h_x_y = CondEntropy(input_val_x_y_ordered);
- *      h_y = Entropy(input_val_y_ordered);
- *      GENERATE FLATTEN(group), h_y - cond_h_x_y;
- * }
- * }
- * </pre>
- * </p>
- * @see Entropy
- */
-public class CondEntropy extends AccumulatorEvalFunc<Double> {
-    //last visited tuple of <x,y>
-    private Tuple xy;
-    
-    //number of occurrence of last visited <x,y>
-    private long cxy;
-    
-    //number of occurrence of last visited x
-    private long cx;
-    
-    //comparison result between the present tuple and the last visited tuple
-    private int lastCmp;
-    
-    //entropy estimator for H(x,y)
-    private EntropyEstimator combEstimator;
-    
-    //entropy estimator for H(x)
-    private EntropyEstimator condXEstimator;
-    
-    public CondEntropy() throws ExecException
-    {
-      this(EntropyEstimator.EMPIRICAL_ESTIMATOR);
-    }
-    
-    public CondEntropy(String type) throws ExecException 
-    {
-      this(type, EntropyUtil.LOG);
-    }
-
-    public CondEntropy(String type, String base) throws ExecException
-    {
-      try {
-          this.combEstimator = EntropyEstimator.createEstimator(type, base);
-          this.condXEstimator = EntropyEstimator.createEstimator(type, base);
-      } catch (IllegalArgumentException ex) {
-          throw new ExecException(String.format(
-                  "Fail to initialize StreamingCondEntropy with entropy estimator of type (%s), base: (%s). Exception: (%s)",
-                  type, base, ex)); 
-      }
-      cleanup();
-    }
-    
-    /*
-     * Accumulate occurrence frequency of <x,y> and x
-     * as we stream through the input bag
-     */
-    @Override
-    public void accumulate(Tuple input) throws IOException
-    {
-      for (Tuple t : (DataBag) input.get(0)) {
-
-        if (this.xy != null)
-        {
-            int cmp = t.compareTo(this.xy);
-            
-            //check if the comparison result is different from previous compare result
-            if ((cmp < 0 && this.lastCmp > 0)
-                || (cmp > 0 && this.lastCmp < 0)) {
-                throw new ExecException("Out of order! previous tuple: " + this.xy + ", present tuple: " + t
-                                        + ", comparsion: " + cmp + ", previous comparsion: " + this.lastCmp);
-            }
-            if (cmp != 0) {
-               //different <x,y>
-               this.combEstimator.accumulate(this.cxy);
-               this.cxy = 0;
-               this.lastCmp = cmp;
-               if(DataType.compare(this.xy.get(0), t.get(0)) != 0) {
-                  //different x
-                   this.condXEstimator.accumulate(this.cx);
-                   this.cx = 0;
-               }
-            } 
-        }
-
-        //set tuple t as the next tuple for comparison
-        this.xy = t;
-
-        //accumulate cx
-        this.cx++;
-        
-        //accumulate cxy
-        this.cxy++;
-      }
-    }
-    
-    @Override
-    public Double getValue()
-    {
-      //do not miss the last tuple
-      try {
-          this.combEstimator.accumulate(this.cxy);
-          this.condXEstimator.accumulate(this.cx);
-      } catch (ExecException ex) {
-          throw new RuntimeException("Error while accumulating sample frequency: " + ex);
-      }
-      
-      //Chain rule: H(Y|X) = H(X, Y) - H(X)
-      return this.combEstimator.getEntropy() - this.condXEstimator.getEntropy();
-    }
-    
-    @Override
-    public void cleanup()
-    {
-      this.xy = null;
-      this.cxy = 0;
-      this.cx = 0;
-      this.lastCmp = 0;
-      this.combEstimator.reset();
-      this.condXEstimator.reset();
-    }
-    
-    @Override
-    public Schema outputSchema(Schema input)
-    {
-        try {
-            Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-            if (inputFieldSchema.type != DataType.BAG)
-            {
-              throw new RuntimeException("Expected a BAG as input");
-            }
-            
-            Schema inputBagSchema = inputFieldSchema.schema;
-            
-            if (inputBagSchema.getField(0).type != DataType.TUPLE)
-            {
-              throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                       DataType.findTypeName(inputBagSchema.getField(0).type)));
-            }
-            
-            Schema tupleSchema = inputBagSchema.getField(0).schema;
-            
-            if(tupleSchema == null) {
-                throw new RuntimeException("The tuple of the input bag has no schema");
-            }
-            
-            List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
-            
-            if(fieldSchemaList == null || fieldSchemaList.size() != 2) {
-                throw new RuntimeException("The field schema of the input tuple is null or its size is not 2");
-            }
-            
-            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
-                                                                   .getName()
-                                                                   .toLowerCase(), input),
-                                                 DataType.DOUBLE));
-          } catch (FrontendException e) {
-            throw new RuntimeException(e);
-          }
-     }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/EmpiricalCountEntropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EmpiricalCountEntropy.java b/src/java/datafu/pig/stats/entropy/EmpiricalCountEntropy.java
deleted file mode 100644
index 388b80f..0000000
--- a/src/java/datafu/pig/stats/entropy/EmpiricalCountEntropy.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigException;
-import org.apache.pig.PigWarning;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.stats.entropy.EntropyUtil;
-
-
-/**
- * Calculate the empirical entropy of random variable X given its occurrence frequencies, following entropy's
- * {@link <a href="http://en.wikipedia.org/wiki/Entropy_%28information_theory%29" target="_blank">wiki definition</a>}
- * <p>
- * This UDF's constructor takes 1 argument: the logarithm base, whose definition is the same as that defined in {@link datafu.pig.stats.entropy.Entropy}
- * </p>
- * <p>
- * Note: 
- * <ul>
- *     <li>Unlike {@link datafu.pig.stats.entropy.Entropy}, which calculates entropy from sorted raw data bag in accumulative mode,
- *     this UDF calculates entropy from the data's occurrence frequencies which does not need to be sorted, either in accumulative or algebraic mode.
- *     <li>Each tuple of the UDF's input bag <b>must only</b> have 1 field, the occurrence frequency of a data instance,
- *     and the data type of this field <b>must</b> be int or long. Otherwise, an exception will be thrown.
- *     <li>Negative frequency number will be silently discarded and a warning message will be logged in the job's log file.
- *     <li>The returned entropy value is of double type.
- * </ul>
- * </p>
- * <p>
- * How to use: 
- * </p>
- * <p>
- * To use this UDF, customer needs to pre-compute the occurrence frequency of each data instance, often in an outer GROUP BY
- * , and then use this UDF to calculate entropy with those frequency numbers in another outer GROUP BY.
- * </p>
- * <p>
- * Compared with {@link datafu.pig.stats.entropy.Entropy}, this UDF is more scalable when we need to handle a very large data set, 
- * since it could distribute computation onto mappers and take advantage of combiners to reduce intermedidate output from mappers to reducers.
- * </p>
- * <p>
- * Example:
- * <pre>
- * {@code
- * 
- * define Entropy datafu.pig.stats.entropy.EmpiricalCountEntropy();
- *
- * input = LOAD 'input' AS (val: double);
- *
- * -- calculate the occurrence of each instance
- * counts_g = GROUP input BY val;
- * counts = FOREACh counts_g GENERATE COUNT(input) AS cnt;
- * 
- * -- calculate entropy 
- * input_counts_g = GROUP counts ALL;
- * entropy = FOREACH input_counts_g GENERATE Entropy(counts) AS entropy;
- * }
- * </pre>
- * </p>
- * Use case to calculate mutual information using EmpiricalCountEntropy:
- * <p>
- * <pre>
- * {@code
- * 
- * define Entropy datafu.pig.stats.entropy.EmpiricalCountEntropy();
- * 
- * input = LOAD 'input' AS (valX: double, valY: double);
- * 
- * ------------
- * -- calculate mutual information I(X, Y) using entropy
- * -- I(X, Y) = H(X) + H(Y) -  H(X, Y)
- * ------------
- * 
- * input_x_y_g = GROUP input BY (valX, valY);
- * input_x_y_cnt = FOREACH input_x_y_g GENERATE flatten(group) as (valX, valY), COUNT(input) AS cnt;
- * 
- * input_x_g = GROUP input_x_y_cnt BY valX;
- * input_x_cnt = FOREACH input_x_g GENERATE flatten(group) as valX, SUM(input_x_y_cnt.cnt) AS cnt;
- * 
- * input_y_g = GROUP input_x_y_cnt BY valY;
- * input_y_cnt = FOREACH input_y_g GENERATE flatten(group) as valY, SUM(input_x_y_cnt.cnt) AS cnt;
- * 
- * input_x_y_entropy_g = GROUP input_x_y_cnt ALL;
- * input_x_y_entropy = FOREACH input_x_y_entropy_g {
- *                         input_x_y_entropy_cnt = input_x_y_cnt.cnt;
- *                         GENERATE Entropy(input_x_y_entropy_cnt) AS x_y_entropy;
- *                     }
- *                         
- * input_x_entropy_g = GROUP input_x_cnt ALL;
- * input_x_entropy = FOREACH input_x_entropy_g {
- *                         input_x_entropy_cnt = input_x_cnt.cnt;
- *                         GENERATE Entropy(input_x_entropy_cnt) AS x_entropy;
- *                   }
- *                       
- * input_y_entropy_g = GROUP input_y_cnt ALL;
- * input_y_entropy = FOREACH input_y_entropy_g {
- *                         input_y_entropy_cnt = input_y_cnt.cnt;
- *                         GENERATE Entropy(input_y_entropy_cnt) AS y_entropy;
- *                   }
- *
- * input_mi_cross = CROSS input_x_y_entropy, input_x_entropy, input_y_entropy;
- * input_mi = FOREACH input_mi_cross GENERATE (input_x_entropy::x_entropy +
- *                                             input_y_entropy::y_entropy - 
- *                                             input_x_y_entropy::x_y_entropy) AS mi;
- * }
- * </pre>
- * </p>
- * @see Entropy
- */
-
-public class EmpiricalCountEntropy extends AccumulatorEvalFunc<Double> implements Algebraic {
-    
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-    
-    //entropy estimator for accumulator
-    //re-use the same entropy estimator for StreamingEntropy
-    private EntropyEstimator streamEstimator;
-    
-    //logarithm base
-    private String base;
-    
-    public EmpiricalCountEntropy() throws ExecException {
-        //empirical estimator using Euler's number as logarithm base
-        this(EntropyUtil.LOG);
-    }
-    
-    public EmpiricalCountEntropy(String base) throws ExecException {
-        try {
-            this.streamEstimator = EntropyEstimator.createEstimator(EntropyEstimator.EMPIRICAL_ESTIMATOR, base);
-        } catch (IllegalArgumentException ex) {
-            throw new ExecException(
-                    String.format("Fail to initialize EmpiricalCountEntropy with logarithm base: (%s), exception: (%s)", base, ex));
-        }
-        this.base = base;
-    }
-    
-    /*
-     * Algebraic implementation part
-     */
-    
-    private String param = null;
-    private String getParam()
-    {
-      if (param == null) {
-        if (this.base != null) {
-          param = String.format("('%s')", this.base);
-        } else {
-          param = "";
-        }
-      }
-      return param;
-    }
-    
-    @Override
-    public String getFinal() {
-        return Final.class.getName() + getParam();
-    }
-
-    @Override
-    public String getInitial() {
-       return Initial.class.getName() + getParam();
-    }
-
-    @Override
-    public String getIntermed() {
-        return Intermediate.class.getName() + getParam();
-    }
-    
-    static public class Initial extends EvalFunc<Tuple> {
-                
-        public Initial(){}
-        
-        public Initial(String base){}
-        
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            Tuple t = mTupleFactory.newTuple(2);
-
-            try{
-                //input is a bag with one tuple containing
-                //the sample's occurrence frequency
-                DataBag bg = (DataBag) input.get(0);
-                Long cxl = null;
-                if(bg.iterator().hasNext()) {
-                   Tuple tp = bg.iterator().next();
-                   cxl = ((Number)(tp.get(0))).longValue();
-                }
-                
-                if(cxl == null || cxl.longValue() < 0) {
-                    //emit null in case of invalid input frequency
-                    t.set(0, null);
-                    t.set(1, null);
-                    warn("Non-positive input frequency number: " + cxl, PigWarning.UDF_WARNING_1);
-                } else {
-                    long cx = cxl.longValue();
-                    double logcx = (cx > 0 ? Math.log(cx) : 0);
-                    double cxlogcx = cx * logcx;
-                    
-                    //1st element of the returned tuple is freq * log(freq)
-                    t.set(0, cxlogcx);
-                    
-                    //2nd element of the returned tuple is freq
-                    t.set(1, cxl);
-                }
-                
-                return t;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch(Exception e) {
-                int errCode = 10080;
-                String msg = "Error while computing entropy in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-            
-        }
-        
-    }
-    
-    static public class Intermediate extends EvalFunc<Tuple> {
-        
-        public Intermediate(){}
-        
-        public Intermediate(String base){}
-        
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                return combine(b);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 10081;
-                String msg = "Error while computing entropy in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-    
-    static public class Final extends EvalFunc<Double> {
-        private String base;
-        
-        public Final()
-        {
-            this(EntropyUtil.LOG);
-        }
-        
-        public Final(String base)
-        {
-            this.base = base;
-        }
-        
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                Tuple combined = combine(b);
-                
-                Double sumOfCxLogCx = (Double)combined.get(0);
-                Long sumOfCx = (Long)combined.get(1);
-                
-                if(sumOfCxLogCx == null || sumOfCx == null) {
-                    //emit null if there is at least 1 invalid input
-                    warn("Invalid null field output from combine(), " +
-                    		"1st field: " + sumOfCxLogCx + ", 2nd field: " + sumOfCx, PigWarning.UDF_WARNING_1);
-                    return null;
-                }
-                
-                Double entropy = null;
-                
-                double scxlogcx = sumOfCxLogCx.doubleValue();
-                long scx = sumOfCx.longValue();
-                                
-                if (scx > 0) {
-                    //H(X) = log(N) - 1 / N * SUM(c(x) * log(c(x)) )
-                    entropy = EntropyUtil.logTransform(Math.log(scx) - scxlogcx / scx, this.base);
-                }
-
-                return entropy;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 10082;
-                String msg = "Error while computing average in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-
-        }        
-    }
-    
-    static protected Tuple combine(DataBag values) throws ExecException { 
-        Tuple output = mTupleFactory.newTuple(2);
-        
-        boolean sawNonNull = false;
-        double sumOfCxLogCx = 0;
-        long sumOfCx = 0;
-        
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            Double scxlogcx = (Double)t.get(0);
-            Long scx = (Long)t.get(1);
-            
-            if(scxlogcx != null && scx != null) {
-                sumOfCxLogCx += scxlogcx.doubleValue();
-                sumOfCx += scx.longValue();
-                sawNonNull = true;
-            }
-        }
-        
-        if (sawNonNull) {
-            output.set(0, sumOfCxLogCx);
-            output.set(1, sumOfCx);
-        } else {
-            //emit null if there is no invalid input
-            output.set(0, null);
-            output.set(1, null);
-        }
-        
-        return output;
-    }
-    
-    /*
-     * Accumulator implementation part
-     */
-
-    @Override
-    public void accumulate(Tuple input) throws IOException
-    {
-        for (Tuple t : (DataBag) input.get(0)) {
-            long cx = ((Number)(t.get(0))).longValue();
-            this.streamEstimator.accumulate(cx);
-        }
-    }
-
-    @Override
-    public Double getValue()
-    {
-      return this.streamEstimator.getEntropy();
-    }
-
-    @Override
-    public void cleanup()
-    {
-        this.streamEstimator.reset();
-    }
-    
-    @Override
-    public Schema outputSchema(Schema input)
-    {
-        try {
-            Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-            if (inputFieldSchema.type != DataType.BAG)
-            {
-                throw new RuntimeException("Expected a BAG as input");
-            }
-            
-            Schema inputBagSchema = inputFieldSchema.schema;
-            
-            if (inputBagSchema.getField(0).type != DataType.TUPLE)
-            {
-                throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                       DataType.findTypeName(inputBagSchema.getField(0).type)));
-            }
-            
-            Schema tupleSchema = inputBagSchema.getField(0).schema;
-            
-            if(tupleSchema == null) {
-                throw new RuntimeException("The tuple of input bag has no schema");
-            }
-            
-            List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
-            
-            if(fieldSchemaList == null || fieldSchemaList.size() != 1) {
-                throw new RuntimeException("The field schema of the input tuple is null or its size is not 1");
-            }
-            
-            if(fieldSchemaList.get(0).type != DataType.INTEGER &&
-               fieldSchemaList.get(0).type != DataType.LONG )
-            {
-                String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
-                                                       DataType.findTypeName(DataType.LONG)};
-                throw new RuntimeException("Expect the type of the input tuple to be of (" +
-                        java.util.Arrays.toString(expectedTypes) + "), but instead found " + 
-                        DataType.findTypeName(fieldSchemaList.get(0).type));
-            } 
-            
-            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
-                                                                   .getName()
-                                                                   .toLowerCase(), input),
-                                                 DataType.DOUBLE));
-          } catch (FrontendException e) {
-            throw new RuntimeException(e);
-          }
-     }    
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java b/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
deleted file mode 100644
index 3f0acab..0000000
--- a/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-/*
- * This entropy estimator calculates the empirical entropy of given species
- * using the occurrence frequency of individual in the sample as an estimation of its probability. 
- * The empirical estimator is also called the maximum likelihood estimator (MLE). 
- * <p>
- * It applies the following formula to compute entropy:
- * <pre>
- * {@code
- * H(X) = log(N) - 1 / N * SUM(c(x) * log(c(x)) )
- * c(x) is the occurrence frequency of sample x, N is the sum of c(x)
- * }
- * </pre>
- * </p>
- * <p>
- * This entropy estimator is widely used when the number of species is known and small.
- * But it is biased since it does not cover the rare species with zero frequency in the sample.
- * </p>
- */
-class EmpiricalEntropyEstimator extends EntropyEstimator {
-    
-    //sum of frequency cx of all input samples
-    private long N;
-    
-    //sum of cx * log(cx) of all input samples
-    private double M;
-    
-    EmpiricalEntropyEstimator(String base) throws IllegalArgumentException {
-        super(base);
-        reset();
-    }
-
-    @Override
-    public void accumulate(long cx) {
-        if(cx > 0) {
-           N += cx;
-           M += cx * Math.log(cx);
-        }
-    }
-
-    @Override
-    public double getEntropy() {
-        return N > 0 ? EntropyUtil.logTransform(Math.log(N) - M / N, super.base) : 0;
-    }
-    
-    @Override
-    public void reset(){
-        N = 0;
-        M = 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/Entropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/Entropy.java b/src/java/datafu/pig/stats/entropy/Entropy.java
deleted file mode 100644
index 9dfff1a..0000000
--- a/src/java/datafu/pig/stats/entropy/Entropy.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-import java.io.IOException;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
-
-/**
- * Calculate entropy H(X) of random variable X following entropy's 
- * {@link <a href="http://en.wikipedia.org/wiki/Entropy_%28information_theory%29" target="_blank">wiki definition</a>}
- * <p>
- * This UDF's constructor takes 2 arguments. 
- * </p>
- * <p>
- * The 1st argument, the type of entropy estimator algorithm we currently support, includes:
- * <ul>
- *     <li>empirical (empirical entropy estimator)
- *     <li>chaosh (Chao-Shen entropy estimator) 
- * </ul>
- * </p>
- * <p>
- * The default estimation algorithm is empirical.
- * </p>
- * <p>
- * The 2nd argument, the logarithm base we currently support, includes:
- * </p>
- * <p>
- * <ul>
- *     <li>log (use Euler's number as the logarithm base)
- *     <li>log2 (use 2 as the logarithm base)
- *     <li>log10 (use 10 as the logarithm base) 
- * </ul>
- * </p>
- * <p>
- * The default logarithm base is log.
- * </p> 
- * <p>
- * Note:
- * <ul>
- *     <li>The input to this UDF must be a <b>sorted</b> bag of raw data tuples of X.
- *     An exception will be thrown if the input bag is not sorted 
- *     <li>The returned entropy value is of double type.
- * </ul>
- * </p>
- * <p>
- * How to use: 
- * </p>
- * <p>
- * This UDF calculates entropy from raw data tuples without the need to pre-compute per tuple occurrence frequency.
- * </p>
- * <p>
- * It could be used in a nested FOREACH after a GROUP BY, in which we sort the inner bag and use the sorted bag as this UDF's input.
- * </p>
- * Example:
- * <p>
- * <pre>
- * {@code
- * --calculate empirical entropy with Euler's number as the logarithm base
- * define Entropy datafu.pig.stats.entropy.Entropy();
- *
- * input = LOAD 'input' AS (grp: chararray, val: double);
- *
- * -- calculate the input's entropy in each group
- * input_group_g = GROUP input BY grp;
- * entropy_group = FOREACH input_group_g {
- *   input_val = input.val;
- *   input_ordered = ORDER input_val BY $0;
- *   GENERATE FLATTEN(group) AS group, Entropy(input_ordered) AS entropy; 
- * }
- * }
- * </pre>
- * </p>
- * @see CondEntropy
- * @see EmpiricalCountEntropy
- */
-public class Entropy extends AccumulatorEvalFunc<Double>
-{ 
-  //last visited tuple
-  private Tuple x;
-  
-  //number of occurrence of last visited tuple
-  private long cx;
-  
-  //comparison result between the present tuple and the last visited tuple
-  private int lastCmp;
-  
-  //entropy estimator that accumulates sample's occurrence frequency to
-  //calculates the actual entropy
-  private EntropyEstimator estimator;
-  
-  public Entropy() throws ExecException
-  {
-    this(EntropyEstimator.EMPIRICAL_ESTIMATOR);
-  }
-  
-  public Entropy(String type) throws ExecException 
-  {
-    this(type, EntropyUtil.LOG);
-  }
-
-  public Entropy(String type, String base) throws ExecException
-  {
-    try {
-        this.estimator = EntropyEstimator.createEstimator(type, base);
-    } catch (IllegalArgumentException ex) {
-        throw new ExecException(
-                String.format("Fail to initialize StreamingEntropy with entropy estimator of type (%s), base: (%s), exception: (%s)",
-                       type, base, ex) 
-              ); 
-    }
-    cleanup();
-  }
-
-  /*
-   * Accumulate occurrence frequency of each tuple as we stream through the input bag
-   */
-  @Override
-  public void accumulate(Tuple input) throws IOException
-  {
-    for (Tuple t : (DataBag) input.get(0)) {
-
-      if (this.x != null)
-      {
-          int cmp = t.compareTo(this.x);
-          
-          //check if the comparison result is different from previous compare result
-          if ((cmp < 0 && this.lastCmp > 0)
-              || (cmp > 0 && this.lastCmp < 0)) {
-              throw new ExecException("Out of order! previous tuple: " + this.x + ", present tuple: " + t
-                                      + ", comparsion: " + cmp + ", previous comparsion: " + this.lastCmp);
-          }
-
-          if (cmp != 0) {
-             //different tuple
-             this.estimator.accumulate(this.cx);
-             this.cx = 0;
-             this.lastCmp = cmp;
-          } 
-      }
-
-      //set tuple t as the next tuple for comparison
-      this.x = t;
-
-      //accumulate cx
-      this.cx++;
-    }
-  }
-
-  @Override
-  public Double getValue()
-  {
-    //do not miss the last tuple
-    try {
-        this.estimator.accumulate(this.cx);
-    } catch (ExecException ex) {
-        throw new RuntimeException("Error while accumulating sample frequency: " + ex);
-    }
-
-    return this.estimator.getEntropy();
-  }
-
-  @Override
-  public void cleanup()
-  {
-    this.x = null;
-    this.cx = 0;
-    this.lastCmp = 0;
-    this.estimator.reset();
-  }
-  
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-      try {
-          Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-          if (inputFieldSchema.type != DataType.BAG)
-          {
-            throw new RuntimeException("Expected a BAG as input");
-          }
-          
-          Schema inputBagSchema = inputFieldSchema.schema;
-          
-          if (inputBagSchema.getField(0).type != DataType.TUPLE)
-          {
-            throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                     DataType.findTypeName(inputBagSchema.getField(0).type)));
-          }
-          
-          return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
-                                                                 .getName()
-                                                                 .toLowerCase(), input),
-                                               DataType.DOUBLE));
-        } catch (FrontendException e) {
-          throw new RuntimeException(e);
-        }
-   }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EntropyEstimator.java b/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
deleted file mode 100644
index 336ef86..0000000
--- a/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-import org.apache.pig.backend.executionengine.ExecException;
-
-/*
-* Entropy estimator which exposes a unified interface to application
-* and hides the implementation details in its subclasses
-*/
-public abstract class EntropyEstimator {
-
-    public static final String EMPIRICAL_ESTIMATOR = "empirical";
-    
-    public static final String CHAOSHEN_ESTIMATOR = "chaosh";
-    
-    /*
-     * logarithm base
-     * by default, we use Euler's number as the default logarithm base
-     */ 
-    protected String base;
-    
-    protected EntropyEstimator(String base) throws IllegalArgumentException {
-        if(!EntropyUtil.isValidLogBase(base)) {
-            throw new IllegalArgumentException("Invalid input logarithm base. " + 
-                    "Please refer to StreamingEntropy's javadoc for supported logarithm base");
-        }
-        this.base = base;
-    }
-    
-    /* 
-     * create estimator instance based on the input type
-     * @param type  type of entropy estimator
-     * @param base logarithm base
-     * if the method is not supported, a null estimator instance is returned
-     * and the external application needs to handle this case   
-     */
-    public static EntropyEstimator createEstimator(String type, 
-                                                   String base) throws IllegalArgumentException 
-    {
-        //empirical estimator
-        if(EmpiricalEntropyEstimator.EMPIRICAL_ESTIMATOR.equalsIgnoreCase(type)) {
-            return new EmpiricalEntropyEstimator(base);
-        }
-        //chao-shen estimator
-        if(EmpiricalEntropyEstimator.CHAOSHEN_ESTIMATOR.equalsIgnoreCase(type)) {
-           return new ChaoShenEntropyEstimator(base);
-        }
-        //unsupported estimator type
-        throw new IllegalArgumentException("invalid input entropy estimator type. " +
-                    "Please refer to StreamingEntropy's javadoc for the supported estimator types");        
-    }
-    
-    /*
-     * accumulate occurrence frequency from input stream of samples
-     * @param cx the occurrence frequency of the last input sample
-     */
-    public abstract void accumulate(long cx) throws ExecException;
-
-    /*
-     * calculate the output entropy value
-     * return entropy value as a double type number
-     */
-    public abstract double getEntropy();
-
-    /*
-     * cleanup and reset internal states
-     */
-    public abstract void reset();
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/entropy/EntropyUtil.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EntropyUtil.java b/src/java/datafu/pig/stats/entropy/EntropyUtil.java
deleted file mode 100644
index 685f3e2..0000000
--- a/src/java/datafu/pig/stats/entropy/EntropyUtil.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats.entropy;
-
-public class EntropyUtil {
-    
-    public static final String LOG = "log";
-    
-    public static final String LOG2 = "log2";
-    
-    public static final String LOG10 = "log10";
-
-    /*
-     * Transform the input entropy to that in the input logarithm base
-     * The input entropy is assumed to be calculated in the Euler's number logarithm base
-     * */
-    public static double logTransform(double h, String base) {
-        if(LOG2.equalsIgnoreCase(base)) {
-            return h / Math.log(2);
-        }
-
-        if(LOG10.equalsIgnoreCase(base)) {
-            return h / Math.log(10);
-        }
-        
-        return h;
-    }
-    
-    public static boolean isValidLogBase(String base) {
-        return LOG.equalsIgnoreCase(base) ||
-               LOG2.equalsIgnoreCase(base) ||
-               LOG10.equalsIgnoreCase(base);
-    }
-}


Mime
View raw message