datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [09/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/CountEach.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/CountEach.java b/src/java/datafu/pig/bags/CountEach.java
deleted file mode 100644
index cfb0152..0000000
--- a/src/java/datafu/pig/bags/CountEach.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Generates a count of the number of times each distinct tuple appears in a bag.
- * 
- * <p>
- * Example:
- * <pre>
- * {@code
- * DEFINE CountEach datafu.pig.bags.CountEach();
- * DEFINE CountEachFlatten datafu.pig.bags.CountEach('flatten');
- * 
- * -- input: 
- * -- ({(A),(A),(C),(B)})
- * input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)});
- * 
- * -- output: 
- * -- {((A),2),((C),1),((B),1)}
- * output = FOREACH input GENERATE CountEach(B); 
- * 
- * -- output_flatten: 
- * -- ({(A,2),(C,1),(B,1)})
- * output_flatten = FOREACH input GENERATE CountEachFlatten(B);
- * } 
- * </pre>
- * </p>
- */
-public class CountEach extends AccumulatorEvalFunc<DataBag>
-{
-  private boolean flatten = false;
-  private Map<Tuple, Integer> counts = new HashMap<Tuple, Integer>();
-  
-  public CountEach() {
-    
-  }
-  
-  public CountEach(String arg) {
-    if (arg != null && arg.toLowerCase().equals("flatten")) {
-      flatten = true;
-    }
-  }
-
-  @Override
-  public void accumulate(Tuple input) throws IOException
-  {
-    DataBag inputBag = (DataBag)input.get(0);
-    if (inputBag == null) throw new IllegalArgumentException("Expected a bag, got null");
-    
-    for (Tuple tuple : inputBag) {
-      if (!counts.containsKey(tuple)) {
-        counts.put(tuple, 0);
-      }
-      counts.put(tuple, counts.get(tuple)+1);
-    }
-  }
-
-  @Override
-  public DataBag getValue()
-  {
-    DataBag output = BagFactory.getInstance().newDefaultBag();
-    for (Tuple tuple : counts.keySet()) {
-      Tuple outputTuple = null;
-      Tuple innerTuple = TupleFactory.getInstance().newTuple(tuple.getAll());
-      if (flatten) {        
-        innerTuple.append(counts.get(tuple));
-        outputTuple = innerTuple;
-      } else {
-        outputTuple = TupleFactory.getInstance().newTuple();
-        outputTuple.append(innerTuple);
-        outputTuple.append(counts.get(tuple));
-      }
-      output.add(outputTuple);
-    }
-
-    return output;
-  }
-
-  @Override
-  public void cleanup()
-  {
-    counts.clear();
-  }
-  
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected input to have one field");
-      }
-      
-      Schema.FieldSchema bagFieldSchema = input.getField(0);
-
-      if (bagFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-      
-      Schema inputBagSchema = bagFieldSchema.schema;
-
-      if (inputBagSchema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
-      }      
-      
-      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
-      if (inputTupleSchema == null) inputTupleSchema = new Schema();
-      
-      Schema outputTupleSchema = null;
-      
-      if (this.flatten) {
-        outputTupleSchema = inputTupleSchema.clone();
-        outputTupleSchema.add(new Schema.FieldSchema("count", DataType.INTEGER));
-      } else {        
-        outputTupleSchema = new Schema();
-        outputTupleSchema.add(new Schema.FieldSchema("tuple_schema", inputTupleSchema.clone(), DataType.TUPLE));
-        outputTupleSchema.add(new Schema.FieldSchema("count", DataType.INTEGER));
-      }
-      
-      return new Schema(new Schema.FieldSchema(
-            getSchemaName(this.getClass().getName().toLowerCase(), input),
-            outputTupleSchema, 
-            DataType.BAG));
-    }
-    catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-    catch (FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/DistinctBy.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/DistinctBy.java b/src/java/datafu/pig/bags/DistinctBy.java
deleted file mode 100644
index a79e4de..0000000
--- a/src/java/datafu/pig/bags/DistinctBy.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Get distinct elements in a bag by a given set of field positions.
- * The input and output schemas will be identical.  
- * 
- * The first tuple containing each distinct combination of these fields will be taken.
- * 
- * This operation is order preserving.  If both A and B appear in the output,
- * and A appears before B in the input, then A will appear before B in the output.
- * 
- * Example:
- * <pre>
- * {@code
- * define DistinctBy datafu.pig.bags.DistinctBy('0');
- * 
- * -- input:
- * -- ({(a, 1),(a,1),(b, 2),(b,22),(c, 3),(d, 4)})
- * input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)});
- * 
- * output = FOREACH input GENERATE DistinctBy(B);
- * 
- * -- output:
- * -- ({(a,1),(b,2),(c,3),(d,4)})
- * } 
- * </pre>
- * 
- * @param map Any number of strings specifying field positions
- */
-public class DistinctBy extends AccumulatorEvalFunc<DataBag>
-{
-  private HashSet<Integer> fields = new HashSet<Integer>();
-  private HashSet<Tuple> seen = new HashSet<Tuple>();
-  private DataBag outputBag;
-  
-  public DistinctBy(String... fields)
-  {
-    for(String field : fields) {
-      this.fields.add(Integer.parseInt(field));
-    }
-    cleanup();
-  }
-
-  @Override
-  public void accumulate(Tuple input) throws IOException
-  {
-    if (input.size() != 1) {
-      throw new RuntimeException("Expected input to have only a single field");
-    }    
-    if (input.getType(0) != DataType.BAG) {
-      throw new RuntimeException("Expected a BAG as input");
-    }
-    
-    DataBag inputBag = (DataBag)input.get(0);
-    for (Tuple t : inputBag) {
-      Tuple distinctFieldTuple = getDistinctFieldTuple(t, this.fields);
-      if (!seen.contains(distinctFieldTuple)) {
-        outputBag.add(t);
-        seen.add(distinctFieldTuple);
-      }
-    }
-  }
-
-  @Override
-  public void cleanup()
-  {
-    seen.clear();
-    outputBag = BagFactory.getInstance().newDefaultBag();
-  }
-
-  @Override
-  public DataBag getValue()
-  {
-    return outputBag;
-  }
-  
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected input to have only a single field");
-      }
-      
-      Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-      if (inputFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-      
-      Schema inputBagSchema = inputFieldSchema.schema;
-
-      if (inputBagSchema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
-      }
-      
-      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
-      
-      Schema outputTupleSchema = inputTupleSchema.clone();     
-      
-      return new Schema(new Schema.FieldSchema(
-            getSchemaName(this.getClass().getName().toLowerCase(), input),
-            outputTupleSchema, 
-            DataType.BAG));
-    }
-    catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-    catch (FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  private Tuple getDistinctFieldTuple(Tuple t, HashSet<Integer> distinctFieldPositions) throws ExecException {
-    Tuple fieldTuple = TupleFactory.getInstance().newTuple(distinctFieldPositions.size());
-    int idx = 0;
-    for(int i=0; i<t.size(); i++) {
-      if (distinctFieldPositions.contains(i)) {
-        fieldTuple.set(idx, t.get(i));
-        idx++;
-      }
-    }
-    return fieldTuple;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/EmptyBagToNull.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/EmptyBagToNull.java b/src/java/datafu/pig/bags/EmptyBagToNull.java
deleted file mode 100644
index 5524ad5..0000000
--- a/src/java/datafu/pig/bags/EmptyBagToNull.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Returns null if the input is an empty bag; otherwise,
- * returns the input bag unchanged.
- */
-public class EmptyBagToNull extends EvalFunc<DataBag>
-{
-  @Override
-  public DataBag exec(Tuple tuple) throws IOException
-  {
-    if (tuple.size() == 0 || tuple.get(0) == null)
-      return null;
-    Object o = tuple.get(0);
-    if (o instanceof DataBag)
-    {
-      DataBag bag = (DataBag)o;
-      if (bag.size() == 0)
-      {
-        return null;
-      }
-      else
-      {
-        return bag;
-      }
-    }
-    else
-      throw new IllegalArgumentException("expected a null or a bag");
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    return input;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/EmptyBagToNullFields.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/EmptyBagToNullFields.java b/src/java/datafu/pig/bags/EmptyBagToNullFields.java
deleted file mode 100644
index 6933c28..0000000
--- a/src/java/datafu/pig/bags/EmptyBagToNullFields.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.ContextualEvalFunc;
-
-/**
- * For an empty bag, inserts a tuple having null values for all fields; 
- * otherwise, the input bag is returned unchanged.
- * 
- * <p>
- * This can be useful when performing FLATTEN on a bag from a COGROUP,
- * as FLATTEN on an empty bag produces no data.
- * </p>
- */
-public class EmptyBagToNullFields extends ContextualEvalFunc<DataBag>
-{
-  @Override
-  public DataBag exec(Tuple tuple) throws IOException
-  {
-    if (tuple.size() == 0 || tuple.get(0) == null)
-      return null;
-    Object o = tuple.get(0);
-    if (o instanceof DataBag)
-    {
-      DataBag bag = (DataBag)o;
-      if (bag.size() == 0)
-      {
-        // create a tuple with null values for all fields
-        int tupleSize = (Integer)getInstanceProperties().get("tuplesize");
-        return BagFactory.getInstance().newDefaultBag(Arrays.asList(TupleFactory.getInstance().newTuple(tupleSize)));
-      }
-      else
-      {
-        return bag;
-      }
-    }
-    else
-      throw new IllegalArgumentException("expected a null or a bag");
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try
-    {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected only a single field as input");
-      }
-      
-      if (input.getField(0).type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input, but found " + DataType.findTypeName(input.getField(0).type));
-      }
-      
-      // get the size of the tuple within the bag
-      int innerTupleSize = input.getField(0).schema.getField(0).schema.getFields().size();
-      getInstanceProperties().put("tuplesize", innerTupleSize);
-    }
-    catch (FrontendException e)
-    {
-      throw new RuntimeException(e);
-    }
-    return input;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/Enumerate.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/Enumerate.java b/src/java/datafu/pig/bags/Enumerate.java
deleted file mode 100644
index 8a0d072..0000000
--- a/src/java/datafu/pig/bags/Enumerate.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Enumerate a bag, appending to each tuple its index within the bag.
- * 
- * <p>
- * For example:
- * <pre>
- *   {(A),(B),(C),(D)} => {(A,0),(B,1),(C,2),(D,3)}
- * </pre>
- * The first constructor parameter (optional) dictates the starting index of the counting.
- * This UDF implements the accumulator interface, reducing DataBag materialization costs.
- * </p>
- *
- * <p>
- * Example:
- * <pre>
- * {@code
- * define Enumerate datafu.pig.bags.Enumerate('1');
- *
- * -- input:
- * -- ({(100),(200),(300),(400)})
- * input = LOAD 'input' as (B: bag{T: tuple(v2:INT)});
- *
- * -- output:
- * -- ({(100,1),(200,2),(300,3),(400,4)})
- * output = FOREACH input GENERATE Enumerate(B);
- * }
- * </pre>
- */
-public class Enumerate extends AccumulatorEvalFunc<DataBag>
-{
-  private final int start;
-  
-  private DataBag outputBag;
-  private long i;
-  private long count;
-
-  public Enumerate()
-  {
-    this("0");
-  }
-
-  public Enumerate(String start)
-  {
-    this.start = Integer.parseInt(start);
-    cleanup();
-  }
-  
-  @Override
-  public void accumulate(Tuple arg0) throws IOException
-  {
-    DataBag inputBag = (DataBag)arg0.get(0);
-    for (Tuple t : inputBag) {
-      Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
-      t1.append(i);
-      outputBag.add(t1);
-
-      if (count % 1000000 == 0) {
-        outputBag.spill();
-        count = 0;
-      }
-      i++;
-      count++;
-    }
-  }
-
-  @Override
-  public void cleanup()
-  {
-    this.outputBag = BagFactory.getInstance().newDefaultBag();
-    this.i = this.start;
-    this.count = 0;
-  }
-
-  @Override
-  public DataBag getValue()
-  {
-    return outputBag;
-  }
-  
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected input to have only a single field");
-      }
-      
-      Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-      if (inputFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-      
-      Schema inputBagSchema = inputFieldSchema.schema;
-
-      if (inputBagSchema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
-      }
-      
-      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
-      
-      Schema outputTupleSchema = inputTupleSchema.clone();
-      outputTupleSchema.add(new Schema.FieldSchema("i", DataType.LONG));
-      
-      return new Schema(new Schema.FieldSchema(
-            getSchemaName(this.getClass().getName().toLowerCase(), input),
-            outputTupleSchema, 
-            DataType.BAG));
-    }
-    catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-    catch (FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/FirstTupleFromBag.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/FirstTupleFromBag.java b/src/java/datafu/pig/bags/FirstTupleFromBag.java
deleted file mode 100644
index 1f24984..0000000
--- a/src/java/datafu/pig/bags/FirstTupleFromBag.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import datafu.pig.util.SimpleEvalFunc;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Returns the first tuple from a bag. Requires a second parameter that will be returned if the bag is empty.
- *
- * Example:
- * <pre>
- * {@code
- * define FirstTupleFromBag datafu.pig.bags.FirstTupleFromBag();
- *
- * -- input:
- * -- ({(a,1)})
- * input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)});
- *
- * output = FOREACH input GENERATE FirstTupleFromBag(B, null);
- *
- * -- output:
- * -- (a,1)
- * }
- * </pre>
- */
-
-public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
-{
-  public Tuple call(DataBag bag, Tuple defaultValue) throws IOException
-  {
-    for (Tuple t : bag) {
-      return t;
-    }
-    return defaultValue;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      return new Schema(input.getField(0).schema);
-    }
-    catch (Exception e) {
-      return null;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/NullToEmptyBag.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/NullToEmptyBag.java b/src/java/datafu/pig/bags/NullToEmptyBag.java
deleted file mode 100644
index 09fffb3..0000000
--- a/src/java/datafu/pig/bags/NullToEmptyBag.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Returns an empty bag if the input is null; otherwise,
- * returns the input bag unchanged.
- */
-public class NullToEmptyBag extends EvalFunc<DataBag>
-{
-  @Override
-  public DataBag exec(Tuple tuple) throws IOException
-  {
-    if (tuple.size() == 0 || tuple.get(0) == null)
-      return BagFactory.getInstance().newDefaultBag();
-    Object o = tuple.get(0);
-    if (o instanceof DataBag)
-      return (DataBag)o;
-    else
-      throw new IllegalArgumentException("expected a null or a bag");
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    return input;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/PrependToBag.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/PrependToBag.java b/src/java/datafu/pig/bags/PrependToBag.java
deleted file mode 100644
index 9292871..0000000
--- a/src/java/datafu/pig/bags/PrependToBag.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Prepends a tuple to a bag. 
- * 
- * <p>N.B. this copies the entire input bag, so don't use it for large bags.</p>
- * 
- * <p>
- * Example:
- * <pre>
- * {@code
- * define PrependToBag datafu.pig.bags.PrependToBag();
- * 
- * -- input:
- * -- ({(1),(2),(3)},(4))
- * -- ({(10),(20),(30),(40),(50)},(60))
- * input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-
- * -- output:
- * -- ({(4),(1),(2),(3)})
- * -- ({(60),(10),(20),(30),(40),(50)})
- * output = FOREACH input GENERATE PrependToBag(B,T) as B;
- * }
- * </pre>
- * </p>
- */
-public class PrependToBag extends SimpleEvalFunc<DataBag>
-{
-  public DataBag call(DataBag inputBag, Tuple t) throws IOException
-  {
-    DataBag outputBag = BagFactory.getInstance().newDefaultBag();
-    outputBag.add(t);
-    for (Tuple x : inputBag)
-      outputBag.add(x);
-    return outputBag;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
-              input.getField(0).schema, DataType.BAG));
-    }
-    catch (FrontendException e) {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/ReverseEnumerate.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/ReverseEnumerate.java b/src/java/datafu/pig/bags/ReverseEnumerate.java
deleted file mode 100644
index c86ffcf..0000000
--- a/src/java/datafu/pig/bags/ReverseEnumerate.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Enumerate a bag, appending to each tuple its index within the bag, with indices being produced in 
- * descending order. 
- * 
- * <p>
- * For example:
- * <pre>
- *   {(A),(B),(C),(D)} => {(A,3),(B,2),(C,1),(D,0)}
- * </pre>
- * The first constructor parameter (optional) dictates the starting index of the counting. As the
- * UDF requires the size of the bag for reverse counting, this UDF does <b>not</b> implement the
- * accumulator interface and suffers from the slight performance penalty of DataBag materialization.
- * </p>
- *
- * <p>
- * Example:
- * <pre>
- * {@code
- * define ReverseEnumerate datafu.pig.bags.ReverseEnumerate('1');
- *
- * -- input:
- * -- ({(100),(200),(300),(400)})
- * input = LOAD 'input' as (B: bag{T: tuple(v2:INT)});
- *
- * -- output:
- * -- ({(100,4),(200,3),(300,2),(400,1)})
- * output = FOREACH input GENERATE ReverseEnumerate(B);
- * }
- * </pre>
- * </p>
- */
-public class ReverseEnumerate extends SimpleEvalFunc<DataBag>
-{
-  private final int start;
-
-  public ReverseEnumerate()
-  {
-    this.start = 0;
-  }
-
-  public ReverseEnumerate(String start)
-  {
-    this.start = Integer.parseInt(start);
-  }
-  
-  public DataBag call(DataBag inputBag) throws IOException
-  {
-    DataBag outputBag = BagFactory.getInstance().newDefaultBag();
-    long i = start, count = 0;
-    i = inputBag.size() - 1 + start;
-
-    for (Tuple t : inputBag) {
-      Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
-      t1.append(i);
-      outputBag.add(t1);
-
-      if (count % 1000000 == 0) {
-        outputBag.spill();
-        count = 0;
-      }
-      i--;
-      count++;
-    }
-
-    return outputBag;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected input to have only a single field");
-      }
-      
-      Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-      if (inputFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-      
-      Schema inputBagSchema = inputFieldSchema.schema;
-
-      if (inputBagSchema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
-      }
-      
-      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
-      
-      Schema outputTupleSchema = inputTupleSchema.clone();
-      outputTupleSchema.add(new Schema.FieldSchema("i", DataType.LONG));
-      
-      return new Schema(new Schema.FieldSchema(
-            getSchemaName(this.getClass().getName().toLowerCase(), input),
-            outputTupleSchema, 
-            DataType.BAG));
-    }
-    catch (CloneNotSupportedException e) {
-      throw new RuntimeException(e);
-    }
-    catch (FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/UnorderedPairs.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/UnorderedPairs.java b/src/java/datafu/pig/bags/UnorderedPairs.java
deleted file mode 100644
index a1d149e..0000000
--- a/src/java/datafu/pig/bags/UnorderedPairs.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
-
-/**
- * Generates pairs of all items in a bag.
- * 
- * <p>
- * Example:
- * <pre>
- * {@code
- * define UnorderedPairs datafu.pig.bags.UnorderedPairs();
- * 
- * -- input:
- * -- ({(1),(2),(3),(4)})
- * input = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
- * 
- * -- output:
- * -- ({((1),(2)),((1),(3)),((1),(4)),((2),(3)),((2),(4)),((3),(4))})
- * output = FOREACH input GENERATE UnorderedPairs(B);
- * } 
- * </pre>
- * </p>
- */
-public class UnorderedPairs extends EvalFunc<DataBag>
-{
-  private static final BagFactory bagFactory = BagFactory.getInstance();
-  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    PigStatusReporter reporter = PigStatusReporter.getInstance();
-
-    try {
-      DataBag inputBag = (DataBag) input.get(0);
-      DataBag outputBag = bagFactory.newDefaultBag();
-      long i=0, j, cnt=0;
-
-      if (inputBag != null)
-      {
-        for (Tuple elem1 : inputBag) {
-          j = 0; 
-          for (Tuple elem2 : inputBag) {
-            if (j > i) {
-              outputBag.add(tupleFactory.newTuple(Arrays.asList(elem1, elem2)));
-              cnt++;
-            }
-            j++;
-  
-            if (reporter != null)
-              reporter.progress();
-  
-            if (cnt % 1000000 == 0) {
-              outputBag.spill();
-              cnt = 0;
-            }
-          }
-          i++;
-        }
-      }
-      
-      return outputBag;
-    }
-    catch (Exception e) {
-      throw new RuntimeException("Caught exception processing input of " + this.getClass().getName(), e);
-    }
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 1)
-      {
-        throw new RuntimeException("Expected input to have only a single field");
-      }
-      
-      Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-      if (inputFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-      
-      Schema inputBagSchema = inputFieldSchema.schema;
-
-      if (inputBagSchema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
-      }      
-      
-      Schema ouputTupleSchema = new Schema();
-      ouputTupleSchema.add(new Schema.FieldSchema("elem1", inputBagSchema.getField(0).schema.clone(), DataType.TUPLE));
-      ouputTupleSchema.add(new Schema.FieldSchema("elem2", inputBagSchema.getField(0).schema.clone(), DataType.TUPLE));
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
-                                               ouputTupleSchema, 
-                                               DataType.BAG));
-    }
-    catch (Exception e) {
-      return null;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/package-info.java b/src/java/datafu/pig/bags/package-info.java
deleted file mode 100644
index 214b837..0000000
--- a/src/java/datafu/pig/bags/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * A collection of general purpose UDFs for operating on bags.
- */
-package datafu.pig.bags;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/geo/HaversineDistInMiles.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/geo/HaversineDistInMiles.java b/src/java/datafu/pig/geo/HaversineDistInMiles.java
deleted file mode 100644
index d1e3988..0000000
--- a/src/java/datafu/pig/geo/HaversineDistInMiles.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.geo;
-
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Computes the distance (in miles) between two latitude-longitude pairs 
- * using the {@link <a href="http://en.wikipedia.org/wiki/Haversine_formula" target="_blank">Haversine formula</a>}.
- *
- * <p>
- * Example:
- * <pre>
- * {@code
- * -- input is a TSV of two latitude and longitude pairs
- * input = LOAD 'input' AS (lat1 : double, long1 : double, lat2 : double, long2 : double);
- * output = FOREACH input GENERATE datafu.pig.geo.HaversineDistInMiles(lat1, long1, lat2, long2) as distance;
- * }</pre></p>
- */
-public class HaversineDistInMiles extends SimpleEvalFunc<Double>
-{
-  public static final double EARTH_RADIUS = 3958.75;
-
-  public Double call(Double lat1, Double lng1, Double lat2, Double lng2)
-  {
-    if (lat1 == null || lng1 == null || lat2 == null || lng2 == null)
-      return null;
-
-    double d_lat = Math.toRadians(lat2-lat1);
-    double d_long = Math.toRadians(lng2-lng1);
-    double a = Math.sin(d_lat/2) * Math.sin(d_lat/2) +
-               Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
-               Math.sin(d_long/2) * Math.sin(d_long/2);
-    double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
-    return EARTH_RADIUS * c;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    return new Schema(new Schema.FieldSchema("dist", DataType.DOUBLE));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/geo/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/geo/package-info.java b/src/java/datafu/pig/geo/package-info.java
deleted file mode 100644
index 12b27b0..0000000
--- a/src/java/datafu/pig/geo/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * UDFs for geographic computations.
- */
-package datafu.pig.geo;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/hash/MD5.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/hash/MD5.java b/src/java/datafu/pig/hash/MD5.java
deleted file mode 100644
index b7b51da..0000000
--- a/src/java/datafu/pig/hash/MD5.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.hash;
-
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-import org.apache.commons.codec.binary.Base64;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Computes the MD5 value of a string and outputs it in hex (by default).
- * A method can be provided to the constructor, which may be either 'hex' or 'base64'.
- */
-public class MD5 extends SimpleEvalFunc<String>
-{
-  private final MessageDigest md5er;
-  private final boolean isBase64;
-  
-  public MD5()
-  {
-    this("hex");
-  }
-  
-  public MD5(String method)
-  {
-    if ("hex".equals(method))
-    {
-      isBase64 = false;
-    }
-    else if ("base64".equals(method))
-    {
-      isBase64 = true;
-    }
-    else
-    {
-      throw new IllegalArgumentException("Expected either hex or base64");
-    }
-    
-    try {
-      md5er = MessageDigest.getInstance("md5");
-    }
-    catch (NoSuchAlgorithmException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  public String call(String val)
-  {
-    if (isBase64)
-    {
-      return new String(Base64.encodeBase64(md5er.digest(val.getBytes())));
-    }
-    else
-    {
-      return new BigInteger(1, md5er.digest(val.getBytes())).toString(16);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/hash/SHA.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/hash/SHA.java b/src/java/datafu/pig/hash/SHA.java
deleted file mode 100644
index ff859e5..0000000
--- a/src/java/datafu/pig/hash/SHA.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.hash;
-
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-public class SHA extends SimpleEvalFunc<String> {
-	private final MessageDigest sha;
-
-	public SHA(){
-		this("256");
-	}
-	
-	public SHA(String algorithm){
-		try {
-			sha = MessageDigest.getInstance("SHA-"+algorithm);
-		} catch (NoSuchAlgorithmException e) {
-			throw new RuntimeException(e);
-		}
-	}
-	
-	public String call(String value){
-		return new BigInteger(1, sha.digest(value.getBytes())).toString(16);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/hash/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/hash/package-info.java b/src/java/datafu/pig/hash/package-info.java
deleted file mode 100644
index 320a029..0000000
--- a/src/java/datafu/pig/hash/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * UDFs for computing hashes from data.
- */
-package datafu.pig.hash;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/linkanalysis/PageRank.java b/src/java/datafu/pig/linkanalysis/PageRank.java
deleted file mode 100644
index 80ff567..0000000
--- a/src/java/datafu/pig/linkanalysis/PageRank.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.linkanalysis;
-
-import it.unimi.dsi.fastutil.ints.Int2IntMap;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.Accumulator;
-import org.apache.pig.AccumulatorEvalFunc;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * A UDF which implements {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}.
- * 
- * <p>  
- * This is not a distributed implementation.  Each graph is stored in memory while running the algorithm, with edges optionally 
- * spilled to disk to conserve memory.  This can be used to distribute the execution of PageRank on multiple
- * reasonably sized graphs.  It does not distribute execuion of PageRank for each individual graph.  Each graph is identified
- * by an integer valued topic ID.
- * </p>
- * 
- * <p>
- * If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs
- * many MapReduce jobs in a sequence to complete the PageRank iterations.
- * </p>
- * 
- * <p>
- * Each graph is represented through a bag of (source,edges) tuples.  The 'source' is an integer ID representing the source node.
- * The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples.  The 'dest' is an
- * integer ID representing the destination node.  The weight is a double representing how much the edge should be weighted.
- * For a standard PageRank implementation just use weight of 1.0.
- * </p>
- * 
- * <p>
- * The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.
- * </p>
- * 
- * <p>
- * There are several configurable options for this UDF, among them:
- * <p>
- * 
- * <ul>
- * <li>
- * <b>alpha</b>: Controls the PageRank alpha value.  The default is 0.85.  A higher value reduces the "random jump"
- * factor and causes the rank to be influenced more by edges. 
- * </li>
- * <li>
- * <b>max_iters</b>: The maximum number of iterations to run.  The default is 150.
- * </li>
- * <li>
- * <b>dangling_nodes</b>: How to handling "dangling nodes", i.e. nodes with no outgoing edges.  When "true" this is equivalent
- * to forcing a dangling node to have an outgoing edge to every other node in the graph.  The default is "false".
- * </li>
- * <li>
- * <b>tolerance</b>: A threshold which causes iterations to cease.  It is measured from the total change in ranks from each of
- * the nodes in the graph.  As the ranks settle on their final values the total change decreases.  This can be used
- * to stop iterations early.  The default is 1e-16. 
- * </li> 
- * <li>
- * <b>max_nodes_and_edges</b>: This is a control to prevent running out of memory.  As a graph is loaded, if the sum of edges
- * and nodes exceeds this value then it will stop.  It will not fail but PageRank will not be run on this graph.  Instead a null
- * value will be returned as a result.  The default is 100M.
- * </li>
- * <li>
- * <b>spill_to_edge_disk_storage</b>: Used to conserve memory.  When "true" it causes the edge data to be written to disk in a temp file instead
- * of being held in memory when the number of edges exceeds a threshold.  The nodes are still held in memory however.  
- * Each iteration of PageRank will stream through the edges stored on disk.  The default is "false".
- * </li>
- * <li>
- * <b>max_edges_in_memory</b>: When spilling edges to disk is enabled, this is the threshold which triggers that behavior.  The default is 30M.
- * </li>
- * </ul>
- * 
- * <p>
- * Parameters are configured by passing them in as a sequence of pairs into the UDF constructor.  For example, below the alpha value is set to
- * 0.87 and dangling nodes are enabled.  All arguments must be strings.
- * </p>
- * 
- * <p>
- * <pre>
- * {@code
- * define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
- * }
- * </pre>
- * </p>
- * 
- * <p>
- * Full example:
- * <pre>
- * {@code
- * 
- * topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
- * 
- * topic_edges_grouped = GROUP topic_edges by (topic, source) ;
- * topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
- *    group.topic as topic,
- *    group.source as source,
- *    topic_edges.(dest,weight) as edges;
- * 
- * topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic; 
- * 
- * topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
- *    group as topic,
- *    FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
- *
- * topic_ranks = FOREACH topic_ranks GENERATE
- *    topic, source, rank;
- * 
- * }
- * </pre>
- * </p> 
- */
-public class PageRank extends AccumulatorEvalFunc<DataBag>
-{
-  private final datafu.pig.linkanalysis.PageRankImpl graph = new datafu.pig.linkanalysis.PageRankImpl();
-
-  private int maxNodesAndEdges = 100000000;
-  private int maxEdgesInMemory = 30000000;
-  private double tolerance = 1e-16;
-  private int maxIters = 150;
-  private boolean useEdgeDiskStorage = false;
-  private boolean enableDanglingNodeHandling = false;
-  private boolean enableNodeBiasing = false;
-  private boolean aborted = false;
-  private float alpha = 0.85f;
-
-  TupleFactory tupleFactory = TupleFactory.getInstance();
-  BagFactory bagFactory = BagFactory.getInstance();
-  
-  public PageRank()
-  {
-    initialize();
-  }
-
-  public PageRank(String... parameters)
-  {
-    if (parameters.length % 2 != 0)
-    {
-      throw new RuntimeException("Invalid parameters list");
-    }
-
-    for (int i=0; i<parameters.length; i+=2)
-    {
-      String parameterName = parameters[i];
-      String value = parameters[i+1];
-      if (parameterName.equals("max_nodes_and_edges"))
-      {
-        maxNodesAndEdges = Integer.parseInt(value);
-      }
-      else if (parameterName.equals("max_edges_in_memory"))
-      {
-        maxEdgesInMemory = Integer.parseInt(value);
-      }
-      else if (parameterName.equals("tolerance"))
-      {
-        tolerance = Double.parseDouble(value);
-      }
-      else if (parameterName.equals("max_iters"))
-      {
-        maxIters = Integer.parseInt(value);
-      }
-      else if (parameterName.equals("spill_to_edge_disk_storage"))
-      {
-        useEdgeDiskStorage = Boolean.parseBoolean(value);
-      }
-      else if (parameterName.equals("dangling_nodes"))
-      {
-        enableDanglingNodeHandling = Boolean.parseBoolean(value);
-      }
-      else if (parameterName.equals("node_biasing"))
-      {
-        enableNodeBiasing = Boolean.parseBoolean(value);
-      }
-      else if (parameterName.equals("alpha"))
-      {
-        alpha = Float.parseFloat(value);
-      }
-    }
-
-    initialize();
-  }
-
-  private void initialize()
-  {
-    if (useEdgeDiskStorage)
-    {
-      this.graph.enableEdgeDiskCaching();
-    }
-    else
-    {
-      this.graph.disableEdgeDiskCaching();
-    }
-
-    if (enableDanglingNodeHandling)
-    {
-      this.graph.enableDanglingNodeHandling();
-    }
-    else
-    {
-      this.graph.disableDanglingNodeHandling();
-    }
-    
-    if (enableNodeBiasing)
-    {
-      this.graph.enableNodeBiasing();
-    }
-    else
-    {
-      this.graph.disableNodeBiasing();
-    }
-
-    this.graph.setEdgeCachingThreshold(maxEdgesInMemory);
-    this.graph.setAlpha(alpha);
-  }
-
-  @Override
-  public void accumulate(Tuple t) throws IOException
-  {
-    if (aborted)
-    {
-      return;
-    }
-    
-    DataBag bag = (DataBag) t.get(0);
-    if (bag == null || bag.size() == 0)
-      return;
-    
-    for (Tuple sourceTuple : bag) 
-    {
-      Integer sourceId = (Integer)sourceTuple.get(0);
-      DataBag edges = (DataBag)sourceTuple.get(1);
-      Double nodeBias = null;
-      if (enableNodeBiasing)
-      {
-        nodeBias = (Double)sourceTuple.get(2);
-      }
-
-      ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();
-
-      for (Tuple edgeTuple : edges)
-      {
-        Integer destId = (Integer)edgeTuple.get(0);
-        Double weight = (Double)edgeTuple.get(1);
-        HashMap<String,Object> edgeMap = new HashMap<String, Object>();
-        edgeMap.put("dest",destId);
-        edgeMap.put("weight",weight);
-        edgesMapList.add(edgeMap);
-      }
-
-      if (enableNodeBiasing)
-      {
-        graph.addNode(sourceId, edgesMapList, nodeBias.floatValue());
-      }
-      else
-      {
-        graph.addNode(sourceId, edgesMapList);
-      }
-
-      if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
-      {
-        System.out.println(String.format("There are too many nodes and edges (%d + %d > %d). Aborting.", graph.nodeCount(), graph.edgeCount(), maxNodesAndEdges));
-        aborted = true;
-        break;
-      }
-
-      reporter.progress();
-    }
-  }
-
-  @Override
-  public DataBag getValue()
-  {
-    if (aborted)
-    {
-      return null;
-    }
-    
-    System.out.println(String.format("Nodes: %d, Edges: %d", graph.nodeCount(), graph.edgeCount()));
-    
-    ProgressIndicator progressIndicator = getProgressIndicator();
-    System.out.println("Finished loading graph.");
-    long startTime = System.nanoTime();
-    System.out.println("Initializing.");
-    try
-    {
-      graph.init(progressIndicator);
-    }
-    catch (IOException e)
-    {
-      e.printStackTrace();
-      return null;
-    }
-    System.out.println(String.format("Done, took %f ms", (System.nanoTime() - startTime)/10.0e6));
-
-    float totalDiff;
-    int iter = 0;
-
-    System.out.println("Beginning iterations");
-    startTime = System.nanoTime();
-    do
-    {
-      // TODO log percentage complete every 5 minutes
-      try
-      {
-        totalDiff = graph.nextIteration(progressIndicator);
-      }
-      catch (IOException e)
-      {
-        e.printStackTrace();
-        return null;
-      }
-      iter++;
-    } while(iter < maxIters && totalDiff > tolerance);
-    System.out.println(String.format("Done, %d iterations took %f ms", iter, (System.nanoTime() - startTime)/10.0e6));
-
-    DataBag output = bagFactory.newDefaultBag();
-
-    for (Int2IntMap.Entry node : graph.getNodeIds())
-    {
-      int nodeId = node.getIntKey();
-      float rank = graph.getNodeRank(nodeId);
-      List nodeData = new ArrayList(2);
-      nodeData.add(nodeId);
-      nodeData.add(rank);
-      output.add(tupleFactory.newTuple(nodeData));
-    }
-
-    return output;
-  }
-
-  @Override
-  public void cleanup()
-  {
-    try
-    {
-      aborted = false;
-      this.graph.clear();
-    }
-    catch (IOException e)
-    { 
-      e.printStackTrace();
-    }
-  }
-
-  private ProgressIndicator getProgressIndicator()
-  {
-    return new ProgressIndicator()
-        {
-          @Override
-          public void progress()
-          {
-            reporter.progress();
-          }
-        };
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try
-    {
-      Schema.FieldSchema inputFieldSchema = input.getField(0);
-
-      if (inputFieldSchema.type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected a BAG as input");
-      }
-
-      Schema inputBagSchema = inputFieldSchema.schema;
-
-      if (inputBagSchema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
-      }
-      
-      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
-      
-      if (!this.enableNodeBiasing)
-      {
-        if (inputTupleSchema.size() != 2)
-        {
-          throw new RuntimeException("Expected two fields for the node data");
-        }
-      }
-      else
-      {
-        if (inputTupleSchema.size() != 3)
-        {
-          throw new RuntimeException("Expected three fields for the node data");
-        }
-      }
-      
-      if (inputTupleSchema.getField(0).type != DataType.INTEGER)
-      {
-        throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
-                                                 DataType.findTypeName(inputTupleSchema.getField(0).type)));
-      }
-
-      if (inputTupleSchema.getField(1).type != DataType.BAG)
-      {
-        throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
-      }
-      
-      if (this.enableNodeBiasing && inputTupleSchema.getField(2).type != DataType.DOUBLE)
-      {
-        throw new RuntimeException(String.format("Expected node bias to be a DOUBLE, but instead found %s",
-                                                 DataType.findTypeName(inputTupleSchema.getField(2).type)));
-      }
-
-      Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);
-
-      if (edgesFieldSchema.schema.getField(0).type != DataType.TUPLE)
-      {
-        throw new RuntimeException(String.format("Expected edges field to contain a TUPLE, but instead found %s",
-                                                 DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
-      }
-      
-      Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
-      
-      if (edgesTupleSchema.size() != 2)
-      {
-        throw new RuntimeException("Expected two fields for the edge data");
-      }
-      
-      if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
-      {
-        throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
-                                                 DataType.findTypeName(edgesTupleSchema.getField(0).type)));
-      }
-
-      if (edgesTupleSchema.getField(1).type != DataType.DOUBLE)
-      {
-        throw new RuntimeException(String.format("Expected destination edge weight to a DOUBLE, but instead found %s",
-                                                 DataType.findTypeName(edgesTupleSchema.getField(1).type)));
-      }
-
-      Schema tupleSchema = new Schema();
-      tupleSchema.add(new Schema.FieldSchema("node",DataType.INTEGER));
-      tupleSchema.add(new Schema.FieldSchema("rank",DataType.FLOAT));
-
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
-                                                                 .getName()
-                                                                 .toLowerCase(), input),
-                                               tupleSchema,
-                                               DataType.BAG));
-    }
-    catch (FrontendException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/linkanalysis/PageRankImpl.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/linkanalysis/PageRankImpl.java b/src/java/datafu/pig/linkanalysis/PageRankImpl.java
deleted file mode 100644
index 5d0b932..0000000
--- a/src/java/datafu/pig/linkanalysis/PageRankImpl.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.linkanalysis;
-
-import it.unimi.dsi.fastutil.floats.FloatArrayList;
-import it.unimi.dsi.fastutil.ints.Int2IntMap;
-import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.common.collect.AbstractIterator;
-
-/**
- * An implementation of {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}, used by the {@link PageRank} UDF.
- * It is not intended to be used directly.   
- * </p>
- */
-public class PageRankImpl
-{    
-  private float totalRankChange;
-  private long edgeCount;
-  private long nodeCount;
-  
-  // the damping factor
-  private float alpha = 0.85f;
-  
-  // edge weights (which are doubles) are multiplied by this value so they can be stored as integers internally
-  private static float EDGE_WEIGHT_MULTIPLIER = 100000;
-    
-  private final Int2IntOpenHashMap nodeIndices = new Int2IntOpenHashMap();
-  private final FloatArrayList nodeData = new FloatArrayList(); // rank, total weight, contribution, bias(optional), (repeat)
-  private int nodeFieldCount = 3; // unless biasing is enabled
-  
-  private final IntArrayList danglingNodes = new IntArrayList();
-  
-  private final IntArrayList edges = new IntArrayList(); // source, dest node count... dest id, weight pos, (repeat)
-  
-  private boolean shouldHandleDanglingNodes = false;
-  private boolean shouldCacheEdgesOnDisk = false;
-  private long edgeCachingThreshold;
-  private boolean nodeBiasingEnabled = false;
-  
-  private File edgesFile;
-  private DataOutputStream edgeDataOutputStream;
-  private boolean usingEdgeDiskCache;
-  
-  public void clear() throws IOException
-  {
-    this.edgeCount = 0;
-    this.nodeCount = 0;
-    this.totalRankChange = 0.0f;
-    
-    this.nodeIndices.clear();
-    this.nodeData.clear();
-    this.edges.clear();
-    this.danglingNodes.clear();
-    
-    if (edgeDataOutputStream != null)
-    {
-      this.edgeDataOutputStream.close();
-      this.edgeDataOutputStream = null;
-    }
-    
-    this.usingEdgeDiskCache = false;
-    this.edgesFile = null;
-  }
-  
-  /**
-    * Gets the page rank alpha value.
-    * @return alpha
-    */
-   public float getAlpha()
-   {
-     return alpha;
-   }
-   
-   /**
-    * Sets the page rank alpha value (default is 0.85);
-    * @param alpha 
-    */
-   public void setAlpha(float alpha)
-   {
-     this.alpha = alpha;
-   }
-   
-   public boolean isNodeBiasingEnabled()
-   {
-     return this.nodeBiasingEnabled;
-   }
-   
-   public void enableNodeBiasing()
-   {
-     this.nodeBiasingEnabled = true;
-     this.nodeFieldCount = 4;
-   }
-   
-   public void disableNodeBiasing()
-   {
-     this.nodeBiasingEnabled = false;
-     this.nodeFieldCount = 3;
-   }
-   
-  
-  /**
-   * Gets whether disk is being used to cache edges.
-   * @return True if the edges are cached on disk.
-   */
-  public boolean isUsingEdgeDiskCache()
-  {
-    return usingEdgeDiskCache;
-  }
-  
-  /**
-   * Enable disk caching of edges once there are too many (disabled by default).
-   */
-  public void enableEdgeDiskCaching()
-  {
-    shouldCacheEdgesOnDisk = true;
-  }
-  
-  /**
-   * Disable disk caching of edges once there are too many (disabled by default).
-   */
-  public void disableEdgeDiskCaching()
-  {
-    shouldCacheEdgesOnDisk = false;
-  }
-  
-  /**
-   * Gets whether edge disk caching is enabled.
-   * @return True if edge disk caching is enabled.
-   */
-  public boolean isEdgeDiskCachingEnabled()
-  {
-    return shouldCacheEdgesOnDisk;
-  }
-  
-  /**
-   * Gets the number of edges past which they will be cached on disk instead of in memory.
-   * Edge disk caching must be enabled for this to have any effect.
-   * @return Edge count past which caching occurs
-   */
-  public long getEdgeCachingThreshold()
-  {
-    return edgeCachingThreshold;
-  }
-
-  /**
-   * Set the number of edges past which they will be cached on disk instead of in memory.
-   * Edge disk caching must be enabled for this to have any effect.
-   * @param count Edge count past which caching occurs
-   */
-  public void setEdgeCachingThreshold(long count)
-  {
-    edgeCachingThreshold = count;
-  }
-  
-  /**
-   * Enables dangling node handling (disabled by default).
-   */
-  public void enableDanglingNodeHandling()
-  {
-    shouldHandleDanglingNodes = true;
-  }
-  
-  /**
-   * Disables dangling node handling (disabled by default).
-   */
-  public void disableDanglingNodeHandling()
-  {
-    shouldHandleDanglingNodes = false;
-  }
-  
-  public long nodeCount()
-  {
-    return this.nodeCount;
-  }
-  
-  public long edgeCount()
-  {
-    return this.edgeCount;
-  }
-
-  public Int2IntMap.FastEntrySet getNodeIds()
-  {
-    return this.nodeIndices.int2IntEntrySet();
-  }
-  
-  public float getNodeRank(int nodeId)
-  {
-    int nodeIndex = this.nodeIndices.get(nodeId);
-    return nodeData.get(nodeIndex);
-  }
-  
-  public float getTotalRankChange()
-  {
-    return this.totalRankChange;
-  }
-  
-  private void maybeCreateNode(int nodeId)
-  {
-    // create from node if it doesn't already exist
-    if (!nodeIndices.containsKey(nodeId))
-    {      
-      int index = this.nodeData.size();
-      
-      this.nodeData.add(0.0f); // rank
-      this.nodeData.add(0.0f); // total weight
-      this.nodeData.add(0.0f); // contribution
-      
-      if (this.nodeBiasingEnabled)
-      {
-        this.nodeData.add(0.0f); // bias
-      }      
-      
-      this.nodeIndices.put(nodeId, index);
-      
-      this.nodeCount++;
-    }
-  }
-  
-  public float getNodeBias(int nodeId)
-  {
-    if (!this.nodeBiasingEnabled)
-    {
-      throw new IllegalArgumentException("Node biasing not enable");
-    }
-    int nodeIndex = this.nodeIndices.get(nodeId);
-    return this.nodeData.get(nodeIndex+3);
-  }
-  
-  public void setNodeBias(int nodeId, float bias)
-  {
-    if (!this.nodeBiasingEnabled)
-    {
-      throw new IllegalArgumentException("Node biasing not enable");
-    }
-    
-    int nodeIndex = this.nodeIndices.get(nodeId);
-    this.nodeData.set(nodeIndex+3, bias);
-  }
-  
-  public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges) throws IOException
-  {
-    // with bias of 1.0, all nodes have an equal bias (that is, no bias)
-    addNode(sourceId, sourceEdges, 1.0f);
-  }
-  
-  public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges, float bias) throws IOException
-  {
-    int source = sourceId.intValue();
-   
-    maybeCreateNode(source);
-    
-    if (this.nodeBiasingEnabled)
-    {
-      setNodeBias(source, bias);
-    }
-    else if (bias != 1.0f)
-    {
-      // with node biasing disabled, all nodes implicitly have a bias of 1.0, which means no bias, so if anything else was specified
-      // it won't take effect.
-      throw new IllegalArgumentException("Bias was specified but node biasing not enabled");
-    }
-    
-    if (this.shouldCacheEdgesOnDisk && !usingEdgeDiskCache && (sourceEdges.size() + this.edgeCount) >= this.edgeCachingThreshold)
-    {
-      writeEdgesToDisk();
-    }
-    
-    // store the source node id itself
-    appendEdgeData(source);
-    
-    // store how many outgoing edges this node has
-    appendEdgeData(sourceEdges.size());
-    
-    // store the outgoing edges
-    for (Map<String,Object> edge : sourceEdges)
-    {
-      int dest = ((Integer)edge.get("dest")).intValue();
-      float weight = ((Double)edge.get("weight")).floatValue();
-            
-      maybeCreateNode(dest);
-      
-      appendEdgeData(dest);
-      
-      // location of weight in weights array
-      appendEdgeData(Math.max(1, (int)(weight * EDGE_WEIGHT_MULTIPLIER)));
-      
-      this.edgeCount++;
-    }
-  }
-  
-  private void appendEdgeData(int data) throws IOException
-  {
-    if (this.edgeDataOutputStream != null)
-    {
-      this.edgeDataOutputStream.writeInt(data);
-    }
-    else
-    {
-      this.edges.add(data);
-    }
-  }
-  
-  public void init() throws IOException
-  {
-    init(getDummyIndicator());
-  }
-    
-  public void init(ProgressIndicator progressIndicator) throws IOException
-  {
-    if (this.edgeDataOutputStream != null)
-    {
-      this.edgeDataOutputStream.close();
-      this.edgeDataOutputStream = null;
-    }
-    
-    // initialize all nodes to an equal share of the total rank (1.0)
-    float nodeRank = 1.0f / this.nodeCount;        
-    float totalBias = 0.0f;
-    for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
-    {
-      nodeData.set(j, nodeRank);      
-      progressIndicator.progress();
-      if (this.nodeBiasingEnabled) 
-      {
-        totalBias += nodeData.getFloat(j+3);
-      }
-    }      
-    
-    // if node biasing enabled, need to normalize the bias by the total bias across all nodes so it represents
-    // the share of bias.
-    if (this.nodeBiasingEnabled)
-    {
-      for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
-      {
-        float bias = nodeData.getFloat(j+3);
-        bias /= totalBias;
-        nodeData.set(j+3,bias);
-      }
-    }
-    
-    Iterator<Integer> edgeData = getEdgeData();
-    
-    while(edgeData.hasNext())
-    {
-      int sourceId = edgeData.next();
-      int nodeEdgeCount = edgeData.next();
-      
-      while (nodeEdgeCount-- > 0)
-      {
-        // skip the destination node id
-        edgeData.next();
-        
-        float weight = edgeData.next();
-                
-        int nodeIndex = this.nodeIndices.get(sourceId);
-        
-        float totalWeight = this.nodeData.getFloat(nodeIndex+1); 
-        totalWeight += weight;
-        this.nodeData.set(nodeIndex+1, totalWeight);
-        
-        progressIndicator.progress();
-      }
-    }
-    
-    // if handling dangling nodes, get a list of them by finding those nodes with no outgoing
-    // edges (i.e. total outgoing edge weight is 0.0)
-    if (shouldHandleDanglingNodes)
-    {
-      for (Map.Entry<Integer,Integer> e : nodeIndices.entrySet())
-      {
-        int nodeId = e.getKey();
-        int nodeIndex = e.getValue();
-        float totalWeight = nodeData.getFloat(nodeIndex+1);
-        if (totalWeight == 0.0f)
-        {
-          danglingNodes.add(nodeId);
-        }
-      }
-    }
-  }
-  
-  public float nextIteration(ProgressIndicator progressIndicator) throws IOException
-  {
-    distribute(progressIndicator);
-    commit(progressIndicator);
-    
-    return getTotalRankChange();
-  }
-  
-  public float nextIteration() throws IOException
-  {
-    ProgressIndicator dummyIndicator = getDummyIndicator();
-    distribute(dummyIndicator);
-    commit(dummyIndicator);
-    
-    return getTotalRankChange();
-  }
-  
-  private ProgressIndicator getDummyIndicator()
-  {
-    return new ProgressIndicator() {
-      @Override
-      public void progress()
-      {        
-      }
-    };
-  }
-  
-  public void distribute(ProgressIndicator progressIndicator) throws IOException
-  {    
-    Iterator<Integer> edgeData = getEdgeData();
-    
-    while(edgeData.hasNext())
-    {
-      int sourceId = edgeData.next();
-      int nodeEdgeCount = edgeData.next();
-      
-      while (nodeEdgeCount-- > 0)
-      {
-        int toId = edgeData.next();
-        float weight = edgeData.next();
-                
-        int fromNodeIndex = this.nodeIndices.get(sourceId);
-        int toNodeIndex = this.nodeIndices.get(toId);
-        
-        float contributionChange = weight * this.nodeData.getFloat(fromNodeIndex) / this.nodeData.getFloat(fromNodeIndex+1);
-        
-        float currentContribution = this.nodeData.getFloat(toNodeIndex+2);
-        this.nodeData.set(toNodeIndex+2, currentContribution + contributionChange);
-        
-        progressIndicator.progress();
-      }      
-    }
-    
-    if (shouldHandleDanglingNodes)
-    {
-      // get the rank from each of the dangling nodes
-      float totalRank = 0.0f;
-      for (int nodeId : danglingNodes)
-      {
-        int nodeIndex = nodeIndices.get(nodeId);
-        float rank = nodeData.get(nodeIndex);
-        totalRank += rank;
-      }
-      
-      // distribute the dangling node ranks to all the nodes in the graph
-      // note: the alpha factor is applied in the commit stage
-      float contributionIncrease = totalRank / this.nodeCount;
-      for (int i=2; i<nodeData.size(); i += this.nodeFieldCount)
-      {
-        float contribution = nodeData.getFloat(i);
-        contribution += contributionIncrease;
-        nodeData.set(i, contribution);
-      }
-    }
-  }
-  
-  public void commit(ProgressIndicator progressIndicator)
-  {
-    this.totalRankChange = 0.0f;
-    
-    float oneMinusAlpha = (1.0f - this.alpha);
-    float oneMinusAlphaOverNodeCount = oneMinusAlpha / nodeCount;
-    
-    for (int nodeIndex=0; nodeIndex<this.nodeData.size(); nodeIndex += this.nodeFieldCount)
-    {      
-      float oldRank = this.nodeData.get(nodeIndex+2);
-      float newRank;
-      
-      if (this.nodeBiasingEnabled)
-      {
-        float bias = this.nodeData.get(nodeIndex+3);
-        newRank = bias * oneMinusAlpha + alpha * oldRank;
-      }
-      else
-      {
-        newRank = oneMinusAlphaOverNodeCount + alpha * oldRank;
-      }
-      
-      this.nodeData.set(nodeIndex+2, 0.0f);
-      
-      float lastRankDiff = newRank - this.nodeData.get(nodeIndex);
-      
-      this.nodeData.set(nodeIndex, newRank);
-      
-      this.totalRankChange += Math.abs(lastRankDiff);
-      
-      progressIndicator.progress();
-    }
-  }
-  
-  private void writeEdgesToDisk() throws IOException
-  { 
-    this.edgesFile = File.createTempFile("fastgraph", null);
-    
-    FileOutputStream outStream = new FileOutputStream(this.edgesFile);
-    BufferedOutputStream bufferedStream = new BufferedOutputStream(outStream);
-    this.edgeDataOutputStream = new DataOutputStream(bufferedStream);
-    
-    for (int edgeData : edges)
-    {
-      this.edgeDataOutputStream.writeInt(edgeData);
-    }
-    
-    this.edges.clear();
-    usingEdgeDiskCache = true;
-  }
-  
-  private Iterator<Integer> getEdgeData() throws IOException
-  {
-    if (!usingEdgeDiskCache)
-    {
-      return this.edges.iterator();
-    }
-    else
-    {
-      FileInputStream fileInputStream = new FileInputStream(this.edgesFile);
-      BufferedInputStream inputStream = new BufferedInputStream(fileInputStream);
-      final DataInputStream dataInputStream = new DataInputStream(inputStream);
-      
-      return new AbstractIterator<Integer>() {
-        
-        @Override
-        protected Integer computeNext()
-        {
-          try
-          {
-            return dataInputStream.readInt();
-          }
-          catch (IOException e)
-          {
-            return endOfData();
-          }
-        }
-        
-      };
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/linkanalysis/ProgressIndicator.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/linkanalysis/ProgressIndicator.java b/src/java/datafu/pig/linkanalysis/ProgressIndicator.java
deleted file mode 100644
index 3dc54c5..0000000
--- a/src/java/datafu/pig/linkanalysis/ProgressIndicator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.linkanalysis;
-
-interface ProgressIndicator
-{
-  void progress();
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/linkanalysis/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/linkanalysis/package-info.java b/src/java/datafu/pig/linkanalysis/package-info.java
deleted file mode 100644
index 2c6c078..0000000
--- a/src/java/datafu/pig/linkanalysis/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * UDFs for performing link analysis, such as PageRank.
- */
-package datafu.pig.linkanalysis;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/random/RandInt.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/random/RandInt.java b/src/java/datafu/pig/random/RandInt.java
deleted file mode 100644
index de89c4a..0000000
--- a/src/java/datafu/pig/random/RandInt.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.random;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.pig.builtin.Nondeterministic;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Generates a uniformly distributed integer between two bounds.
- */
-@Nondeterministic
-public class RandInt extends SimpleEvalFunc<Integer> 
-{
-  private final Random rand = new Random();
-  
-  /**
-   * @param min lower bound for random number
-   * @param max upper bound for random number
-   */
-  public Integer call(Integer min, Integer max) throws IOException
-  {
-    try
-    {
-      if (min > max)
-      {
-        throw new RuntimeException("The first argument must be less than the second");
-      }
-      return rand.nextInt(max - min + 1) + min;
-    }
-    catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    return new Schema(new Schema.FieldSchema("rand", DataType.INTEGER));
-  }
-  
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/random/RandomUUID.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/random/RandomUUID.java b/src/java/datafu/pig/random/RandomUUID.java
deleted file mode 100644
index d63f4cf..0000000
--- a/src/java/datafu/pig/random/RandomUUID.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package datafu.pig.random;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.builtin.Nondeterministic;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Generates a random UUID using java.util.UUID
- */
-@Nondeterministic
-public class RandomUUID extends EvalFunc<String>
-{
-    public String exec(Tuple input) throws IOException
-    {
-        return UUID.randomUUID().toString();
-    }
-
-    @Override
-    public Schema outputSchema(Schema input)
-    {
-        return new Schema(new Schema.FieldSchema("uuid", DataType.CHARARRAY));
-    }
-}
-


Mime
View raw message