datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [18/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java b/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java
new file mode 100644
index 0000000..6933c28
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.ContextualEvalFunc;
+
+/**
+ * For an empty bag, inserts a tuple having null values for all fields; 
+ * otherwise, the input bag is returned unchanged.
+ * 
+ * <p>
+ * This can be useful when performing FLATTEN on a bag from a COGROUP,
+ * as FLATTEN on an empty bag produces no data.
+ * </p>
+ */
+public class EmptyBagToNullFields extends ContextualEvalFunc<DataBag>
+{
+  @Override
+  public DataBag exec(Tuple tuple) throws IOException
+  {
+    if (tuple.size() == 0 || tuple.get(0) == null)
+      return null;
+    Object o = tuple.get(0);
+    if (o instanceof DataBag)
+    {
+      DataBag bag = (DataBag)o;
+      if (bag.size() == 0)
+      {
+        // create a tuple with null values for all fields
+        int tupleSize = (Integer)getInstanceProperties().get("tuplesize");
+        return BagFactory.getInstance().newDefaultBag(Arrays.asList(TupleFactory.getInstance().newTuple(tupleSize)));
+      }
+      else
+      {
+        return bag;
+      }
+    }
+    else
+      throw new IllegalArgumentException("expected a null or a bag");
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try
+    {
+      if (input.size() != 1)
+      {
+        throw new RuntimeException("Expected only a single field as input");
+      }
+      
+      if (input.getField(0).type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input, but found " + DataType.findTypeName(input.getField(0).type));
+      }
+      
+      // get the size of the tuple within the bag
+      int innerTupleSize = input.getField(0).schema.getField(0).schema.getFields().size();
+      getInstanceProperties().put("tuplesize", innerTupleSize);
+    }
+    catch (FrontendException e)
+    {
+      throw new RuntimeException(e);
+    }
+    return input;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java b/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java
new file mode 100644
index 0000000..8a0d072
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Enumerate a bag, appending to each tuple its index within the bag.
+ * 
+ * <p>
+ * For example:
+ * <pre>
+ *   {(A),(B),(C),(D)} => {(A,0),(B,1),(C,2),(D,3)}
+ * </pre>
+ * The first constructor parameter (optional) dictates the starting index of the counting.
+ * This UDF implements the accumulator interface, reducing DataBag materialization costs.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define Enumerate datafu.pig.bags.Enumerate('1');
+ *
+ * -- input:
+ * -- ({(100),(200),(300),(400)})
+ * input = LOAD 'input' as (B: bag{T: tuple(v2:INT)});
+ *
+ * -- output:
+ * -- ({(100,1),(200,2),(300,3),(400,4)})
+ * output = FOREACH input GENERATE Enumerate(B);
+ * }
+ * </pre>
+ */
+public class Enumerate extends AccumulatorEvalFunc<DataBag>
+{
+  private final int start;
+  
+  private DataBag outputBag;
+  private long i;
+  private long count;
+
+  public Enumerate()
+  {
+    this("0");
+  }
+
+  public Enumerate(String start)
+  {
+    this.start = Integer.parseInt(start);
+    cleanup();
+  }
+  
+  @Override
+  public void accumulate(Tuple arg0) throws IOException
+  {
+    DataBag inputBag = (DataBag)arg0.get(0);
+    for (Tuple t : inputBag) {
+      Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
+      t1.append(i);
+      outputBag.add(t1);
+
+      if (count % 1000000 == 0) {
+        outputBag.spill();
+        count = 0;
+      }
+      i++;
+      count++;
+    }
+  }
+
+  @Override
+  public void cleanup()
+  {
+    this.outputBag = BagFactory.getInstance().newDefaultBag();
+    this.i = this.start;
+    this.count = 0;
+  }
+
+  @Override
+  public DataBag getValue()
+  {
+    return outputBag;
+  }
+  
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try {
+      if (input.size() != 1)
+      {
+        throw new RuntimeException("Expected input to have only a single field");
+      }
+      
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+      
+      Schema inputBagSchema = inputFieldSchema.schema;
+
+      if (inputBagSchema.getField(0).type != DataType.TUPLE)
+      {
+        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
+      }
+      
+      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+      
+      Schema outputTupleSchema = inputTupleSchema.clone();
+      outputTupleSchema.add(new Schema.FieldSchema("i", DataType.LONG));
+      
+      return new Schema(new Schema.FieldSchema(
+            getSchemaName(this.getClass().getName().toLowerCase(), input),
+            outputTupleSchema, 
+            DataType.BAG));
+    }
+    catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+    catch (FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
new file mode 100644
index 0000000..1f24984
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import datafu.pig.util.SimpleEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Returns the first tuple from a bag. Requires a second parameter that will be returned if the bag is empty.
+ *
+ * Example:
+ * <pre>
+ * {@code
+ * define FirstTupleFromBag datafu.pig.bags.FirstTupleFromBag();
+ *
+ * -- input:
+ * -- ({(a,1)})
+ * input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)});
+ *
+ * output = FOREACH input GENERATE FirstTupleFromBag(B, null);
+ *
+ * -- output:
+ * -- (a,1)
+ * }
+ * </pre>
+ */
+
+public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
+{
+  public Tuple call(DataBag bag, Tuple defaultValue) throws IOException
+  {
+    for (Tuple t : bag) {
+      return t;
+    }
+    return defaultValue;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try {
+      return new Schema(input.getField(0).schema);
+    }
+    catch (Exception e) {
+      return null;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java b/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java
new file mode 100644
index 0000000..09fffb3
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Returns an empty bag if the input is null; otherwise,
+ * returns the input bag unchanged.
+ */
+public class NullToEmptyBag extends EvalFunc<DataBag>
+{
+  @Override
+  public DataBag exec(Tuple tuple) throws IOException
+  {
+    if (tuple.size() == 0 || tuple.get(0) == null)
+      return BagFactory.getInstance().newDefaultBag();
+    Object o = tuple.get(0);
+    if (o instanceof DataBag)
+      return (DataBag)o;
+    else
+      throw new IllegalArgumentException("expected a null or a bag");
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    return input;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java b/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java
new file mode 100644
index 0000000..9292871
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Prepends a tuple to a bag. 
+ * 
+ * <p>N.B. this copies the entire input bag, so don't use it for large bags.</p>
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define PrependToBag datafu.pig.bags.PrependToBag();
+ * 
+ * -- input:
+ * -- ({(1),(2),(3)},(4))
+ * -- ({(10),(20),(30),(40),(50)},(60))
+ * input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
+
+ * -- output:
+ * -- ({(4),(1),(2),(3)})
+ * -- ({(60),(10),(20),(30),(40),(50)})
+ * output = FOREACH input GENERATE PrependToBag(B,T) as B;
+ * }
+ * </pre>
+ * </p>
+ */
+public class PrependToBag extends SimpleEvalFunc<DataBag>
+{
+  public DataBag call(DataBag inputBag, Tuple t) throws IOException
+  {
+    DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+    outputBag.add(t);
+    for (Tuple x : inputBag)
+      outputBag.add(x);
+    return outputBag;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try {
+      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+              input.getField(0).schema, DataType.BAG));
+    }
+    catch (FrontendException e) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java b/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java
new file mode 100644
index 0000000..c86ffcf
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Enumerate a bag, appending to each tuple its index within the bag, with indices being produced in 
+ * descending order. 
+ * 
+ * <p>
+ * For example:
+ * <pre>
+ *   {(A),(B),(C),(D)} => {(A,3),(B,2),(C,1),(D,0)}
+ * </pre>
+ * The first constructor parameter (optional) dictates the starting index of the counting. As the
+ * UDF requires the size of the bag for reverse counting, this UDF does <b>not</b> implement the
+ * accumulator interface and suffers from the slight performance penalty of DataBag materialization.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define ReverseEnumerate datafu.pig.bags.ReverseEnumerate('1');
+ *
+ * -- input:
+ * -- ({(100),(200),(300),(400)})
+ * input = LOAD 'input' as (B: bag{T: tuple(v2:INT)});
+ *
+ * -- output:
+ * -- ({(100,4),(200,3),(300,2),(400,1)})
+ * output = FOREACH input GENERATE ReverseEnumerate(B);
+ * }
+ * </pre>
+ * </p>
+ */
+public class ReverseEnumerate extends SimpleEvalFunc<DataBag>
+{
+  private final int start;
+
+  public ReverseEnumerate()
+  {
+    this.start = 0;
+  }
+
+  public ReverseEnumerate(String start)
+  {
+    this.start = Integer.parseInt(start);
+  }
+  
+  public DataBag call(DataBag inputBag) throws IOException
+  {
+    DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+    long i = start, count = 0;
+    i = inputBag.size() - 1 + start;
+
+    for (Tuple t : inputBag) {
+      Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
+      t1.append(i);
+      outputBag.add(t1);
+
+      if (count % 1000000 == 0) {
+        outputBag.spill();
+        count = 0;
+      }
+      i--;
+      count++;
+    }
+
+    return outputBag;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try {
+      if (input.size() != 1)
+      {
+        throw new RuntimeException("Expected input to have only a single field");
+      }
+      
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+      
+      Schema inputBagSchema = inputFieldSchema.schema;
+
+      if (inputBagSchema.getField(0).type != DataType.TUPLE)
+      {
+        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
+      }
+      
+      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+      
+      Schema outputTupleSchema = inputTupleSchema.clone();
+      outputTupleSchema.add(new Schema.FieldSchema("i", DataType.LONG));
+      
+      return new Schema(new Schema.FieldSchema(
+            getSchemaName(this.getClass().getName().toLowerCase(), input),
+            outputTupleSchema, 
+            DataType.BAG));
+    }
+    catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+    catch (FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java b/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java
new file mode 100644
index 0000000..a1d149e
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.bags;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+/**
+ * Generates pairs of all items in a bag.
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define UnorderedPairs datafu.pig.bags.UnorderedPairs();
+ * 
+ * -- input:
+ * -- ({(1),(2),(3),(4)})
+ * input = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
+ * 
+ * -- output:
+ * -- ({((1),(2)),((1),(3)),((1),(4)),((2),(3)),((2),(4)),((3),(4))})
+ * output = FOREACH input GENERATE UnorderedPairs(B);
+ * } 
+ * </pre>
+ * </p>
+ */
+public class UnorderedPairs extends EvalFunc<DataBag>
+{
+  private static final BagFactory bagFactory = BagFactory.getInstance();
+  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+  @Override
+  public DataBag exec(Tuple input) throws IOException
+  {
+    PigStatusReporter reporter = PigStatusReporter.getInstance();
+
+    try {
+      DataBag inputBag = (DataBag) input.get(0);
+      DataBag outputBag = bagFactory.newDefaultBag();
+      long i=0, j, cnt=0;
+
+      if (inputBag != null)
+      {
+        for (Tuple elem1 : inputBag) {
+          j = 0; 
+          for (Tuple elem2 : inputBag) {
+            if (j > i) {
+              outputBag.add(tupleFactory.newTuple(Arrays.asList(elem1, elem2)));
+              cnt++;
+            }
+            j++;
+  
+            if (reporter != null)
+              reporter.progress();
+  
+            if (cnt % 1000000 == 0) {
+              outputBag.spill();
+              cnt = 0;
+            }
+          }
+          i++;
+        }
+      }
+      
+      return outputBag;
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Caught exception processing input of " + this.getClass().getName(), e);
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try {
+      if (input.size() != 1)
+      {
+        throw new RuntimeException("Expected input to have only a single field");
+      }
+      
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+      
+      Schema inputBagSchema = inputFieldSchema.schema;
+
+      if (inputBagSchema.getField(0).type != DataType.TUPLE)
+      {
+        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
+      }      
+      
+      Schema ouputTupleSchema = new Schema();
+      ouputTupleSchema.add(new Schema.FieldSchema("elem1", inputBagSchema.getField(0).schema.clone(), DataType.TUPLE));
+      ouputTupleSchema.add(new Schema.FieldSchema("elem2", inputBagSchema.getField(0).schema.clone(), DataType.TUPLE));
+      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+                                               ouputTupleSchema, 
+                                               DataType.BAG));
+    }
+    catch (Exception e) {
+      return null;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/package-info.java b/datafu-pig/src/main/java/datafu/pig/bags/package-info.java
new file mode 100644
index 0000000..214b837
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A collection of general purpose UDFs for operating on bags.
+ */
+package datafu.pig.bags;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java b/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java
new file mode 100644
index 0000000..d1e3988
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.geo;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Computes the distance (in miles) between two latitude-longitude pairs 
+ * using the {@link <a href="http://en.wikipedia.org/wiki/Haversine_formula" target="_blank">Haversine formula</a>}.
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * -- input is a TSV of two latitude and longitude pairs
+ * input = LOAD 'input' AS (lat1 : double, long1 : double, lat2 : double, long2 : double);
+ * output = FOREACH input GENERATE datafu.pig.geo.HaversineDistInMiles(lat1, long1, lat2, long2) as distance;
+ * }</pre></p>
+ */
+public class HaversineDistInMiles extends SimpleEvalFunc<Double>
+{
+  public static final double EARTH_RADIUS = 3958.75;
+
+  public Double call(Double lat1, Double lng1, Double lat2, Double lng2)
+  {
+    if (lat1 == null || lng1 == null || lat2 == null || lng2 == null)
+      return null;
+
+    double d_lat = Math.toRadians(lat2-lat1);
+    double d_long = Math.toRadians(lng2-lng1);
+    double a = Math.sin(d_lat/2) * Math.sin(d_lat/2) +
+               Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
+               Math.sin(d_long/2) * Math.sin(d_long/2);
+    double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
+    return EARTH_RADIUS * c;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    return new Schema(new Schema.FieldSchema("dist", DataType.DOUBLE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/geo/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/geo/package-info.java b/datafu-pig/src/main/java/datafu/pig/geo/package-info.java
new file mode 100644
index 0000000..12b27b0
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/geo/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for geographic computations.
+ */
+package datafu.pig.geo;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/hash/MD5.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/hash/MD5.java b/datafu-pig/src/main/java/datafu/pig/hash/MD5.java
new file mode 100644
index 0000000..b7b51da
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/hash/MD5.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.hash;
+
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.codec.binary.Base64;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Computes the MD5 value of a string and outputs it in hex (by default).
+ * A method can be provided to the constructor, which may be either 'hex' or 'base64'.
+ */
+public class MD5 extends SimpleEvalFunc<String>
+{
+  private final MessageDigest md5er;
+  private final boolean isBase64;
+  
+  public MD5()
+  {
+    this("hex");
+  }
+  
+  public MD5(String method)
+  {
+    if ("hex".equals(method))
+    {
+      isBase64 = false;
+    }
+    else if ("base64".equals(method))
+    {
+      isBase64 = true;
+    }
+    else
+    {
+      throw new IllegalArgumentException("Expected either hex or base64");
+    }
+    
+    try {
+      md5er = MessageDigest.getInstance("md5");
+    }
+    catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public String call(String val)
+  {
+    if (isBase64)
+    {
+      return new String(Base64.encodeBase64(md5er.digest(val.getBytes())));
+    }
+    else
+    {
+      return new BigInteger(1, md5er.digest(val.getBytes())).toString(16);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/hash/SHA.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/hash/SHA.java b/datafu-pig/src/main/java/datafu/pig/hash/SHA.java
new file mode 100644
index 0000000..ff859e5
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/hash/SHA.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.hash;
+
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+public class SHA extends SimpleEvalFunc<String> {
+	private final MessageDigest sha;
+
+	public SHA(){
+		this("256");
+	}
+	
+	public SHA(String algorithm){
+		try {
+			sha = MessageDigest.getInstance("SHA-"+algorithm);
+		} catch (NoSuchAlgorithmException e) {
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public String call(String value){
+		return new BigInteger(1, sha.digest(value.getBytes())).toString(16);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/hash/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/hash/package-info.java b/datafu-pig/src/main/java/datafu/pig/hash/package-info.java
new file mode 100644
index 0000000..320a029
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/hash/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for computing hashes from data.
+ */
+package datafu.pig.hash;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java
new file mode 100644
index 0000000..80ff567
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.linkanalysis;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A UDF which implements {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}.
+ * 
+ * <p>  
+ * This is not a distributed implementation.  Each graph is stored in memory while running the algorithm, with edges optionally 
+ * spilled to disk to conserve memory.  This can be used to distribute the execution of PageRank on multiple
+ * reasonably sized graphs.  It does not distribute execuion of PageRank for each individual graph.  Each graph is identified
+ * by an integer valued topic ID.
+ * </p>
+ * 
+ * <p>
+ * If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs
+ * many MapReduce jobs in a sequence to complete the PageRank iterations.
+ * </p>
+ * 
+ * <p>
+ * Each graph is represented through a bag of (source,edges) tuples.  The 'source' is an integer ID representing the source node.
+ * The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples.  The 'dest' is an
+ * integer ID representing the destination node.  The weight is a double representing how much the edge should be weighted.
+ * For a standard PageRank implementation just use weight of 1.0.
+ * </p>
+ * 
+ * <p>
+ * The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.
+ * </p>
+ * 
+ * <p>
+ * There are several configurable options for this UDF, among them:
+ * <p>
+ * 
+ * <ul>
+ * <li>
+ * <b>alpha</b>: Controls the PageRank alpha value.  The default is 0.85.  A higher value reduces the "random jump"
+ * factor and causes the rank to be influenced more by edges. 
+ * </li>
+ * <li>
+ * <b>max_iters</b>: The maximum number of iterations to run.  The default is 150.
+ * </li>
+ * <li>
+ * <b>dangling_nodes</b>: How to handling "dangling nodes", i.e. nodes with no outgoing edges.  When "true" this is equivalent
+ * to forcing a dangling node to have an outgoing edge to every other node in the graph.  The default is "false".
+ * </li>
+ * <li>
+ * <b>tolerance</b>: A threshold which causes iterations to cease.  It is measured from the total change in ranks from each of
+ * the nodes in the graph.  As the ranks settle on their final values the total change decreases.  This can be used
+ * to stop iterations early.  The default is 1e-16. 
+ * </li> 
+ * <li>
+ * <b>max_nodes_and_edges</b>: This is a control to prevent running out of memory.  As a graph is loaded, if the sum of edges
+ * and nodes exceeds this value then it will stop.  It will not fail but PageRank will not be run on this graph.  Instead a null
+ * value will be returned as a result.  The default is 100M.
+ * </li>
+ * <li>
+ * <b>spill_to_edge_disk_storage</b>: Used to conserve memory.  When "true" it causes the edge data to be written to disk in a temp file instead
+ * of being held in memory when the number of edges exceeds a threshold.  The nodes are still held in memory however.  
+ * Each iteration of PageRank will stream through the edges stored on disk.  The default is "false".
+ * </li>
+ * <li>
+ * <b>max_edges_in_memory</b>: When spilling edges to disk is enabled, this is the threshold which triggers that behavior.  The default is 30M.
+ * </li>
+ * </ul>
+ * 
+ * <p>
+ * Parameters are configured by passing them in as a sequence of pairs into the UDF constructor.  For example, below the alpha value is set to
+ * 0.87 and dangling nodes are enabled.  All arguments must be strings.
+ * </p>
+ * 
+ * <p>
+ * <pre>
+ * {@code
+ * define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
+ * }
+ * </pre>
+ * </p>
+ * 
+ * <p>
+ * Full example:
+ * <pre>
+ * {@code
+ * 
+ * topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
+ * 
+ * topic_edges_grouped = GROUP topic_edges by (topic, source) ;
+ * topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
+ *    group.topic as topic,
+ *    group.source as source,
+ *    topic_edges.(dest,weight) as edges;
+ * 
+ * topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic; 
+ * 
+ * topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
+ *    group as topic,
+ *    FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
+ *
+ * topic_ranks = FOREACH topic_ranks GENERATE
+ *    topic, source, rank;
+ * 
+ * }
+ * </pre>
+ * </p> 
+ */
+public class PageRank extends AccumulatorEvalFunc<DataBag>
+{
+  private final datafu.pig.linkanalysis.PageRankImpl graph = new datafu.pig.linkanalysis.PageRankImpl();
+
+  private int maxNodesAndEdges = 100000000;
+  private int maxEdgesInMemory = 30000000;
+  private double tolerance = 1e-16;
+  private int maxIters = 150;
+  private boolean useEdgeDiskStorage = false;
+  private boolean enableDanglingNodeHandling = false;
+  private boolean enableNodeBiasing = false;
+  private boolean aborted = false;
+  private float alpha = 0.85f;
+
+  TupleFactory tupleFactory = TupleFactory.getInstance();
+  BagFactory bagFactory = BagFactory.getInstance();
+  
+  public PageRank()
+  {
+    initialize();
+  }
+
+  public PageRank(String... parameters)
+  {
+    if (parameters.length % 2 != 0)
+    {
+      throw new RuntimeException("Invalid parameters list");
+    }
+
+    for (int i=0; i<parameters.length; i+=2)
+    {
+      String parameterName = parameters[i];
+      String value = parameters[i+1];
+      if (parameterName.equals("max_nodes_and_edges"))
+      {
+        maxNodesAndEdges = Integer.parseInt(value);
+      }
+      else if (parameterName.equals("max_edges_in_memory"))
+      {
+        maxEdgesInMemory = Integer.parseInt(value);
+      }
+      else if (parameterName.equals("tolerance"))
+      {
+        tolerance = Double.parseDouble(value);
+      }
+      else if (parameterName.equals("max_iters"))
+      {
+        maxIters = Integer.parseInt(value);
+      }
+      else if (parameterName.equals("spill_to_edge_disk_storage"))
+      {
+        useEdgeDiskStorage = Boolean.parseBoolean(value);
+      }
+      else if (parameterName.equals("dangling_nodes"))
+      {
+        enableDanglingNodeHandling = Boolean.parseBoolean(value);
+      }
+      else if (parameterName.equals("node_biasing"))
+      {
+        enableNodeBiasing = Boolean.parseBoolean(value);
+      }
+      else if (parameterName.equals("alpha"))
+      {
+        alpha = Float.parseFloat(value);
+      }
+    }
+
+    initialize();
+  }
+
+  private void initialize()
+  {
+    if (useEdgeDiskStorage)
+    {
+      this.graph.enableEdgeDiskCaching();
+    }
+    else
+    {
+      this.graph.disableEdgeDiskCaching();
+    }
+
+    if (enableDanglingNodeHandling)
+    {
+      this.graph.enableDanglingNodeHandling();
+    }
+    else
+    {
+      this.graph.disableDanglingNodeHandling();
+    }
+    
+    if (enableNodeBiasing)
+    {
+      this.graph.enableNodeBiasing();
+    }
+    else
+    {
+      this.graph.disableNodeBiasing();
+    }
+
+    this.graph.setEdgeCachingThreshold(maxEdgesInMemory);
+    this.graph.setAlpha(alpha);
+  }
+
+  @Override
+  public void accumulate(Tuple t) throws IOException
+  {
+    if (aborted)
+    {
+      return;
+    }
+    
+    DataBag bag = (DataBag) t.get(0);
+    if (bag == null || bag.size() == 0)
+      return;
+    
+    for (Tuple sourceTuple : bag) 
+    {
+      Integer sourceId = (Integer)sourceTuple.get(0);
+      DataBag edges = (DataBag)sourceTuple.get(1);
+      Double nodeBias = null;
+      if (enableNodeBiasing)
+      {
+        nodeBias = (Double)sourceTuple.get(2);
+      }
+
+      ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();
+
+      for (Tuple edgeTuple : edges)
+      {
+        Integer destId = (Integer)edgeTuple.get(0);
+        Double weight = (Double)edgeTuple.get(1);
+        HashMap<String,Object> edgeMap = new HashMap<String, Object>();
+        edgeMap.put("dest",destId);
+        edgeMap.put("weight",weight);
+        edgesMapList.add(edgeMap);
+      }
+
+      if (enableNodeBiasing)
+      {
+        graph.addNode(sourceId, edgesMapList, nodeBias.floatValue());
+      }
+      else
+      {
+        graph.addNode(sourceId, edgesMapList);
+      }
+
+      if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
+      {
+        System.out.println(String.format("There are too many nodes and edges (%d + %d > %d). Aborting.", graph.nodeCount(), graph.edgeCount(), maxNodesAndEdges));
+        aborted = true;
+        break;
+      }
+
+      reporter.progress();
+    }
+  }
+
+  @Override
+  public DataBag getValue()
+  {
+    if (aborted)
+    {
+      return null;
+    }
+    
+    System.out.println(String.format("Nodes: %d, Edges: %d", graph.nodeCount(), graph.edgeCount()));
+    
+    ProgressIndicator progressIndicator = getProgressIndicator();
+    System.out.println("Finished loading graph.");
+    long startTime = System.nanoTime();
+    System.out.println("Initializing.");
+    try
+    {
+      graph.init(progressIndicator);
+    }
+    catch (IOException e)
+    {
+      e.printStackTrace();
+      return null;
+    }
+    System.out.println(String.format("Done, took %f ms", (System.nanoTime() - startTime)/10.0e6));
+
+    float totalDiff;
+    int iter = 0;
+
+    System.out.println("Beginning iterations");
+    startTime = System.nanoTime();
+    do
+    {
+      // TODO log percentage complete every 5 minutes
+      try
+      {
+        totalDiff = graph.nextIteration(progressIndicator);
+      }
+      catch (IOException e)
+      {
+        e.printStackTrace();
+        return null;
+      }
+      iter++;
+    } while(iter < maxIters && totalDiff > tolerance);
+    System.out.println(String.format("Done, %d iterations took %f ms", iter, (System.nanoTime() - startTime)/10.0e6));
+
+    DataBag output = bagFactory.newDefaultBag();
+
+    for (Int2IntMap.Entry node : graph.getNodeIds())
+    {
+      int nodeId = node.getIntKey();
+      float rank = graph.getNodeRank(nodeId);
+      List nodeData = new ArrayList(2);
+      nodeData.add(nodeId);
+      nodeData.add(rank);
+      output.add(tupleFactory.newTuple(nodeData));
+    }
+
+    return output;
+  }
+
+  @Override
+  public void cleanup()
+  {
+    try
+    {
+      aborted = false;
+      this.graph.clear();
+    }
+    catch (IOException e)
+    { 
+      e.printStackTrace();
+    }
+  }
+
+  private ProgressIndicator getProgressIndicator()
+  {
+    return new ProgressIndicator()
+        {
+          @Override
+          public void progress()
+          {
+            reporter.progress();
+          }
+        };
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    try
+    {
+      Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+      if (inputFieldSchema.type != DataType.BAG)
+      {
+        throw new RuntimeException("Expected a BAG as input");
+      }
+
+      Schema inputBagSchema = inputFieldSchema.schema;
+
+      if (inputBagSchema.getField(0).type != DataType.TUPLE)
+      {
+        throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                                                 DataType.findTypeName(inputBagSchema.getField(0).type)));
+      }
+      
+      Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+      
+      if (!this.enableNodeBiasing)
+      {
+        if (inputTupleSchema.size() != 2)
+        {
+          throw new RuntimeException("Expected two fields for the node data");
+        }
+      }
+      else
+      {
+        if (inputTupleSchema.size() != 3)
+        {
+          throw new RuntimeException("Expected three fields for the node data");
+        }
+      }
+      
+      if (inputTupleSchema.getField(0).type != DataType.INTEGER)
+      {
+        throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
+                                                 DataType.findTypeName(inputTupleSchema.getField(0).type)));
+      }
+
+      if (inputTupleSchema.getField(1).type != DataType.BAG)
+      {
+        throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
+      }
+      
+      if (this.enableNodeBiasing && inputTupleSchema.getField(2).type != DataType.DOUBLE)
+      {
+        throw new RuntimeException(String.format("Expected node bias to be a DOUBLE, but instead found %s",
+                                                 DataType.findTypeName(inputTupleSchema.getField(2).type)));
+      }
+
+      Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);
+
+      if (edgesFieldSchema.schema.getField(0).type != DataType.TUPLE)
+      {
+        throw new RuntimeException(String.format("Expected edges field to contain a TUPLE, but instead found %s",
+                                                 DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
+      }
+      
+      Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
+      
+      if (edgesTupleSchema.size() != 2)
+      {
+        throw new RuntimeException("Expected two fields for the edge data");
+      }
+      
+      if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
+      {
+        throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
+                                                 DataType.findTypeName(edgesTupleSchema.getField(0).type)));
+      }
+
+      if (edgesTupleSchema.getField(1).type != DataType.DOUBLE)
+      {
+        throw new RuntimeException(String.format("Expected destination edge weight to a DOUBLE, but instead found %s",
+                                                 DataType.findTypeName(edgesTupleSchema.getField(1).type)));
+      }
+
+      Schema tupleSchema = new Schema();
+      tupleSchema.add(new Schema.FieldSchema("node",DataType.INTEGER));
+      tupleSchema.add(new Schema.FieldSchema("rank",DataType.FLOAT));
+
+      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                                                                 .getName()
+                                                                 .toLowerCase(), input),
+                                               tupleSchema,
+                                               DataType.BAG));
+    }
+    catch (FrontendException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java
new file mode 100644
index 0000000..5d0b932
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.linkanalysis;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * An implementation of {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}, used by the {@link PageRank} UDF.
+ * It is not intended to be used directly.   
+ * </p>
+ */
+public class PageRankImpl
+{    
+  private float totalRankChange;
+  private long edgeCount;
+  private long nodeCount;
+  
+  // the damping factor
+  private float alpha = 0.85f;
+  
+  // edge weights (which are doubles) are multiplied by this value so they can be stored as integers internally
+  private static float EDGE_WEIGHT_MULTIPLIER = 100000;
+    
+  private final Int2IntOpenHashMap nodeIndices = new Int2IntOpenHashMap();
+  private final FloatArrayList nodeData = new FloatArrayList(); // rank, total weight, contribution, bias(optional), (repeat)
+  private int nodeFieldCount = 3; // unless biasing is enabled
+  
+  private final IntArrayList danglingNodes = new IntArrayList();
+  
+  private final IntArrayList edges = new IntArrayList(); // source, dest node count... dest id, weight pos, (repeat)
+  
+  private boolean shouldHandleDanglingNodes = false;
+  private boolean shouldCacheEdgesOnDisk = false;
+  private long edgeCachingThreshold;
+  private boolean nodeBiasingEnabled = false;
+  
+  private File edgesFile;
+  private DataOutputStream edgeDataOutputStream;
+  private boolean usingEdgeDiskCache;
+  
+  public void clear() throws IOException
+  {
+    this.edgeCount = 0;
+    this.nodeCount = 0;
+    this.totalRankChange = 0.0f;
+    
+    this.nodeIndices.clear();
+    this.nodeData.clear();
+    this.edges.clear();
+    this.danglingNodes.clear();
+    
+    if (edgeDataOutputStream != null)
+    {
+      this.edgeDataOutputStream.close();
+      this.edgeDataOutputStream = null;
+    }
+    
+    this.usingEdgeDiskCache = false;
+    this.edgesFile = null;
+  }
+  
+  /**
+    * Gets the page rank alpha value.
+    * @return alpha
+    */
+   public float getAlpha()
+   {
+     return alpha;
+   }
+   
+   /**
+    * Sets the page rank alpha value (default is 0.85);
+    * @param alpha 
+    */
+   public void setAlpha(float alpha)
+   {
+     this.alpha = alpha;
+   }
+   
+   public boolean isNodeBiasingEnabled()
+   {
+     return this.nodeBiasingEnabled;
+   }
+   
+   public void enableNodeBiasing()
+   {
+     this.nodeBiasingEnabled = true;
+     this.nodeFieldCount = 4;
+   }
+   
+   public void disableNodeBiasing()
+   {
+     this.nodeBiasingEnabled = false;
+     this.nodeFieldCount = 3;
+   }
+   
+  
+  /**
+   * Gets whether disk is being used to cache edges.
+   * @return True if the edges are cached on disk.
+   */
+  public boolean isUsingEdgeDiskCache()
+  {
+    return usingEdgeDiskCache;
+  }
+  
+  /**
+   * Enable disk caching of edges once there are too many (disabled by default).
+   */
+  public void enableEdgeDiskCaching()
+  {
+    shouldCacheEdgesOnDisk = true;
+  }
+  
+  /**
+   * Disable disk caching of edges once there are too many (disabled by default).
+   */
+  public void disableEdgeDiskCaching()
+  {
+    shouldCacheEdgesOnDisk = false;
+  }
+  
+  /**
+   * Gets whether edge disk caching is enabled.
+   * @return True if edge disk caching is enabled.
+   */
+  public boolean isEdgeDiskCachingEnabled()
+  {
+    return shouldCacheEdgesOnDisk;
+  }
+  
+  /**
+   * Gets the number of edges past which they will be cached on disk instead of in memory.
+   * Edge disk caching must be enabled for this to have any effect.
+   * @return Edge count past which caching occurs
+   */
+  public long getEdgeCachingThreshold()
+  {
+    return edgeCachingThreshold;
+  }
+
+  /**
+   * Set the number of edges past which they will be cached on disk instead of in memory.
+   * Edge disk caching must be enabled for this to have any effect.
+   * @param count Edge count past which caching occurs
+   */
+  public void setEdgeCachingThreshold(long count)
+  {
+    edgeCachingThreshold = count;
+  }
+  
+  /**
+   * Enables dangling node handling (disabled by default).
+   */
+  public void enableDanglingNodeHandling()
+  {
+    shouldHandleDanglingNodes = true;
+  }
+  
+  /**
+   * Disables dangling node handling (disabled by default).
+   */
+  public void disableDanglingNodeHandling()
+  {
+    shouldHandleDanglingNodes = false;
+  }
+  
+  public long nodeCount()
+  {
+    return this.nodeCount;
+  }
+  
+  public long edgeCount()
+  {
+    return this.edgeCount;
+  }
+
+  public Int2IntMap.FastEntrySet getNodeIds()
+  {
+    return this.nodeIndices.int2IntEntrySet();
+  }
+  
+  public float getNodeRank(int nodeId)
+  {
+    int nodeIndex = this.nodeIndices.get(nodeId);
+    return nodeData.get(nodeIndex);
+  }
+  
+  public float getTotalRankChange()
+  {
+    return this.totalRankChange;
+  }
+  
+  private void maybeCreateNode(int nodeId)
+  {
+    // create from node if it doesn't already exist
+    if (!nodeIndices.containsKey(nodeId))
+    {      
+      int index = this.nodeData.size();
+      
+      this.nodeData.add(0.0f); // rank
+      this.nodeData.add(0.0f); // total weight
+      this.nodeData.add(0.0f); // contribution
+      
+      if (this.nodeBiasingEnabled)
+      {
+        this.nodeData.add(0.0f); // bias
+      }      
+      
+      this.nodeIndices.put(nodeId, index);
+      
+      this.nodeCount++;
+    }
+  }
+  
+  public float getNodeBias(int nodeId)
+  {
+    if (!this.nodeBiasingEnabled)
+    {
+      throw new IllegalArgumentException("Node biasing not enable");
+    }
+    int nodeIndex = this.nodeIndices.get(nodeId);
+    return this.nodeData.get(nodeIndex+3);
+  }
+  
+  public void setNodeBias(int nodeId, float bias)
+  {
+    if (!this.nodeBiasingEnabled)
+    {
+      throw new IllegalArgumentException("Node biasing not enable");
+    }
+    
+    int nodeIndex = this.nodeIndices.get(nodeId);
+    this.nodeData.set(nodeIndex+3, bias);
+  }
+  
+  public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges) throws IOException
+  {
+    // with bias of 1.0, all nodes have an equal bias (that is, no bias)
+    addNode(sourceId, sourceEdges, 1.0f);
+  }
+  
+  public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges, float bias) throws IOException
+  {
+    int source = sourceId.intValue();
+   
+    maybeCreateNode(source);
+    
+    if (this.nodeBiasingEnabled)
+    {
+      setNodeBias(source, bias);
+    }
+    else if (bias != 1.0f)
+    {
+      // with node biasing disabled, all nodes implicitly have a bias of 1.0, which means no bias, so if anything else was specified
+      // it won't take effect.
+      throw new IllegalArgumentException("Bias was specified but node biasing not enabled");
+    }
+    
+    if (this.shouldCacheEdgesOnDisk && !usingEdgeDiskCache && (sourceEdges.size() + this.edgeCount) >= this.edgeCachingThreshold)
+    {
+      writeEdgesToDisk();
+    }
+    
+    // store the source node id itself
+    appendEdgeData(source);
+    
+    // store how many outgoing edges this node has
+    appendEdgeData(sourceEdges.size());
+    
+    // store the outgoing edges
+    for (Map<String,Object> edge : sourceEdges)
+    {
+      int dest = ((Integer)edge.get("dest")).intValue();
+      float weight = ((Double)edge.get("weight")).floatValue();
+            
+      maybeCreateNode(dest);
+      
+      appendEdgeData(dest);
+      
+      // location of weight in weights array
+      appendEdgeData(Math.max(1, (int)(weight * EDGE_WEIGHT_MULTIPLIER)));
+      
+      this.edgeCount++;
+    }
+  }
+  
+  private void appendEdgeData(int data) throws IOException
+  {
+    if (this.edgeDataOutputStream != null)
+    {
+      this.edgeDataOutputStream.writeInt(data);
+    }
+    else
+    {
+      this.edges.add(data);
+    }
+  }
+  
+  public void init() throws IOException
+  {
+    init(getDummyIndicator());
+  }
+    
+  public void init(ProgressIndicator progressIndicator) throws IOException
+  {
+    if (this.edgeDataOutputStream != null)
+    {
+      this.edgeDataOutputStream.close();
+      this.edgeDataOutputStream = null;
+    }
+    
+    // initialize all nodes to an equal share of the total rank (1.0)
+    float nodeRank = 1.0f / this.nodeCount;        
+    float totalBias = 0.0f;
+    for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
+    {
+      nodeData.set(j, nodeRank);      
+      progressIndicator.progress();
+      if (this.nodeBiasingEnabled) 
+      {
+        totalBias += nodeData.getFloat(j+3);
+      }
+    }      
+    
+    // if node biasing enabled, need to normalize the bias by the total bias across all nodes so it represents
+    // the share of bias.
+    if (this.nodeBiasingEnabled)
+    {
+      for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
+      {
+        float bias = nodeData.getFloat(j+3);
+        bias /= totalBias;
+        nodeData.set(j+3,bias);
+      }
+    }
+    
+    Iterator<Integer> edgeData = getEdgeData();
+    
+    while(edgeData.hasNext())
+    {
+      int sourceId = edgeData.next();
+      int nodeEdgeCount = edgeData.next();
+      
+      while (nodeEdgeCount-- > 0)
+      {
+        // skip the destination node id
+        edgeData.next();
+        
+        float weight = edgeData.next();
+                
+        int nodeIndex = this.nodeIndices.get(sourceId);
+        
+        float totalWeight = this.nodeData.getFloat(nodeIndex+1); 
+        totalWeight += weight;
+        this.nodeData.set(nodeIndex+1, totalWeight);
+        
+        progressIndicator.progress();
+      }
+    }
+    
+    // if handling dangling nodes, get a list of them by finding those nodes with no outgoing
+    // edges (i.e. total outgoing edge weight is 0.0)
+    if (shouldHandleDanglingNodes)
+    {
+      for (Map.Entry<Integer,Integer> e : nodeIndices.entrySet())
+      {
+        int nodeId = e.getKey();
+        int nodeIndex = e.getValue();
+        float totalWeight = nodeData.getFloat(nodeIndex+1);
+        if (totalWeight == 0.0f)
+        {
+          danglingNodes.add(nodeId);
+        }
+      }
+    }
+  }
+  
+  public float nextIteration(ProgressIndicator progressIndicator) throws IOException
+  {
+    distribute(progressIndicator);
+    commit(progressIndicator);
+    
+    return getTotalRankChange();
+  }
+  
+  public float nextIteration() throws IOException
+  {
+    ProgressIndicator dummyIndicator = getDummyIndicator();
+    distribute(dummyIndicator);
+    commit(dummyIndicator);
+    
+    return getTotalRankChange();
+  }
+  
+  private ProgressIndicator getDummyIndicator()
+  {
+    return new ProgressIndicator() {
+      @Override
+      public void progress()
+      {        
+      }
+    };
+  }
+  
+  public void distribute(ProgressIndicator progressIndicator) throws IOException
+  {    
+    Iterator<Integer> edgeData = getEdgeData();
+    
+    while(edgeData.hasNext())
+    {
+      int sourceId = edgeData.next();
+      int nodeEdgeCount = edgeData.next();
+      
+      while (nodeEdgeCount-- > 0)
+      {
+        int toId = edgeData.next();
+        float weight = edgeData.next();
+                
+        int fromNodeIndex = this.nodeIndices.get(sourceId);
+        int toNodeIndex = this.nodeIndices.get(toId);
+        
+        float contributionChange = weight * this.nodeData.getFloat(fromNodeIndex) / this.nodeData.getFloat(fromNodeIndex+1);
+        
+        float currentContribution = this.nodeData.getFloat(toNodeIndex+2);
+        this.nodeData.set(toNodeIndex+2, currentContribution + contributionChange);
+        
+        progressIndicator.progress();
+      }      
+    }
+    
+    if (shouldHandleDanglingNodes)
+    {
+      // get the rank from each of the dangling nodes
+      float totalRank = 0.0f;
+      for (int nodeId : danglingNodes)
+      {
+        int nodeIndex = nodeIndices.get(nodeId);
+        float rank = nodeData.get(nodeIndex);
+        totalRank += rank;
+      }
+      
+      // distribute the dangling node ranks to all the nodes in the graph
+      // note: the alpha factor is applied in the commit stage
+      float contributionIncrease = totalRank / this.nodeCount;
+      for (int i=2; i<nodeData.size(); i += this.nodeFieldCount)
+      {
+        float contribution = nodeData.getFloat(i);
+        contribution += contributionIncrease;
+        nodeData.set(i, contribution);
+      }
+    }
+  }
+  
+  public void commit(ProgressIndicator progressIndicator)
+  {
+    this.totalRankChange = 0.0f;
+    
+    float oneMinusAlpha = (1.0f - this.alpha);
+    float oneMinusAlphaOverNodeCount = oneMinusAlpha / nodeCount;
+    
+    for (int nodeIndex=0; nodeIndex<this.nodeData.size(); nodeIndex += this.nodeFieldCount)
+    {      
+      float oldRank = this.nodeData.get(nodeIndex+2);
+      float newRank;
+      
+      if (this.nodeBiasingEnabled)
+      {
+        float bias = this.nodeData.get(nodeIndex+3);
+        newRank = bias * oneMinusAlpha + alpha * oldRank;
+      }
+      else
+      {
+        newRank = oneMinusAlphaOverNodeCount + alpha * oldRank;
+      }
+      
+      this.nodeData.set(nodeIndex+2, 0.0f);
+      
+      float lastRankDiff = newRank - this.nodeData.get(nodeIndex);
+      
+      this.nodeData.set(nodeIndex, newRank);
+      
+      this.totalRankChange += Math.abs(lastRankDiff);
+      
+      progressIndicator.progress();
+    }
+  }
+  
+  private void writeEdgesToDisk() throws IOException
+  { 
+    this.edgesFile = File.createTempFile("fastgraph", null);
+    
+    FileOutputStream outStream = new FileOutputStream(this.edgesFile);
+    BufferedOutputStream bufferedStream = new BufferedOutputStream(outStream);
+    this.edgeDataOutputStream = new DataOutputStream(bufferedStream);
+    
+    for (int edgeData : edges)
+    {
+      this.edgeDataOutputStream.writeInt(edgeData);
+    }
+    
+    this.edges.clear();
+    usingEdgeDiskCache = true;
+  }
+  
+  private Iterator<Integer> getEdgeData() throws IOException
+  {
+    if (!usingEdgeDiskCache)
+    {
+      return this.edges.iterator();
+    }
+    else
+    {
+      FileInputStream fileInputStream = new FileInputStream(this.edgesFile);
+      BufferedInputStream inputStream = new BufferedInputStream(fileInputStream);
+      final DataInputStream dataInputStream = new DataInputStream(inputStream);
+      
+      return new AbstractIterator<Integer>() {
+        
+        @Override
+        protected Integer computeNext()
+        {
+          try
+          {
+            return dataInputStream.readInt();
+          }
+          catch (IOException e)
+          {
+            return endOfData();
+          }
+        }
+        
+      };
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java
new file mode 100644
index 0000000..3dc54c5
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.linkanalysis;
+
+interface ProgressIndicator
+{
+  void progress();
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java
new file mode 100644
index 0000000..2c6c078
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for performing link analysis, such as PageRank.
+ */
+package datafu.pig.linkanalysis;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/random/RandInt.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/random/RandInt.java b/datafu-pig/src/main/java/datafu/pig/random/RandInt.java
new file mode 100644
index 0000000..de89c4a
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/random/RandInt.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.random;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Generates a uniformly distributed integer between two bounds.
+ */
+@Nondeterministic
+public class RandInt extends SimpleEvalFunc<Integer> 
+{
+  private final Random rand = new Random();
+  
+  /**
+   * @param min lower bound for random number
+   * @param max upper bound for random number
+   */
+  public Integer call(Integer min, Integer max) throws IOException
+  {
+    try
+    {
+      if (min > max)
+      {
+        throw new RuntimeException("The first argument must be less than the second");
+      }
+      return rand.nextInt(max - min + 1) + min;
+    }
+    catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Schema outputSchema(Schema input)
+  {
+    return new Schema(new Schema.FieldSchema("rand", DataType.INTEGER));
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java b/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java
new file mode 100644
index 0000000..d63f4cf
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.random;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Generates a random UUID using java.util.UUID
+ */
+@Nondeterministic
+public class RandomUUID extends EvalFunc<String>
+{
+    public String exec(Tuple input) throws IOException
+    {
+        return UUID.randomUUID().toString();
+    }
+
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        return new Schema(new Schema.FieldSchema("uuid", DataType.CHARARRAY));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/random/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/random/package-info.java b/datafu-pig/src/main/java/datafu/pig/random/package-info.java
new file mode 100644
index 0000000..8c7750d
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/random/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs dealing with randomness.
+ */
+package datafu.pig.random;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java b/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java
new file mode 100644
index 0000000..09b4c95
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.util.PriorityQueue;
+
+class Reservoir extends PriorityQueue<ScoredTuple>
+{
+  private static final long serialVersionUID = 1L;
+  private int numSamples;
+  
+  public Reservoir(int numSamples)
+  {
+    super(numSamples);
+    this.numSamples = numSamples;
+  }
+  
+  public boolean consider(ScoredTuple scoredTuple)
+  {
+    if (super.size() < numSamples) {
+      return super.add(scoredTuple);
+    } else {      
+      ScoredTuple head = super.peek();
+      if (scoredTuple.score > head.score) {
+        super.poll();
+        return super.add(scoredTuple);
+      }
+      return false;
+    }
+  }
+}


Mime
View raw message