http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetDifference.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetDifference.java b/src/java/datafu/pig/sets/SetDifference.java
deleted file mode 100644
index b6469e9..0000000
--- a/src/java/datafu/pig/sets/SetDifference.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.sets;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-
-/**
- * Computes the set difference of two or more bags. Duplicates are eliminated. <b>The input bags must be sorted.</b>
- *
- * <p>
- * If bags A and B are provided, then this computes A-B, i.e. all elements in A that are not in B.
- * If bags A, B and C are provided, then this computes A-B-C, i.e. all elements in A that are not in B or C.
- * </p>
- *
- * <p>
- * Example:
- * <pre>
- * {@code
- * define SetDifference datafu.pig.sets.SetDifference();
- *
- * -- input:
- * -- ({(1),(2),(3),(4),(5),(6)},{(3),(4)})
- * input = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
- *
- * input = FOREACH input {
- * B1 = ORDER B1 BY val ASC;
- * B2 = ORDER B2 BY val ASC;
- *
- * -- output:
- * -- ({(1),(2),(5),(6)})
- * GENERATE SetDifference(B1,B2);
- * }
- * }</pre>
- */
-public class SetDifference extends SetOperationsBase
-{
- private static final BagFactory bagFactory = BagFactory.getInstance();
-
- /**
- * Loads the data bags from the input tuple and puts them in a priority queue,
- * where ordering is determined by the data from the iterator for each bag.
- *
- * <p>
- * The bags are wrapped in a {@link Pair} object that is comparable on the data
- * currently available from the iterator.
- * These objects are ordered first by the data, then by the index within the tuple
- * the bag came from.
- * </p>
- *
- * @param input
- * @return
- * @throws IOException
- */
- private PriorityQueue<Pair> loadBags(Tuple input) throws IOException
- {
- PriorityQueue<Pair> pq = new PriorityQueue<Pair>(input.size());
-
- for (int i=0; i < input.size(); i++)
- {
- if (input.get(i) != null)
- {
- Iterator<Tuple> inputIterator = ((DataBag)input.get(i)).iterator();
- if(inputIterator.hasNext())
- {
- pq.add(new Pair(inputIterator,i));
- }
- }
- }
- return pq;
- }
-
- /**
- * Counts how many elements in the priority queue match the
- * element at the front of the queue, which should be from the first bag.
- *
- * @param pq priority queue
- * @return number of matches
- */
- public int countMatches(PriorityQueue<Pair> pq)
- {
- Pair nextPair = pq.peek();
- Tuple data = nextPair.data;
-
- // sanity check
- if (!nextPair.index.equals(0))
- {
- throw new RuntimeException("Expected next bag to have index 0");
- }
-
- int matches = 0;
- for (Pair p : pq) {
- if (data.equals(p.data))
- matches++;
- }
- // subtract 1 since element matches itself
- return matches - 1;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- if (input.size() < 2)
- {
- throw new RuntimeException("Expected at least two inputs, but found " + input.size());
- }
-
- for (Object o : input)
- {
- if (o != null && !(o instanceof DataBag))
- {
- throw new RuntimeException("Inputs must be bags");
- }
- }
-
- DataBag outputBag = bagFactory.newDefaultBag();
-
- DataBag bag1 = (DataBag)input.get(0);
- DataBag bag2 = (DataBag)input.get(1);
-
- if (bag1 == null || bag1.size() == 0)
- {
- return outputBag;
- }
- // optimization
- else if (input.size() == 2 && (bag2 == null || bag2.size() == 0))
- {
- return bag1;
- }
-
- PriorityQueue<Pair> pq = loadBags(input);
-
- Tuple lastData = null;
-
- while (true)
- {
- Pair nextPair = pq.peek();
-
- // ignore data we've already encountered
- if (nextPair.data.compareTo(lastData) != 0)
- {
- // Only take data from the first bag, where there are no other
- // bags that have the same data.
- if (nextPair.index.equals(0) && countMatches(pq) == 0)
- {
- outputBag.add(nextPair.data);
- lastData = nextPair.data;
- }
- }
-
- Pair p = pq.poll();
-
- // only put the bag back into the queue if it still has data
- if (p.hasNext())
- {
- p.next();
- pq.offer(p);
- }
- else if (p.index.equals(0))
- {
- // stop when we exhaust all elements from the first bag
- break;
- }
- }
-
- return outputBag;
- }
-
- /**
- * A wrapper for the tuple iterator that implements comparable so it can be used in the priority queue.
- *
- * <p>
- * This is compared first on the data, then on the index the bag came from
- * in the input tuple.
- * </p>
- * @author mhayes
- *
- */
- private static class Pair implements Comparable<Pair>
- {
- private final Iterator<Tuple> it;
- private final Integer index;
- private Tuple data;
-
- /**
- * Constructs the {@link Pair}.
- *
- * @param it tuple iterator
- * @param index index within the tuple that the bag came from
- */
- public Pair(Iterator<Tuple> it, int index)
- {
- this.index = index;
- this.it = it;
- this.data = it.next();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int compareTo(Pair o)
- {
- int r = this.data.compareTo(o.data);
- if (r == 0)
- {
- return index.compareTo(o.index);
- }
- else
- {
- return r;
- }
- }
-
- public boolean hasNext()
- {
- return it.hasNext();
- }
-
- @SuppressWarnings("unchecked")
- public Tuple next()
- {
- Tuple nextData = it.next();
- // algorithm assumes data is in order
- if (data.compareTo(nextData) > 0)
- {
- throw new RuntimeException("Out of order!");
- }
- this.data = nextData;
- return this.data;
- }
-
- @Override
- public String toString()
- {
- return String.format("[%s within %d]",data,index);
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetIntersect.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetIntersect.java b/src/java/datafu/pig/sets/SetIntersect.java
deleted file mode 100644
index 0ba586e..0000000
--- a/src/java/datafu/pig/sets/SetIntersect.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.sets;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * Computes the set intersection of two or more bags. Duplicates are eliminated. <b>The input bags must be sorted.</b>
- * <p>
- * Example:
- * <pre>
- * {@code
- * define SetIntersect datafu.pig.sets.SetIntersect();
- *
- * -- input:
- * -- ({(1,10),(2,20),(3,30),(4,40)},{(2,20),(4,40),(8,80)})
- * input = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
- *
- * input = FOREACH input {
- * B1 = ORDER B1 BY val1 ASC, val2 ASC;
- * B2 = ORDER B2 BY val1 ASC, val2 ASC;
- *
- * -- output:
- * -- ({(2,20),(4,40)})
- * GENERATE SetIntersect(B1,B2);
- * }
- * }</pre>
- */
-public class SetIntersect extends SetOperationsBase
-{
- private static final BagFactory bagFactory = BagFactory.getInstance();
-
- static class pair implements Comparable<pair>
- {
- final Iterator<Tuple> it;
- Tuple data;
-
- pair(Iterator<Tuple> it)
- {
- this.it = it;
- this.data = it.next();
- }
-
- @Override
- public int compareTo(pair o)
- {
- return this.data.compareTo(o.data);
- }
- }
-
- private PriorityQueue<pair> load_bags(Tuple input) throws IOException
- {
- PriorityQueue<pair> pq = new PriorityQueue<pair>(input.size());
-
- for (int i=0; i < input.size(); i++) {
- Object o = input.get(i);
- if (!(o instanceof DataBag))
- throw new RuntimeException("parameters must be databags");
- Iterator<Tuple> inputIterator= ((DataBag) o).iterator();
- if(inputIterator.hasNext())
- pq.add(new pair(inputIterator));
- }
- return pq;
- }
-
- public boolean all_equal(PriorityQueue<pair> pq)
- {
- Object o = pq.peek().data;
- for (pair p : pq) {
- if (!o.equals(p.data))
- return false;
- }
- return true;
- }
-
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- DataBag outputBag = bagFactory.newDefaultBag();
- PriorityQueue<pair> pq = load_bags(input);
- if(pq.size() != input.size())
- return outputBag; // one or more input bags were empty
- Tuple last_data = null;
-
- while (true) {
- if (pq.peek().data.compareTo(last_data) != 0 && all_equal(pq)) {
- last_data = pq.peek().data;
- outputBag.add(last_data);
- }
-
- pair p = pq.poll();
- if (!p.it.hasNext())
- break;
- Tuple nextData = p.it.next();
- // algorithm assumes data is in order
- if (p.data.compareTo(nextData) > 0)
- {
- throw new RuntimeException("Out of order!");
- }
- p.data = nextData;
- pq.offer(p);
- }
-
- return outputBag;
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetOperationsBase.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetOperationsBase.java b/src/java/datafu/pig/sets/SetOperationsBase.java
deleted file mode 100644
index c9997f8..0000000
--- a/src/java/datafu/pig/sets/SetOperationsBase.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.sets;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Base class for set operations.
- *
- * @author "Matthew Hayes <mhayes@linkedin.com>"
- *
- */
-public abstract class SetOperationsBase extends EvalFunc<DataBag>
-{
- @Override
- public Schema outputSchema(Schema input)
- {
- try {
- for (Schema.FieldSchema fieldSchema : input.getFields())
- {
- if (fieldSchema.type != DataType.BAG)
- {
- throw new RuntimeException("Expected a bag but got: " + DataType.findTypeName(fieldSchema.type));
- }
- }
-
- Schema bagSchema = input.getField(0).schema;
-
- Schema outputSchema = new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
- .getName()
- .toLowerCase(), input),
- bagSchema,
- DataType.BAG));
-
- return outputSchema;
- }
- catch (Exception e) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetUnion.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetUnion.java b/src/java/datafu/pig/sets/SetUnion.java
deleted file mode 100644
index 49fd99c..0000000
--- a/src/java/datafu/pig/sets/SetUnion.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.sets;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * Computes the set union of two or more bags. Duplicates are eliminated.
- * <p>
- * Example:
- * <pre>
- * {@code
- * define SetUnion datafu.pig.sets.SetUnion();
- *
- * -- input:
- * -- ({(2,20),(3,30),(4,40)},{(1,10),(2,20),(4,40),(8,80)})
- * input = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
- *
- * -- output:
- * -- ({(2,20),(3,30),(4,40),(1,10),(8,80)})
- * output = FOREACH input GENERATE SetUnion(B1,B2);
- * }
- * </pre>
- */
-public class SetUnion extends SetOperationsBase
-{
- private static final BagFactory bagFactory = BagFactory.getInstance();
- private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- DataBag outputBag = bagFactory.newDistinctBag();
-
- try {
- for (int i=0; i < input.size(); i++) {
- Object o = input.get(i);
- if (!(o instanceof DataBag))
- throw new RuntimeException("parameters must be databags");
-
- DataBag inputBag = (DataBag) o;
- for (Tuple elem : inputBag) {
- outputBag.add(elem);
- }
- }
-
- return outputBag;
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/package-info.java b/src/java/datafu/pig/sets/package-info.java
deleted file mode 100644
index 2c5b087..0000000
--- a/src/java/datafu/pig/sets/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * UDFs for set operations such as intersect and union.
- */
-package datafu.pig.sets;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/DoubleVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/DoubleVAR.java b/src/java/datafu/pig/stats/DoubleVAR.java
deleted file mode 100644
index ab8eed2..0000000
--- a/src/java/datafu/pig/stats/DoubleVAR.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR}
-*/
-public class DoubleVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- Double sum = sum(input);
- Double sumSquare = sumSquare(input);
-
- if(sum == null) {
- // either we were handed an empty bag or a bag
- // filled with nulls - return null in this case
- return null;
- }
- long count = count(input);
-
- Double var = null;
- if (count > 0){
- Double avg = new Double(sum / count);
- Double avgSquare = new Double(sumSquare / count);
- var = avgSquare - avg*avg;
- }
-
- return var;
- } catch (ExecException ee) {
- throw ee;
- }
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Intermediate.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- Tuple t = mTupleFactory.newTuple(3);
- try {
- // input is a bag with one tuple containing
- // the column we are trying to get variance
- DataBag bg = (DataBag) input.get(0);
- Double d = null;
- Iterator<Tuple> iter = bg.iterator();
- if(iter.hasNext()) {
- Tuple tp = iter.next();
- d = (Double)tp.get(0);
- }
-
- if (iter.hasNext())
- {
- throw new RuntimeException("Expected only one tuple in bag");
- }
-
- if (d == null){
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- }
- else {
- t.set(0, d);
- t.set(1, d*d);
- t.set(2, 1L);
- }
- return t;
- } catch(NumberFormatException nfe) {
- nfe.printStackTrace();
- // invalid input,
- // treat this input as null
- try {
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- } catch (ExecException e) {
- throw e;
- }
- return t;
- } catch (ExecException ee) {
- ee.printStackTrace();
- throw ee;
- } catch (Exception e) {
- e.printStackTrace();
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
-
- }
- }
-
- static public class Intermediate extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- return combine(b);
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
-
- }
- }
- }
-
- static public class Final extends EvalFunc<Double> {
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- Tuple combined = combine(b);
-
- Double sum = (Double)combined.get(0);
- Double sumSquare = (Double)combined.get(1);
- if(sum == null) {
- return null;
- }
- Long count = (Long)combined.get(2);
-
- Double var = null;
-
- if (count > 0) {
- Double avg = new Double(sum / count);
- Double avgSquare = new Double(sumSquare / count);
- var = avgSquare - avg*avg;
- }
- return var;
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- }
-
- static protected Tuple combine(DataBag values) throws ExecException{
- double sum = 0;
- double sumSquare = 0;
- long totalCount = 0;
-
- // combine is called from Intermediate and Final
- // In either case, Initial would have been called
- // before and would have sent in valid tuples
- // Hence we don't need to check if incoming bag
- // is empty
-
- Tuple output = mTupleFactory.newTuple(3);
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- Double d = (Double)t.get(0);
- Double dSquare = (Double)t.get(1);
- Long count = (Long)t.get(2);
-
- // we count nulls in var as contributing 0
- // a departure from SQL for performance of
- // COUNT() which implemented by just inspecting
- // size of the bag
- if(d == null) {
- d = 0.0;
- dSquare = 0.0;
- } else {
- sawNonNull = true;
- }
- sum += d;
- sumSquare += dSquare;
- totalCount += count;
- }
- if(sawNonNull) {
- output.set(0, new Double(sum));
- output.set(1, new Double(sumSquare));
- } else {
- output.set(0, null);
- output.set(1, null);
- }
- output.set(2, Long.valueOf(totalCount));
- return output;
- }
-
- static protected long count(Tuple input) throws ExecException {
- DataBag values = (DataBag)input.get(0);
- long cnt = 0;
- Iterator<Tuple> it = values.iterator();
- while (it.hasNext()){
- Tuple t = (Tuple)it.next();
- if (t != null && t.size() > 0 && t.get(0) != null)
- cnt ++;
- }
-
- return cnt;
- }
-
- static protected Double sum(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- double sum = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Double d= (Double)t.get(0);
- if (d == null) continue;
- sawNonNull = true;
- sum += d;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Double(sum);
- } else {
- return null;
- }
- }
-
- static protected Double sumSquare(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- double sumSquare = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Double d = (Double)t.get(0);
- if (d == null) continue;
- sawNonNull = true;
- sumSquare += d*d;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of squared values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Double(sumSquare);
- } else {
- return null;
- }
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
- }
-
- /* Accumulator interface implementation */
- private Double intermediateSumSquare = null;
- private Double intermediateSum = null;
- private Long intermediateCount = null;
-
- @Override
- public void accumulate(Tuple b) throws IOException {
- try {
- Double sum = sum(b);
- if(sum == null) {
- return;
- }
-
- Double sumSquare = sumSquare(b);
- if(sumSquare == null) {
- return;
- }
-
- // set default values
- if (intermediateSum == null || intermediateCount == null) {
- intermediateSumSquare = 0.0;
- intermediateSum = 0.0;
- intermediateCount = (long) 0;
- }
-
- long count = (Long)count(b);
-
- if (count > 0) {
- intermediateCount += count;
- intermediateSum += sum;
- intermediateSumSquare += sumSquare;
- }
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- public void cleanup() {
- intermediateSumSquare = null;
- intermediateSum = null;
- intermediateCount = null;
- }
-
- @Override
- public Double getValue() {
- Double var = null;
- if (intermediateCount != null && intermediateCount > 0) {
- Double avg = new Double(intermediateSum / intermediateCount);
- Double avgSquare = new Double(intermediateSumSquare / intermediateCount);
- var = avgSquare - avg*avg;
- }
- return var;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/FloatVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/FloatVAR.java b/src/java/datafu/pig/stats/FloatVAR.java
deleted file mode 100644
index 8a697e0..0000000
--- a/src/java/datafu/pig/stats/FloatVAR.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR}
-*/
-public class FloatVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- Double sum = sum(input);
- Double sumSquare = sumSquare(input);
-
- if(sum == null) {
- // either we were handed an empty bag or a bag
- // filled with nulls - return null in this case
- return null;
- }
- long count = count(input);
-
- Double var = null;
- if (count > 0){
- Double avg = new Double(sum / count);
- Double avgSquare = new Double(sumSquare / count);
- var = avgSquare - avg*avg;
- }
-
- return var;
- } catch (ExecException ee) {
- throw ee;
- }
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Intermediate.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- Tuple t = mTupleFactory.newTuple(3);
- try {
- // input is a bag with one tuple containing
- // the column we are trying to get variance
- DataBag bg = (DataBag) input.get(0);
- Float f = null;
- Iterator<Tuple> iter = bg.iterator();
- if(iter.hasNext()) {
- Tuple tp = iter.next();
- f = (Float)tp.get(0);
- }
-
- if (iter.hasNext())
- {
- throw new RuntimeException("Expected only one tuple in bag");
- }
-
- Double d = f!= null ? new Double(f): null;
-
- if (f == null){
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- }
- else {
- t.set(0, d);
- t.set(1, d*d);
- t.set(2, 1L);
- }
- return t;
- } catch(NumberFormatException nfe) {
- nfe.printStackTrace();
- // invalid input,
- // treat this input as null
- try {
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- } catch (ExecException e) {
- throw e;
- }
- return t;
- } catch (ExecException ee) {
- ee.printStackTrace();
- throw ee;
- } catch (Exception e) {
- e.printStackTrace();
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
-
- }
- }
-
- static public class Intermediate extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- return combine(b);
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
-
- }
- }
- }
-
- static public class Final extends EvalFunc<Double> {
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- Tuple combined = combine(b);
-
- Double sum = (Double)combined.get(0);
- Double sumSquare = (Double)combined.get(1);
- if(sum == null) {
- return null;
- }
- Long count = (Long)combined.get(2);
-
- Double var = null;
-
- if (count > 0) {
- Double avg = new Double(sum / count);
- Double avgSquare = new Double(sumSquare / count);
- var = avgSquare - avg*avg;
- }
- return var;
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- }
-
- static protected Tuple combine(DataBag values) throws ExecException{
- double sum = 0;
- double sumSquare = 0;
- long totalCount = 0;
-
- // combine is called from Intermediate and Final
- // In either case, Initial would have been called
- // before and would have sent in valid tuples
- // Hence we don't need to check if incoming bag
- // is empty
-
- Tuple output = mTupleFactory.newTuple(3);
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- Double d = (Double)t.get(0);
- Double dSquare = (Double)t.get(1);
- Long count = (Long)t.get(2);
-
- // we count nulls in var as contributing 0
- // a departure from SQL for performance of
- // COUNT() which implemented by just inspecting
- // size of the bag
- if(d == null) {
- d = 0.0;
- dSquare = 0.0;
- } else {
- sawNonNull = true;
- }
- sum += d;
- sumSquare += dSquare;
- totalCount += count;
- }
- if(sawNonNull) {
- output.set(0, new Double(sum));
- output.set(1, new Double(sumSquare));
- } else {
- output.set(0, null);
- output.set(1, null);
- }
- output.set(2, Long.valueOf(totalCount));
- return output;
- }
-
- static protected long count(Tuple input) throws ExecException {
- DataBag values = (DataBag)input.get(0);
- long cnt = 0;
- Iterator<Tuple> it = values.iterator();
- while (it.hasNext()){
- Tuple t = (Tuple)it.next();
- if (t != null && t.size() > 0 && t.get(0) != null)
- cnt ++;
- }
-
- return cnt;
- }
-
- static protected Double sum(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- double sum = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Float f= (Float)t.get(0);
- if (f == null) continue;
- sawNonNull = true;
- sum += f;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Double(sum);
- } else {
- return null;
- }
- }
-
- static protected Double sumSquare(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- double sumSquare = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Float f = (Float)t.get(0);
- if (f == null) continue;
- sawNonNull = true;
- sumSquare += f*f;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of squared values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Double(sumSquare);
- } else {
- return null;
- }
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
- }
-
- /* Accumulator interface implementation */
- private Double intermediateSumSquare = null;
- private Double intermediateSum = null;
- private Long intermediateCount = null;
-
- @Override
- public void accumulate(Tuple b) throws IOException {
- try {
- Double sum = sum(b);
- if(sum == null) {
- return;
- }
-
- Double sumSquare = sumSquare(b);
- if(sumSquare == null) {
- return;
- }
-
- // set default values
- if (intermediateSum == null || intermediateCount == null) {
- intermediateSumSquare = 0.0;
- intermediateSum = 0.0;
- intermediateCount = (long) 0;
- }
-
- long count = (Long)count(b);
-
- if (count > 0) {
- intermediateCount += count;
- intermediateSum += sum;
- intermediateSumSquare += sumSquare;
- }
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- public void cleanup() {
- intermediateSumSquare = null;
- intermediateSum = null;
- intermediateCount = null;
- }
-
- @Override
- public Double getValue() {
- Double var = null;
- if (intermediateCount != null && intermediateCount > 0) {
- Double avg = new Double(intermediateSum / intermediateCount);
- Double avgSquare = new Double(intermediateSumSquare / intermediateCount);
- var = avgSquare - avg*avg;
- }
- return var;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java
deleted file mode 100644
index 0ebd94f..0000000
--- a/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
- *
- * <p>
- * This uses the implementation of HyperLogLog++ from <a href="https://github.com/addthis/stream-lib" target="_blank">stream-lib</a>.
- * The HyperLogLog++ algorithm is an enhanced version of HyperLogLog as described in
- * <a href="http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/pubs/archive/40671.pdf">here</a>.
- * </p>
- *
- * <p>
- * This is a streaming implementation, and therefore the input data does not need to be sorted.
- * </p>
- *
- * @author mhayes
- *
- */
-public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
-{
- private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus estimator;
-
- private final int p;
-
- /**
- * Constructs a HyperLogLog++ estimator.
- */
- public HyperLogLogPlusPlus()
- {
- this("20");
- }
-
- /**
- * Constructs a HyperLogLog++ estimator.
- *
- * @param p precision value
- */
- public HyperLogLogPlusPlus(String p)
- {
- this.p = Integer.parseInt(p);
- cleanup();
- }
-
- @Override
- public void accumulate(Tuple arg0) throws IOException
- {
- DataBag inputBag = (DataBag)arg0.get(0);
- for (Tuple t : inputBag)
- {
- estimator.offer(t);
- }
- }
-
- @Override
- public void cleanup()
- {
- this.estimator = new com.clearspring.analytics.stream.cardinality.HyperLogLogPlus(p);
- }
-
- @Override
- public Long getValue()
- {
- return this.estimator.cardinality();
- }
-
- @Override
- public Schema outputSchema(Schema input)
- {
- try {
- if (input.size() != 1)
- {
- throw new RuntimeException("Expected input to have only a single field");
- }
-
- Schema.FieldSchema inputFieldSchema = input.getField(0);
-
- if (inputFieldSchema.type != DataType.BAG)
- {
- throw new RuntimeException("Expected a BAG as input");
- }
-
- return new Schema(new Schema.FieldSchema(null, DataType.LONG));
- }
- catch (FrontendException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/IntVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/IntVAR.java b/src/java/datafu/pig/stats/IntVAR.java
deleted file mode 100644
index 0f34d8e..0000000
--- a/src/java/datafu/pig/stats/IntVAR.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR}
-*/
-public class IntVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- Long sum = sum(input);
- Long sumSquare = sumSquare(input);
-
- if(sum == null) {
- // either we were handed an empty bag or a bag
- // filled with nulls - return null in this case
- return null;
- }
- long count = count(input);
-
- Double var = null;
- if (count > 0){
- Double avg = new Double((double)sum / count);
- Double avgSquare = new Double((double)sumSquare / count);
- var = avgSquare - avg*avg;
- }
-
- return var;
- } catch (ExecException ee) {
- throw ee;
- }
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Intermediate.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- Tuple t = mTupleFactory.newTuple(3);
- try {
- // input is a bag with one tuple containing
- // the column we are trying to get variance
- DataBag bg = (DataBag) input.get(0);
- Integer i = null;
- Iterator<Tuple> iter = bg.iterator();
- if(iter.hasNext()) {
- Tuple tp = iter.next();
- i = (Integer)tp.get(0);
- }
-
- if (iter.hasNext())
- {
- throw new RuntimeException("Expected only one tuple in bag");
- }
-
- if (i == null) {
- t.set(2, 0L);
- t.set(0, null);
- t.set(1, null);
- }
- else {
- t.set(2, 1L);
- t.set(0, (long)i);
- t.set(1, (long)i*i);
- }
- return t;
- } catch(NumberFormatException nfe) {
- nfe.printStackTrace();
- // invalid input,
- // treat this input as null
- try {
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- } catch (ExecException e) {
- throw e;
- }
- return t;
- } catch (ExecException ee) {
- ee.printStackTrace();
- throw ee;
- } catch (Exception e) {
- e.printStackTrace();
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
-
- }
- }
-
- static public class Intermediate extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- return combine(b);
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- }
-
- static public class Final extends EvalFunc<Double> {
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- Tuple combined = combine(b);
-
- Long sum = (Long)combined.get(0);
- Long sumSquare = (Long)combined.get(1);
- if(sum == null) {
- return null;
- }
- Long count = (Long)combined.get(2);
-
- Double var = null;
-
- if (count > 0) {
- Double avg = new Double((double)sum / count);
- Double avgSquare = new Double((double)sumSquare / count);
- var = avgSquare - avg*avg;
- }
- return var;
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- }
-
- static protected Tuple combine(DataBag values) throws ExecException{
- long sum = 0;
- long sumSquare = 0;
- long totalCount = 0;
-
- // combine is called from Intermediate and Final
- // In either case, Initial would have been called
- // before and would have sent in valid tuples
- // Hence we don't need to check if incoming bag
- // is empty
-
- Tuple output = mTupleFactory.newTuple(3);
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- Long i = (Long)t.get(0);
- Long iSquare = (Long)t.get(1);
- Long count = (Long)t.get(2);
-
- // we count nulls in var as contributing 0
- // a departure from SQL for performance of
- // COUNT() which implemented by just inspecting
- // size of the bag
- if(i == null) {
- i = (long)0;
- iSquare = (long)0;
- } else {
- sawNonNull = true;
- }
- sum += i;
- sumSquare += iSquare;
- totalCount += count;
- }
- if(sawNonNull) {
- output.set(0, new Long(sum));
- output.set(1, new Long(sumSquare));
- } else {
- output.set(0, null);
- output.set(1, null);
- }
- output.set(2, Long.valueOf(totalCount));
- return output;
- }
-
- static protected long count(Tuple input) throws ExecException {
- DataBag values = (DataBag)input.get(0);
- long cnt = 0;
- Iterator<Tuple> it = values.iterator();
- while (it.hasNext()){
- Tuple t = (Tuple)it.next();
- if (t != null && t.size() > 0 && t.get(0) != null)
- cnt ++;
- }
-
- return cnt;
- }
-
- static protected Long sum(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- long sum = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Integer i = (Integer)t.get(0);
- if (i == null) continue;
- sawNonNull = true;
- sum += i;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Long(sum);
- } else {
- return null;
- }
- }
-
- static protected Long sumSquare(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- long sumSquare = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Integer i = (Integer)t.get(0);
- if (i == null) continue;
- sawNonNull = true;
- sumSquare += i*i;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of squared values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Long(sumSquare);
- } else {
- return null;
- }
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
- }
-
- /* Accumulator interface implementation */
- private Long intermediateSumSquare = null;
- private Long intermediateSum = null;
- private Long intermediateCount = null;
-
- @Override
- public void accumulate(Tuple b) throws IOException {
- try {
- Long sum = sum(b);
- if(sum == null) {
- return;
- }
-
- Long sumSquare = sumSquare(b);
- if(sumSquare == null) {
- return;
- }
-
- // set default values
- if (intermediateSum == null || intermediateCount == null) {
- intermediateSumSquare = (long) 0;
- intermediateSum = (long) 0;
- intermediateCount = (long) 0;
- }
-
- long count = (Long)count(b);
-
- if (count > 0) {
- intermediateCount += count;
- intermediateSum += sum;
- intermediateSumSquare += sumSquare;
- }
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- public void cleanup() {
- intermediateSumSquare = null;
- intermediateSum = null;
- intermediateCount = null;
- }
-
- @Override
- public Double getValue() {
- Double var = null;
- if (intermediateCount != null && intermediateCount > 0) {
- Double avg = new Double((double)intermediateSum / intermediateCount);
- Double avgSquare = new Double((double)intermediateSumSquare / intermediateCount);
- var = avgSquare - avg*avg;
- }
- return var;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/LongVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/LongVAR.java b/src/java/datafu/pig/stats/LongVAR.java
deleted file mode 100644
index 561f15a..0000000
--- a/src/java/datafu/pig/stats/LongVAR.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR}
-*/
-public class LongVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- Long sum = sum(input);
- Long sumSquare = sumSquare(input);
-
- if(sum == null) {
- // either we were handed an empty bag or a bag
- // filled with nulls - return null in this case
- return null;
- }
- long count = count(input);
-
- Double var = null;
- if (count > 0){
- Double avg = new Double((double)sum / count);
- Double avgSquare = new Double((double)sumSquare / count);
- var = avgSquare - avg*avg;
- }
-
- return var;
- } catch (ExecException ee) {
- throw ee;
- }
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Intermediate.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- Tuple t = mTupleFactory.newTuple(3);
- try {
- // input is a bag with one tuple containing
- // the column we are trying to get variance
- DataBag bg = (DataBag) input.get(0);
- Long l = null;
- Iterator<Tuple> iter = bg.iterator();
- if(iter.hasNext()) {
- Tuple tp = iter.next();
- l = (Long)tp.get(0);
- }
-
- if (iter.hasNext())
- {
- throw new RuntimeException("Expected only one tuple in bag");
- }
-
- if (l == null) {
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- }
- else {
- t.set(0, l);
- t.set(1, l*l);
- t.set(2, 1L);
- }
- return t;
- } catch(NumberFormatException nfe) {
- nfe.printStackTrace();
- // invalid input,
- // treat this input as null
- try {
- t.set(0, null);
- t.set(1, null);
- t.set(2, 0L);
- } catch (ExecException e) {
- throw e;
- }
- return t;
- } catch (ExecException ee) {
- ee.printStackTrace();
- throw ee;
- } catch (Exception e) {
- e.printStackTrace();
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
-
- }
- }
-
- static public class Intermediate extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- return combine(b);
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
-
- }
- }
- }
-
- static public class Final extends EvalFunc<Double> {
- @Override
- public Double exec(Tuple input) throws IOException {
- try {
- DataBag b = (DataBag)input.get(0);
- Tuple combined = combine(b);
-
- Long sum = (Long)combined.get(0);
- Long sumSquare = (Long)combined.get(1);
- if(sum == null) {
- return null;
- }
- Long count = (Long)combined.get(2);
-
- Double var = null;
-
- if (count > 0) {
- Double avg = new Double((double)sum / count);
- Double avgSquare = new Double((double)sumSquare / count);
- var = avgSquare - avg*avg;
- }
- return var;
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
- }
-
- static protected Tuple combine(DataBag values) throws ExecException{
- long sum = 0;
- long sumSquare = 0;
- long totalCount = 0;
-
- // combine is called from Intermediate and Final
- // In either case, Initial would have been called
- // before and would have sent in valid tuples
- // Hence we don't need to check if incoming bag
- // is empty
-
- Tuple output = mTupleFactory.newTuple(3);
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- Long l = (Long)t.get(0);
- Long lSquare = (Long)t.get(1);
- Long count = (Long)t.get(2);
-
- // we count nulls in var as contributing 0
- // a departure from SQL for performance of
- // COUNT() which implemented by just inspecting
- // size of the bag
- if(l == null) {
- l = (long)0;
- lSquare = (long)0;
- } else {
- sawNonNull = true;
- }
- sum += l;
- sumSquare += lSquare;
- totalCount += count;
- }
- if(sawNonNull) {
- output.set(0, new Long(sum));
- output.set(1, new Long(sumSquare));
- } else {
- output.set(0, null);
- output.set(1, null);
- }
- output.set(2, Long.valueOf(totalCount));
- return output;
- }
-
- static protected long count(Tuple input) throws ExecException {
- DataBag values = (DataBag)input.get(0);
- long cnt = 0;
- Iterator<Tuple> it = values.iterator();
- while (it.hasNext()){
- Tuple t = (Tuple)it.next();
- if (t != null && t.size() > 0 && t.get(0) != null)
- cnt ++;
- }
-
- return cnt;
- }
-
- static protected Long sum(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- long sum = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Long l = (Long)t.get(0);
- if (l == null) continue;
- sawNonNull = true;
- sum += l;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Long(sum);
- } else {
- return null;
- }
- }
-
- static protected Long sumSquare(Tuple input) throws ExecException, IOException {
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- if(values.size() == 0) {
- return null;
- }
-
- long sumSquare = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- try{
- Long l = (Long)t.get(0);
- if (l == null) continue;
- sawNonNull = true;
- sumSquare += l*l;
- }catch(RuntimeException exp) {
- int errCode = 2103;
- String msg = "Problem while computing sum of squared values.";
- throw new ExecException(msg, errCode, PigException.BUG, exp);
- }
- }
-
- if(sawNonNull) {
- return new Long(sumSquare);
- } else {
- return null;
- }
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
- }
-
- /* Accumulator interface implementation */
- private Long intermediateSumSquare = null;
- private Long intermediateSum = null;
- private Long intermediateCount = null;
-
- @Override
- public void accumulate(Tuple b) throws IOException {
- try {
- Long sum = sum(b);
- if(sum == null) {
- return;
- }
-
- Long sumSquare = sumSquare(b);
- if(sumSquare == null) {
- return;
- }
-
- // set default values
- if (intermediateSum == null || intermediateCount == null) {
- intermediateSumSquare = (long) 0;
- intermediateSum = (long) 0;
- intermediateCount = (long) 0;
- }
-
- long count = (Long)count(b);
-
- if (count > 0) {
- intermediateCount += count;
- intermediateSum += sum;
- intermediateSumSquare += sumSquare;
- }
- } catch (ExecException ee) {
- throw ee;
- } catch (Exception e) {
- int errCode = 2106;
- String msg = "Error while computing variance in " + this.getClass().getSimpleName();
- throw new ExecException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- public void cleanup() {
- intermediateSumSquare = null;
- intermediateSum = null;
- intermediateCount = null;
- }
-
- @Override
- public Double getValue() {
- Double var = null;
- if (intermediateCount != null && intermediateCount > 0) {
- Double avg = new Double((double)intermediateSum / intermediateCount);
- Double avgSquare = new Double((double)intermediateSumSquare / intermediateCount);
- var = avgSquare - avg*avg;
- }
- return var;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/MarkovPairs.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/MarkovPairs.java b/src/java/datafu/pig/stats/MarkovPairs.java
deleted file mode 100644
index d3d7632..0000000
--- a/src/java/datafu/pig/stats/MarkovPairs.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-
-/**
- * Accepts a bag of tuples, with user supplied ordering, and generates pairs that can be used for
- * a Markov chain analysis. For example, if you had {(1), (4), (7)}, using the default lookahead of 1, you
- * get the pairs {
- * ((1),(4)),
- * ((4),(7))}
- * A lookahead factor tells the UDF how many steps in to the future to include. so, for a,b,c with a lookahead
- * of 2, a would be paired with both b and c.
- * The results are ordered are returned as ordered by the caller.
-*/
-
-public class MarkovPairs extends EvalFunc<DataBag>
-{
- private static final BagFactory bagFactory = BagFactory.getInstance();
- private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-
- private static long lookahead_steps;
-
- private final int SPILL_THRESHOLD = 1000000;
-
- public MarkovPairs()
- {
- MarkovPairs.lookahead_steps = 1;
- }
-
- public MarkovPairs(String lookahead_steps)
- {
- MarkovPairs.lookahead_steps = Integer.valueOf(lookahead_steps);
- }
-
- /* start and end are inclusive. This forms transition pairs */
- private void generatePairs(ArrayList<Tuple> input, int start, int end, DataBag outputBag)
- throws ExecException
- {
- int count = 0;
- for (int i = start; (i + 1)<= end; i++) {
- Tuple elem1 = input.get(i);
-
- lookahead:
- for (int j = i+1; j <= i + lookahead_steps; j++)
- {
- if (j > end) break lookahead;
- Tuple elem2 = input.get(j);
- if (count >= SPILL_THRESHOLD) {
- outputBag.spill();
- count = 0;
- }
- outputBag.add(tupleFactory.newTuple(Arrays.asList(elem1, elem2)));
- count ++;
- }
- }
- }
-
- @Override
- public DataBag exec(Tuple input)
- throws IOException
- {
- //things come in a tuple, in our case we have a bag (ordered views) passed. This is embedded in a length one tuple
-
- DataBag inputBag = (DataBag) input.get(0);
- ArrayList<Tuple> inputData = new ArrayList<Tuple>();
-
- for (Tuple tuple : inputBag) {
- inputData.add(tuple);
- }
-
- int inputSize = inputData.size();
-
- try {
- DataBag outputBag = bagFactory.newDefaultBag();
-
- int startPos = 0;
-
- int stopPos = inputSize - 1;
- generatePairs(inputData, startPos, stopPos, outputBag);
-
- // set startPos for the next bucket
- startPos = stopPos + 1;
- return outputBag;
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
-
-
- @Override
- public Schema outputSchema(Schema input)
- {
- try {
- Schema tupleSchema = new Schema();
-
- FieldSchema fieldSchema = input.getField(0);
-
- if (fieldSchema.type != DataType.BAG)
- {
- throw new RuntimeException(String.format("Expected input schema to be BAG, but instead found %s",
- DataType.findTypeName(fieldSchema.type)));
- }
-
- FieldSchema fieldSchema2 = fieldSchema.schema.getField(0);
-
- tupleSchema.add(new Schema.FieldSchema("elem1", fieldSchema2.schema));
- tupleSchema.add(new Schema.FieldSchema("elem2", fieldSchema2.schema));
- return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
- tupleSchema,
- DataType.BAG));
- }
- catch (Exception e) {
- return null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/Median.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/Median.java b/src/java/datafu/pig/stats/Median.java
deleted file mode 100644
index e33a84e..0000000
--- a/src/java/datafu/pig/stats/Median.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-/**
- * Computes the {@link <a href="http://en.wikipedia.org/wiki/Median" target="_blank">median</a>}
- * for a <b>sorted</b> input bag, using type R-2 estimation. This is a convenience wrapper around Quantile.
- *
- * <p>
- * N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is
- * done (e.g., group by 'day') if the data is too large. That is, this isn't distributed median.
- * </p>
- *
- * @see Quantile
- */
-public class Median extends Quantile
-{
- public Median()
- {
- super("0.5");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/Quantile.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/Quantile.java b/src/java/datafu/pig/stats/Quantile.java
deleted file mode 100644
index 6fd42d3..0000000
--- a/src/java/datafu/pig/stats/Quantile.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Computes {@link <a href="http://en.wikipedia.org/wiki/Quantile" target="_blank">quantiles</a>}
- * for a <b>sorted</b> input bag, using type R-2 estimation.
- *
- * <p>
- * N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is
- * done (e.g., group by 'day') if the data is too large. That is, this isn't distributed quantiles.
- * </p>
- *
- * <p>
- * Note that unlike datafu's StreamingQuantile algorithm, this implementation gives
- * <b>exact</b> quantiles. But, it requires that the input bag to be sorted. Quantile must spill to
- * disk when the input data is too large to fit in memory, which will contribute to longer runtimes.
- * Because StreamingQuantile implements accumulate it can be much more efficient than Quantile for
- * large input bags which do not fit well in memory.
- * </p>
- *
- * <p>The constructor takes a single integer argument that specifies the number of evenly-spaced
- * quantiles to compute, e.g.,</p>
- *
- * <ul>
- * <li>Quantile('3') yields the min, the median, and the max
- * <li>Quantile('5') yields the min, the 25th, 50th, 75th percentiles, and the max
- * <li>Quantile('101') yields the min, the max, and all 99 percentiles.
- * </ul>
- *
- * <p>Alternatively the constructor can take the explicit list of quantiles to compute, e.g.</p>
- *
- * <ul>
- * <li>Quantile('0.0','0.5','1.0') yields the min, the median, and the max
- * <li>Quantile('0.0','0.25','0.5','0.75','1.0') yields the min, the 25th, 50th, 75th percentiles, and the max
- * </ul>
- *
- * <p>The list of quantiles need not span the entire range from 0.0 to 1.0, nor do they need to be evenly spaced, e.g.</p>
- *
- * <ul>
- * <li>Quantile('0.5','0.90','0.95','0.99') yields the median, the 90th, 95th, and the 99th percentiles
- * <li>Quantile('0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987') yields the 0.13th, 2.28th, 15.87th, 50th, 84.13th, 97.72nd, and 99.87th percentiles
- * </ul>
- *
- * <p>
- * Example:
- * <pre>
- * {@code
- *
- * define Quantile datafu.pig.stats.Quantile('0.0','0.5','1.0');
-
- * -- input: 9,10,2,3,5,8,1,4,6,7
- * input = LOAD 'input' AS (val:int);
- *
- * grouped = GROUP input ALL;
- *
- * -- produces: (1,5.5,10)
- * quantiles = FOREACH grouped {
- * sorted = ORDER input BY val;
- * GENERATE Quantile(sorted);
- * }
- * }</pre></p>
- *
- * @see Median
- * @see StreamingQuantile
- */
-public class Quantile extends SimpleEvalFunc<Tuple>
-{
- List<Double> quantiles;
-
- private static class Pair<T1,T2>
- {
- public T1 first;
- public T2 second;
-
- public Pair(T1 first, T2 second) {
- this.first = first;
- this.second = second;
- }
- }
-
- // For the output schema, label the quantiles 0, 1, 2, ... n
- // Otherwise label the quantiles based on the quantile value.
- // e.g. 50% quantile 0.5 will be labeled as 0_5
- private boolean ordinalOutputSchema;
-
- public Quantile(String... k)
- {
- this.quantiles = QuantileUtil.getQuantilesFromParams(k);
-
- if (k.length == 1 && Double.parseDouble(k[0]) > 1.0)
- {
- this.ordinalOutputSchema = true;
- this.quantiles = QuantileUtil.getQuantilesFromParams(k);
- }
- else
- {
- this.quantiles = QuantileUtil.getQuantilesFromParams(k);
- }
- }
-
- private static Pair<Long, Long> getIndexes(double k, long N)
- {
- double h = N*k + 0.5;
- long i1 = Math.min(Math.max(1, (long)Math.ceil(h - 0.5)), N);
- long i2 = Math.min(Math.max(1, (long)Math.floor(h + 0.5)), N);
-
- return new Pair<Long, Long>(i1, i2);
- }
-
- public Tuple call(DataBag bag) throws IOException
- {
- if (bag == null || bag.size() == 0)
- return null;
-
- Map<Long, Double> d = new HashMap<Long, Double>();
- long N = bag.size(), max_id = 1;
-
- for (double k : this.quantiles) {
- Pair<Long, Long> idx = getIndexes(k, N);
-
- d.put(idx.first, null);
- d.put(idx.second, null);
- max_id = Math.max(max_id, idx.second);
- }
-
- long i = 1;
- for (Tuple t : bag) {
- if (i > max_id)
- break;
-
- if (d.containsKey(i)) {
- Object o = t.get(0);
- if (!(o instanceof Number))
- throw new IllegalStateException("bag must have numerical values (and be non-null)");
- d.put(i, ((Number) o).doubleValue());
- }
- i++;
- }
-
- Tuple t = TupleFactory.getInstance().newTuple(this.quantiles.size());
- int j = 0;
- for (double k : this.quantiles) {
- Pair<Long, Long> p = getIndexes(k, N);
- double quantile = (d.get(p.first) + d.get(p.second)) / 2;
- t.set(j, quantile);
- j++;
- }
- return t;
- }
-
- @Override
- public Schema outputSchema(Schema input)
- {
- Schema tupleSchema = new Schema();
- if (ordinalOutputSchema)
- {
- for (int i = 0; i < this.quantiles.size(); i++)
- {
- tupleSchema.add(new Schema.FieldSchema("quantile_" + i, DataType.DOUBLE));
- }
- }
- else
- {
- for (Double x : this.quantiles)
- tupleSchema.add(new Schema.FieldSchema("quantile_" + x.toString().replace(".", "_"), DataType.DOUBLE));
- }
-
- try {
- return new Schema(new FieldSchema(null, tupleSchema, DataType.TUPLE));
- } catch(FrontendException e) {
- throw new RuntimeException(e);
- }
- }
-}
-
|