DATAFU-2 UDFs for entropy and weighted sampling algorithms
https://issues.apache.org/jira/browse/DATAFU-2
Signed-off-by: Matt Hayes <mhayes@linkedin.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/e8084146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/e8084146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/e8084146
Branch: refs/heads/master
Commit: e8084146843bd546f9af51bffa32f29fbf7633db
Parents: 9d7dffd
Author: Jian J. Wang <wjian@dogfavorshot-lm.peking.corp.yahoo.com>
Authored: Wed Jan 22 10:41:37 2014 -0800
Committer: Matt Hayes <mhayes@linkedin.com>
Committed: Wed Jan 22 10:41:37 2014 -0800
----------------------------------------------------------------------
.../datafu/pig/sampling/ReservoirSample.java | 36 +-
src/java/datafu/pig/sampling/ScoredTuple.java | 17 +-
.../pig/sampling/WeightedReservoirSample.java | 265 +++++++++
.../stats/entropy/ChaoShenEntropyEstimator.java | 230 ++++++++
.../entropy/EmpiricalEntropyEstimator.java | 71 +++
src/java/datafu/pig/stats/entropy/Entropy.java | 428 ++++++++++++++
.../pig/stats/entropy/EntropyEstimator.java | 87 +++
.../datafu/pig/stats/entropy/EntropyUtil.java | 51 ++
.../pig/stats/entropy/StreamingCondEntropy.java | 269 +++++++++
.../pig/stats/entropy/StreamingEntropy.java | 223 +++++++
.../WeightedReservoirSamplingTests.java | 293 ++++++++++
.../pig/stats/entropy/AbstractEntropyTests.java | 46 ++
.../test/pig/stats/entropy/EntropyTests.java | 585 +++++++++++++++++++
.../entropy/StreamingChaoShenEntropyTests.java | 373 ++++++++++++
.../StreamingEmpiricalCondEntropyTests.java | 412 +++++++++++++
.../entropy/StreamingEmpiricalEntropyTests.java | 429 ++++++++++++++
16 files changed, 3809 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/sampling/ReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/ReservoirSample.java b/src/java/datafu/pig/sampling/ReservoirSample.java
index 403bfaf..747b28c 100644
--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -56,9 +56,12 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
@Nondeterministic
public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Algebraic
{
- Integer numSamples;
+ protected Integer numSamples;
+
private Reservoir reservoir;
+ protected ScoredTuple.ScoreGenerator scoreGen;
+
private Reservoir getReservoir()
{
if (reservoir == null) {
@@ -71,13 +74,23 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
{
this.numSamples = Integer.parseInt(numSamples);
}
-
+
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(this.scoreGen == null)
+ {
+ this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+ }
+ return this.scoreGen;
+ }
+
@Override
public void accumulate(Tuple input) throws IOException
{
DataBag samples = (DataBag) input.get(0);
+ ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
for (Tuple sample : samples) {
- getReservoir().consider(new ScoredTuple(Math.random(), sample));
+ getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
}
}
@@ -85,6 +98,7 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
public void cleanup()
{
this.reservoir = null;
+ this.scoreGen = null;
}
@Override
@@ -159,6 +173,7 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
{
int numSamples;
private Reservoir reservoir;
+ protected ScoredTuple.ScoreGenerator scoreGen;
TupleFactory tupleFactory = TupleFactory.getInstance();
public Initial(){}
@@ -175,11 +190,22 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
}
return reservoir;
}
+
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(this.scoreGen == null)
+ {
+ this.scoreGen = new ScoredTuple.PureRandomScoreGenerator();
+ }
+ return this.scoreGen;
+ }
@Override
public Tuple exec(Tuple input) throws IOException {
DataBag output = BagFactory.getInstance().newDefaultBag();
+ ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
+
DataBag samples = (DataBag) input.get(0);
if (samples == null)
{
@@ -189,11 +215,11 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
// no need to construct a reservoir, so just emit intermediate tuples
for (Tuple sample : samples) {
// add the score on to the intermediate tuple
- output.add(new ScoredTuple(Math.random(), sample).getIntermediateTuple(tupleFactory));
+ output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
}
} else {
for (Tuple sample : samples) {
- getReservoir().consider(new ScoredTuple(Math.random(), sample));
+ getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
}
for (ScoredTuple scoredTuple : getReservoir()) {
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/sampling/ScoredTuple.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/ScoredTuple.java b/src/java/datafu/pig/sampling/ScoredTuple.java
index 293cf3d..c793584 100644
--- a/src/java/datafu/pig/sampling/ScoredTuple.java
+++ b/src/java/datafu/pig/sampling/ScoredTuple.java
@@ -88,5 +88,20 @@ class ScoredTuple implements Comparable<ScoredTuple>
else return -1;
}
return score.compareTo(o.score);
- }
+ }
+
+ static interface ScoreGenerator
+ {
+ double generateScore(Tuple sample) throws ExecException;
+ }
+
+ static class PureRandomScoreGenerator implements ScoreGenerator
+ {
+ public PureRandomScoreGenerator(){}
+
+ public double generateScore(Tuple sample)
+ {
+ return Math.random();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/sampling/WeightedReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/WeightedReservoirSample.java b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
new file mode 100644
index 0000000..0792411
--- /dev/null
+++ b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * <p>
+ * Performs a weighted random sample using an in-memory reservoir to produce
+ * a weighted random sample of a given size based on the A-Res algorithm described in
+ * {@link <a href="http://utopia.duth.gr/~pefraimi/research/data/2007EncOfAlg.pdf" target="_blank">paper</a>}.
+ * </p>
+ * <p>
+ * Species with larger weight have higher probability to be selected in the final sample set.
+ * </p>
+ * <p>
+ * This UDF inherits from {@link ReservoirSample} and it is guaranteed to produce
+ * a sample of the given size. Similarly it comes at the cost of scalability.
+ * since it uses internal storage with size equaling the desired sample to guarantee the exact sample size.
+ * </p>
+ * <p>
+ * Its constructor takes 2 arguments.
+ * <ul>
+ * <li>The 1st argument specifies the sample size which should be a string of positive integer.
+ * <li>The 2nd argument specifies the index of the weight field in the input tuple,
+ * which should be a string of non-negative integer that is no greater than the input tuple size.
+ * </ul>
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+ * input = LOAD 'input' AS (v1:chararray, v2:INT);
+ * input_g = GROUP input ALL;
+ * sampled = FOREACH input_g GENERATE WeightedSample(input);
+ * }
+ * </pre>
+ * </p>
+ * @author wjian
+ */
+
+@Nondeterministic
+public class WeightedReservoirSample extends ReservoirSample {
+
+ private Integer weightIdx;
+
+ public WeightedReservoirSample(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ this.weightIdx = Integer.parseInt(strWeightIdx);
+ if(this.weightIdx < 0) {
+ throw new IllegalArgumentException("Invalid negative index of weight field argument for WeightedReserviorSample constructor: "
+ + strWeightIdx);
+ }
+ }
+
+ @Override
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(super.scoreGen == null)
+ {
+ super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+ }
+ return this.scoreGen;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG) {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema tupleSchema = inputBagSchema.getField(0).schema;
+
+ if(tupleSchema == null) {
+ throw new RuntimeException("The tuple of input bag has no schema");
+ }
+
+ List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+
+ if(fieldSchemaList == null || fieldSchemaList.size() <= Math.max(0, this.weightIdx)) {
+ throw new RuntimeException("The field schema of the input tuple is null " +
+ "or the tuple size is no more than the weight field index: "
+ + this.weightIdx);
+ }
+
+ if(fieldSchemaList.get(this.weightIdx).type != DataType.INTEGER &&
+ fieldSchemaList.get(this.weightIdx).type != DataType.LONG &&
+ fieldSchemaList.get(this.weightIdx).type != DataType.FLOAT &&
+ fieldSchemaList.get(this.weightIdx).type != DataType.DOUBLE)
+ {
+ String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
+ DataType.findTypeName(DataType.LONG),
+ DataType.findTypeName(DataType.FLOAT),
+ DataType.findTypeName(DataType.DOUBLE)};
+ throw new RuntimeException("Expect the type of the weight field of the input tuple to be of (" +
+ java.util.Arrays.toString(expectedTypes) + "), but instead found (" +
+ DataType.findTypeName(fieldSchemaList.get(this.weightIdx).type) + "), weight field: " +
+ this.weightIdx);
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+ inputFieldSchema.schema, DataType.BAG));
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ String param = null;
+
+ private String getParam()
+ {
+ if (this.param == null) {
+ if(super.numSamples != null && this.weightIdx != null) {
+ this.param = String.format("('%d','%d')",
+ super.numSamples,
+ this.weightIdx);
+ } else {
+ this.param = "";
+ }
+ }
+
+ return this.param;
+ }
+
+
+ @Override
+ public String getInitial()
+ {
+ return Initial.class.getName() + getParam();
+ }
+
+ @Override
+ public String getIntermed()
+ {
+ return Intermediate.class.getName() + getParam();
+ }
+
+ @Override
+ public String getFinal()
+ {
+ return Final.class.getName() + getParam();
+ }
+
+ static public class Initial extends ReservoirSample.Initial
+ {
+ private Integer weightIdx;
+
+ public Initial()
+ {
+ super();
+ this.weightIdx = null;
+ }
+
+ public Initial(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ this.weightIdx = Integer.parseInt(strWeightIdx);
+ if(this.weightIdx < 0) {
+ throw new IllegalArgumentException("Invalid negative index of weight field for WeightedReserviorSample.Initial constructor: "
+ + strWeightIdx);
+ }
+ }
+
+ @Override
+ protected ScoredTuple.ScoreGenerator getScoreGenerator()
+ {
+ if(super.scoreGen == null)
+ {
+ super.scoreGen = new InverseWeightScoreGenerator(this.weightIdx);
+ }
+ return super.scoreGen;
+ }
+ }
+
+ static public class Intermediate extends ReservoirSample.Intermediate
+ {
+ public Intermediate()
+ {
+ super();
+ }
+
+ public Intermediate(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ }
+ }
+
+ static public class Final extends ReservoirSample.Final
+ {
+ public Final()
+ {
+ super();
+ }
+
+ public Final(String strNumSamples, String strWeightIdx)
+ {
+ super(strNumSamples);
+ }
+ }
+
+ static class InverseWeightScoreGenerator implements ScoredTuple.ScoreGenerator
+ {
+ //index of the weight field of the input tuple
+ private int weightIdx;
+
+ InverseWeightScoreGenerator(Integer weightIdx)
+ {
+ if(weightIdx == null || weightIdx < 0) {
+ throw new IllegalArgumentException("Invalid null or negative weight index input: " + weightIdx);
+ }
+ this.weightIdx = weightIdx;
+ }
+
+ @Override
+ public double generateScore(Tuple sample) throws ExecException
+ {
+ double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
+ if(Double.compare(weight, 0.0) <= 0)
+ {
+ //non-positive weight should be avoided
+ throw new ExecException(String.format("Invalid sample weight [%f]. It should be a positive real number", weight));
+ }
+ //a differnt approach to try: u^(1/w) could be exp(log(u)/w) ?
+ return Math.pow(Math.random(), 1/weight);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java b/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
new file mode 100644
index 0000000..bfb7398
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/ChaoShenEntropyEstimator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+
+import java.util.Collections;
+
+
+
+/*
+ * This class implements the Chao-Shen entropy estimator based on this
+ * {@link <a href="http://www.researchgate.net/publication/226041346_Nonparametric_estimation_of_Shannons_index_of_diversity_when_there_are_unseen_species_in_sample/file/d912f510a79b8aae74.pdf" target="_blank">paper</a>}
+ * <p>
+ * It combines the {@link <a href="http://en.wikipedia.org/wiki/Horvitz%E2%80%93Thompson_estimator" target="_blank">Horvitz-Thompson estimator</a>}
+ * to adjust for missing species and the concept of sample coverage to estimate the relative abundances of species in the sample.
+ * The entropy calculation formula is as follows:
+ * <pre>
+ * {@code
+ * H(X) = - SUM( ( C * c(x) / N * log(C * c(x) / N) ) / ( 1 - (1 - C * c(x) / N) ^ N ) )
+ * c(x) is the occurrence frequency of sample x, N is the sum of c(x)
+ * C = 1 - N1 / N, N1 is the number of samples whose c(x) equals 1.
+ * }
+ * </pre>
+ * </p>
+ */
+class ChaoShenEntropyEstimator extends EntropyEstimator {
+ //sample frequency
+ private AccumulativeSampleFrequencyMap freqBaggedMap;
+
+ //sum of frequency of all samples
+ private long N;
+
+ //number of samples whose occurrence frequency is 1
+ private long N1;
+
+ ChaoShenEntropyEstimator(String base){
+ super(base);
+ reset();
+ }
+
+ @Override
+ public void accumulate(long cx) throws ExecException {
+ if(cx > 0) {
+ freqBaggedMap.accumulate(cx);
+ N += cx;
+ if(cx == 1) {
+ N1++;
+ }
+ }
+ }
+
+ @Override
+ public double getEntropy() {
+ double h = 0.0;
+
+ if(N > 0) {
+
+ if(N1 == N) {
+ //avoid c == 0
+ N1 = N - 1;
+ }
+
+ //sample coverage estimation
+ double c = 1 - (double)N1 / N;
+
+ try {
+ //from in-memory frequency map
+ for(Iterator<Map.Entry<Long, MutableLong>> iter = this.freqBaggedMap.getInternalMap().entrySet().iterator();
+ iter.hasNext(); ) {
+ Map.Entry<Long, MutableLong> entry = iter.next();
+ long cx = entry.getKey();
+ long cnt = entry.getValue().longValue();
+ h += accumlateEntropy(cx, this.N, c, cnt);
+ }
+ //from backup databag
+ for(Iterator<Tuple> iter = this.freqBaggedMap.getInternalBag().iterator();
+ iter.hasNext(); ) {
+ Tuple t = iter.next();
+ long cx = (Long)t.get(0);
+ long cnt = (Long)t.get(1);
+ h += accumlateEntropy(cx, this.N, c, cnt);
+ }
+ } catch(ExecException ex) {
+ throw new RuntimeException(
+ "Error while computing chao-shen entropy, exception: " + ex);
+ }
+ }
+
+ return EntropyUtil.logTransform(h, super.base);
+ }
+
+ private double accumlateEntropy(long cx,
+ long N,
+ double c,
+ long cnt) {
+ //sample proportion
+ double p = (double)cx / N;
+ //sample coverage adjusted probability
+ double pa = c * p;
+ //probability to see an individual in the sample
+ double la = 1 - Math.pow((1 - pa), N);
+ return -(cnt * pa * Math.log(pa) / la);
+ }
+
+ @Override
+ public void reset() {
+ this.N = 0;
+ this.N1 = 0;
+ if(this.freqBaggedMap != null) {
+ this.freqBaggedMap.clear();
+ }
+ this.freqBaggedMap = new AccumulativeSampleFrequencyMap();
+ }
+
+ /*
+ * A map backed by a data bag to record the sample occurrence frequencies which might be repeated
+ * The purpose of this class is that we suspect in the real applications, the sample occurrence frequency
+ * might be repeated and putting all these repeated numbers into databag may be space inefficient
+ * So we have an in-memory map to record the repetitions and use a databag to backup when the size of the
+ * in-memory map exceeds a pre-defined threshold. At present we use 5M as the default threshold to control
+ * the number of entries in the in-memory map, which approximates to 80M bytes.
+ */
+ private class AccumulativeSampleFrequencyMap {
+
+ private long spillBytesThreshold;
+
+ /* key is the sample occurrence frequency
+ * value is the number of repetitions of this frequency
+ */
+ private Map<Long, MutableLong> countMap;
+
+ /* the backed databag
+ * each tuple has 2 elements
+ * the 1st element is sample occurrence frequency
+ * the 2nd element is the number of repetitions
+ */
+ private DataBag countBag;
+
+ AccumulativeSampleFrequencyMap(){
+ this(1024 * 1024 * 5 * (Long.SIZE + Long.SIZE) / 8);
+ }
+
+ AccumulativeSampleFrequencyMap(long spillBytesThreshold) {
+ this.spillBytesThreshold = spillBytesThreshold;
+ clear();
+ }
+
+ void accumulate(long key) throws ExecException {
+ MutableLong val = this.countMap.get(key);
+ if(val == null) {
+ this.countMap.put(key, new MutableLong(1));
+ } else {
+ val.add(1);
+ }
+
+ if(this.countMap.size() * (Long.SIZE + Long.SIZE) / 8 > spillBytesThreshold) {
+ spillFromMap2Bag();
+ }
+ }
+
+ private void spillFromMap2Bag() throws ExecException {
+ for(Map.Entry<Long, MutableLong> entry : this.countMap.entrySet()) {
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, entry.getKey());
+ t.set(1, entry.getValue().longValue());
+ this.countBag.add(t);
+ }
+ this.countMap.clear();
+ }
+
+ Map<Long, MutableLong> getInternalMap() {
+ return Collections.unmodifiableMap(this.countMap);
+ }
+
+ DataBag getInternalBag() {
+ return this.countBag;
+ }
+
+ void clear() {
+ countMap = new HashMap<Long, MutableLong>();
+ countBag = BagFactory.getInstance().newDefaultBag();
+ }
+ }
+
+ private class MutableLong {
+ private long val;
+
+ MutableLong(){
+ this(0L);
+ }
+
+ MutableLong(long val) {
+ this.val = val;
+ }
+
+ long longValue(){
+ return val;
+ }
+
+ void add(long additive){
+ this.val += additive;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java b/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
new file mode 100644
index 0000000..3f0acab
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/EmpiricalEntropyEstimator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+/*
+ * This entropy estimator calculates the empirical entropy of given species
+ * using the occurrence frequency of individual in the sample as an estimation of its probability.
+ * The empirical estimator is also called the maximum likelihood estimator (MLE).
+ * <p>
+ * It applies the following formula to compute entropy:
+ * <pre>
+ * {@code
+ * H(X) = log(N) - 1 / N * SUM(c(x) * log(c(x)) )
+ * c(x) is the occurrence frequency of sample x, N is the sum of c(x)
+ * }
+ * </pre>
+ * </p>
+ * <p>
+ * This entropy estimator is widely used when the number of species is known and small.
+ * But it is biased since it does not cover the rare species with zero frequency in the sample.
+ * </p>
+ */
+class EmpiricalEntropyEstimator extends EntropyEstimator {
+
+ //sum of frequency cx of all input samples
+ private long N;
+
+ //sum of cx * log(cx) of all input samples
+ private double M;
+
+ EmpiricalEntropyEstimator(String base) throws IllegalArgumentException {
+ super(base);
+ reset();
+ }
+
+ @Override
+ public void accumulate(long cx) {
+ if(cx > 0) {
+ N += cx;
+ M += cx * Math.log(cx);
+ }
+ }
+
+ @Override
+ public double getEntropy() {
+ return N > 0 ? EntropyUtil.logTransform(Math.log(N) - M / N, super.base) : 0;
+ }
+
+ @Override
+ public void reset(){
+ N = 0;
+ M = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/Entropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/Entropy.java b/src/java/datafu/pig/stats/entropy/Entropy.java
new file mode 100644
index 0000000..a074bc0
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/Entropy.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.stats.entropy.EntropyUtil;
+
+
+/**
+ * Calculate the empirical entropy given a bag of data sample counts following entropy's
+ * {@link <a href="http://en.wikipedia.org/wiki/Entropy_%28information_theory%29" target="_blank">wiki definition</a>}
+ * <p>
+ * It supports entropy calculation both in a streaming way and in a distributed way using combiner.
+ * </p>
+ * <p>
+ * This UDF's constructor accepts the logarithm base as its single argument.
+ * The definition of supported logarithm base is the same as {@link datafu.pig.stats.entropy.StreamingEntropy}
+ * </p>
+ * <p>
+ * Note:
+ * <ul>
+ * <li>the input to the UDF is a bag of data sample's occurrence frequency,
+ * which is different from * {StreamingEntropy}, whose input is a sorted bag of raw data samples.
+ * <li>the UDF accepts int or long number. Other data types will be rejected and an exception will be thrown
+ * <li>the input int or long number should be non-negative,
+ * negative input will be silently discarded and a warning message will be logged.
+ * <li>the returned entropy value is of double type.
+ * </ul>
+ * </p>
+ * <p>
+ * How to use:
+ * </p>
+ * <p>
+ * This UDF is suitable to calculate entropy on the whole data set when we
+ * could easily get each sample's frequency using an outer GROUP BY.
+ * </p>
+ * <p>
+ * Then we could use another outer GROUP BY on the sample frequencies to get the entropy.
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * define Entropy datafu.pig.stats.entropy.Entropy();
+ *
+ * input = LOAD 'input' AS (val: double);
+ *
+ * -- calculate the occurrence of each sample
+ * counts_g = GROUP input BY val;
+ * counts = FOREACh counts_g GENERATE COUNT(input) AS cnt;
+ *
+ * -- calculate entropy
+ * input_counts_g = GROUP counts ALL;
+ * entropy = FOREACH input_counts_g GENERATE Entropy(counts) AS entropy;
+ * }
+ * </pre>
+ * </p>
+ * Use case to calculate mutual information using Entropy:
+ * <p>
+ * <pre>
+ * {@code
+ *
+ * define Entropy datafu.pig.stats.entropy.Entropy();
+ *
+ * input = LOAD 'input' AS (valX: double, valY: double);
+ *
+ * ------------
+ * -- calculate mutual information I(X, Y) using entropy
+ * -- I(X, Y) = H(X) + H(Y) - H(X, Y)
+ * ------------
+ *
+ * input_x_y_g = GROUP input BY (valX, valY);
+ * input_x_y_cnt = FOREACH input_x_y_g GENERATE flatten(group) as (valX, valY), COUNT(input) AS cnt;
+ *
+ * input_x_g = GROUP input_x_y_cnt BY valX;
+ * input_x_cnt = FOREACH input_x_g GENERATE flatten(group) as valX, SUM(input_x_y_cnt.cnt) AS cnt;
+ *
+ * input_y_g = GROUP input_x_y_cnt BY valY;
+ * input_y_cnt = FOREACH input_y_g GENERATE flatten(group) as valY, SUM(input_x_y_cnt.cnt) AS cnt;
+ *
+ * input_x_y_entropy_g = GROUP input_x_y_cnt ALL;
+ * input_x_y_entropy = FOREACH input_x_y_entropy_g {
+ * input_x_y_entropy_cnt = input_x_y_cnt.cnt;
+ * GENERATE Entropy(input_x_y_entropy_cnt) AS x_y_entropy;
+ * }
+ *
+ * input_x_entropy_g = GROUP input_x_cnt ALL;
+ * input_x_entropy = FOREACH input_x_entropy_g {
+ * input_x_entropy_cnt = input_x_cnt.cnt;
+ * GENERATE Entropy(input_x_entropy_cnt) AS x_entropy;
+ * }
+ *
+ * input_y_entropy_g = GROUP input_y_cnt ALL;
+ * input_y_entropy = FOREACH input_y_entropy_g {
+ * input_y_entropy_cnt = input_y_cnt.cnt;
+ * GENERATE Entropy(input_y_entropy_cnt) AS y_entropy;
+ * }
+ *
+ * input_mi_cross = CROSS input_x_y_entropy, input_x_entropy, input_y_entropy;
+ * input_mi = FOREACH input_mi_cross GENERATE (input_x_entropy::x_entropy +
+ * input_y_entropy::y_entropy -
+ * input_x_y_entropy::x_y_entropy) AS mi;
+ * }
+ * </pre>
+ * </p>
+ * @see datafu.pig.stats.entropy.StreamingEntropy
+ */
+
+public class Entropy extends AccumulatorEvalFunc<Double> implements Algebraic {
+
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ //entropy estimator for accumulator
+ //re-use the same entropy estimator for StreamingEntropy
+ private EntropyEstimator streamEstimator;
+
+ //logarithm base
+ private String base;
+
+ public Entropy() throws ExecException {
+ //empirical estimator using Euler's number as logarithm base
+ this(EntropyUtil.LOG);
+ }
+
+ public Entropy(String base) throws ExecException {
+ try {
+ this.streamEstimator = EntropyEstimator.createEstimator(EntropyEstimator.EMPIRICAL_ESTIMATOR, base);
+ } catch (IllegalArgumentException ex) {
+ throw new ExecException(
+ String.format("Fail to initialize Entropy with logarithm base: (%s), exception: (%s)", base, ex));
+ }
+ this.base = base;
+ }
+
+ /*
+ * Algebraic implementation part
+ */
+
+ private String param = null;
+ private String getParam()
+ {
+ if (param == null) {
+ if (this.base != null) {
+ param = String.format("('%s')", this.base);
+ } else {
+ param = "";
+ }
+ }
+ return param;
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName() + getParam();
+ }
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName() + getParam();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermediate.class.getName() + getParam();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+
+ public Initial(){}
+
+ public Initial(String base){}
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ Tuple t = mTupleFactory.newTuple(2);
+
+ try{
+ //input is a bag with one tuple containing
+ //the sample's occurrence frequency
+ DataBag bg = (DataBag) input.get(0);
+ Long cxl = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ cxl = ((Number)(tp.get(0))).longValue();
+ }
+
+ if(cxl == null || cxl.longValue() < 0) {
+ //emit null in case of invalid input frequency
+ t.set(0, null);
+ t.set(1, null);
+ warn("Non-positive input frequency number: " + cxl, PigWarning.UDF_WARNING_1);
+ } else {
+ long cx = cxl.longValue();
+ double logcx = (cx > 0 ? Math.log(cx) : 0);
+ double cxlogcx = cx * logcx;
+
+ //1st element of the returned tuple is freq * log(freq)
+ t.set(0, cxlogcx);
+
+ //2nd element of the returned tuple is freq
+ t.set(1, cxl);
+ }
+
+ return t;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch(Exception e) {
+ int errCode = 10080;
+ String msg = "Error while computing entropy in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+
+ }
+
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+
+ public Intermediate(){}
+
+ public Intermediate(String base){}
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ DataBag b = (DataBag)input.get(0);
+ return combine(b);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 10081;
+ String msg = "Error while computing entropy in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+
+ static public class Final extends EvalFunc<Double> {
+ private String base;
+
+ public Final()
+ {
+ this(EntropyUtil.LOG);
+ }
+
+ public Final(String base)
+ {
+ this.base = base;
+ }
+
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ try {
+ DataBag b = (DataBag)input.get(0);
+ Tuple combined = combine(b);
+
+ Double sumOfCxLogCx = (Double)combined.get(0);
+ Long sumOfCx = (Long)combined.get(1);
+
+ if(sumOfCxLogCx == null || sumOfCx == null) {
+ //emit null if there is at least 1 invalid input
+ warn("Invalid null field output from combine(), " +
+ "1st field: " + sumOfCxLogCx + ", 2nd field: " + sumOfCx, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ Double entropy = null;
+
+ double scxlogcx = sumOfCxLogCx.doubleValue();
+ long scx = sumOfCx.longValue();
+
+ if (scx > 0) {
+ //H(X) = log(N) - 1 / N * SUM(c(x) * log(c(x)) )
+ entropy = EntropyUtil.logTransform(Math.log(scx) - scxlogcx / scx, this.base);
+ }
+
+ return entropy;
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 10082;
+ String msg = "Error while computing average in " + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+
+ }
+ }
+
+ static protected Tuple combine(DataBag values) throws ExecException {
+ Tuple output = mTupleFactory.newTuple(2);
+
+ boolean sawNonNull = false;
+ double sumOfCxLogCx = 0;
+ long sumOfCx = 0;
+
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ Double scxlogcx = (Double)t.get(0);
+ Long scx = (Long)t.get(1);
+
+ if(scxlogcx != null && scx != null) {
+ sumOfCxLogCx += scxlogcx.doubleValue();
+ sumOfCx += scx.longValue();
+ sawNonNull = true;
+ }
+ }
+
+ if (sawNonNull) {
+ output.set(0, sumOfCxLogCx);
+ output.set(1, sumOfCx);
+ } else {
+ //emit null if there is no invalid input
+ output.set(0, null);
+ output.set(1, null);
+ }
+
+ return output;
+ }
+
+ /*
+ * Accumulator implementation part
+ */
+
+ @Override
+ public void accumulate(Tuple input) throws IOException
+ {
+ for (Tuple t : (DataBag) input.get(0)) {
+ long cx = ((Number)(t.get(0))).longValue();
+ this.streamEstimator.accumulate(cx);
+ }
+ }
+
+ @Override
+ public Double getValue()
+ {
+ return this.streamEstimator.getEntropy();
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.streamEstimator.reset();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema tupleSchema = inputBagSchema.getField(0).schema;
+
+ if(tupleSchema == null) {
+ throw new RuntimeException("The tuple of input bag has no schema");
+ }
+
+ List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+
+ if(fieldSchemaList == null || fieldSchemaList.size() != 1) {
+ throw new RuntimeException("The field schema of the input tuple is null or its size is not 1");
+ }
+
+ if(fieldSchemaList.get(0).type != DataType.INTEGER &&
+ fieldSchemaList.get(0).type != DataType.LONG )
+ {
+ String[] expectedTypes = new String[] {DataType.findTypeName(DataType.INTEGER),
+ DataType.findTypeName(DataType.LONG)};
+ throw new RuntimeException("Expect the type of the input tuple to be of (" +
+ java.util.Arrays.toString(expectedTypes) + "), but instead found " +
+ DataType.findTypeName(fieldSchemaList.get(0).type));
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ DataType.DOUBLE));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EntropyEstimator.java b/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
new file mode 100644
index 0000000..336ef86
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/EntropyEstimator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+/*
+* Entropy estimator which exposes a unified interface to application
+* and hides the implementation details in its subclasses
+*/
+public abstract class EntropyEstimator {
+
+ public static final String EMPIRICAL_ESTIMATOR = "empirical";
+
+ public static final String CHAOSHEN_ESTIMATOR = "chaosh";
+
+ /*
+ * logarithm base
+ * by default, we use Euler's number as the default logarithm base
+ */
+ protected String base;
+
+ protected EntropyEstimator(String base) throws IllegalArgumentException {
+ if(!EntropyUtil.isValidLogBase(base)) {
+ throw new IllegalArgumentException("Invalid input logarithm base. " +
+ "Please refer to StreamingEntropy's javadoc for supported logarithm base");
+ }
+ this.base = base;
+ }
+
+ /*
+ * create estimator instance based on the input type
+ * @param type type of entropy estimator
+ * @param base logarithm base
+ * if the method is not supported, a null estimator instance is returned
+ * and the external application needs to handle this case
+ */
+ public static EntropyEstimator createEstimator(String type,
+ String base) throws IllegalArgumentException
+ {
+ //empirical estimator
+ if(EmpiricalEntropyEstimator.EMPIRICAL_ESTIMATOR.equalsIgnoreCase(type)) {
+ return new EmpiricalEntropyEstimator(base);
+ }
+ //chao-shen estimator
+ if(EmpiricalEntropyEstimator.CHAOSHEN_ESTIMATOR.equalsIgnoreCase(type)) {
+ return new ChaoShenEntropyEstimator(base);
+ }
+ //unsupported estimator type
+ throw new IllegalArgumentException("invalid input entropy estimator type. " +
+ "Please refer to StreamingEntropy's javadoc for the supported estimator types");
+ }
+
+ /*
+ * accumulate occurrence frequency from input stream of samples
+ * @param cx the occurrence frequency of the last input sample
+ */
+ public abstract void accumulate(long cx) throws ExecException;
+
+ /*
+ * calculate the output entropy value
+ * return entropy value as a double type number
+ */
+ public abstract double getEntropy();
+
+ /*
+ * cleanup and reset internal states
+ */
+ public abstract void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/EntropyUtil.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/EntropyUtil.java b/src/java/datafu/pig/stats/entropy/EntropyUtil.java
new file mode 100644
index 0000000..685f3e2
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/EntropyUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+public class EntropyUtil {
+
+ public static final String LOG = "log";
+
+ public static final String LOG2 = "log2";
+
+ public static final String LOG10 = "log10";
+
+ /*
+ * Transform the input entropy to that in the input logarithm base
+ * The input entropy is assumed to be calculated in the Euler's number logarithm base
+ * */
+ public static double logTransform(double h, String base) {
+ if(LOG2.equalsIgnoreCase(base)) {
+ return h / Math.log(2);
+ }
+
+ if(LOG10.equalsIgnoreCase(base)) {
+ return h / Math.log(10);
+ }
+
+ return h;
+ }
+
+ public static boolean isValidLogBase(String base) {
+ return LOG.equalsIgnoreCase(base) ||
+ LOG2.equalsIgnoreCase(base) ||
+ LOG10.equalsIgnoreCase(base);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java b/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java
new file mode 100644
index 0000000..67ad4d3
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/StreamingCondEntropy.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import org.apache.pig.AccumulatorEvalFunc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+
+/**
+ * Calculate the conditional entropy H(Y|X) of random variables X and Y according to its
+ * {@link <a href="http://en.wikipedia.org/wiki/Conditional_entropy" target="_blank">wiki definition</a>},
+ * X is the conditional variable and Y is the variable that conditions on X.
+ * <p>
+ * The input to this UDF is a bag of 2 field tuple.
+ * <ul>
+ * <li>the 1st field of the tuple is an instance of variable X.
+ * <li>the 2nd field of the tuple is an instance of variable Y.
+ * </ul>
+ * </p>
+ * <p>
+ * An exception will be thrown if the input tuple does not have 2 fields.
+ * </p>
+ * <p>
+ * This UDF's constructor definition and parameters are the same as that of * {@link datafu.pig.stats.entropy.StreamingEntropy}
+ * </p>
+ * <p>
+ * Note:
+ * <ul>
+ * <li>input bag to the UDF must be sorted on X and Y, with X in the first order.
+ * <li>Entropy value is returned as double type.
+ * </ul>
+ * </p>
+ * <p>
+ * How to use:
+ * </p>
+ * <p>
+ * This UDF is suitable to calculate conditional entropy in a nested FOREACH after a GROUP BY,
+ * where we sort the inner bag and use the sorted bag as the input to this UDF.
+ * </p>
+ * <p>
+ * This is a scenario in which we would like to get 2 variables' conditional entropy in different constraint groups.
+ * </p>
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * --define empirical conditional entropy with Euler's number as the logarithm base
+ * define CondEntropy datafu.pig.stats.entropy.stream.StreamingCondEntropy();
+ *
+ * input = LOAD 'input' AS (grp: chararray, valX: double, valY: double);
+ *
+ * -- calculate conditional entropy H(Y|X) in each group
+ * input_group_g = GROUP input BY grp;
+ * entropy_group = FOREACH input_group_g {
+ * input_val = input.(valX, valY)
+ * input_ordered = ORDER input_val BY $0, $1;
+ * GENERATE FLATTEN(group) AS group, CondEntropy(input_ordered) AS cond_entropy;
+ * }
+ * }
+ * </pre>
+ * </p>
+ * Use case to calculate mutual information:
+ * <p>
+ * <pre>
+ * {@code
+ * ------------
+ * -- calculate mutual information I(X, Y) using streaming conditional entropy and streaming entropy
+ * -- I(X, Y) = H(Y) - H(Y|X)
+ * ------------
+ *
+ * define CondEntropy datafu.pig.stats.entropy.stream.StreamingCondEntropy();
+ * define Entropy datafu.pig.stats.entropy.stream.StreamingEntropy();
+ *
+ * input = LOAD 'input' AS (grp: chararray, valX: double, valY: double);
+ *
+ * -- calculate the I(X,Y) in each group
+ * input_group_g = GROUP input BY grp;
+ * mutual_information = FOREACH input_group_g {
+ * input_val_x_y = input.(valX, valY);
+ * input_val_x_y_ordered = ORDER input_val_x_y BY $0,$1;
+ * input_val_y = input.valY;
+ * input_val_y_ordered = ORDER input_val_y BY $0;
+ * cond_h_x_y = CondEntropy(input_val_x_y_ordered);
+ * h_y = Entropy(input_val_y_ordered);
+ * GENERATE FLATTEN(group), h_y - cond_h_x_y;
+ * }
+ * }
+ * </pre>
+ * </p>
+ * @see StreamingEntropy
+ */
+public class StreamingCondEntropy extends AccumulatorEvalFunc<Double> {
+ //last visited tuple of <x,y>
+ private Tuple xy;
+
+ //number of occurrence of last visited <x,y>
+ private long cxy;
+
+ //number of occurrence of last visited x
+ private long cx;
+
+ //comparison result between the present tuple and the last visited tuple
+ private int lastCmp;
+
+ //entropy estimator for H(x,y)
+ private EntropyEstimator combEstimator;
+
+ //entropy estimator for H(x)
+ private EntropyEstimator condXEstimator;
+
+ public StreamingCondEntropy() throws ExecException
+ {
+ this(EntropyEstimator.EMPIRICAL_ESTIMATOR);
+ }
+
+ public StreamingCondEntropy(String type) throws ExecException
+ {
+ this(type, EntropyUtil.LOG);
+ }
+
+ public StreamingCondEntropy(String type, String base) throws ExecException
+ {
+ try {
+ this.combEstimator = EntropyEstimator.createEstimator(type, base);
+ this.condXEstimator = EntropyEstimator.createEstimator(type, base);
+ } catch (IllegalArgumentException ex) {
+ throw new ExecException(String.format(
+ "Fail to initialize StreamingCondEntropy with entropy estimator of type (%s), base: (%s). Exception: (%s)",
+ type, base, ex));
+ }
+ cleanup();
+ }
+
+ /*
+ * Accumulate occurrence frequency of <x,y> and x
+ * as we stream through the input bag
+ */
+ @Override
+ public void accumulate(Tuple input) throws IOException
+ {
+ for (Tuple t : (DataBag) input.get(0)) {
+
+ if (this.xy != null)
+ {
+ int cmp = t.compareTo(this.xy);
+
+ //check if the comparison result is different from previous compare result
+ if ((cmp < 0 && this.lastCmp > 0)
+ || (cmp > 0 && this.lastCmp < 0)) {
+ throw new ExecException("Out of order! previous tuple: " + this.xy + ", present tuple: " + t
+ + ", comparsion: " + cmp + ", previous comparsion: " + this.lastCmp);
+ }
+ if (cmp != 0) {
+ //different <x,y>
+ this.combEstimator.accumulate(this.cxy);
+ this.cxy = 0;
+ this.lastCmp = cmp;
+ if(DataType.compare(this.xy.get(0), t.get(0)) != 0) {
+ //different x
+ this.condXEstimator.accumulate(this.cx);
+ this.cx = 0;
+ }
+ }
+ }
+
+ //set tuple t as the next tuple for comparison
+ this.xy = t;
+
+ //accumulate cx
+ this.cx++;
+
+ //accumulate cxy
+ this.cxy++;
+ }
+ }
+
+ @Override
+ public Double getValue()
+ {
+ //do not miss the last tuple
+ try {
+ this.combEstimator.accumulate(this.cxy);
+ this.condXEstimator.accumulate(this.cx);
+ } catch (ExecException ex) {
+ throw new RuntimeException("Error while accumulating sample frequency: " + ex);
+ }
+
+ //Chain rule: H(Y|X) = H(X, Y) - H(X)
+ return this.combEstimator.getEntropy() - this.condXEstimator.getEntropy();
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.xy = null;
+ this.cxy = 0;
+ this.cx = 0;
+ this.lastCmp = 0;
+ this.combEstimator.reset();
+ this.condXEstimator.reset();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema tupleSchema = inputBagSchema.getField(0).schema;
+
+ if(tupleSchema == null) {
+ throw new RuntimeException("The tuple of the input bag has no schema");
+ }
+
+ List<Schema.FieldSchema> fieldSchemaList = tupleSchema.getFields();
+
+ if(fieldSchemaList == null || fieldSchemaList.size() != 2) {
+ throw new RuntimeException("The field schema of the input tuple is null or its size is not 2");
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ DataType.DOUBLE));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/src/java/datafu/pig/stats/entropy/StreamingEntropy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/entropy/StreamingEntropy.java b/src/java/datafu/pig/stats/entropy/StreamingEntropy.java
new file mode 100644
index 0000000..707e690
--- /dev/null
+++ b/src/java/datafu/pig/stats/entropy/StreamingEntropy.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.stats.entropy;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+
+/**
+ * Calculate entropy of a given stream of raw data samples according to entropy's
+ * {@link <a href="http://en.wikipedia.org/wiki/Entropy_%28information_theory%29" target="_blank">wiki definition</a>}
+ * <p>
+ * Its constructor takes 2 arguments.
+ * </p>
+ * <p>
+ * The 1st argument, the type of entropy estimator algorithm we currently support, includes:
+ * <ul>
+ * <li>empirical (empirical entropy estimator)
+ * <li>chaosh (Chao-Shen entropy estimator)
+ * </ul>
+ * </p>
+ * <p>
+ * The default estimation algorithm is empirical.
+ * </p>
+ * <p>
+ * The 2nd argument, the logarithm base we currently support, includes:
+ * </p>
+ * <p>
+ * <ul>
+ * <li>log (use Euler's number as the logarithm base)
+ * <li>log2 (use 2 as the logarithm base)
+ * <li>log10 (use 10 as the logarithm base)
+ * </ul>
+ * </p>
+ * <p>
+ * The default logarithm base is log.
+ * </p>
+ * <p>
+ * Note:
+ * <ul>
+ * <li>The input bag to the UDF must be sorted.
+ * <li>The entropy value is returned as double type.
+ * </ul>
+ * </p>
+ * <p>
+ * How to use:
+ * </p>
+ * <p>
+ * This UDF is suitable to calculate entropy in a nested FOREACH after a GROUP BY,
+ * where we sort the inner bag and use the sorted bag as the input to this UDF.
+ * </p>
+ * <p>
+ * This is a scenario in which we would like to get a variable's entropy in different constraint groups.
+ * </p>
+ * Example:
+ * <p>
+ * <pre>
+ * {@code
+ * --calculate empirical entropy with Euler's number as the logarithm base
+ * define Entropy datafu.pig.stats.entropy.stream.StreamingEntropy();
+ *
+ * input = LOAD 'input' AS (grp: chararray, val: double);
+ *
+ * -- calculate the input samples' entropy in each group
+ * input_group_g = GROUP input BY grp;
+ * entropy_group = FOREACH input_group_g {
+ * input_val = input.val;
+ * input_ordered = ORDER input_val BY $0;
+ * GENERATE FLATTEN(group) AS group, Entropy(input_ordered) AS entropy;
+ * }
+ * }
+ * </pre>
+ * </p>
+ * @see StreamingCondEntropy
+ */
+public class StreamingEntropy extends AccumulatorEvalFunc<Double>
+{
+ //last visited tuple
+ private Tuple x;
+
+ //number of occurrence of last visited tuple
+ private long cx;
+
+ //comparison result between the present tuple and the last visited tuple
+ private int lastCmp;
+
+ //entropy estimator that accumulates sample's occurrence frequency to
+ //calculates the actual entropy
+ private EntropyEstimator estimator;
+
+ public StreamingEntropy() throws ExecException
+ {
+ this(EntropyEstimator.EMPIRICAL_ESTIMATOR);
+ }
+
+ public StreamingEntropy(String type) throws ExecException
+ {
+ this(type, EntropyUtil.LOG);
+ }
+
+ public StreamingEntropy(String type, String base) throws ExecException
+ {
+ try {
+ this.estimator = EntropyEstimator.createEstimator(type, base);
+ } catch (IllegalArgumentException ex) {
+ throw new ExecException(
+ String.format("Fail to initialize StreamingEntropy with entropy estimator of type (%s), base: (%s), exception: (%s)",
+ type, base, ex)
+ );
+ }
+ cleanup();
+ }
+
+ /*
+ * Accumulate occurrence frequency of each tuple as we stream through the input bag
+ */
+ @Override
+ public void accumulate(Tuple input) throws IOException
+ {
+ for (Tuple t : (DataBag) input.get(0)) {
+
+ if (this.x != null)
+ {
+ int cmp = t.compareTo(this.x);
+
+ //check if the comparison result is different from previous compare result
+ if ((cmp < 0 && this.lastCmp > 0)
+ || (cmp > 0 && this.lastCmp < 0)) {
+ throw new ExecException("Out of order! previous tuple: " + this.x + ", present tuple: " + t
+ + ", comparsion: " + cmp + ", previous comparsion: " + this.lastCmp);
+ }
+
+ if (cmp != 0) {
+ //different tuple
+ this.estimator.accumulate(this.cx);
+ this.cx = 0;
+ this.lastCmp = cmp;
+ }
+ }
+
+ //set tuple t as the next tuple for comparison
+ this.x = t;
+
+ //accumulate cx
+ this.cx++;
+ }
+ }
+
+ @Override
+ public Double getValue()
+ {
+ //do not miss the last tuple
+ try {
+ this.estimator.accumulate(this.cx);
+ } catch (ExecException ex) {
+ throw new RuntimeException("Error while accumulating sample frequency: " + ex);
+ }
+
+ return this.estimator.getEntropy();
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.x = null;
+ this.cx = 0;
+ this.lastCmp = 0;
+ this.estimator.reset();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ DataType.DOUBLE));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
new file mode 100644
index 0000000..f507cbd
--- /dev/null
+++ b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import java.io.IOException;
+import java.util.*;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.WeightedReservoirSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link WeightedReservoirSample}.
+ *
+ * @author wjian
+ *
+ */
+public class WeightedReservoirSamplingTests extends PigTests
+{
+ /**
+ register $JAR_PATH
+
+ define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+
+ data = LOAD 'input' AS (v1:chararray,v2:INT);
+ data_g = group data all;
+ sampled = FOREACH data_g GENERATE WeightedSample(data);
+ --describe sampled;
+
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String weightedSampleTest;
+
+ @Test
+ public void weightedSampleTest() throws Exception
+ {
+ Map<String, Integer> count = new HashMap<String, Integer>();
+
+ count.put("a", 0);
+ count.put("b", 0);
+ count.put("c", 0);
+ count.put("d", 0);
+
+ PigTest test = createPigTestFromString(weightedSampleTest);
+
+ writeLinesToFile("input",
+ "a\t100","b\t1","c\t5","d\t2");
+
+ for(int i = 0; i < 10; i++) {
+ test.runScript();
+
+ List<Tuple> output = this.getLinesForAlias(test, "sampled");
+
+ Tuple t = output.get(0);
+
+ DataBag sampleBag = (DataBag)t.get(0);
+
+ for(Iterator<Tuple> sampleIter = sampleBag.iterator(); sampleIter.hasNext();) {
+ Tuple st = sampleIter.next();
+ String key = (String)st.get(0);
+ count.put(key, count.get(key) + 1);
+ }
+ }
+
+ String maxKey = "";
+ int maxCount = 0;
+ for(String key : count.keySet()) {
+ if(maxCount < count.get(key)) {
+ maxKey = key;
+ maxCount = count.get(key);
+ }
+ }
+
+ Assert.assertEquals(maxKey, "a");
+ }
+
+ @Test
+ public void weightedReservoirSampleAccumulateTest() throws IOException
+ {
+ WeightedReservoirSample sampler = new WeightedReservoirSample("10", "1");
+
+ for (int i=0; i<100; i++)
+ {
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, i);
+ t.set(1, i + 1);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ bag.add(t);
+ Tuple input = TupleFactory.getInstance().newTuple(bag);
+ sampler.accumulate(input);
+ }
+
+ DataBag result = sampler.getValue();
+ verifyNoRepeatAllFound(result, 10, 0, 100);
+ }
+
+ @Test
+ public void weightedReservoirSampleAlgebraicTest() throws IOException
+ {
+ WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
+ WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
+ WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
+
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ for (int i=0; i<100; i++)
+ {
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, i);
+ t.set(1, i + 1);
+ bag.add(t);
+ }
+
+ Tuple input = TupleFactory.getInstance().newTuple(bag);
+
+ Tuple intermediateTuple = initialSampler.exec(input);
+ DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
+ intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+ intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
+ DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+ verifyNoRepeatAllFound(result, 10, 0, 100);
+ }
+
+ private void verifyNoRepeatAllFound(DataBag result,
+ int expectedResultSize,
+ int left,
+ int right) throws ExecException
+ {
+ Assert.assertEquals(expectedResultSize, result.size());
+
+ // all must be found, no repeats
+ Set<Integer> found = new HashSet<Integer>();
+ for (Tuple t : result)
+ {
+ Integer i = (Integer)t.get(0);
+ Assert.assertTrue(i>=left && i<right);
+ Assert.assertFalse(String.format("Found duplicate of %d",i), found.contains(i));
+ found.add(i);
+ }
+ }
+
+ @Test
+ public void weightedReservoirSampleLimitExecTest() throws IOException
+ {
+ WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
+
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ for (int i=0; i<10; i++)
+ {
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, i);
+ t.set(1, 1); // score is equal for all
+ bag.add(t);
+ }
+
+ Tuple input = TupleFactory.getInstance().newTuple(1);
+ input.set(0, bag);
+
+ DataBag result = sampler.exec(input);
+
+ verifyNoRepeatAllFound(result, 10, 0, 10);
+
+ Set<Integer> found = new HashSet<Integer>();
+ for (Tuple t : result)
+ {
+ Integer i = (Integer)t.get(0);
+ found.add(i);
+ }
+
+ for(int i = 0; i < 10; i++)
+ {
+ Assert.assertTrue(found.contains(i));
+ }
+ }
+
+ @Test
+ public void invalidConstructorArgTest() throws Exception
+ {
+ try {
+ WeightedReservoirSample sampler = new WeightedReservoirSample("1", "-1");
+ Assert.fail( "Testcase should fail");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getMessage().indexOf("Invalid negative index of weight field argument for WeightedReserviorSample constructor: -1") >= 0);
+ }
+ }
+
+ @Test
+ public void invalidWeightTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(weightedSampleTest);
+
+ writeLinesToFile("input",
+ "a\t100","b\t1","c\t0","d\t2");
+ try {
+ test.runScript();
+ List<Tuple> output = this.getLinesForAlias(test, "sampled");
+ Assert.fail( "Testcase should fail");
+ } catch (Exception ex) {
+ }
+ }
+
+ /**
+ register $JAR_PATH
+
+ define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
+
+ data = LOAD 'input' AS (v1:chararray);
+ data_g = group data all;
+ sampled = FOREACH data_g GENERATE WeightedSample(data);
+ describe sampled;
+
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String invalidInputTupleSizeTest;
+
+ @Test
+ public void invalidInputTupleSizeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
+
+ writeLinesToFile("input",
+ "a","b","c","d");
+ try {
+ test.runScript();
+ List<Tuple> output = this.getLinesForAlias(test, "sampled");
+ Assert.fail( "Testcase should fail");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getMessage().indexOf("The field schema of the input tuple is null or the tuple size is no more than the weight field index: 1") >= 0);
+ }
+ }
+
+ /**
+ register $JAR_PATH
+
+ define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0');
+
+ data = LOAD 'input' AS (v1:chararray, v2:INT);
+ data_g = group data all;
+ sampled = FOREACH data_g GENERATE WeightedSample(data);
+ describe sampled;
+
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String invalidWeightFieldSchemaTest;
+
+ @Test
+ public void invalidWeightFieldSchemaTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
+
+ writeLinesToFile("input",
+ "a\t100","b\t1","c\t5","d\t2");
+ try {
+ test.runScript();
+ List<Tuple> output = this.getLinesForAlias(test, "sampled");
+ Assert.fail( "Testcase should fail");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getMessage().indexOf("Expect the type of the weight field of the input tuple to be of ([int, long, float, double]), but instead found (chararray), weight field: 0") >= 0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/e8084146/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java b/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java
new file mode 100644
index 0000000..6e512d4
--- /dev/null
+++ b/test/pig/datafu/test/pig/stats/entropy/AbstractEntropyTests.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.stats.entropy;
+
+import static org.testng.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.Tuple;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+import datafu.test.pig.PigTests;
+
+public abstract class AbstractEntropyTests extends PigTests
+{
+ protected void verifyEqualEntropyOutput(List<Double> expectedOutput, List<Tuple> output, int digits) throws ExecException {
+ assertEquals(expectedOutput.size(), output.size());
+ Iterator<Double> expectationIterator = expectedOutput.iterator();
+ String formatDigits = "%." + digits + "f";
+ for (Tuple t : output)
+ {
+ Double entropy = (Double)t.get(0);
+ assertEquals(String.format(formatDigits,entropy),String.format(formatDigits, expectationIterator.next()));
+ }
+ }
+}
|