datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [07/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetDifference.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetDifference.java b/src/java/datafu/pig/sets/SetDifference.java
deleted file mode 100644
index b6469e9..0000000
--- a/src/java/datafu/pig/sets/SetDifference.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.sets;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-
-/**
- * Computes the set difference of two or more bags.  Duplicates are eliminated. <b>The input bags must be sorted.</b>
- * 
- * <p>
- * If bags A and B are provided, then this computes A-B, i.e. all elements in A that are not in B.
- * If bags A, B and C are provided, then this computes A-B-C, i.e. all elements in A that are not in B or C.
- * </p>
- * 
- * <p>
- * Example:
- * <pre>
- * {@code
- * define SetDifference datafu.pig.sets.SetDifference();
- *
- * -- input:
- * -- ({(1),(2),(3),(4),(5),(6)},{(3),(4)})
- * input = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
- *
- * input = FOREACH input {
- *   B1 = ORDER B1 BY val ASC;
- *   B2 = ORDER B2 BY val ASC;
- *
- *   -- output:
- *   -- ({(1),(2),(5),(6)})
- *   GENERATE SetDifference(B1,B2);
- * }
- * }</pre>
- */
-public class SetDifference extends SetOperationsBase
-{
-  private static final BagFactory bagFactory = BagFactory.getInstance();
-  
-  /**
-   * Loads the data bags from the input tuple and puts them in a priority queue,
-   * where ordering is determined by the data from the iterator for each bag.
-   * 
-   * <p>
-   * The bags are wrapped in a {@link Pair} object that is comparable on the data
-   * currently available from the iterator.
-   * These objects are ordered first by the data, then by the index within the tuple
-   * the bag came from.
-   * </p>
-   *  
-   * @param input
-   * @return
-   * @throws IOException
-   */
-  private PriorityQueue<Pair> loadBags(Tuple input) throws IOException
-  {    
-    PriorityQueue<Pair> pq = new PriorityQueue<Pair>(input.size());
-
-    for (int i=0; i < input.size(); i++) 
-    {
-      if (input.get(i) != null)
-      {
-        Iterator<Tuple> inputIterator = ((DataBag)input.get(i)).iterator();      
-        if(inputIterator.hasNext())
-        {
-          pq.add(new Pair(inputIterator,i));
-        }
-      }
-    }
-    return pq;
-  }
-
-  /**
-   * Counts how many elements in the priority queue match the
-   * element at the front of the queue, which should be from the first bag. 
-   * 
-   * @param pq priority queue
-   * @return number of matches
-   */
-  public int countMatches(PriorityQueue<Pair> pq)
-  {
-    Pair nextPair = pq.peek();
-    Tuple data = nextPair.data;
-    
-    // sanity check
-    if (!nextPair.index.equals(0))
-    {
-      throw new RuntimeException("Expected next bag to have index 0");
-    }
-    
-    int matches = 0;
-    for (Pair p : pq) {
-      if (data.equals(p.data))
-        matches++;
-    }
-    // subtract 1 since element matches itself
-    return matches - 1;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    if (input.size() < 2)
-    {
-      throw new RuntimeException("Expected at least two inputs, but found " + input.size());
-    }
-    
-    for (Object o : input)
-    {
-      if (o != null && !(o instanceof DataBag))
-      {
-        throw new RuntimeException("Inputs must be bags");
-      }
-    }
-
-    DataBag outputBag = bagFactory.newDefaultBag();
-    
-    DataBag bag1 = (DataBag)input.get(0);
-    DataBag bag2 = (DataBag)input.get(1);
-    
-    if (bag1 == null || bag1.size() == 0)
-    {
-      return outputBag;
-    }
-    // optimization
-    else if (input.size() == 2 && (bag2 == null || bag2.size() == 0))
-    {
-      return bag1;
-    }
-    
-    PriorityQueue<Pair> pq = loadBags(input);
-    
-    Tuple lastData = null;
-
-    while (true) 
-    {
-      Pair nextPair = pq.peek();
-      
-      // ignore data we've already encountered
-      if (nextPair.data.compareTo(lastData) != 0)
-      {
-        // Only take data from the first bag, where there are no other
-        // bags that have the same data.
-        if (nextPair.index.equals(0) && countMatches(pq) == 0)
-        {
-          outputBag.add(nextPair.data);
-          lastData = nextPair.data;
-        }
-      }
-
-      Pair p = pq.poll();      
-      
-      // only put the bag back into the queue if it still has data
-      if (p.hasNext())
-      {
-        p.next();
-        pq.offer(p);
-      }
-      else if (p.index.equals(0))
-      {
-        // stop when we exhaust all elements from the first bag
-        break;
-      }
-    }
-
-    return outputBag;
-  }
-
-  /**
-   * A wrapper for the tuple iterator that implements comparable so it can be used in the priority queue.
-   * 
-   * <p>
-   * This is compared first on the data, then on the index the bag came from
-   * in the input tuple.
-   * </p>
-   * @author mhayes
-   *
-   */
-  private static class Pair implements Comparable<Pair>
-  {
-    private final Iterator<Tuple> it;
-    private final Integer index;
-    private Tuple data;
-
-    /**
-     * Constructs the {@link Pair}.
-     * 
-     * @param it tuple iterator
-     * @param index index within the tuple that the bag came from
-     */
-    public Pair(Iterator<Tuple> it, int index)
-    {
-      this.index = index;
-      this.it = it;
-      this.data = it.next();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public int compareTo(Pair o)
-    {
-      int r = this.data.compareTo(o.data);
-      if (r == 0)
-      {
-        return index.compareTo(o.index);
-      }
-      else
-      {
-        return r;
-      }
-    }
-    
-    public boolean hasNext()
-    {
-      return it.hasNext();
-    }
-    
-    @SuppressWarnings("unchecked")
-    public Tuple next()
-    {
-      Tuple nextData = it.next();
-      // algorithm assumes data is in order
-      if (data.compareTo(nextData) > 0)
-      {
-        throw new RuntimeException("Out of order!");
-      }
-      this.data = nextData;
-      return this.data;
-    }
-
-    @Override
-    public String toString()
-    {
-      return String.format("[%s within %d]",data,index);
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetIntersect.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetIntersect.java b/src/java/datafu/pig/sets/SetIntersect.java
deleted file mode 100644
index 0ba586e..0000000
--- a/src/java/datafu/pig/sets/SetIntersect.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.sets;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * Computes the set intersection of two or more bags.  Duplicates are eliminated. <b>The input bags must be sorted.</b>
- * <p>
- * Example:
- * <pre>
- * {@code
- * define SetIntersect datafu.pig.sets.SetIntersect();
- *
- * -- input:
- * -- ({(1,10),(2,20),(3,30),(4,40)},{(2,20),(4,40),(8,80)})
- * input = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
- *
- * input = FOREACH input {
- *   B1 = ORDER B1 BY val1 ASC, val2 ASC;
- *   B2 = ORDER B2 BY val1 ASC, val2 ASC;
- *
- *   -- output:
- *   -- ({(2,20),(4,40)})
- *   GENERATE SetIntersect(B1,B2);
- * }
- * }</pre>
- */
-public class SetIntersect extends SetOperationsBase
-{
-  private static final BagFactory bagFactory = BagFactory.getInstance();
-
-  static class pair implements Comparable<pair>
-  {
-    final Iterator<Tuple> it;
-    Tuple data;
-
-    pair(Iterator<Tuple> it)
-    {
-      this.it = it;
-      this.data = it.next();
-    }
-
-    @Override
-    public int compareTo(pair o)
-    {
-      return this.data.compareTo(o.data);
-    }
-  }
-
-  private PriorityQueue<pair> load_bags(Tuple input) throws IOException
-  {
-    PriorityQueue<pair> pq = new PriorityQueue<pair>(input.size());
-
-    for (int i=0; i < input.size(); i++) {
-      Object o = input.get(i);
-      if (!(o instanceof DataBag))
-        throw new RuntimeException("parameters must be databags");
-      Iterator<Tuple> inputIterator= ((DataBag) o).iterator();
-      if(inputIterator.hasNext())
-        pq.add(new pair(inputIterator));
-    }
-    return pq;
-  }
-
-  public boolean all_equal(PriorityQueue<pair> pq)
-  {
-    Object o = pq.peek().data;
-    for (pair p : pq) {
-      if (!o.equals(p.data))
-        return false;
-    }
-    return true;
-  }
-
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    DataBag outputBag = bagFactory.newDefaultBag();
-    PriorityQueue<pair> pq = load_bags(input);
-    if(pq.size() != input.size())
-      return outputBag; // one or more input bags were empty
-    Tuple last_data = null;
-
-    while (true) {
-      if (pq.peek().data.compareTo(last_data) != 0 && all_equal(pq)) {
-        last_data = pq.peek().data;
-        outputBag.add(last_data);
-      }
-
-      pair p = pq.poll();
-      if (!p.it.hasNext())
-        break;
-      Tuple nextData = p.it.next();
-      // algorithm assumes data is in order
-      if (p.data.compareTo(nextData) > 0)
-      {
-        throw new RuntimeException("Out of order!");
-      }
-      p.data = nextData;
-      pq.offer(p);
-    }
-
-    return outputBag;
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetOperationsBase.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetOperationsBase.java b/src/java/datafu/pig/sets/SetOperationsBase.java
deleted file mode 100644
index c9997f8..0000000
--- a/src/java/datafu/pig/sets/SetOperationsBase.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.sets;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Base class for set operations.
- * 
- * @author "Matthew Hayes <mhayes@linkedin.com>"
- *
- */
-public abstract class SetOperationsBase extends EvalFunc<DataBag>
-{
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {      
-      for (Schema.FieldSchema fieldSchema : input.getFields())
-      {
-        if (fieldSchema.type != DataType.BAG)
-        {
-          throw new RuntimeException("Expected a bag but got: " + DataType.findTypeName(fieldSchema.type));
-        }
-      }
-      
-      Schema bagSchema = input.getField(0).schema;
-                  
-      Schema outputSchema = new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
-                                                             .getName()
-                                                             .toLowerCase(), input),
-                                           bagSchema,
-                                           DataType.BAG));
-            
-      return outputSchema;
-    }
-    catch (Exception e) {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/SetUnion.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/SetUnion.java b/src/java/datafu/pig/sets/SetUnion.java
deleted file mode 100644
index 49fd99c..0000000
--- a/src/java/datafu/pig/sets/SetUnion.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.sets;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * Computes the set union of two or more bags.  Duplicates are eliminated.
- * <p>
- * Example:
- * <pre>
- * {@code
- * define SetUnion datafu.pig.sets.SetUnion();
- * 
- * -- input:
- * -- ({(2,20),(3,30),(4,40)},{(1,10),(2,20),(4,40),(8,80)})
- * input = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
- * 
- * -- output:
- * -- ({(2,20),(3,30),(4,40),(1,10),(8,80)})
- * output = FOREACH input GENERATE SetUnion(B1,B2);
- * }
- * </pre>
- */
-public class SetUnion extends SetOperationsBase
-{
-  private static final BagFactory bagFactory = BagFactory.getInstance();
-  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    DataBag outputBag = bagFactory.newDistinctBag();
-
-    try {
-      for (int i=0; i < input.size(); i++) {
-        Object o = input.get(i);
-        if (!(o instanceof DataBag))
-          throw new RuntimeException("parameters must be databags");
-
-        DataBag inputBag = (DataBag) o;
-        for (Tuple elem : inputBag) {
-          outputBag.add(elem);
-        }
-      }
-
-      return outputBag;
-    }
-    catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/sets/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sets/package-info.java b/src/java/datafu/pig/sets/package-info.java
deleted file mode 100644
index 2c5b087..0000000
--- a/src/java/datafu/pig/sets/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * UDFs for set operations such as intersect and union.
- */
-package datafu.pig.sets;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/DoubleVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/DoubleVAR.java b/src/java/datafu/pig/stats/DoubleVAR.java
deleted file mode 100644
index ab8eed2..0000000
--- a/src/java/datafu/pig/stats/DoubleVAR.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR} 
-*/
-public class DoubleVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            Double sum = sum(input);
-            Double sumSquare = sumSquare(input);
-            
-            if(sum == null) {
-                // either we were handed an empty bag or a bag
-                // filled with nulls - return null in this case
-                return null;
-            }
-            long count = count(input);
-
-            Double var = null;
-            if (count > 0){
-                Double avg = new Double(sum / count);
-                Double avgSquare = new Double(sumSquare / count);
-                var = avgSquare - avg*avg;
-            }
-    
-            return var;
-        } catch (ExecException ee) {
-            throw ee;
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            Tuple t = mTupleFactory.newTuple(3);
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to get variance 
-                DataBag bg = (DataBag) input.get(0);
-                Double d = null;
-                Iterator<Tuple> iter = bg.iterator();
-                if(iter.hasNext()) {
-                    Tuple tp = iter.next();
-                    d = (Double)tp.get(0);
-                }
-                
-                if (iter.hasNext())
-                {
-                  throw new RuntimeException("Expected only one tuple in bag");
-                }
-                
-                if (d == null){
-                  t.set(0, null);
-                  t.set(1, null);
-                  t.set(2, 0L);
-                }
-                else {
-                  t.set(0, d);
-                  t.set(1, d*d);
-                  t.set(2, 1L);
-                }
-                return t;
-            } catch(NumberFormatException nfe) {
-                nfe.printStackTrace();
-                // invalid input,
-                // treat this input as null
-                try {
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                } catch (ExecException e) {
-                    throw e;
-                }
-                return t;
-            } catch (ExecException ee) {
-                ee.printStackTrace();
-                throw ee;
-            } catch (Exception e) {
-                e.printStackTrace();
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-                
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                return combine(b);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            
-            }
-        }
-    }
-
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                Tuple combined = combine(b);
-
-                Double sum = (Double)combined.get(0);
-                Double sumSquare = (Double)combined.get(1);
-                if(sum == null) {
-                    return null;
-                }
-                Long count = (Long)combined.get(2);
-
-                Double var = null;
-                
-                if (count > 0) {
-                    Double avg = new Double(sum / count);
-                    Double avgSquare = new Double(sumSquare / count);
-                    var = avgSquare - avg*avg;
-                }
-                return var;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Tuple combine(DataBag values) throws ExecException{
-        double sum = 0;
-        double sumSquare = 0;
-        long totalCount = 0;
-
-        // combine is called from Intermediate and Final
-        // In either case, Initial would have been called
-        // before and would have sent in valid tuples
-        // Hence we don't need to check if incoming bag
-        // is empty
-
-        Tuple output = mTupleFactory.newTuple(3);
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            Double d = (Double)t.get(0);
-            Double dSquare = (Double)t.get(1);
-            Long count = (Long)t.get(2);
-            
-            // we count nulls in var as contributing 0
-            // a departure from SQL for performance of
-            // COUNT() which implemented by just inspecting
-            // size of the bag
-            if(d == null) {
-                d = 0.0;
-                dSquare = 0.0;
-            } else {
-                sawNonNull = true;
-            }
-            sum += d;
-            sumSquare += dSquare;
-            totalCount += count;
-        }
-        if(sawNonNull) {
-            output.set(0, new Double(sum));
-            output.set(1, new Double(sumSquare));
-        } else {
-            output.set(0, null);
-            output.set(1, null);
-        }
-        output.set(2, Long.valueOf(totalCount));
-        return output;
-    }
-
-    static protected long count(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        long cnt = 0;
-        Iterator<Tuple> it = values.iterator();
-        while (it.hasNext()){
-            Tuple t = (Tuple)it.next();
-            if (t != null && t.size() > 0 && t.get(0) != null)
-                cnt ++;
-        }
-                    
-        return cnt;
-    }
-
-    static protected Double sum(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-       
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Double d= (Double)t.get(0);
-                if (d == null) continue;
-                sawNonNull = true;
-                sum += d;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-    }
-    
-    static protected Double sumSquare(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sumSquare = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Double d = (Double)t.get(0);
-                if (d == null) continue;
-                sawNonNull = true;
-                sumSquare += d*d;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of squared values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(sumSquare);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
-    }
-
-    /* Accumulator interface implementation */
-    private Double intermediateSumSquare = null;
-    private Double intermediateSum = null;
-    private Long intermediateCount = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double sum = sum(b);
-            if(sum == null) {
-                return;
-            }
-            
-            Double sumSquare = sumSquare(b);
-            if(sumSquare == null) {
-                return;
-            }
-            
-            // set default values
-            if (intermediateSum == null || intermediateCount == null) {
-                intermediateSumSquare = 0.0;
-                intermediateSum = 0.0;
-                intermediateCount = (long) 0;
-            }
-            
-            long count = (Long)count(b);
-
-            if (count > 0) {
-                intermediateCount += count;
-                intermediateSum += sum;
-                intermediateSumSquare += sumSquare;
-            }
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSumSquare = null;
-        intermediateSum = null;
-        intermediateCount = null;
-    }
-
-    @Override
-    public Double getValue() {
-        Double var = null;
-        if (intermediateCount != null && intermediateCount > 0) {
-            Double avg = new Double(intermediateSum / intermediateCount);
-            Double avgSquare = new Double(intermediateSumSquare / intermediateCount);
-            var = avgSquare - avg*avg;
-        }
-        return var;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/FloatVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/FloatVAR.java b/src/java/datafu/pig/stats/FloatVAR.java
deleted file mode 100644
index 8a697e0..0000000
--- a/src/java/datafu/pig/stats/FloatVAR.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR} 
-*/
-public class FloatVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            Double sum = sum(input);
-            Double sumSquare = sumSquare(input);
-            
-            if(sum == null) {
-                // either we were handed an empty bag or a bag
-                // filled with nulls - return null in this case
-                return null;
-            }
-            long count = count(input);
-
-            Double var = null;
-            if (count > 0){
-                Double avg = new Double(sum / count);
-                Double avgSquare = new Double(sumSquare / count);
-                var = avgSquare - avg*avg;
-            }
-    
-            return var;
-        } catch (ExecException ee) {
-            throw ee;
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            Tuple t = mTupleFactory.newTuple(3);
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to get variance 
-                DataBag bg = (DataBag) input.get(0);
-                Float f = null;
-                Iterator<Tuple> iter = bg.iterator();
-                if(iter.hasNext()) {
-                    Tuple tp = iter.next();
-                    f = (Float)tp.get(0);
-                }
-                
-                if (iter.hasNext())
-                {
-                  throw new RuntimeException("Expected only one tuple in bag");
-                }
-                
-                Double d = f!= null ? new Double(f): null;
-                
-                if (f == null){
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                }
-                else {
-                    t.set(0, d);
-                    t.set(1, d*d);
-                    t.set(2, 1L);
-                }
-                return t;
-            } catch(NumberFormatException nfe) {
-                nfe.printStackTrace();
-                // invalid input,
-                // treat this input as null
-                try {
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                } catch (ExecException e) {
-                    throw e;
-                }
-                return t;
-            } catch (ExecException ee) {
-                ee.printStackTrace();
-                throw ee;
-            } catch (Exception e) {
-                e.printStackTrace();
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-                
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                return combine(b);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            
-            }
-        }
-    }
-
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                Tuple combined = combine(b);
-
-                Double sum = (Double)combined.get(0);
-                Double sumSquare = (Double)combined.get(1);
-                if(sum == null) {
-                    return null;
-                }
-                Long count = (Long)combined.get(2);
-
-                Double var = null;
-                
-                if (count > 0) {
-                    Double avg = new Double(sum / count);
-                    Double avgSquare = new Double(sumSquare / count);
-                    var = avgSquare - avg*avg;
-                }
-                return var;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Tuple combine(DataBag values) throws ExecException{
-        double sum = 0;
-        double sumSquare = 0;
-        long totalCount = 0;
-
-        // combine is called from Intermediate and Final
-        // In either case, Initial would have been called
-        // before and would have sent in valid tuples
-        // Hence we don't need to check if incoming bag
-        // is empty
-
-        Tuple output = mTupleFactory.newTuple(3);
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            Double d = (Double)t.get(0);
-            Double dSquare = (Double)t.get(1);
-            Long count = (Long)t.get(2);
-            
-            // we count nulls in var as contributing 0
-            // a departure from SQL for performance of
-            // COUNT() which implemented by just inspecting
-            // size of the bag
-            if(d == null) {
-                d = 0.0;
-                dSquare = 0.0;
-            } else {
-                sawNonNull = true;
-            }
-            sum += d;
-            sumSquare += dSquare;
-            totalCount += count;
-        }
-        if(sawNonNull) {
-            output.set(0, new Double(sum));
-            output.set(1, new Double(sumSquare));
-        } else {
-            output.set(0, null);
-            output.set(1, null);
-        }
-        output.set(2, Long.valueOf(totalCount));
-        return output;
-    }
-
-    static protected long count(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        long cnt = 0;
-        Iterator<Tuple> it = values.iterator();
-        while (it.hasNext()){
-            Tuple t = (Tuple)it.next();
-            if (t != null && t.size() > 0 && t.get(0) != null)
-                cnt ++;
-        }
-                    
-        return cnt;
-    }
-
-    static protected Double sum(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Float f= (Float)t.get(0);
-                if (f == null) continue;
-                sawNonNull = true;
-                sum += f;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(sum);
-        } else {
-            return null;
-        }
-    }
-    
-    static protected Double sumSquare(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        double sumSquare = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Float f = (Float)t.get(0);
-                if (f == null) continue;
-                sawNonNull = true;
-                sumSquare += f*f;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of squared values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Double(sumSquare);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
-    }
-
-    /* Accumulator interface implementation */
-    private Double intermediateSumSquare = null;
-    private Double intermediateSum = null;
-    private Long intermediateCount = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Double sum = sum(b);
-            if(sum == null) {
-                return;
-            }
-            
-            Double sumSquare = sumSquare(b);
-            if(sumSquare == null) {
-                return;
-            }
-            
-            // set default values
-            if (intermediateSum == null || intermediateCount == null) {
-                intermediateSumSquare = 0.0;
-                intermediateSum = 0.0;
-                intermediateCount = (long) 0;
-            }
-            
-            long count = (Long)count(b);
-
-            if (count > 0) {
-                intermediateCount += count;
-                intermediateSum += sum;
-                intermediateSumSquare += sumSquare;
-            }
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSumSquare = null;
-        intermediateSum = null;
-        intermediateCount = null;
-    }
-
-    @Override
-    public Double getValue() {
-        Double var = null;
-        if (intermediateCount != null && intermediateCount > 0) {
-            Double avg = new Double(intermediateSum / intermediateCount);
-            Double avgSquare = new Double(intermediateSumSquare / intermediateCount);
-            var = avgSquare - avg*avg;
-        }
-        return var;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java
deleted file mode 100644
index 0ebd94f..0000000
--- a/src/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
- * 
- * <p>
- * This uses the implementation of HyperLogLog++ from <a href="https://github.com/addthis/stream-lib" target="_blank">stream-lib</a>.
- * The HyperLogLog++ algorithm is an enhanced version of HyperLogLog as described in 
- * <a href="http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en/us/pubs/archive/40671.pdf">here</a>.
- * </p>
- * 
- * <p>
- * This is a streaming implementation, and therefore the input data does not need to be sorted.
- * </p>
- * 
- * @author mhayes
- *
- */
-public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
-{
-  private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus estimator;
-  
-  private final int p;
-  
-  /**
-   * Constructs a HyperLogLog++ estimator.
-   */
-  public HyperLogLogPlusPlus()
-  {
-    this("20");
-  }
-  
-  /**
-   * Constructs a HyperLogLog++ estimator.
-   * 
-   * @param p precision value
-   */
-  public HyperLogLogPlusPlus(String p)
-  {
-    this.p = Integer.parseInt(p);
-    cleanup();
-  }
-  
-  @Override
-  public void accumulate(Tuple arg0) throws IOException
-  {
-    DataBag inputBag = (DataBag)arg0.get(0);
-    for (Tuple t : inputBag) 
-    {
-      estimator.offer(t);
-    }
-  }
-
-  @Override
-  public void cleanup()
-  {
-    this.estimator = new com.clearspring.analytics.stream.cardinality.HyperLogLogPlus(p);
-  }
-
-  @Override
-  public Long getValue()
-  {
-    return this.estimator.cardinality();
-  }
-  
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected input to have only a single field");
-      }
-      
-      Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-      if (inputFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-      
-      return new Schema(new Schema.FieldSchema(null, DataType.LONG));
-    }
-    catch (FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/IntVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/IntVAR.java b/src/java/datafu/pig/stats/IntVAR.java
deleted file mode 100644
index 0f34d8e..0000000
--- a/src/java/datafu/pig/stats/IntVAR.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR} 
-*/
-public class IntVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            Long sum = sum(input);
-            Long sumSquare = sumSquare(input);
-            
-            if(sum == null) {
-                // either we were handed an empty bag or a bag
-                // filled with nulls - return null in this case
-                return null;
-            }
-            long count = count(input);
-
-            Double var = null;
-            if (count > 0){
-                Double avg = new Double((double)sum / count);
-                Double avgSquare = new Double((double)sumSquare / count);
-                var = avgSquare - avg*avg;
-            }
-    
-            return var;
-        } catch (ExecException ee) {
-            throw ee;
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            Tuple t = mTupleFactory.newTuple(3);
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to get variance 
-                DataBag bg = (DataBag) input.get(0);
-                Integer i = null;
-                Iterator<Tuple> iter = bg.iterator();
-                if(iter.hasNext()) {
-                    Tuple tp = iter.next();
-                    i = (Integer)tp.get(0);
-                }
-                
-                if (iter.hasNext())
-                {
-                  throw new RuntimeException("Expected only one tuple in bag");
-                }
-                
-                if (i == null) {
-                    t.set(2, 0L);
-                    t.set(0, null);
-                    t.set(1, null);
-                }
-                else {
-                    t.set(2, 1L);
-                    t.set(0, (long)i);
-                    t.set(1, (long)i*i);
-                }
-                return t;
-            } catch(NumberFormatException nfe) {
-                nfe.printStackTrace();
-                // invalid input,
-                // treat this input as null
-                try {
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                } catch (ExecException e) {
-                    throw e;
-                }
-                return t;
-            } catch (ExecException ee) {
-                ee.printStackTrace();
-                throw ee;
-            } catch (Exception e) {
-                e.printStackTrace();
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-                
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                return combine(b);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                Tuple combined = combine(b);
-
-                Long sum = (Long)combined.get(0);
-                Long sumSquare = (Long)combined.get(1);
-                if(sum == null) {
-                    return null;
-                }
-                Long count = (Long)combined.get(2);
-
-                Double var = null;
-                
-                if (count > 0) {
-                    Double avg = new Double((double)sum / count);
-                    Double avgSquare = new Double((double)sumSquare / count);
-                    var = avgSquare - avg*avg;
-                }
-                return var;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Tuple combine(DataBag values) throws ExecException{
-        long sum = 0;
-        long sumSquare = 0;
-        long totalCount = 0;
-
-        // combine is called from Intermediate and Final
-        // In either case, Initial would have been called
-        // before and would have sent in valid tuples
-        // Hence we don't need to check if incoming bag
-        // is empty
-
-        Tuple output = mTupleFactory.newTuple(3);
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            Long i = (Long)t.get(0);
-            Long iSquare = (Long)t.get(1);
-            Long count = (Long)t.get(2);
-            
-            // we count nulls in var as contributing 0
-            // a departure from SQL for performance of
-            // COUNT() which implemented by just inspecting
-            // size of the bag
-            if(i == null) {
-                i = (long)0;
-                iSquare = (long)0;
-            } else {
-                sawNonNull = true;
-            }
-            sum += i;
-            sumSquare += iSquare;
-            totalCount += count;
-        }
-        if(sawNonNull) {
-            output.set(0, new Long(sum));
-            output.set(1, new Long(sumSquare));
-        } else {
-            output.set(0, null);
-            output.set(1, null);
-        }
-        output.set(2, Long.valueOf(totalCount));
-        return output;
-    }
-
-    static protected long count(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        long cnt = 0;
-        Iterator<Tuple> it = values.iterator();
-        while (it.hasNext()){
-            Tuple t = (Tuple)it.next();
-            if (t != null && t.size() > 0 && t.get(0) != null)
-                cnt ++;
-        }
-                    
-        return cnt;
-    }
-
-    static protected Long sum(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Integer i = (Integer)t.get(0);
-                if (i == null) continue;
-                sawNonNull = true;
-                sum += i;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Long(sum);
-        } else {
-            return null;
-        }
-    }
-    
-    static protected Long sumSquare(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sumSquare = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Integer i = (Integer)t.get(0);
-                if (i == null) continue;
-                sawNonNull = true;
-                sumSquare += i*i;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of squared values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Long(sumSquare);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
-    }
-
-    /* Accumulator interface implementation */
-    private Long intermediateSumSquare = null;
-    private Long intermediateSum = null;
-    private Long intermediateCount = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Long sum = sum(b);
-            if(sum == null) {
-                return;
-            }
-            
-            Long sumSquare = sumSquare(b);
-            if(sumSquare == null) {
-                return;
-            }
-            
-            // set default values
-            if (intermediateSum == null || intermediateCount == null) {
-                intermediateSumSquare = (long) 0;
-                intermediateSum = (long) 0;
-                intermediateCount = (long) 0;
-            }
-            
-            long count = (Long)count(b);
-
-            if (count > 0) {
-                intermediateCount += count;
-                intermediateSum += sum;
-                intermediateSumSquare += sumSquare;
-            }
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSumSquare = null;
-        intermediateSum = null;
-        intermediateCount = null;
-    }
-
-    @Override
-    public Double getValue() {
-        Double var = null;
-        if (intermediateCount != null && intermediateCount > 0) {
-            Double avg = new Double((double)intermediateSum / intermediateCount);
-            Double avgSquare = new Double((double)intermediateSumSquare / intermediateCount);
-            var = avgSquare - avg*avg;
-        }
-        return var;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/LongVAR.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/LongVAR.java b/src/java/datafu/pig/stats/LongVAR.java
deleted file mode 100644
index 561f15a..0000000
--- a/src/java/datafu/pig/stats/LongVAR.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.Algebraic;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.PigException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.backend.executionengine.ExecException;
-
-
-/**
-* Use {@link VAR} 
-*/
-public class LongVAR extends EvalFunc<Double> implements Algebraic, Accumulator<Double> {
-    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        try {
-            Long sum = sum(input);
-            Long sumSquare = sumSquare(input);
-            
-            if(sum == null) {
-                // either we were handed an empty bag or a bag
-                // filled with nulls - return null in this case
-                return null;
-            }
-            long count = count(input);
-
-            Double var = null;
-            if (count > 0){
-                Double avg = new Double((double)sum / count);
-                Double avgSquare = new Double((double)sumSquare / count);
-                var = avgSquare - avg*avg;
-            }
-    
-            return var;
-        } catch (ExecException ee) {
-            throw ee;
-        }
-    }
-
-    public String getInitial() {
-        return Initial.class.getName();
-    }
-
-    public String getIntermed() {
-        return Intermediate.class.getName();
-    }
-
-    public String getFinal() {
-        return Final.class.getName();
-    }
-
-    static public class Initial extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            Tuple t = mTupleFactory.newTuple(3);
-            try {
-                // input is a bag with one tuple containing
-                // the column we are trying to get variance 
-                DataBag bg = (DataBag) input.get(0);
-                Long l = null;
-                Iterator<Tuple> iter = bg.iterator();
-                if(iter.hasNext()) {
-                    Tuple tp = iter.next();
-                    l = (Long)tp.get(0);
-                }
-                
-                if (iter.hasNext())
-                {
-                  throw new RuntimeException("Expected only one tuple in bag");
-                }
-                
-                if (l == null) {
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                }
-                else { 
-                    t.set(0, l);
-                    t.set(1, l*l);
-                    t.set(2, 1L);
-                }
-                return t;
-            } catch(NumberFormatException nfe) {
-                nfe.printStackTrace();
-                // invalid input,
-                // treat this input as null
-                try {
-                    t.set(0, null);
-                    t.set(1, null);
-                    t.set(2, 0L);
-                } catch (ExecException e) {
-                    throw e;
-                }
-                return t;
-            } catch (ExecException ee) {
-                ee.printStackTrace();
-                throw ee;
-            } catch (Exception e) {
-                e.printStackTrace();
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-                
-        }
-    }
-
-    static public class Intermediate extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                return combine(b);
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variacne in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            
-            }
-        }
-    }
-
-    static public class Final extends EvalFunc<Double> {
-        @Override
-        public Double exec(Tuple input) throws IOException {
-            try {
-                DataBag b = (DataBag)input.get(0);
-                Tuple combined = combine(b);
-
-                Long sum = (Long)combined.get(0);
-                Long sumSquare = (Long)combined.get(1);
-                if(sum == null) {
-                    return null;
-                }
-                Long count = (Long)combined.get(2);
-
-                Double var = null;
-                
-                if (count > 0) {
-                    Double avg = new Double((double)sum / count);
-                    Double avgSquare = new Double((double)sumSquare / count);
-                    var = avgSquare - avg*avg;
-                }
-                return var;
-            } catch (ExecException ee) {
-                throw ee;
-            } catch (Exception e) {
-                int errCode = 2106;
-                String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
-    }
-
-    static protected Tuple combine(DataBag values) throws ExecException{
-        long sum = 0;
-        long sumSquare = 0;
-        long totalCount = 0;
-
-        // combine is called from Intermediate and Final
-        // In either case, Initial would have been called
-        // before and would have sent in valid tuples
-        // Hence we don't need to check if incoming bag
-        // is empty
-
-        Tuple output = mTupleFactory.newTuple(3);
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            Long l = (Long)t.get(0);
-            Long lSquare = (Long)t.get(1);
-            Long count = (Long)t.get(2);
-            
-            // we count nulls in var as contributing 0
-            // a departure from SQL for performance of
-            // COUNT() which implemented by just inspecting
-            // size of the bag
-            if(l == null) {
-                l = (long)0;
-                lSquare = (long)0;
-            } else {
-                sawNonNull = true;
-            }
-            sum += l;
-            sumSquare += lSquare;
-            totalCount += count;
-        }
-        if(sawNonNull) {
-            output.set(0, new Long(sum));
-            output.set(1, new Long(sumSquare));
-        } else {
-            output.set(0, null);
-            output.set(1, null);
-        }
-        output.set(2, Long.valueOf(totalCount));
-        return output;
-    }
-
-    static protected long count(Tuple input) throws ExecException {
-        DataBag values = (DataBag)input.get(0);
-        long cnt = 0;
-        Iterator<Tuple> it = values.iterator();
-        while (it.hasNext()){
-            Tuple t = (Tuple)it.next();
-            if (t != null && t.size() > 0 && t.get(0) != null)
-                cnt ++;
-        }
-                    
-        return cnt;
-    }
-
-    static protected Long sum(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sum = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Long l = (Long)t.get(0);
-                if (l == null) continue;
-                sawNonNull = true;
-                sum += l;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Long(sum);
-        } else {
-            return null;
-        }
-    }
-    
-    static protected Long sumSquare(Tuple input) throws ExecException, IOException {
-        DataBag values = (DataBag)input.get(0);
-        
-        // if we were handed an empty bag, return NULL
-        if(values.size() == 0) {
-            return null;
-        }
-
-        long sumSquare = 0;
-        boolean sawNonNull = false;
-        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            try{
-                Long l = (Long)t.get(0);
-                if (l == null) continue;
-                sawNonNull = true;
-                sumSquare += l*l;
-            }catch(RuntimeException exp) {
-                int errCode = 2103;
-                String msg = "Problem while computing sum of squared values.";
-                throw new ExecException(msg, errCode, PigException.BUG, exp);
-            }
-        }
-
-        if(sawNonNull) {
-            return new Long(sumSquare);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
-    }
-
-    /* Accumulator interface implementation */
-    private Long intermediateSumSquare = null;
-    private Long intermediateSum = null;
-    private Long intermediateCount = null;
-    
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        try {
-            Long sum = sum(b);
-            if(sum == null) {
-                return;
-            }
-            
-            Long sumSquare = sumSquare(b);
-            if(sumSquare == null) {
-                return;
-            }
-            
-            // set default values
-            if (intermediateSum == null || intermediateCount == null) {
-                intermediateSumSquare = (long) 0;
-                intermediateSum = (long) 0;
-                intermediateCount = (long) 0;
-            }
-            
-            long count = (Long)count(b);
-
-            if (count > 0) {
-                intermediateCount += count;
-                intermediateSum += sum;
-                intermediateSumSquare += sumSquare;
-            }
-        } catch (ExecException ee) {
-            throw ee;
-        } catch (Exception e) {
-            int errCode = 2106;
-            String msg = "Error while computing variance in " + this.getClass().getSimpleName();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        intermediateSumSquare = null;
-        intermediateSum = null;
-        intermediateCount = null;
-    }
-
-    @Override
-    public Double getValue() {
-        Double var = null;
-        if (intermediateCount != null && intermediateCount > 0) {
-            Double avg = new Double((double)intermediateSum / intermediateCount);
-            Double avgSquare = new Double((double)intermediateSumSquare / intermediateCount);
-            var = avgSquare - avg*avg;
-        }
-        return var;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/MarkovPairs.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/MarkovPairs.java b/src/java/datafu/pig/stats/MarkovPairs.java
deleted file mode 100644
index d3d7632..0000000
--- a/src/java/datafu/pig/stats/MarkovPairs.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-
-/**
- * Accepts a bag of tuples, with user supplied ordering, and generates pairs that can be used for
- * a Markov chain analysis. For example, if you had {(1), (4), (7)}, using the default lookahead of 1, you
- * get the pairs {
- *                ((1),(4)),
- *                ((4),(7))}
- * A lookahead factor tells the UDF how many steps in to the future to include. so, for a,b,c with a lookahead
- * of 2, a would be paired with both b and c.
- * The results are ordered are returned as ordered by the caller.
-*/
-
-public class MarkovPairs extends EvalFunc<DataBag>
-{
-  private static final BagFactory bagFactory = BagFactory.getInstance();
-  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-
-  private static long lookahead_steps;
-
-  private final int SPILL_THRESHOLD = 1000000;
-
-  public MarkovPairs()
-  {   
-      MarkovPairs.lookahead_steps = 1;
-  }
-  
-  public MarkovPairs(String lookahead_steps)
-  {   
-      MarkovPairs.lookahead_steps = Integer.valueOf(lookahead_steps);
-  }
-
-  /* start and end are inclusive. This forms transition pairs */
-  private void generatePairs(ArrayList<Tuple> input, int start, int end, DataBag outputBag)
-      throws ExecException
-  {
-    int count = 0;
-    for (int i = start; (i + 1)<= end; i++) {
-      Tuple elem1 = input.get(i);
-      
-      lookahead:
-      for (int j = i+1; j <= i + lookahead_steps; j++)
-      {
-        if (j > end) break lookahead;
-        Tuple elem2 = input.get(j);        
-        if (count >= SPILL_THRESHOLD) {
-          outputBag.spill();
-          count = 0;
-        }
-        outputBag.add(tupleFactory.newTuple(Arrays.asList(elem1, elem2)));
-        count ++;
-      }
-    }
-  }
-
-  @Override
-  public DataBag exec(Tuple input)
-      throws IOException
-  {
-    //things come in a tuple, in our case we have a bag (ordered views) passed. This is embedded in a length one tuple
-    
-    DataBag inputBag = (DataBag) input.get(0);         
-    ArrayList<Tuple> inputData = new ArrayList<Tuple>();
-
-    for (Tuple tuple : inputBag) {
-      inputData.add(tuple);
-    }
-
-    int inputSize = inputData.size();
-
-    try {
-      DataBag outputBag = bagFactory.newDefaultBag();
-
-      int startPos = 0;
-
-      int stopPos = inputSize - 1;
-      generatePairs(inputData, startPos, stopPos, outputBag);
-
-      // set startPos for the next bucket
-      startPos = stopPos + 1;
-      return outputBag;
-    }
-    catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      Schema tupleSchema = new Schema();
-                 
-      FieldSchema fieldSchema = input.getField(0);
-      
-      if (fieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException(String.format("Expected input schema to be BAG, but instead found %s",
-                                                 DataType.findTypeName(fieldSchema.type)));
-      }
-      
-      FieldSchema fieldSchema2 = fieldSchema.schema.getField(0);
-      
-      tupleSchema.add(new Schema.FieldSchema("elem1", fieldSchema2.schema));
-      tupleSchema.add(new Schema.FieldSchema("elem2", fieldSchema2.schema));
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
-                                               tupleSchema,
-                                               DataType.BAG));
-    }
-    catch (Exception e) {
-      return null;
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/Median.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/Median.java b/src/java/datafu/pig/stats/Median.java
deleted file mode 100644
index e33a84e..0000000
--- a/src/java/datafu/pig/stats/Median.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.stats;
-
-/**
- * Computes the {@link <a href="http://en.wikipedia.org/wiki/Median" target="_blank">median</a>} 
- * for a <b>sorted</b> input bag, using type R-2 estimation.  This is a convenience wrapper around Quantile.
- *
- * <p>
- * N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is 
- * done (e.g., group by 'day') if the data is too large.  That is, this isn't distributed median.
- * </p>
- * 
- * @see Quantile
- */
-public class Median extends Quantile
-{
-  public Median()
-  {
-    super("0.5");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/stats/Quantile.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/stats/Quantile.java b/src/java/datafu/pig/stats/Quantile.java
deleted file mode 100644
index 6fd42d3..0000000
--- a/src/java/datafu/pig/stats/Quantile.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.stats;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Computes {@link <a href="http://en.wikipedia.org/wiki/Quantile" target="_blank">quantiles</a>} 
- * for a <b>sorted</b> input bag, using type R-2 estimation.
- *
- * <p>
- * N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is 
- * done (e.g., group by 'day') if the data is too large.  That is, this isn't distributed quantiles.
- * </p>
- * 
- * <p>
- * Note that unlike datafu's StreamingQuantile algorithm, this implementation gives
- * <b>exact</b> quantiles.  But, it requires that the input bag to be sorted.  Quantile must spill to 
- * disk when the input data is too large to fit in memory, which will contribute to longer runtimes. 
- * Because StreamingQuantile implements accumulate it can be much more efficient than Quantile for 
- * large input bags which do not fit well in memory.
- * </p>
- * 
- * <p>The constructor takes a single integer argument that specifies the number of evenly-spaced 
- * quantiles to compute, e.g.,</p>
- * 
- * <ul>
- *   <li>Quantile('3') yields the min, the median, and the max
- *   <li>Quantile('5') yields the min, the 25th, 50th, 75th percentiles, and the max
- *   <li>Quantile('101') yields the min, the max, and all 99 percentiles.
- * </ul>
- * 
- * <p>Alternatively the constructor can take the explicit list of quantiles to compute, e.g.</p>
- *
- * <ul>
- *   <li>Quantile('0.0','0.5','1.0') yields the min, the median, and the max
- *   <li>Quantile('0.0','0.25','0.5','0.75','1.0') yields the min, the 25th, 50th, 75th percentiles, and the max
- * </ul>
- *
- * <p>The list of quantiles need not span the entire range from 0.0 to 1.0, nor do they need to be evenly spaced, e.g.</p>
- * 
- * <ul>
- *   <li>Quantile('0.5','0.90','0.95','0.99') yields the median, the 90th, 95th, and the 99th percentiles
- *   <li>Quantile('0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987') yields the 0.13th, 2.28th, 15.87th, 50th, 84.13th, 97.72nd, and 99.87th percentiles
- * </ul>
- * 
- * <p>
- * Example:
- * <pre>
- * {@code
- *
- * define Quantile datafu.pig.stats.Quantile('0.0','0.5','1.0');
-
- * -- input: 9,10,2,3,5,8,1,4,6,7
- * input = LOAD 'input' AS (val:int);
- *
- * grouped = GROUP input ALL;
- *
- * -- produces: (1,5.5,10)
- * quantiles = FOREACH grouped {
- *   sorted = ORDER input BY val;
- *   GENERATE Quantile(sorted);
- * }
- * }</pre></p>
- *
- * @see Median
- * @see StreamingQuantile
- */
-public class Quantile extends SimpleEvalFunc<Tuple>
-{
-  List<Double> quantiles;
-
-  private static class Pair<T1,T2>
-  {
-    public T1 first;
-    public T2 second;
-
-    public Pair(T1 first, T2 second) {
-      this.first = first;
-      this.second = second;
-    }
-  }
-  
-  // For the output schema, label the quantiles 0, 1, 2, ... n
-  // Otherwise label the quantiles based on the quantile value.
-  // e.g. 50% quantile 0.5 will be labeled as 0_5
-  private boolean ordinalOutputSchema;
-
-  public Quantile(String... k)
-  {
-    this.quantiles = QuantileUtil.getQuantilesFromParams(k);
-    
-    if (k.length == 1 && Double.parseDouble(k[0]) > 1.0) 
-    {
-      this.ordinalOutputSchema = true;
-      this.quantiles = QuantileUtil.getQuantilesFromParams(k);
-    }
-    else
-    {
-      this.quantiles = QuantileUtil.getQuantilesFromParams(k);
-    }
-  }
-
-  private static Pair<Long, Long> getIndexes(double k, long N)
-  {
-    double h = N*k + 0.5;
-    long i1 = Math.min(Math.max(1, (long)Math.ceil(h - 0.5)), N);
-    long i2 = Math.min(Math.max(1, (long)Math.floor(h + 0.5)), N);
-
-    return new Pair<Long, Long>(i1, i2);
-  }
-  
-  public Tuple call(DataBag bag) throws IOException
-  {
-    if (bag == null || bag.size() == 0)
-      return null;
-
-    Map<Long, Double> d = new HashMap<Long, Double>();
-    long N = bag.size(), max_id = 1;
-    
-    for (double k : this.quantiles) {
-      Pair<Long, Long> idx = getIndexes(k, N);
-
-      d.put(idx.first, null);
-      d.put(idx.second, null);
-      max_id = Math.max(max_id, idx.second);
-    }
-
-    long i = 1;
-    for (Tuple t : bag) {
-      if (i > max_id)
-        break;
-
-      if (d.containsKey(i)) {
-        Object o = t.get(0);
-        if (!(o instanceof Number))
-          throw new IllegalStateException("bag must have numerical values (and be non-null)");
-        d.put(i, ((Number) o).doubleValue());
-      }
-      i++;
-    }
-
-    Tuple t = TupleFactory.getInstance().newTuple(this.quantiles.size());
-    int j = 0;
-    for (double k : this.quantiles) {
-      Pair<Long, Long> p = getIndexes(k, N);
-      double quantile = (d.get(p.first) + d.get(p.second)) / 2;
-      t.set(j, quantile);
-      j++;
-    }
-    return t;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    Schema tupleSchema = new Schema();
-    if (ordinalOutputSchema)
-    {
-      for (int i = 0; i < this.quantiles.size(); i++) 
-      {
-        tupleSchema.add(new Schema.FieldSchema("quantile_" + i, DataType.DOUBLE));
-      }
-    }
-    else
-    {
-      for (Double x : this.quantiles)
-        tupleSchema.add(new Schema.FieldSchema("quantile_" + x.toString().replace(".", "_"), DataType.DOUBLE));
-    }
-
-    try {
-      return new Schema(new FieldSchema(null, tupleSchema, DataType.TUPLE));
-    } catch(FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
-


Mime
View raw message