http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java b/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java
new file mode 100644
index 0000000..6933c28
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/EmptyBagToNullFields.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.ContextualEvalFunc;
+
+/**
+ * For an empty bag, inserts a tuple having null values for all fields;
+ * otherwise, the input bag is returned unchanged.
+ *
+ * <p>
+ * This can be useful when performing FLATTEN on a bag from a COGROUP,
+ * as FLATTEN on an empty bag produces no data.
+ * </p>
+ */
+public class EmptyBagToNullFields extends ContextualEvalFunc<DataBag>
+{
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException
+ {
+ if (tuple.size() == 0 || tuple.get(0) == null)
+ return null;
+ Object o = tuple.get(0);
+ if (o instanceof DataBag)
+ {
+ DataBag bag = (DataBag)o;
+ if (bag.size() == 0)
+ {
+ // create a tuple with null values for all fields
+ int tupleSize = (Integer)getInstanceProperties().get("tuplesize");
+ return BagFactory.getInstance().newDefaultBag(Arrays.asList(TupleFactory.getInstance().newTuple(tupleSize)));
+ }
+ else
+ {
+ return bag;
+ }
+ }
+ else
+ throw new IllegalArgumentException("expected a null or a bag");
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ if (input.size() != 1)
+ {
+ throw new RuntimeException("Expected only a single field as input");
+ }
+
+ if (input.getField(0).type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input, but found " + DataType.findTypeName(input.getField(0).type));
+ }
+
+ // get the size of the tuple within the bag
+ int innerTupleSize = input.getField(0).schema.getField(0).schema.getFields().size();
+ getInstanceProperties().put("tuplesize", innerTupleSize);
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return input;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java b/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java
new file mode 100644
index 0000000..8a0d072
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/Enumerate.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Enumerate a bag, appending to each tuple its index within the bag.
+ *
+ * <p>
+ * For example:
+ * <pre>
+ * {(A),(B),(C),(D)} => {(A,0),(B,1),(C,2),(D,3)}
+ * </pre>
+ * The first constructor parameter (optional) dictates the starting index of the counting.
+ * This UDF implements the accumulator interface, reducing DataBag materialization costs.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define Enumerate datafu.pig.bags.Enumerate('1');
+ *
+ * -- input:
+ * -- ({(100),(200),(300),(400)})
+ * input = LOAD 'input' as (B: bag{T: tuple(v2:INT)});
+ *
+ * -- output:
+ * -- ({(100,1),(200,2),(300,3),(400,4)})
+ * output = FOREACH input GENERATE Enumerate(B);
+ * }
+ * </pre>
+ */
+public class Enumerate extends AccumulatorEvalFunc<DataBag>
+{
+ private final int start;
+
+ private DataBag outputBag;
+ private long i;
+ private long count;
+
+ public Enumerate()
+ {
+ this("0");
+ }
+
+ public Enumerate(String start)
+ {
+ this.start = Integer.parseInt(start);
+ cleanup();
+ }
+
+ @Override
+ public void accumulate(Tuple arg0) throws IOException
+ {
+ DataBag inputBag = (DataBag)arg0.get(0);
+ for (Tuple t : inputBag) {
+ Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
+ t1.append(i);
+ outputBag.add(t1);
+
+ if (count % 1000000 == 0) {
+ outputBag.spill();
+ count = 0;
+ }
+ i++;
+ count++;
+ }
+ }
+
+ @Override
+ public void cleanup()
+ {
+ this.outputBag = BagFactory.getInstance().newDefaultBag();
+ this.i = this.start;
+ this.count = 0;
+ }
+
+ @Override
+ public DataBag getValue()
+ {
+ return outputBag;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ if (input.size() != 1)
+ {
+ throw new RuntimeException("Expected input to have only a single field");
+ }
+
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+
+ Schema outputTupleSchema = inputTupleSchema.clone();
+ outputTupleSchema.add(new Schema.FieldSchema("i", DataType.LONG));
+
+ return new Schema(new Schema.FieldSchema(
+ getSchemaName(this.getClass().getName().toLowerCase(), input),
+ outputTupleSchema,
+ DataType.BAG));
+ }
+ catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
new file mode 100644
index 0000000..1f24984
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import datafu.pig.util.SimpleEvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Returns the first tuple from a bag. Requires a second parameter that will be returned if the bag is empty.
+ *
+ * Example:
+ * <pre>
+ * {@code
+ * define FirstTupleFromBag datafu.pig.bags.FirstTupleFromBag();
+ *
+ * -- input:
+ * -- ({(a,1)})
+ * input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)});
+ *
+ * output = FOREACH input GENERATE FirstTupleFromBag(B, null);
+ *
+ * -- output:
+ * -- (a,1)
+ * }
+ * </pre>
+ */
+
+public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
+{
+ public Tuple call(DataBag bag, Tuple defaultValue) throws IOException
+ {
+ for (Tuple t : bag) {
+ return t;
+ }
+ return defaultValue;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ return new Schema(input.getField(0).schema);
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java b/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java
new file mode 100644
index 0000000..09fffb3
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/NullToEmptyBag.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Returns an empty bag if the input is null; otherwise,
+ * returns the input bag unchanged.
+ */
+public class NullToEmptyBag extends EvalFunc<DataBag>
+{
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException
+ {
+ if (tuple.size() == 0 || tuple.get(0) == null)
+ return BagFactory.getInstance().newDefaultBag();
+ Object o = tuple.get(0);
+ if (o instanceof DataBag)
+ return (DataBag)o;
+ else
+ throw new IllegalArgumentException("expected a null or a bag");
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ return input;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java b/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java
new file mode 100644
index 0000000..9292871
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/PrependToBag.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Prepends a tuple to a bag.
+ *
+ * <p>N.B. this copies the entire input bag, so don't use it for large bags.</p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define PrependToBag datafu.pig.bags.PrependToBag();
+ *
+ * -- input:
+ * -- ({(1),(2),(3)},(4))
+ * -- ({(10),(20),(30),(40),(50)},(60))
+ * input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
+
+ * -- output:
+ * -- ({(4),(1),(2),(3)})
+ * -- ({(60),(10),(20),(30),(40),(50)})
+ * output = FOREACH input GENERATE PrependToBag(B,T) as B;
+ * }
+ * </pre>
+ * </p>
+ */
+public class PrependToBag extends SimpleEvalFunc<DataBag>
+{
+ public DataBag call(DataBag inputBag, Tuple t) throws IOException
+ {
+ DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+ outputBag.add(t);
+ for (Tuple x : inputBag)
+ outputBag.add(x);
+ return outputBag;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+ input.getField(0).schema, DataType.BAG));
+ }
+ catch (FrontendException e) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java b/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java
new file mode 100644
index 0000000..c86ffcf
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/ReverseEnumerate.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Enumerate a bag, appending to each tuple its index within the bag, with indices being produced in
+ * descending order.
+ *
+ * <p>
+ * For example:
+ * <pre>
+ * {(A),(B),(C),(D)} => {(A,3),(B,2),(C,1),(D,0)}
+ * </pre>
+ * The first constructor parameter (optional) dictates the starting index of the counting. As the
+ * UDF requires the size of the bag for reverse counting, this UDF does <b>not</b> implement the
+ * accumulator interface and suffers from the slight performance penalty of DataBag materialization.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define ReverseEnumerate datafu.pig.bags.ReverseEnumerate('1');
+ *
+ * -- input:
+ * -- ({(100),(200),(300),(400)})
+ * input = LOAD 'input' as (B: bag{T: tuple(v2:INT)});
+ *
+ * -- output:
+ * -- ({(100,4),(200,3),(300,2),(400,1)})
+ * output = FOREACH input GENERATE ReverseEnumerate(B);
+ * }
+ * </pre>
+ * </p>
+ */
+public class ReverseEnumerate extends SimpleEvalFunc<DataBag>
+{
+ private final int start;
+
+ public ReverseEnumerate()
+ {
+ this.start = 0;
+ }
+
+ public ReverseEnumerate(String start)
+ {
+ this.start = Integer.parseInt(start);
+ }
+
+ public DataBag call(DataBag inputBag) throws IOException
+ {
+ DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+ long i = start, count = 0;
+ i = inputBag.size() - 1 + start;
+
+ for (Tuple t : inputBag) {
+ Tuple t1 = TupleFactory.getInstance().newTuple(t.getAll());
+ t1.append(i);
+ outputBag.add(t1);
+
+ if (count % 1000000 == 0) {
+ outputBag.spill();
+ count = 0;
+ }
+ i--;
+ count++;
+ }
+
+ return outputBag;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ if (input.size() != 1)
+ {
+ throw new RuntimeException("Expected input to have only a single field");
+ }
+
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+
+ Schema outputTupleSchema = inputTupleSchema.clone();
+ outputTupleSchema.add(new Schema.FieldSchema("i", DataType.LONG));
+
+ return new Schema(new Schema.FieldSchema(
+ getSchemaName(this.getClass().getName().toLowerCase(), input),
+ outputTupleSchema,
+ DataType.BAG));
+ }
+ catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java b/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java
new file mode 100644
index 0000000..a1d149e
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/UnorderedPairs.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+/**
+ * Generates pairs of all items in a bag.
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define UnorderedPairs datafu.pig.bags.UnorderedPairs();
+ *
+ * -- input:
+ * -- ({(1),(2),(3),(4)})
+ * input = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
+ *
+ * -- output:
+ * -- ({((1),(2)),((1),(3)),((1),(4)),((2),(3)),((2),(4)),((3),(4))})
+ * output = FOREACH input GENERATE UnorderedPairs(B);
+ * }
+ * </pre>
+ * </p>
+ */
+public class UnorderedPairs extends EvalFunc<DataBag>
+{
+ private static final BagFactory bagFactory = BagFactory.getInstance();
+ private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+
+ try {
+ DataBag inputBag = (DataBag) input.get(0);
+ DataBag outputBag = bagFactory.newDefaultBag();
+ long i=0, j, cnt=0;
+
+ if (inputBag != null)
+ {
+ for (Tuple elem1 : inputBag) {
+ j = 0;
+ for (Tuple elem2 : inputBag) {
+ if (j > i) {
+ outputBag.add(tupleFactory.newTuple(Arrays.asList(elem1, elem2)));
+ cnt++;
+ }
+ j++;
+
+ if (reporter != null)
+ reporter.progress();
+
+ if (cnt % 1000000 == 0) {
+ outputBag.spill();
+ cnt = 0;
+ }
+ }
+ i++;
+ }
+ }
+
+ return outputBag;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Caught exception processing input of " + this.getClass().getName(), e);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try {
+ if (input.size() != 1)
+ {
+ throw new RuntimeException("Expected input to have only a single field");
+ }
+
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema ouputTupleSchema = new Schema();
+ ouputTupleSchema.add(new Schema.FieldSchema("elem1", inputBagSchema.getField(0).schema.clone(), DataType.TUPLE));
+ ouputTupleSchema.add(new Schema.FieldSchema("elem2", inputBagSchema.getField(0).schema.clone(), DataType.TUPLE));
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
+ ouputTupleSchema,
+ DataType.BAG));
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/bags/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/package-info.java b/datafu-pig/src/main/java/datafu/pig/bags/package-info.java
new file mode 100644
index 0000000..214b837
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A collection of general purpose UDFs for operating on bags.
+ */
+package datafu.pig.bags;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java b/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java
new file mode 100644
index 0000000..d1e3988
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/geo/HaversineDistInMiles.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.geo;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Computes the distance (in miles) between two latitude-longitude pairs
+ * using the {@link <a href="http://en.wikipedia.org/wiki/Haversine_formula" target="_blank">Haversine formula</a>}.
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * -- input is a TSV of two latitude and longitude pairs
+ * input = LOAD 'input' AS (lat1 : double, long1 : double, lat2 : double, long2 : double);
+ * output = FOREACH input GENERATE datafu.pig.geo.HaversineDistInMiles(lat1, long1, lat2, long2) as distance;
+ * }</pre></p>
+ */
+public class HaversineDistInMiles extends SimpleEvalFunc<Double>
+{
+ public static final double EARTH_RADIUS = 3958.75;
+
+ public Double call(Double lat1, Double lng1, Double lat2, Double lng2)
+ {
+ if (lat1 == null || lng1 == null || lat2 == null || lng2 == null)
+ return null;
+
+ double d_lat = Math.toRadians(lat2-lat1);
+ double d_long = Math.toRadians(lng2-lng1);
+ double a = Math.sin(d_lat/2) * Math.sin(d_lat/2) +
+ Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) *
+ Math.sin(d_long/2) * Math.sin(d_long/2);
+ double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
+ return EARTH_RADIUS * c;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ return new Schema(new Schema.FieldSchema("dist", DataType.DOUBLE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/geo/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/geo/package-info.java b/datafu-pig/src/main/java/datafu/pig/geo/package-info.java
new file mode 100644
index 0000000..12b27b0
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/geo/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for geographic computations.
+ */
+package datafu.pig.geo;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/hash/MD5.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/hash/MD5.java b/datafu-pig/src/main/java/datafu/pig/hash/MD5.java
new file mode 100644
index 0000000..b7b51da
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/hash/MD5.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.hash;
+
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.codec.binary.Base64;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Computes the MD5 value of a string and outputs it in hex (by default).
+ * A method can be provided to the constructor, which may be either 'hex' or 'base64'.
+ */
+public class MD5 extends SimpleEvalFunc<String>
+{
+ private final MessageDigest md5er;
+ private final boolean isBase64;
+
+ public MD5()
+ {
+ this("hex");
+ }
+
+ public MD5(String method)
+ {
+ if ("hex".equals(method))
+ {
+ isBase64 = false;
+ }
+ else if ("base64".equals(method))
+ {
+ isBase64 = true;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Expected either hex or base64");
+ }
+
+ try {
+ md5er = MessageDigest.getInstance("md5");
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String call(String val)
+ {
+ if (isBase64)
+ {
+ return new String(Base64.encodeBase64(md5er.digest(val.getBytes())));
+ }
+ else
+ {
+ return new BigInteger(1, md5er.digest(val.getBytes())).toString(16);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/hash/SHA.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/hash/SHA.java b/datafu-pig/src/main/java/datafu/pig/hash/SHA.java
new file mode 100644
index 0000000..ff859e5
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/hash/SHA.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.hash;
+
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+public class SHA extends SimpleEvalFunc<String> {
+ private final MessageDigest sha;
+
+ public SHA(){
+ this("256");
+ }
+
+ public SHA(String algorithm){
+ try {
+ sha = MessageDigest.getInstance("SHA-"+algorithm);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String call(String value){
+ return new BigInteger(1, sha.digest(value.getBytes())).toString(16);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/hash/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/hash/package-info.java b/datafu-pig/src/main/java/datafu/pig/hash/package-info.java
new file mode 100644
index 0000000..320a029
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/hash/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for computing hashes from data.
+ */
+package datafu.pig.hash;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java
new file mode 100644
index 0000000..80ff567
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRank.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.linkanalysis;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A UDF which implements {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}.
+ *
+ * <p>
+ * This is not a distributed implementation. Each graph is stored in memory while running the algorithm, with edges optionally
+ * spilled to disk to conserve memory. This can be used to distribute the execution of PageRank on multiple
+ * reasonably sized graphs. It does not distribute execuion of PageRank for each individual graph. Each graph is identified
+ * by an integer valued topic ID.
+ * </p>
+ *
+ * <p>
+ * If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs
+ * many MapReduce jobs in a sequence to complete the PageRank iterations.
+ * </p>
+ *
+ * <p>
+ * Each graph is represented through a bag of (source,edges) tuples. The 'source' is an integer ID representing the source node.
+ * The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples. The 'dest' is an
+ * integer ID representing the destination node. The weight is a double representing how much the edge should be weighted.
+ * For a standard PageRank implementation just use weight of 1.0.
+ * </p>
+ *
+ * <p>
+ * The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.
+ * </p>
+ *
+ * <p>
+ * There are several configurable options for this UDF, among them:
+ * <p>
+ *
+ * <ul>
+ * <li>
+ * <b>alpha</b>: Controls the PageRank alpha value. The default is 0.85. A higher value reduces the "random jump"
+ * factor and causes the rank to be influenced more by edges.
+ * </li>
+ * <li>
+ * <b>max_iters</b>: The maximum number of iterations to run. The default is 150.
+ * </li>
+ * <li>
+ * <b>dangling_nodes</b>: How to handling "dangling nodes", i.e. nodes with no outgoing edges. When "true" this is equivalent
+ * to forcing a dangling node to have an outgoing edge to every other node in the graph. The default is "false".
+ * </li>
+ * <li>
+ * <b>tolerance</b>: A threshold which causes iterations to cease. It is measured from the total change in ranks from each of
+ * the nodes in the graph. As the ranks settle on their final values the total change decreases. This can be used
+ * to stop iterations early. The default is 1e-16.
+ * </li>
+ * <li>
+ * <b>max_nodes_and_edges</b>: This is a control to prevent running out of memory. As a graph is loaded, if the sum of edges
+ * and nodes exceeds this value then it will stop. It will not fail but PageRank will not be run on this graph. Instead a null
+ * value will be returned as a result. The default is 100M.
+ * </li>
+ * <li>
+ * <b>spill_to_edge_disk_storage</b>: Used to conserve memory. When "true" it causes the edge data to be written to disk in a temp file instead
+ * of being held in memory when the number of edges exceeds a threshold. The nodes are still held in memory however.
+ * Each iteration of PageRank will stream through the edges stored on disk. The default is "false".
+ * </li>
+ * <li>
+ * <b>max_edges_in_memory</b>: When spilling edges to disk is enabled, this is the threshold which triggers that behavior. The default is 30M.
+ * </li>
+ * </ul>
+ *
+ * <p>
+ * Parameters are configured by passing them in as a sequence of pairs into the UDF constructor. For example, below the alpha value is set to
+ * 0.87 and dangling nodes are enabled. All arguments must be strings.
+ * </p>
+ *
+ * <p>
+ * <pre>
+ * {@code
+ * define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
+ * }
+ * </pre>
+ * </p>
+ *
+ * <p>
+ * Full example:
+ * <pre>
+ * {@code
+ *
+ * topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
+ *
+ * topic_edges_grouped = GROUP topic_edges by (topic, source) ;
+ * topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
+ * group.topic as topic,
+ * group.source as source,
+ * topic_edges.(dest,weight) as edges;
+ *
+ * topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic;
+ *
+ * topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
+ * group as topic,
+ * FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
+ *
+ * topic_ranks = FOREACH topic_ranks GENERATE
+ * topic, source, rank;
+ *
+ * }
+ * </pre>
+ * </p>
+ */
+public class PageRank extends AccumulatorEvalFunc<DataBag>
+{
+ private final datafu.pig.linkanalysis.PageRankImpl graph = new datafu.pig.linkanalysis.PageRankImpl();
+
+ private int maxNodesAndEdges = 100000000;
+ private int maxEdgesInMemory = 30000000;
+ private double tolerance = 1e-16;
+ private int maxIters = 150;
+ private boolean useEdgeDiskStorage = false;
+ private boolean enableDanglingNodeHandling = false;
+ private boolean enableNodeBiasing = false;
+ private boolean aborted = false;
+ private float alpha = 0.85f;
+
+ TupleFactory tupleFactory = TupleFactory.getInstance();
+ BagFactory bagFactory = BagFactory.getInstance();
+
+ public PageRank()
+ {
+ initialize();
+ }
+
+ public PageRank(String... parameters)
+ {
+ if (parameters.length % 2 != 0)
+ {
+ throw new RuntimeException("Invalid parameters list");
+ }
+
+ for (int i=0; i<parameters.length; i+=2)
+ {
+ String parameterName = parameters[i];
+ String value = parameters[i+1];
+ if (parameterName.equals("max_nodes_and_edges"))
+ {
+ maxNodesAndEdges = Integer.parseInt(value);
+ }
+ else if (parameterName.equals("max_edges_in_memory"))
+ {
+ maxEdgesInMemory = Integer.parseInt(value);
+ }
+ else if (parameterName.equals("tolerance"))
+ {
+ tolerance = Double.parseDouble(value);
+ }
+ else if (parameterName.equals("max_iters"))
+ {
+ maxIters = Integer.parseInt(value);
+ }
+ else if (parameterName.equals("spill_to_edge_disk_storage"))
+ {
+ useEdgeDiskStorage = Boolean.parseBoolean(value);
+ }
+ else if (parameterName.equals("dangling_nodes"))
+ {
+ enableDanglingNodeHandling = Boolean.parseBoolean(value);
+ }
+ else if (parameterName.equals("node_biasing"))
+ {
+ enableNodeBiasing = Boolean.parseBoolean(value);
+ }
+ else if (parameterName.equals("alpha"))
+ {
+ alpha = Float.parseFloat(value);
+ }
+ }
+
+ initialize();
+ }
+
+ private void initialize()
+ {
+ if (useEdgeDiskStorage)
+ {
+ this.graph.enableEdgeDiskCaching();
+ }
+ else
+ {
+ this.graph.disableEdgeDiskCaching();
+ }
+
+ if (enableDanglingNodeHandling)
+ {
+ this.graph.enableDanglingNodeHandling();
+ }
+ else
+ {
+ this.graph.disableDanglingNodeHandling();
+ }
+
+ if (enableNodeBiasing)
+ {
+ this.graph.enableNodeBiasing();
+ }
+ else
+ {
+ this.graph.disableNodeBiasing();
+ }
+
+ this.graph.setEdgeCachingThreshold(maxEdgesInMemory);
+ this.graph.setAlpha(alpha);
+ }
+
+ @Override
+ public void accumulate(Tuple t) throws IOException
+ {
+ if (aborted)
+ {
+ return;
+ }
+
+ DataBag bag = (DataBag) t.get(0);
+ if (bag == null || bag.size() == 0)
+ return;
+
+ for (Tuple sourceTuple : bag)
+ {
+ Integer sourceId = (Integer)sourceTuple.get(0);
+ DataBag edges = (DataBag)sourceTuple.get(1);
+ Double nodeBias = null;
+ if (enableNodeBiasing)
+ {
+ nodeBias = (Double)sourceTuple.get(2);
+ }
+
+ ArrayList<Map<String,Object>> edgesMapList = new ArrayList<Map<String, Object>>();
+
+ for (Tuple edgeTuple : edges)
+ {
+ Integer destId = (Integer)edgeTuple.get(0);
+ Double weight = (Double)edgeTuple.get(1);
+ HashMap<String,Object> edgeMap = new HashMap<String, Object>();
+ edgeMap.put("dest",destId);
+ edgeMap.put("weight",weight);
+ edgesMapList.add(edgeMap);
+ }
+
+ if (enableNodeBiasing)
+ {
+ graph.addNode(sourceId, edgesMapList, nodeBias.floatValue());
+ }
+ else
+ {
+ graph.addNode(sourceId, edgesMapList);
+ }
+
+ if (graph.nodeCount() + graph.edgeCount() > maxNodesAndEdges)
+ {
+ System.out.println(String.format("There are too many nodes and edges (%d + %d > %d). Aborting.", graph.nodeCount(), graph.edgeCount(), maxNodesAndEdges));
+ aborted = true;
+ break;
+ }
+
+ reporter.progress();
+ }
+ }
+
+ @Override
+ public DataBag getValue()
+ {
+ if (aborted)
+ {
+ return null;
+ }
+
+ System.out.println(String.format("Nodes: %d, Edges: %d", graph.nodeCount(), graph.edgeCount()));
+
+ ProgressIndicator progressIndicator = getProgressIndicator();
+ System.out.println("Finished loading graph.");
+ long startTime = System.nanoTime();
+ System.out.println("Initializing.");
+ try
+ {
+ graph.init(progressIndicator);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ System.out.println(String.format("Done, took %f ms", (System.nanoTime() - startTime)/10.0e6));
+
+ float totalDiff;
+ int iter = 0;
+
+ System.out.println("Beginning iterations");
+ startTime = System.nanoTime();
+ do
+ {
+ // TODO log percentage complete every 5 minutes
+ try
+ {
+ totalDiff = graph.nextIteration(progressIndicator);
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ iter++;
+ } while(iter < maxIters && totalDiff > tolerance);
+ System.out.println(String.format("Done, %d iterations took %f ms", iter, (System.nanoTime() - startTime)/10.0e6));
+
+ DataBag output = bagFactory.newDefaultBag();
+
+ for (Int2IntMap.Entry node : graph.getNodeIds())
+ {
+ int nodeId = node.getIntKey();
+ float rank = graph.getNodeRank(nodeId);
+ List nodeData = new ArrayList(2);
+ nodeData.add(nodeId);
+ nodeData.add(rank);
+ output.add(tupleFactory.newTuple(nodeData));
+ }
+
+ return output;
+ }
+
+ @Override
+ public void cleanup()
+ {
+ try
+ {
+ aborted = false;
+ this.graph.clear();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private ProgressIndicator getProgressIndicator()
+ {
+ return new ProgressIndicator()
+ {
+ @Override
+ public void progress()
+ {
+ reporter.progress();
+ }
+ };
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ try
+ {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG)
+ {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ Schema inputBagSchema = inputFieldSchema.schema;
+
+ if (inputBagSchema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(inputBagSchema.getField(0).type)));
+ }
+
+ Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+
+ if (!this.enableNodeBiasing)
+ {
+ if (inputTupleSchema.size() != 2)
+ {
+ throw new RuntimeException("Expected two fields for the node data");
+ }
+ }
+ else
+ {
+ if (inputTupleSchema.size() != 3)
+ {
+ throw new RuntimeException("Expected three fields for the node data");
+ }
+ }
+
+ if (inputTupleSchema.getField(0).type != DataType.INTEGER)
+ {
+ throw new RuntimeException(String.format("Expected source to be an INTEGER, but instead found %s",
+ DataType.findTypeName(inputTupleSchema.getField(0).type)));
+ }
+
+ if (inputTupleSchema.getField(1).type != DataType.BAG)
+ {
+ throw new RuntimeException(String.format("Expected edges to be represented with a BAG"));
+ }
+
+ if (this.enableNodeBiasing && inputTupleSchema.getField(2).type != DataType.DOUBLE)
+ {
+ throw new RuntimeException(String.format("Expected node bias to be a DOUBLE, but instead found %s",
+ DataType.findTypeName(inputTupleSchema.getField(2).type)));
+ }
+
+ Schema.FieldSchema edgesFieldSchema = inputTupleSchema.getField(1);
+
+ if (edgesFieldSchema.schema.getField(0).type != DataType.TUPLE)
+ {
+ throw new RuntimeException(String.format("Expected edges field to contain a TUPLE, but instead found %s",
+ DataType.findTypeName(edgesFieldSchema.schema.getField(0).type)));
+ }
+
+ Schema edgesTupleSchema = edgesFieldSchema.schema.getField(0).schema;
+
+ if (edgesTupleSchema.size() != 2)
+ {
+ throw new RuntimeException("Expected two fields for the edge data");
+ }
+
+ if (edgesTupleSchema.getField(0).type != DataType.INTEGER)
+ {
+ throw new RuntimeException(String.format("Expected destination edge ID to an INTEGER, but instead found %s",
+ DataType.findTypeName(edgesTupleSchema.getField(0).type)));
+ }
+
+ if (edgesTupleSchema.getField(1).type != DataType.DOUBLE)
+ {
+ throw new RuntimeException(String.format("Expected destination edge weight to a DOUBLE, but instead found %s",
+ DataType.findTypeName(edgesTupleSchema.getField(1).type)));
+ }
+
+ Schema tupleSchema = new Schema();
+ tupleSchema.add(new Schema.FieldSchema("node",DataType.INTEGER));
+ tupleSchema.add(new Schema.FieldSchema("rank",DataType.FLOAT));
+
+ return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ tupleSchema,
+ DataType.BAG));
+ }
+ catch (FrontendException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java
new file mode 100644
index 0000000..5d0b932
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/PageRankImpl.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.linkanalysis;
+
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * An implementation of {@link <a href="http://en.wikipedia.org/wiki/PageRank" target="_blank">PageRank</a>}, used by the {@link PageRank} UDF.
+ * It is not intended to be used directly.
+ * </p>
+ */
+public class PageRankImpl
+{
+ private float totalRankChange;
+ private long edgeCount;
+ private long nodeCount;
+
+ // the damping factor
+ private float alpha = 0.85f;
+
+ // edge weights (which are doubles) are multiplied by this value so they can be stored as integers internally
+ private static float EDGE_WEIGHT_MULTIPLIER = 100000;
+
+ private final Int2IntOpenHashMap nodeIndices = new Int2IntOpenHashMap();
+ private final FloatArrayList nodeData = new FloatArrayList(); // rank, total weight, contribution, bias(optional), (repeat)
+ private int nodeFieldCount = 3; // unless biasing is enabled
+
+ private final IntArrayList danglingNodes = new IntArrayList();
+
+ private final IntArrayList edges = new IntArrayList(); // source, dest node count... dest id, weight pos, (repeat)
+
+ private boolean shouldHandleDanglingNodes = false;
+ private boolean shouldCacheEdgesOnDisk = false;
+ private long edgeCachingThreshold;
+ private boolean nodeBiasingEnabled = false;
+
+ private File edgesFile;
+ private DataOutputStream edgeDataOutputStream;
+ private boolean usingEdgeDiskCache;
+
+ public void clear() throws IOException
+ {
+ this.edgeCount = 0;
+ this.nodeCount = 0;
+ this.totalRankChange = 0.0f;
+
+ this.nodeIndices.clear();
+ this.nodeData.clear();
+ this.edges.clear();
+ this.danglingNodes.clear();
+
+ if (edgeDataOutputStream != null)
+ {
+ this.edgeDataOutputStream.close();
+ this.edgeDataOutputStream = null;
+ }
+
+ this.usingEdgeDiskCache = false;
+ this.edgesFile = null;
+ }
+
+ /**
+ * Gets the page rank alpha value.
+ * @return alpha
+ */
+ public float getAlpha()
+ {
+ return alpha;
+ }
+
+ /**
+ * Sets the page rank alpha value (default is 0.85);
+ * @param alpha
+ */
+ public void setAlpha(float alpha)
+ {
+ this.alpha = alpha;
+ }
+
+ public boolean isNodeBiasingEnabled()
+ {
+ return this.nodeBiasingEnabled;
+ }
+
+ public void enableNodeBiasing()
+ {
+ this.nodeBiasingEnabled = true;
+ this.nodeFieldCount = 4;
+ }
+
+ public void disableNodeBiasing()
+ {
+ this.nodeBiasingEnabled = false;
+ this.nodeFieldCount = 3;
+ }
+
+
+ /**
+ * Gets whether disk is being used to cache edges.
+ * @return True if the edges are cached on disk.
+ */
+ public boolean isUsingEdgeDiskCache()
+ {
+ return usingEdgeDiskCache;
+ }
+
+ /**
+ * Enable disk caching of edges once there are too many (disabled by default).
+ */
+ public void enableEdgeDiskCaching()
+ {
+ shouldCacheEdgesOnDisk = true;
+ }
+
+ /**
+ * Disable disk caching of edges once there are too many (disabled by default).
+ */
+ public void disableEdgeDiskCaching()
+ {
+ shouldCacheEdgesOnDisk = false;
+ }
+
+ /**
+ * Gets whether edge disk caching is enabled.
+ * @return True if edge disk caching is enabled.
+ */
+ public boolean isEdgeDiskCachingEnabled()
+ {
+ return shouldCacheEdgesOnDisk;
+ }
+
+ /**
+ * Gets the number of edges past which they will be cached on disk instead of in memory.
+ * Edge disk caching must be enabled for this to have any effect.
+ * @return Edge count past which caching occurs
+ */
+ public long getEdgeCachingThreshold()
+ {
+ return edgeCachingThreshold;
+ }
+
+ /**
+ * Set the number of edges past which they will be cached on disk instead of in memory.
+ * Edge disk caching must be enabled for this to have any effect.
+ * @param count Edge count past which caching occurs
+ */
+ public void setEdgeCachingThreshold(long count)
+ {
+ edgeCachingThreshold = count;
+ }
+
+ /**
+ * Enables dangling node handling (disabled by default).
+ */
+ public void enableDanglingNodeHandling()
+ {
+ shouldHandleDanglingNodes = true;
+ }
+
+ /**
+ * Disables dangling node handling (disabled by default).
+ */
+ public void disableDanglingNodeHandling()
+ {
+ shouldHandleDanglingNodes = false;
+ }
+
+ public long nodeCount()
+ {
+ return this.nodeCount;
+ }
+
+ public long edgeCount()
+ {
+ return this.edgeCount;
+ }
+
+ public Int2IntMap.FastEntrySet getNodeIds()
+ {
+ return this.nodeIndices.int2IntEntrySet();
+ }
+
+ public float getNodeRank(int nodeId)
+ {
+ int nodeIndex = this.nodeIndices.get(nodeId);
+ return nodeData.get(nodeIndex);
+ }
+
+ public float getTotalRankChange()
+ {
+ return this.totalRankChange;
+ }
+
+ private void maybeCreateNode(int nodeId)
+ {
+ // create from node if it doesn't already exist
+ if (!nodeIndices.containsKey(nodeId))
+ {
+ int index = this.nodeData.size();
+
+ this.nodeData.add(0.0f); // rank
+ this.nodeData.add(0.0f); // total weight
+ this.nodeData.add(0.0f); // contribution
+
+ if (this.nodeBiasingEnabled)
+ {
+ this.nodeData.add(0.0f); // bias
+ }
+
+ this.nodeIndices.put(nodeId, index);
+
+ this.nodeCount++;
+ }
+ }
+
+ public float getNodeBias(int nodeId)
+ {
+ if (!this.nodeBiasingEnabled)
+ {
+ throw new IllegalArgumentException("Node biasing not enable");
+ }
+ int nodeIndex = this.nodeIndices.get(nodeId);
+ return this.nodeData.get(nodeIndex+3);
+ }
+
+ public void setNodeBias(int nodeId, float bias)
+ {
+ if (!this.nodeBiasingEnabled)
+ {
+ throw new IllegalArgumentException("Node biasing not enable");
+ }
+
+ int nodeIndex = this.nodeIndices.get(nodeId);
+ this.nodeData.set(nodeIndex+3, bias);
+ }
+
+ public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges) throws IOException
+ {
+ // with bias of 1.0, all nodes have an equal bias (that is, no bias)
+ addNode(sourceId, sourceEdges, 1.0f);
+ }
+
+ public void addNode(Integer sourceId, ArrayList<Map<String,Object>> sourceEdges, float bias) throws IOException
+ {
+ int source = sourceId.intValue();
+
+ maybeCreateNode(source);
+
+ if (this.nodeBiasingEnabled)
+ {
+ setNodeBias(source, bias);
+ }
+ else if (bias != 1.0f)
+ {
+ // with node biasing disabled, all nodes implicitly have a bias of 1.0, which means no bias, so if anything else was specified
+ // it won't take effect.
+ throw new IllegalArgumentException("Bias was specified but node biasing not enabled");
+ }
+
+ if (this.shouldCacheEdgesOnDisk && !usingEdgeDiskCache && (sourceEdges.size() + this.edgeCount) >= this.edgeCachingThreshold)
+ {
+ writeEdgesToDisk();
+ }
+
+ // store the source node id itself
+ appendEdgeData(source);
+
+ // store how many outgoing edges this node has
+ appendEdgeData(sourceEdges.size());
+
+ // store the outgoing edges
+ for (Map<String,Object> edge : sourceEdges)
+ {
+ int dest = ((Integer)edge.get("dest")).intValue();
+ float weight = ((Double)edge.get("weight")).floatValue();
+
+ maybeCreateNode(dest);
+
+ appendEdgeData(dest);
+
+ // location of weight in weights array
+ appendEdgeData(Math.max(1, (int)(weight * EDGE_WEIGHT_MULTIPLIER)));
+
+ this.edgeCount++;
+ }
+ }
+
+ private void appendEdgeData(int data) throws IOException
+ {
+ if (this.edgeDataOutputStream != null)
+ {
+ this.edgeDataOutputStream.writeInt(data);
+ }
+ else
+ {
+ this.edges.add(data);
+ }
+ }
+
+ public void init() throws IOException
+ {
+ init(getDummyIndicator());
+ }
+
+ public void init(ProgressIndicator progressIndicator) throws IOException
+ {
+ if (this.edgeDataOutputStream != null)
+ {
+ this.edgeDataOutputStream.close();
+ this.edgeDataOutputStream = null;
+ }
+
+ // initialize all nodes to an equal share of the total rank (1.0)
+ float nodeRank = 1.0f / this.nodeCount;
+ float totalBias = 0.0f;
+ for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
+ {
+ nodeData.set(j, nodeRank);
+ progressIndicator.progress();
+ if (this.nodeBiasingEnabled)
+ {
+ totalBias += nodeData.getFloat(j+3);
+ }
+ }
+
+ // if node biasing enabled, need to normalize the bias by the total bias across all nodes so it represents
+ // the share of bias.
+ if (this.nodeBiasingEnabled)
+ {
+ for (int j=0; j<this.nodeData.size(); j+=this.nodeFieldCount)
+ {
+ float bias = nodeData.getFloat(j+3);
+ bias /= totalBias;
+ nodeData.set(j+3,bias);
+ }
+ }
+
+ Iterator<Integer> edgeData = getEdgeData();
+
+ while(edgeData.hasNext())
+ {
+ int sourceId = edgeData.next();
+ int nodeEdgeCount = edgeData.next();
+
+ while (nodeEdgeCount-- > 0)
+ {
+ // skip the destination node id
+ edgeData.next();
+
+ float weight = edgeData.next();
+
+ int nodeIndex = this.nodeIndices.get(sourceId);
+
+ float totalWeight = this.nodeData.getFloat(nodeIndex+1);
+ totalWeight += weight;
+ this.nodeData.set(nodeIndex+1, totalWeight);
+
+ progressIndicator.progress();
+ }
+ }
+
+ // if handling dangling nodes, get a list of them by finding those nodes with no outgoing
+ // edges (i.e. total outgoing edge weight is 0.0)
+ if (shouldHandleDanglingNodes)
+ {
+ for (Map.Entry<Integer,Integer> e : nodeIndices.entrySet())
+ {
+ int nodeId = e.getKey();
+ int nodeIndex = e.getValue();
+ float totalWeight = nodeData.getFloat(nodeIndex+1);
+ if (totalWeight == 0.0f)
+ {
+ danglingNodes.add(nodeId);
+ }
+ }
+ }
+ }
+
+ public float nextIteration(ProgressIndicator progressIndicator) throws IOException
+ {
+ distribute(progressIndicator);
+ commit(progressIndicator);
+
+ return getTotalRankChange();
+ }
+
+ public float nextIteration() throws IOException
+ {
+ ProgressIndicator dummyIndicator = getDummyIndicator();
+ distribute(dummyIndicator);
+ commit(dummyIndicator);
+
+ return getTotalRankChange();
+ }
+
+ private ProgressIndicator getDummyIndicator()
+ {
+ return new ProgressIndicator() {
+ @Override
+ public void progress()
+ {
+ }
+ };
+ }
+
+ public void distribute(ProgressIndicator progressIndicator) throws IOException
+ {
+ Iterator<Integer> edgeData = getEdgeData();
+
+ while(edgeData.hasNext())
+ {
+ int sourceId = edgeData.next();
+ int nodeEdgeCount = edgeData.next();
+
+ while (nodeEdgeCount-- > 0)
+ {
+ int toId = edgeData.next();
+ float weight = edgeData.next();
+
+ int fromNodeIndex = this.nodeIndices.get(sourceId);
+ int toNodeIndex = this.nodeIndices.get(toId);
+
+ float contributionChange = weight * this.nodeData.getFloat(fromNodeIndex) / this.nodeData.getFloat(fromNodeIndex+1);
+
+ float currentContribution = this.nodeData.getFloat(toNodeIndex+2);
+ this.nodeData.set(toNodeIndex+2, currentContribution + contributionChange);
+
+ progressIndicator.progress();
+ }
+ }
+
+ if (shouldHandleDanglingNodes)
+ {
+ // get the rank from each of the dangling nodes
+ float totalRank = 0.0f;
+ for (int nodeId : danglingNodes)
+ {
+ int nodeIndex = nodeIndices.get(nodeId);
+ float rank = nodeData.get(nodeIndex);
+ totalRank += rank;
+ }
+
+ // distribute the dangling node ranks to all the nodes in the graph
+ // note: the alpha factor is applied in the commit stage
+ float contributionIncrease = totalRank / this.nodeCount;
+ for (int i=2; i<nodeData.size(); i += this.nodeFieldCount)
+ {
+ float contribution = nodeData.getFloat(i);
+ contribution += contributionIncrease;
+ nodeData.set(i, contribution);
+ }
+ }
+ }
+
+ public void commit(ProgressIndicator progressIndicator)
+ {
+ this.totalRankChange = 0.0f;
+
+ float oneMinusAlpha = (1.0f - this.alpha);
+ float oneMinusAlphaOverNodeCount = oneMinusAlpha / nodeCount;
+
+ for (int nodeIndex=0; nodeIndex<this.nodeData.size(); nodeIndex += this.nodeFieldCount)
+ {
+ float oldRank = this.nodeData.get(nodeIndex+2);
+ float newRank;
+
+ if (this.nodeBiasingEnabled)
+ {
+ float bias = this.nodeData.get(nodeIndex+3);
+ newRank = bias * oneMinusAlpha + alpha * oldRank;
+ }
+ else
+ {
+ newRank = oneMinusAlphaOverNodeCount + alpha * oldRank;
+ }
+
+ this.nodeData.set(nodeIndex+2, 0.0f);
+
+ float lastRankDiff = newRank - this.nodeData.get(nodeIndex);
+
+ this.nodeData.set(nodeIndex, newRank);
+
+ this.totalRankChange += Math.abs(lastRankDiff);
+
+ progressIndicator.progress();
+ }
+ }
+
+ private void writeEdgesToDisk() throws IOException
+ {
+ this.edgesFile = File.createTempFile("fastgraph", null);
+
+ FileOutputStream outStream = new FileOutputStream(this.edgesFile);
+ BufferedOutputStream bufferedStream = new BufferedOutputStream(outStream);
+ this.edgeDataOutputStream = new DataOutputStream(bufferedStream);
+
+ for (int edgeData : edges)
+ {
+ this.edgeDataOutputStream.writeInt(edgeData);
+ }
+
+ this.edges.clear();
+ usingEdgeDiskCache = true;
+ }
+
+ private Iterator<Integer> getEdgeData() throws IOException
+ {
+ if (!usingEdgeDiskCache)
+ {
+ return this.edges.iterator();
+ }
+ else
+ {
+ FileInputStream fileInputStream = new FileInputStream(this.edgesFile);
+ BufferedInputStream inputStream = new BufferedInputStream(fileInputStream);
+ final DataInputStream dataInputStream = new DataInputStream(inputStream);
+
+ return new AbstractIterator<Integer>() {
+
+ @Override
+ protected Integer computeNext()
+ {
+ try
+ {
+ return dataInputStream.readInt();
+ }
+ catch (IOException e)
+ {
+ return endOfData();
+ }
+ }
+
+ };
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java
new file mode 100644
index 0000000..3dc54c5
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/ProgressIndicator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.linkanalysis;
+
+interface ProgressIndicator
+{
+ void progress();
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java b/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java
new file mode 100644
index 0000000..2c6c078
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/linkanalysis/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for performing link analysis, such as PageRank.
+ */
+package datafu.pig.linkanalysis;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/random/RandInt.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/random/RandInt.java b/datafu-pig/src/main/java/datafu/pig/random/RandInt.java
new file mode 100644
index 0000000..de89c4a
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/random/RandInt.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.random;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Generates a uniformly distributed integer between two bounds.
+ */
+@Nondeterministic
+public class RandInt extends SimpleEvalFunc<Integer>
+{
+ private final Random rand = new Random();
+
+ /**
+ * @param min lower bound for random number
+ * @param max upper bound for random number
+ */
+ public Integer call(Integer min, Integer max) throws IOException
+ {
+ try
+ {
+ if (min > max)
+ {
+ throw new RuntimeException("The first argument must be less than the second");
+ }
+ return rand.nextInt(max - min + 1) + min;
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ return new Schema(new Schema.FieldSchema("rand", DataType.INTEGER));
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java b/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java
new file mode 100644
index 0000000..d63f4cf
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/random/RandomUUID.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.random;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.Nondeterministic;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Generates a random UUID using java.util.UUID
+ */
+@Nondeterministic
+public class RandomUUID extends EvalFunc<String>
+{
+ public String exec(Tuple input) throws IOException
+ {
+ return UUID.randomUUID().toString();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input)
+ {
+ return new Schema(new Schema.FieldSchema("uuid", DataType.CHARARRAY));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/random/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/random/package-info.java b/datafu-pig/src/main/java/datafu/pig/random/package-info.java
new file mode 100644
index 0000000..8c7750d
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/random/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs dealing with randomness.
+ */
+package datafu.pig.random;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java b/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java
new file mode 100644
index 0000000..09b4c95
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/Reservoir.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.util.PriorityQueue;
+
+class Reservoir extends PriorityQueue<ScoredTuple>
+{
+ private static final long serialVersionUID = 1L;
+ private int numSamples;
+
+ public Reservoir(int numSamples)
+ {
+ super(numSamples);
+ this.numSamples = numSamples;
+ }
+
+ public boolean consider(ScoredTuple scoredTuple)
+ {
+ if (super.size() < numSamples) {
+ return super.add(scoredTuple);
+ } else {
+ ScoredTuple head = super.peek();
+ if (scoredTuple.score > head.score) {
+ super.poll();
+ return super.add(scoredTuple);
+ }
+ return false;
+ }
+ }
+}
|