datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [14/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/text/opennlp/POSTag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/text/opennlp/POSTag.java b/datafu-pig/src/main/java/datafu/pig/text/opennlp/POSTag.java
new file mode 100644
index 0000000..fb17c63
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/text/opennlp/POSTag.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.text.opennlp;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import opennlp.tools.postag.POSModel;
+import opennlp.tools.postag.POSTaggerME;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The OpenNLP POSTag UDF tags bags of sequential words with parts of speech and confidence levels using the OpenNLP
+ * toolset, and specifically the POSTaggerME class.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define TokenizeME datafu.pig.text.opennlp.TokenizeME('data/en-token.bin');
+ * define POSTag datafu.pig.text.opennlp.POSTag('data/en-pos-maxent.bin');
+ *
+ * -- input:
+ * -- (Appetizers during happy hour range from low to high.)
+ * input = LOAD 'input' AS (text:chararray);
+ * --
+ * -- ({(Appetizers),(during),(happy),(hour),(range),(from),(low),(to),(high),(.)})
+ * tokenized = FOREACH input GENERATE TokenizeME(text) AS tokens;
+ * --
+ * -- output:
+ * -- Tuple schema is: (word, tag, confidence)
+ * -- ({(Appetizers,NNP,0.3619277937390988),(during,IN,0.7945543860326094),(happy,JJ,0.9888504792754391),
+ * -- (hour,NN,0.9427455123502427),(range,NN,0.7335527963654751),(from,IN,0.9911576465589752),(low,JJ,0.9652034031895174),
+ * -- (to,IN,0.7005347487371849),(high,JJ,0.8227771746247106),(.,.,0.9900983495480891)})
+ * output = FOREACH tokenized GENERATE POSTag(tokens) AS tagged;
+ * }
+ * </pre>
+ */
+public class POSTag extends EvalFunc<DataBag>
+{
+    private POSTaggerME tagger = null;
+    private static final String MODEL_FILE = "pos";
+    private TupleFactory tf = TupleFactory.getInstance();
+    private BagFactory bf = BagFactory.getInstance();
+    private String modelPath;
+
+    public POSTag(String modelPath) {
+        this.modelPath = modelPath;
+    }
+
+    @Override
+    public List<String> getCacheFiles() {
+        List<String> list = new ArrayList<String>(1);
+        list.add(this.modelPath + "#" + MODEL_FILE);
+        return list;
+    }
+
+    // Enable multiple languages by specifying the model path. See http://text.sourceforge.net/models-1.5/
+    public DataBag exec(Tuple input) throws IOException
+    {
+        DataBag inputBag = null;
+
+        if(input.size() != 1) {
+            throw new IOException();
+        }
+
+        inputBag = (DataBag)input.get(0);
+        DataBag outBag = bf.newDefaultBag();
+        if(this.tagger == null) {
+            String loadFile = CachedFile.getFileName(MODEL_FILE, this.modelPath);
+            InputStream modelIn = new FileInputStream(loadFile);
+            InputStream buffer = new BufferedInputStream(modelIn);
+            POSModel model = new POSModel(buffer);
+            this.tagger = new POSTaggerME(model);
+        }
+
+        // Form an inputString array thing for tagger to act on
+        int bagLength = (int)inputBag.size();
+        String[] words = new String[bagLength];
+
+        Iterator<Tuple> itr = inputBag.iterator();
+        int i = 0;
+        while(itr.hasNext()) {
+            words[i] = (String)itr.next().get(0);
+            i++;
+        }
+
+        // Compute tags and their probabilities
+        String tags[] = this.tagger.tag(words);
+        double probs[] = this.tagger.probs();
+
+        // Build output bag of 3-tuples
+        for(int j = 0; j < tags.length; j++) {
+            Tuple newTuple = tf.newTuple(3);
+            newTuple.set(0, words[j]);
+            newTuple.set(1, tags[j]);
+            newTuple.set(2, probs[j]);
+            outBag.add(newTuple);
+        }
+
+        return outBag;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try
+        {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.BAG)
+            {
+                throw new RuntimeException("Expected a BAG as input");
+            }
+
+            Schema inputBagSchema = inputFieldSchema.schema;
+
+            if(inputBagSchema == null) {
+                return null;
+            }
+
+            if (inputBagSchema.getField(0).type != DataType.TUPLE)
+            {
+                throw new RuntimeException(String.format("Expected input bag to contain a TUPLE, but instead found %s",
+                        DataType.findTypeName(inputBagSchema.getField(0).type)));
+            }
+
+            Schema inputTupleSchema = inputBagSchema.getField(0).schema;
+
+            if (inputTupleSchema.size() != 1)
+            {
+                throw new RuntimeException("Expected one field for the token data");
+            }
+
+            if (inputTupleSchema.getField(0).type != DataType.CHARARRAY)
+            {
+                throw new RuntimeException(String.format("Expected source to be a CHARARRAY, but instead found %s",
+                        DataType.findTypeName(inputTupleSchema.getField(0).type)));
+            }
+
+            Schema tupleSchema = new Schema();
+            tupleSchema.add(new Schema.FieldSchema("token",DataType.CHARARRAY));
+            tupleSchema.add(new Schema.FieldSchema("tag",DataType.CHARARRAY));
+            tupleSchema.add(new Schema.FieldSchema("probability",DataType.DOUBLE));
+
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                    .getName()
+                    .toLowerCase(), input),
+                    tupleSchema,
+                    DataType.BAG));
+        }
+        catch (FrontendException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/text/opennlp/SentenceDetect.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/text/opennlp/SentenceDetect.java b/datafu-pig/src/main/java/datafu/pig/text/opennlp/SentenceDetect.java
new file mode 100644
index 0000000..50537fd
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/text/opennlp/SentenceDetect.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.text.opennlp;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import opennlp.tools.sentdetect.SentenceDetectorME;
+import opennlp.tools.sentdetect.SentenceModel;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The OpenNLP SentenceDectectors segment an input paragraph into sentences.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define SentenceDetect datafu.pig.text.SentenceDetect('data/en-sent.bin');
+ *
+ * -- input:
+ * -- ("I believe the Masons have infiltrated the Apache PMC. I believe laser beams control cat brains.")
+ * infoo = LOAD 'input' AS (text:chararray);
+
+ * -- output:
+ * -- ({(I believe the Masons have infiltrated the Apache PMC.)(I believe laser beams control cat brains.)})
+ * outfoo = FOREACH input GENERATE SentenceDetect(text) as sentences;
+ * }
+ * </pre>
+ */
+public class SentenceDetect extends EvalFunc<DataBag>
+{
+    private SentenceDetectorME sdetector = null;
+    private static final String MODEL_FILE = "sentences";
+    private TupleFactory tf = TupleFactory.getInstance();
+    private BagFactory bf = BagFactory.getInstance();
+    private String modelPath = null;
+
+    public SentenceDetect(String modelPath) {
+        this.modelPath = modelPath;
+    }
+
+    @Override
+    public List<String> getCacheFiles() {
+        List<String> list = new ArrayList<String>(1);
+        list.add(this.modelPath + "#" + MODEL_FILE);
+        return list;
+    }
+
+    // Enable multiple languages by specifying the model path. See http://text.sourceforge.net/models-1.5/
+    public DataBag exec(Tuple input) throws IOException
+    {
+        if(input.size() != 1) {
+            throw new IOException();
+        }
+
+        String inputString = input.get(0).toString();
+        if(inputString == null || inputString == "") {
+            return null;
+        }
+        DataBag outBag = bf.newDefaultBag();
+        if(sdetector == null) {
+            String loadFile = CachedFile.getFileName(MODEL_FILE, this.modelPath);
+            InputStream is = new FileInputStream(modelPath);
+            InputStream buffer = new BufferedInputStream(is);
+            SentenceModel model = new SentenceModel(buffer);
+            this.sdetector = new SentenceDetectorME(model);
+        }
+        String sentences[] = this.sdetector.sentDetect(inputString);
+        for(String sentence : sentences) {
+            Tuple outTuple = tf.newTuple(sentence);
+            outBag.add(outTuple);
+        }
+        return outBag;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try
+        {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.CHARARRAY)
+            {
+                throw new RuntimeException("Expected a CHARARRAY as input, but got a " + inputFieldSchema.toString());
+            }
+
+            Schema tupleSchema = new Schema();
+            tupleSchema.add(new Schema.FieldSchema("sentence",DataType.CHARARRAY));
+
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                    .getName()
+                    .toLowerCase(), input),
+                    tupleSchema,
+                    DataType.BAG));
+        }
+        catch (FrontendException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeME.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeME.java b/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeME.java
new file mode 100644
index 0000000..f1f4257
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeME.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.text.opennlp;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import opennlp.tools.tokenize.TokenizerME;
+import opennlp.tools.tokenize.TokenizerModel;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The OpenNLP Tokenizers segment an input character sequence into tokens using the OpenNLP TokenizeME class, which is
+ * a probabilistic, 'maximum entropy' classifier.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define TokenizeME datafu.pig.text.opennlp.TokenizeME('data/en-token.bin');
+ *
+ * -- input:
+ * -- ("I believe the Masons have infiltrated the Apache PMC.")
+ * infoo = LOAD 'input' AS (text:chararray);
+
+ * -- output:
+ * -- ({(I),(believe),(the),(Masons),(have),(infiltrated),(the),(Apache),(PMC),(.)})
+ * outfoo = FOREACH input GENERATE TokenizeME(text) as tokens;
+ * }
+ * </pre>
+ */
+
+
+
+public class TokenizeME extends EvalFunc<DataBag>
+{
+    private TokenizerME tokenizer = null;
+    private static final String MODEL_FILE = "tokens";
+    private TupleFactory tf = TupleFactory.getInstance();
+    private BagFactory bf = BagFactory.getInstance();
+    private String modelPath;
+
+    public TokenizeME(String modelPath) {
+        this.modelPath = modelPath;
+    }
+
+    @Override
+    public List<String> getCacheFiles() {
+        List<String> list = new ArrayList<String>(1);
+        list.add(this.modelPath + "#" + MODEL_FILE);
+        return list;
+    }
+
+    // Enable multiple languages by specifying the model path. See http://text.sourceforge.net/models-1.5/
+    public DataBag exec(Tuple input) throws IOException
+    {
+        if(input.size() != 1) {
+            throw new IOException();
+        }
+
+        String inputString = input.get(0).toString();
+        if(inputString == null || inputString == "") {
+            return null;
+        }
+        DataBag outBag = bf.newDefaultBag();
+        if(this.tokenizer == null) {
+            String loadFile = CachedFile.getFileName(MODEL_FILE, this.modelPath);;
+            InputStream file = new FileInputStream(loadFile);
+            InputStream buffer = new BufferedInputStream(file);
+            TokenizerModel model = new TokenizerModel(buffer);
+            this.tokenizer = new TokenizerME(model);
+        }
+        String tokens[] = this.tokenizer.tokenize(inputString);
+        for(String token : tokens) {
+            Tuple outTuple = tf.newTuple(token);
+            outBag.add(outTuple);
+        }
+        return outBag;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try
+        {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.CHARARRAY)
+            {
+                throw new RuntimeException("Expected a CHARARRAY as input, but got a " + inputFieldSchema.toString());
+            }
+
+            Schema tupleSchema = new Schema();
+            tupleSchema.add(new Schema.FieldSchema("token",DataType.CHARARRAY));
+
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                    .getName()
+                    .toLowerCase(), input),
+                    tupleSchema,
+                    DataType.BAG));
+        }
+        catch (FrontendException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeSimple.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeSimple.java b/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeSimple.java
new file mode 100644
index 0000000..cea48b4
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeSimple.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.text.opennlp;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import opennlp.tools.tokenize.SimpleTokenizer;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The OpenNLP Tokenizers segment an input character sequence into tokens. This one uses the OpenNLP class SimpleTokenizer
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define TokenizeSimple datafu.pig.text.opennlp.TokenizeSimple();
+ *
+ * -- input:
+ * -- ("I believe the Masons have infiltrated the Apache PMC.")
+ * infoo = LOAD 'input' AS (text:chararray);
+
+ * -- output:
+ * -- ({(I),(believe),(the),(Masons),(have),(infiltrated),(the),(Apache),(PMC),(.)})
+ * outfoo = FOREACH input GENERATE TokenizeSimple(text) as tokens;
+ * }
+ * </pre>
+ */
+public class TokenizeSimple extends EvalFunc<DataBag>
+{
+    private SimpleTokenizer tokenizer = SimpleTokenizer.INSTANCE;
+    private TupleFactory tf = TupleFactory.getInstance();
+    private BagFactory bf = BagFactory.getInstance();
+
+    public DataBag exec(Tuple input) throws IOException
+    {
+        if(input.size() != 1) {
+            throw new IOException();
+        }
+
+        String inputString = input.get(0).toString();
+        if(inputString == null || inputString == "") {
+            return null;
+        }
+
+        DataBag outBag = bf.newDefaultBag();
+        String tokens[] = tokenizer.tokenize(inputString);
+        for(String token : tokens) {
+            Tuple outTuple = tf.newTuple(token);
+            outBag.add(outTuple);
+        }
+        return outBag;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try
+        {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.CHARARRAY)
+            {
+                throw new RuntimeException("Expected a CHARARRAY as input, but got a " + inputFieldSchema.toString());
+            }
+
+            Schema tupleSchema = new Schema();
+            tupleSchema.add(new Schema.FieldSchema("token",DataType.CHARARRAY));
+
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                    .getName()
+                    .toLowerCase(), input),
+                    tupleSchema,
+                    DataType.BAG));
+        }
+        catch (FrontendException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeWhitespace.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeWhitespace.java b/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeWhitespace.java
new file mode 100644
index 0000000..8efafb0
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/text/opennlp/TokenizeWhitespace.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.text.opennlp;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import opennlp.tools.tokenize.WhitespaceTokenizer;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The OpenNLP Tokenizers segment an input character sequence into tokens. This one uses the OpenNLP class
+ * WhitespaceTokenizer.
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define TokenizeWhitespace datafu.pig.text.opennlp.TokenizeWhitespace();
+ *
+ * -- input:
+ * -- ("I believe the Masons have infiltrated the Apache PMC.")
+ * infoo = LOAD 'input' AS (text:chararray);
+
+ * -- output:
+ * -- ({(I),(believe),(the),(Masons),(have),(infiltrated),(the),(Apache),(PMC),(.)})
+ * outfoo = FOREACH input GENERATE TokenizeWhitespace(text) as tokens;
+ * }
+ * </pre>
+ */
+public class TokenizeWhitespace extends EvalFunc<DataBag>
+{
+    private WhitespaceTokenizer tokenizer = WhitespaceTokenizer.INSTANCE;
+    private TupleFactory tf = TupleFactory.getInstance();
+    private BagFactory bf = BagFactory.getInstance();
+
+    public DataBag exec(Tuple input) throws IOException
+    {
+        if(input.size() != 1) {
+            throw new IOException();
+        }
+
+        String inputString = input.get(0).toString();
+        if(inputString == null || inputString == "") {
+            return null;
+        }
+
+        DataBag outBag = bf.newDefaultBag();
+        String tokens[] = tokenizer.tokenize(inputString);
+        for(String token : tokens) {
+            Tuple outTuple = tf.newTuple(token);
+            outBag.add(outTuple);
+        }
+        return outBag;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input)
+    {
+        try
+        {
+            Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+            if (inputFieldSchema.type != DataType.CHARARRAY)
+            {
+                throw new RuntimeException("Expected a CHARARRAY as input, but got a " + inputFieldSchema.toString());
+            }
+
+            Schema tupleSchema = new Schema();
+            tupleSchema.add(new Schema.FieldSchema("token",DataType.CHARARRAY));
+
+            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                    .getName()
+                    .toLowerCase(), input),
+                    tupleSchema,
+                    DataType.BAG));
+        }
+        catch (FrontendException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/urls/UserAgentClassify.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/urls/UserAgentClassify.java b/datafu-pig/src/main/java/datafu/pig/urls/UserAgentClassify.java
new file mode 100644
index 0000000..4040fea
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/urls/UserAgentClassify.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.urls;
+
+import datafu.pig.util.SimpleEvalFunc;
+
+/**
+ * Given a user agent string, this UDF classifies clients to 'mobile' and 'desktop'.
+ * Current as of June 2011.
+ */
+public class UserAgentClassify extends SimpleEvalFunc<String>
+{
+  
+  public String call(String useragent)
+  {
+    if (useragent.length() < 4)
+      return "desktop";             //
+    String ua=useragent.toLowerCase();
+    if(ua.matches(".*(android|avantgo|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od)|iris|kindle|lge |maemo|midp|mmp|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\\/|plucker|pocket|psp|symbian|treo|up\\.(browser|link)|vodafone|wap|windows (ce|phone)|xda|xiino).*")||ua.substring(0,4).matches("1207|6310|6590|3gso|4thp|50[1-6]i|770s|802s|a wa|abac|ac(er|oo|s\\-)|ai(ko|rn)|al(av|ca|co)|amoi|an(ex|ny|yw)|aptu|ar(ch|go)|as(te|us)|attw|au(di|\\-m|r |s )|avan|be(ck|ll|nq)|bi(lb|rd)|bl(ac|az)|br(e|v)w|bumb|bw\\-(n|u)|c55\\/|capi|ccwa|cdm\\-|cell|chtm|cldc|cmd\\-|co(mp|nd)|craw|da(it|ll|ng)|dbte|dc\\-s|devi|dica|dmob|do(c|p)o|ds(12|\\-d)|el(49|ai)|em(l2|ul)|er(ic|k0)|esl8|ez([4-7]0|os|wa|ze)|fetc|fly(\\-|_)|g1 u|g560|gene|gf\\-5|g\\-mo|go(\\.w|od)|gr(ad|un)|haie|hcit|hd\\-(m|p|t)|hei\\-|hi(pt|ta)|hp( i|ip)|hs\\-c|ht(c(\\-| |_|a|g|p|s|t)|tp)|hu(aw|tc)|i\\-(20|go|ma)|i230|iac( |\\-|\\/)|ibro|idea|ig01|ikom|im1k|inno|ipaq|iris|ja(t|v)a|jbro|jemu|jigs|kddi|keji|kgt( |\\/)|klon|kp
 t |kwc\\-|kyo(c|k)|le(no|xi)|lg( g|\\/(k|l|u)|50|54|e\\-|e\\/|\\-[a-w])|libw|lynx|m1\\-w|m3ga|m50\\/|ma(te|ui|xo)|mc(01|21|ca)|m\\-cr|me(di|rc|ri)|mi(o8|oa|ts)|mmef|mo(01|02|bi|de|do|t(\\-| |o|v)|zz)|mt(50|p1|v )|mwbp|mywa|n10[0-2]|n20[2-3]|n30(0|2)|n50(0|2|5)|n7(0(0|1)|10)|ne((c|m)\\-|on|tf|wf|wg|wt)|nok(6|i)|nzph|o2im|op(ti|wv)|oran|owg1|p800|pan(a|d|t)|pdxg|pg(13|\\-([1-8]|c))|phil|pire|pl(ay|uc)|pn\\-2|po(ck|rt|se)|prox|psio|pt\\-g|qa\\-a|qc(07|12|21|32|60|\\-[2-7]|i\\-)|qtek|r380|r600|raks|rim9|ro(ve|zo)|s55\\/|sa(ge|ma|mm|ms|ny|va)|sc(01|h\\-|oo|p\\-)|sdk\\/|se(c(\\-|0|1)|47|mc|nd|ri)|sgh\\-|shar|sie(\\-|m)|sk\\-0|sl(45|id)|sm(al|ar|b3|it|t5)|so(ft|ny)|sp(01|h\\-|v\\-|v )|sy(01|mb)|t2(18|50)|t6(00|10|18)|ta(gt|lk)|tcl\\-|tdg\\-|tel(i|m)|tim\\-|t\\-mo|to(pl|sh)|ts(70|m\\-|m3|m5)|tx\\-9|up(\\.b|g1|si)|utst|v400|v750|veri|vi(rg|te)|vk(40|5[0-3]|\\-v)|vm40|voda|vulc|vx(52|53|60|61|70|80|81|83|85|98)|w3c(\\-| )|webc|whit|wi(g |nc|nw)|wmlb|wonu|x700|xda(\\-|2|g)|yas\\-|your|zeto|zte
 \\-"))
+      return "mobile";
+    else
+      return "desktop";     
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/urls/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/urls/package-info.java b/datafu-pig/src/main/java/datafu/pig/urls/package-info.java
new file mode 100644
index 0000000..332beda
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/urls/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UDFs for processing URLs.
+ */
+package datafu.pig.urls;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
new file mode 100644
index 0000000..ee2c3f3
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/AliasableEvalFunc.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Makes implementing and using UDFs easier by enabling named parameters. 
+ * 
+ * <p>
+ * This works by capturing the schema of the input tuple on the front-end and storing it into the UDFContext. 
+ * It provides an easy means of referencing the parameters on the back-end to aid in writing schema-based UDFs.
+ * </p>
+ * 
+ * <p>
+ * A related class is {@link SimpleEvalFunc}.  However they are actually fairly different.  The primary purpose of {@link SimpleEvalFunc} is 
+ * to skip the boilerplate under the assumption that the arguments in and out are well... simple.  
+ * It also assumes that these arguments are in a well-defined positional ordering.
+ * </p>
+ * 
+ * <p>
+ * AliasableEvalFunc allows the UDF writer to avoid dealing with all positional assumptions and instead reference fields 
+ * by their aliases.  This practice allows for more readable code since the alias names should have more meaning 
+ * to the reader than the position.  This approach is also less error prone since it creates a more explicit contract 
+ * for what input the UDF expects and prevents simple mistakes that positional-based UDFs could not easily catch, 
+ * such as transposing two fields of the same type.  If this contract is violated, say, by attempting to reference 
+ * a field that is not present, a meaningful error message may be thrown.
+ * </p>
+ * 
+ * <p>
+ * Example:  This example computes the monthly payments for mortgages depending on interest rate.
+ * <pre>
+ * {@code
+ *  public class MortgagePayment extends AliasableEvalFunc<DataBag> {
+ *    ...
+ *    public DataBag exec(Tuple input) throws IOException {
+ *      DataBag output = BagFactory.getInstance().newDefaultBag();
+ *      
+ *      Double principal = getDouble(input, "principal"); // get a value from the input tuple by alias
+ *      Integer numPayments = getInteger(input, "num_payments");
+ *      DataBag interestRates = getBag(input, "interest_rates");
+ *    
+ *      for (Tuple interestTuple : interestRates) {
+ *        Double interest = getDouble(interestTuple, getPrefixedAliasName("interest_rates", "interest_rate"));  // get a value from the inner bag tuple by alias
+ *        double monthlyPayment = computeMonthlyPayment(principal, numPayments, interest);
+ *        output.add(TupleFactory.getInstance().newTuple(monthlyPayment));
+ *      }
+ *      return output;
+ *    }
+ *  }
+ * }
+ * </pre>
+ * </p>
+ * 
+ * @author wvaughan
+ *
+ * @param <T>
+ */
+public abstract class AliasableEvalFunc<T> extends ContextualEvalFunc<T>
+{
+  private static final String ALIAS_MAP_PROPERTY = "aliasMap";
+    
+  private Map<String, Integer> aliasToPosition = null;
+  
+  public AliasableEvalFunc() {
+    
+  }
+  
+  /**
+   * A wrapper method which captures the schema and then calls getOutputSchema
+   */
+  @Override
+  public Schema outputSchema(Schema input) {
+    storeFieldAliases(input);
+    return getOutputSchema(input);
+  }
+  
+  /**
+   * Specify the output schema as in {link EvalFunc#outputSchema(Schema)}.
+   * 
+   * @param input
+   * @return outputSchema
+   */
+  public abstract Schema getOutputSchema(Schema input);
+
+  @SuppressWarnings("unchecked")
+  private Map<String, Integer> getAliasMap() {
+    return (Map<String, Integer>)getInstanceProperties().get(ALIAS_MAP_PROPERTY);
+  }
+  
+  private void setAliasMap(Map<String, Integer> aliases) {
+    getInstanceProperties().put(ALIAS_MAP_PROPERTY, aliases);
+  }
+  
+  private void storeFieldAliases(Schema tupleSchema)
+  {
+    Map<String, Integer> aliases = new HashMap<String, Integer>();
+    constructFieldAliases(aliases, tupleSchema, null);
+    log.debug("In instance: "+getInstanceName()+", stored alias map: " + aliases);
+    
+    // pass the input schema into the exec function
+    setAliasMap(aliases);
+  }
+  
+  private void constructFieldAliases(Map<String, Integer> aliases, Schema tupleSchema, String prefix)
+  {    
+    int position = 0;
+    for (Schema.FieldSchema field : tupleSchema.getFields()) {
+      String alias = getPrefixedAliasName(prefix, field.alias);
+      if (field.alias != null && !field.alias.equals("null")) { 
+        aliases.put(alias, position);
+        log.debug("In instance: "+getInstanceName()+", stored alias " + alias + " as position " + position);
+      }
+      if (field.schema != null) {
+        constructFieldAliases(aliases, field.schema, alias);
+      }      
+      position++;
+    }
+  }
+  
+  public String getPrefixedAliasName(String prefix, String alias)
+  {
+    if (alias == null || alias.equals("null")) {
+      if (prefix == null) return "";
+      else return prefix; // ignore the null inner bags/tuples
+    }
+    else return ((prefix == null || prefix.equals("null") || prefix.trim().equals("")) ? "" : prefix+".") + alias; // handle top bag
+  }
+  
+  /**
+   * Field aliases are generated from the input schema<br/>
+   * Each alias maps to a bag position<br/>
+   * Inner bags/tuples will have alias of outer.inner.foo
+   * 
+   * @return A map of field alias to field position
+   */
+  public Map<String, Integer> getFieldAliases()
+  {
+    Map<String, Integer> aliases = getAliasMap();
+    if (aliases == null) {
+        log.error("Class: " + this.getClass());
+        log.error("Instance name: " + this.getInstanceName());
+      log.error("Properties: " + getContextProperties());
+      throw new RuntimeException("Could not retrieve aliases from properties using " + ALIAS_MAP_PROPERTY);
+    }
+    return aliases;
+  }
+  
+  public Integer getPosition(String alias) {
+    if (aliasToPosition == null) {
+      aliasToPosition = getFieldAliases();
+    }
+    return aliasToPosition.get(alias);
+  }
+  
+  public Integer getPosition(String prefix, String alias) {
+    return getPosition(getPrefixedAliasName(prefix, alias));
+  }
+      
+  public Integer getInteger(Tuple tuple, String alias) throws ExecException {
+    return getInteger(tuple, alias, null);
+  }
+  
+  public Integer getInteger(Tuple tuple, String alias, Integer defaultValue) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    Number number = (Number)tuple.get(i);
+    if (number == null) return defaultValue;
+    return number.intValue();
+  }
+  
+  public Long getLong(Tuple tuple, String alias) throws ExecException {
+    return getLong(tuple, alias, null);
+  }
+  
+  public Long getLong(Tuple tuple, String alias, Long defaultValue) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    Number number = (Number)tuple.get(i);
+    if (number == null) return defaultValue;
+    return number.longValue();
+  }
+  
+  public Float getFloat(Tuple tuple, String alias) throws ExecException {
+    return getFloat(tuple, alias, null);
+  }
+  
+  public Float getFloat(Tuple tuple, String alias, Float defaultValue) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    Number number = (Number)tuple.get(i);
+    if (number == null) return defaultValue;
+    return number.floatValue();
+  }
+  
+  public Double getDouble(Tuple tuple, String alias) throws ExecException {
+    return getDouble(tuple, alias, null);
+  }
+  
+  public Double getDouble(Tuple tuple, String alias, Double defaultValue) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    Number number = (Number)tuple.get(i);
+    if (number == null) return defaultValue;
+    return number.doubleValue();
+  }
+  
+  public String getString(Tuple tuple, String alias) throws ExecException {
+    return getString(tuple, alias, null);
+  }
+  
+  public String getString(Tuple tuple, String alias, String defaultValue) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    String s = (String)tuple.get(i);
+    if (s == null) return defaultValue;
+    return s;
+  }
+  
+  public Boolean getBoolean(Tuple tuple, String alias) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    return (Boolean)tuple.get(i);
+  }
+  
+  public DataBag getBag(Tuple tuple, String alias) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    return (DataBag)tuple.get(i);
+  }
+  
+  public Object getObject(Tuple tuple, String alias) throws ExecException {
+    Integer i = getPosition(alias); 
+    if (i == null) throw new FieldNotFound("Attempt to reference unknown alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    if (i >= tuple.size()) throw new FieldNotFound("Attempt to reference outside of tuple for alias: "+alias+"\n Instance Properties: "+getInstanceProperties());
+    return tuple.get(i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/Assert.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/Assert.java b/datafu-pig/src/main/java/datafu/pig/util/Assert.java
new file mode 100644
index 0000000..a258915
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/Assert.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.util;
+
+/**
+ * Assert has been renamed to AssertUDF.
+ * 
+ * This class is provided for backward compatibility.
+ *
+ * @deprecated Use {@link AssertUDF} instead.
+ */
+ @Deprecated
+public class Assert extends AssertUDF
+{
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/AssertUDF.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/AssertUDF.java b/datafu-pig/src/main/java/datafu/pig/util/AssertUDF.java
new file mode 100644
index 0000000..16f9247
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/AssertUDF.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.util;
+
+import java.io.IOException;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Filter function which asserts that a value is true.
+ * 
+ * <p>
+ * Unfortunately, the Pig interpreter doesn't recognize boolean expressions nested as function
+ * arguments, so this uses C-style booleans.  That is, the first argument should be
+ * an integer.  0 is interpreted as "false", and anything else is considered "true".
+ * The function will cause the Pig script to fail if a "false" value is encountered.
+ * </p>
+ * 
+ * <p>
+ * There is a unary and a binary version. The unary version just takes a boolean, and throws out a generic exception message when the
+ * assertion is violated.  The binary version takes a String as a second argument and throws that out when the assertion
+ * is violated.
+ * </p>
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * FILTER members BY AssertUDF( (member_id >= 0 ? 1 : 0), 'Doh! Some member ID is negative.' );
+ * }
+ * </pre>
+ * </p>
+ */
+public class AssertUDF extends FilterFunc
+{
+  @Override
+  public Boolean exec(Tuple tuple)
+      throws IOException
+  {
+    if ((Integer) tuple.get(0) == 0) {
+      if (tuple.size() > 1) {
+        throw new IOException("Assertion violated: " + tuple.get(1).toString());
+      }
+      else {
+        throw new IOException("Assertion violated.  What assertion, I do not know, but it was officially violated.");
+      }
+    }
+    else {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/BoolToInt.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/BoolToInt.java b/datafu-pig/src/main/java/datafu/pig/util/BoolToInt.java
new file mode 100644
index 0000000..5ec8d3a
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/BoolToInt.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.util;
+
+/**
+ * UDF which converts a Boolean to an Integer.
+ */
+public class BoolToInt extends SimpleEvalFunc<Integer>
+{
+  public Integer call(Boolean val)
+  {
+    return (val == null || !val) ? 0 : 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/Coalesce.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/Coalesce.java b/datafu-pig/src/main/java/datafu/pig/util/Coalesce.java
new file mode 100644
index 0000000..f8e25f4
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/Coalesce.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+/**
+ * Returns the first non-null value from a tuple, just like {@link <a href="http://msdn.microsoft.com/en-us/library/ms190349.aspx" target="_blank">COALESCE</a>} in SQL. 
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * define COALESCE datafu.pig.util.COALESCE();
+
+ * -- input: 1,2,3,NULL,4,NULL,5
+ * input = LOAD 'input' AS (val:int);
+ *
+ * -- produces: 1,2,3,99,4,99,5
+ * coalesced = FOREACH input GENERATE COALESCE(val,99);
+ *
+ * }
+ * </pre>
+ * </p>
+ * 
+ * @author "Matthew Hayes <mhayes@linkedin.com>"
+ *
+ */
+public class Coalesce extends AliasableEvalFunc<Object>
+{
+  private boolean strict;
+  
+  private static String STRICT_OPTION = "strict";
+  private static String LAZY_OPTION = "lazy";
+  
+  public Coalesce()
+  {    
+    strict = true;
+  }
+  
+  public Coalesce(String option)
+  {
+    if (option.equals(STRICT_OPTION))
+    {
+      strict = true;
+    }
+    else if (option.equals(LAZY_OPTION))
+    {
+      strict = false;
+    }
+    else
+    {
+      throw new IllegalArgumentException("Unexpected option: " + option);
+    }
+  }
+  
+  @Override
+  public Object exec(Tuple input) throws IOException
+  {    
+    if (input == null || input.size() == 0)
+    {
+      return null;
+    }
+    
+    Byte type = (Byte)getInstanceProperties().get("type");
+                
+    for (Object o : input)
+    {
+      if (o != null)
+      {
+        if (strict)
+        {
+          return o;
+        }
+        else
+        {
+          try
+          {
+            switch (type)
+            {
+            case DataType.INTEGER:
+              return DataType.toInteger(o);
+            case DataType.LONG:
+              return DataType.toLong(o);
+            case DataType.DOUBLE:
+              return DataType.toDouble(o); 
+            case DataType.FLOAT:
+              return DataType.toFloat(o);      
+            default:
+              return o;
+            }
+          }
+          catch (Exception e)
+          {
+            DataFuException dfe = new DataFuException(e.getMessage(),e);
+            dfe.setData(o);
+            dfe.setFieldAliases(getFieldAliases());
+            throw dfe;
+          }
+        }
+      }
+    }
+    
+    return null;
+  }
+  
+  @Override
+  public Schema getOutputSchema(Schema input)
+  {
+    if (input.getFields().size() == 0)
+    {
+      throw new RuntimeException("Expected at least one parameter");
+    }
+        
+    Byte outputType = null;
+    int pos = 0;
+    for (FieldSchema field : input.getFields())
+    {
+      if (DataType.isSchemaType(field.type))
+      {
+        throw new RuntimeException(String.format("Not supported on schema types.  Found %s in position %d.",DataType.findTypeName(field.type),pos));
+      }
+      
+      if (DataType.isComplex(field.type))
+      {
+        throw new RuntimeException(String.format("Not supported on complex types.  Found %s in position %d.",DataType.findTypeName(field.type),pos));
+      }
+      
+      if (!DataType.isUsableType(field.type))
+      {
+        throw new RuntimeException(String.format("Not a usable type.  Found %s in position %d.",DataType.findTypeName(field.type),pos));
+      }
+      
+      if (outputType == null)
+      {
+        outputType = field.type;
+      }
+      else if (!outputType.equals(field.type))
+      {        
+        if (strict)
+        {
+          throw new RuntimeException(String.format("Expected all types to be equal, but found '%s' in position %d.  First element has type '%s'.  "
+                                                   + "If you'd like to attempt merging types, use the '%s' option, as '%s' is the default.",
+                                                   DataType.findTypeName(field.type),pos,DataType.findTypeName((byte)outputType),LAZY_OPTION,STRICT_OPTION));
+        }
+        else
+        {
+          byte merged = DataType.mergeType(outputType, field.type);
+          if (merged == DataType.ERROR)
+          {
+            throw new RuntimeException(String.format("Expected all types to be equal, but found '%s' in position %d, where output type is '%s', and types could not be merged.",
+                                                     DataType.findTypeName(field.type),pos,DataType.findTypeName((byte)outputType)));
+          }
+          outputType = merged;
+        }
+      }
+      
+      pos++;
+    }
+    
+    getInstanceProperties().put("type", outputType);
+        
+    return new Schema(new Schema.FieldSchema("item",outputType));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
new file mode 100644
index 0000000..c534b77
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/ContextualEvalFunc.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * An abstract class which enables UDFs to store instance properties
+ * on the front end which will be available on the back end.
+ * For example, properties may be set in the call to outputSchema(),
+ * which will be available when exec() is called.
+ * 
+ * @param <T>
+ */
+public abstract class ContextualEvalFunc<T> extends EvalFunc<T>
+{
+  private String instanceName;
+  
+  @Override
+  public void setUDFContextSignature(String signature) {
+    setInstanceName(signature);
+  }
+  
+  /**
+   * Helper method to return the context properties for this class
+   * 
+   * @return context properties
+   */
+  protected Properties getContextProperties() {
+    UDFContext context = UDFContext.getUDFContext();
+    Properties properties = context.getUDFProperties(this.getClass());
+    return properties;
+  }
+  
+  /**
+   * Helper method to return the context properties for this instance of this class
+   * 
+   * @return instances properties
+   */
+  protected Properties getInstanceProperties() {
+    Properties contextProperties = getContextProperties();
+    if (!contextProperties.containsKey(getInstanceName())) {
+      contextProperties.put(getInstanceName(), new Properties());
+    }
+    return (Properties)contextProperties.get(getInstanceName());
+  }
+  
+  /**
+   * 
+   * @return the name of this instance corresponding to the UDF Context Signature
+   * @see #setUDFContextSignature(String)
+   */
+  protected String getInstanceName() {
+    if (instanceName == null) {
+      throw new RuntimeException("Instance name is null.  This should not happen unless UDFContextSignature was not set.");
+    }
+    return instanceName;
+  }
+  
+  private void setInstanceName(String instanceName) {
+    this.instanceName = instanceName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/DataFuException.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/DataFuException.java b/datafu-pig/src/main/java/datafu/pig/util/DataFuException.java
new file mode 100644
index 0000000..0066aa8
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/DataFuException.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import java.util.Map;
+
+public class DataFuException extends RuntimeException
+{
+  private static final long serialVersionUID = 1L;
+  private Map<String, Integer> fieldAliases;
+  private Object data;
+  
+  public DataFuException()
+  {
+    super();
+  }
+  
+  public DataFuException(String message)
+  {
+    super(message);    
+  }
+  
+  public DataFuException(String message, Throwable cause)
+  {
+    super(message, cause);    
+  }
+  
+  public DataFuException(Throwable cause)
+  {
+    super(cause);    
+  }
+
+  /**
+   * Gets field aliases for a UDF which may be relevant to this exception.
+   * 
+   * @return field aliases
+   */
+  public Map<String, Integer> getFieldAliases()
+  {
+    return fieldAliases;
+  }
+
+  /**
+   * Gets data relevant to this exception.
+   * 
+   * @return data
+   */
+  public Object getData()
+  {
+    return data;
+  }
+
+  /**
+   * Sets field aliases for a UDF which may be relevant to this exception.
+   * 
+   * @param fieldAliases
+   */
+  public void setFieldAliases(Map<String, Integer> fieldAliases)
+  {
+    this.fieldAliases = fieldAliases;
+  }
+
+  /**
+   * Sets data relevant to this exception.
+   * @param data
+   */
+  public void setData(Object data)
+  {
+    this.data = data;
+  }  
+  
+  @Override
+  public String toString()
+  {
+    String s = getClass().getName();
+    String message = getLocalizedMessage();
+    
+    StringBuilder result = new StringBuilder(s);
+    
+    if (message != null)
+    {
+      result.append(": ");
+      result.append(message);
+    }
+    
+    if (getFieldAliases() != null)
+    {
+      result.append("\nAliases:");
+      for (String alias : getFieldAliases().keySet())
+      {
+        result.append("\n");
+        result.append(alias != null && alias.length() > 0 ? alias : "???");
+      }
+    }
+    
+    if (getData() != null)
+    {
+      result.append("\nData:");
+      result.append("\n");
+      result.append(data.toString());
+    }
+    
+    return result.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/FieldNotFound.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/FieldNotFound.java b/datafu-pig/src/main/java/datafu/pig/util/FieldNotFound.java
new file mode 100644
index 0000000..d624007
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/FieldNotFound.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+/**
+ * Thrown by {see AliasableEvalFunc} when attempting to access an unknown field by name.
+ * 
+ * @author wvaughan
+ *
+ */
+public class FieldNotFound extends ExecException
+{
+  private static final long serialVersionUID = 1L;
+  
+  public FieldNotFound() {
+    super();
+  }
+  
+  public FieldNotFound(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/In.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/In.java b/datafu-pig/src/main/java/datafu/pig/util/In.java
new file mode 100644
index 0000000..0667914
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/In.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+/**
+ * In has been renamed to InUDF.
+ * 
+ * This class is provided for backward compatibility.
+ *
+ * @deprecated Use {@link InUDF} instead.
+ */
+ @Deprecated
+public class In extends InUDF
+{
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/InUDF.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/InUDF.java b/datafu-pig/src/main/java/datafu/pig/util/InUDF.java
new file mode 100644
index 0000000..5057285
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/InUDF.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import java.io.IOException;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Similar to the SQL IN function, this function provides a convenient way to filter 
+ * using a logical disjunction over many values. 
+ * Returns true when the first value of the tuple is contained within the remainder of the tuple.
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ * define In datafu.pig.util.InUDF();
+ * -- cars: (alice, red), (bob, blue), (charlie, green), (dave, red);
+ * cars = LOAD cars AS (owner:chararray, color:chararray);
+ * 
+ * -- cars: (alice, red), (bob, blue), (dave, red);
+ * red_blue_cars = FILTER cars BY In(color, 'red', 'blue');
+ * 
+ * }</pre>
+ * </p>
+ * 
+ * @author wvaughan
+ *
+ */
+public class InUDF extends FilterFunc
+{
+
+  @Override
+  public Boolean exec(Tuple input) throws IOException
+  {
+    Object o = input.get(0);
+    Boolean match = false;
+    if (o != null) {
+      for (int i=1; i<input.size() && !match; i++) {
+        match = match || o.equals(input.get(i));
+      }
+    }    
+    return match;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/IntToBool.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/IntToBool.java b/datafu-pig/src/main/java/datafu/pig/util/IntToBool.java
new file mode 100644
index 0000000..d00e297
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/IntToBool.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.util;
+
+/**
+ * UDF which converts an Integer to a Boolean.
+ */
+public class IntToBool extends SimpleEvalFunc<Boolean>
+{
+  public Boolean call(Integer val)
+  {
+    return (val == null || val == 0) ? false : true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java b/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
new file mode 100644
index 0000000..2d262b4
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/SimpleEvalFunc.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+package datafu.pig.util;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+  Uses reflection to makes writing simple wrapper Pig UDFs easier.
+
+  For example, writing a simple string trimming UDF might look like
+  this:
+  <pre>
+  {@code
+  public class TRIM extends EvalFunc<String> 
+  {
+    public String exec(Tuple input) throws IOException 
+    {
+      if (input.size() != 1)
+        throw new IllegalArgumentException("requires a parameter");
+  
+      try {
+        Object o = input.get(0);
+        if (!(o instanceof String))
+          throw new IllegalArgumentException("expected a string");
+  
+        String str = (String)o;
+        return (str == null) ? null : str.trim();
+      } 
+      catch (Exception e) {
+        throw WrappedIOException.wrap("error...", e);
+      }
+    }
+  }
+  }
+  </pre>
+  There is a lot of boilerplate to check the number of arguments and
+  the parameter types in the tuple.
+
+  Instead, with this class, you can derive from SimpleEvalFunc and
+  create a <code>call()</code> method (not exec!), just specifying the
+  arguments as a regular function. The class handles all the argument
+  checking and exception wrapping for you. So your code would be:
+  <pre>
+  {@code
+  public class TRIM2 extends SimpleEvalFunc<String> 
+  {
+    public String call(String s)
+    {
+      return (s != null) ? s.trim() : null;
+    }
+  }
+  }
+  </pre>
+
+  An example of this UDF in action with Pig:
+  <pre>
+  {@code
+  grunt> a = load 'test' as (x:chararray, y:chararray); dump a;
+    (1 , 2)
+    
+  grunt> b = foreach a generate TRIM2(x); dump b;
+    (1)
+    
+  grunt> c = foreach a generate TRIM2((int)x); dump c;
+    datafu.pig.util.TRIM2(java.lang.String): argument type 
+    mismatch [#1]; expected java.lang.String, got java.lang.Integer
+    
+  grunt> d = foreach a generate TRIM2(x, y); dump d;
+    datafu.pig.util.TRIM2(java.lang.String): got 2 arguments, expected 1.
+  }
+  </pre>
+
+*/
+
+public abstract class SimpleEvalFunc<T> extends EvalFunc<T>
+{
+  // TODO Add support for other UDF types (e.g., FilterFunc)
+  // TODO Algebraic EvalFuncs 
+  
+  Method m = null;
+
+  public SimpleEvalFunc()
+  {
+    for (Method method : this.getClass().getMethods()) {
+      if (method.getName() == "call")
+        m = method;
+    }
+    if (m == null)
+      throw new IllegalArgumentException(String.format("%s: couldn't find call() method in UDF.", getClass().getName()));
+  }
+
+  // Pig can't get the return type via reflection (as getReturnType normally tries to do), so give it a hand 
+  @Override
+  public Type getReturnType() 
+  {
+    return m.getReturnType();
+  }
+
+  private String _method_signature() 
+  {
+    StringBuilder sb = new StringBuilder(getClass().getName());
+    Class<?> pvec[] = m.getParameterTypes();
+
+    sb.append("(");
+    for (int i=0; i < pvec.length; i++) {
+      if (i > 0)
+        sb.append(", ");
+      sb.append(String.format("%s", pvec[i].getName()));
+    }
+    sb.append(")");
+
+    return sb.toString();
+  }
+ 
+  @Override
+  @SuppressWarnings("unchecked")
+  public T exec(Tuple input) throws IOException
+  {
+    @SuppressWarnings("rawtypes")
+    Class pvec[] = m.getParameterTypes();
+
+    if (input == null || input.size() == 0)
+      return null;
+    
+    // check right number of arguments
+    if (input.size() != pvec.length) 
+      throw new IOException(String.format("%s: got %d arguments, expected %d.", _method_signature(), input.size(), pvec.length));
+
+    // pull and check argument types
+    Object[] args = new Object[input.size()];
+    for (int i=0; i < pvec.length; i++) {
+      Object o = input.get(i);
+      try {
+        o = pvec[i].cast(o);
+      }
+      catch (ClassCastException e) {
+        throw new IOException(String.format("%s: argument type mismatch [#%d]; expected %s, got %s", _method_signature(), i+1,
+              pvec[i].getName(), o.getClass().getName()));
+      }
+      args[i] = o;
+    }
+
+    try {
+      return (T) m.invoke(this, args);
+    }
+    catch (Exception e) {
+        throw new IOException(String.format("%s: caught exception processing input.", _method_signature()), e);
+    }
+  }
+
+  /**
+   * Override outputSchema so we can verify the input schema at pig compile time, instead of runtime
+   * @param inputSchema input schema
+   * @return call to super.outputSchema in case schema was defined elsewhere
+   */
+  @Override
+  public Schema outputSchema(Schema inputSchema)
+  {
+    if (inputSchema == null) {
+      throw new IllegalArgumentException(String.format("%s: null schema passed to %s", _method_signature(), getClass().getName()));
+    }
+
+    // check correct number of arguments
+    @SuppressWarnings("rawtypes")
+    Class parameterTypes[] = m.getParameterTypes();
+    if (inputSchema.size() != parameterTypes.length) {
+      throw new IllegalArgumentException(String.format("%s: got %d arguments, expected %d.",
+                                                       _method_signature(),
+                                                       inputSchema.size(),
+                                                       parameterTypes.length));
+    }
+
+    // check type for each argument
+    for (int i=0; i < parameterTypes.length; i++) {
+      try {
+        byte inputType = inputSchema.getField(i).type;
+        byte parameterType = DataType.findType(parameterTypes[i]);
+        if (inputType != parameterType) {
+          throw new IllegalArgumentException(String.format("%s: argument type mismatch [#%d]; expected %s, got %s",
+                                                           _method_signature(),
+                                                           i+1,
+                                                           DataType.findTypeName(parameterType),
+                                                           DataType.findTypeName(inputType)));
+        }
+      }
+      catch (FrontendException fe) {
+        throw new IllegalArgumentException(String.format("%s: Problem with input schema: ", _method_signature(), inputSchema), fe);
+      }
+    }
+
+    // delegate to super to determine the actual outputSchema (if specified)
+    return super.outputSchema(inputSchema);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/TransposeTupleToBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/TransposeTupleToBag.java b/datafu-pig/src/main/java/datafu/pig/util/TransposeTupleToBag.java
new file mode 100644
index 0000000..f8a39df
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/TransposeTupleToBag.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+/**
+ * Performs a transpose on a tuple, resulting in a bag of key, value fields where
+ * the key is the column name and the value is the value of that column in the tuple.
+ * 
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * define TransposeTupleToBag datafu.pig.util.TransposeTupleToBag();
+
+ * -- input: 1,10,11,12
+ * input = LOAD 'input' AS (id:int,val1:int,val2:int,val3:int);
+ *
+ * -- produces: 1,{("val1",10),("val2",11),("val3",12)}
+ * output = FOREACH input GENERATE id, TransposeTupleToBag(val1 .. val3);
+ *
+ * }
+ * </pre>
+ * </p>
+ * 
+ * @author "William Vaughan <wvaughan@linkedin.com>"
+ *
+ */
+public class TransposeTupleToBag extends AliasableEvalFunc<DataBag>
+{
+  private final String TRANSPOSE_TYPE = "TRANSPOSE_TYPE";
+
+  @Override
+  public Schema getOutputSchema(Schema input)
+  {
+    try
+    {
+      // require that every field in the input has the same type
+      Byte type = null;
+      for (FieldSchema fieldSchema : input.getFields()) {
+        if (type == null) {
+          type = fieldSchema.type;
+        } else {
+          if (type != fieldSchema.type) {
+            throw new RuntimeException(
+                                       String.format("Expected all input types to match.  Got both %s and %s.", 
+                                                     DataType.findTypeName(type.byteValue()), DataType.findTypeName(fieldSchema.type)));
+          }
+        }      
+      }
+      getInstanceProperties().put(TRANSPOSE_TYPE, type);
+      
+      Schema outputTupleSchema = new Schema();
+      outputTupleSchema.add(new Schema.FieldSchema("key", DataType.CHARARRAY));
+      outputTupleSchema.add(new Schema.FieldSchema("value", type));
+      return new Schema(new Schema.FieldSchema(
+                                               getSchemaName(this.getClass().getName().toLowerCase(), input),
+                                               outputTupleSchema, 
+                                               DataType.BAG));
+    }
+    catch (FrontendException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public DataBag exec(Tuple input) throws IOException
+  {
+    // initialize a reverse mapping
+    HashMap<Integer, String> positionToAlias = new HashMap<Integer, String>();
+    for (String alias : getFieldAliases().keySet()) {
+      positionToAlias.put(getFieldAliases().get(alias), alias);
+    }
+    DataBag output = BagFactory.getInstance().newDefaultBag();
+    for (int i=0; i<input.size(); i++) {
+      Tuple tuple = TupleFactory.getInstance().newTuple();
+      tuple.append(positionToAlias.get(i));
+      tuple.append(input.get(i));
+      output.add(tuple);
+    }
+    return output;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/main/java/datafu/pig/util/package-info.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/util/package-info.java b/datafu-pig/src/main/java/datafu/pig/util/package-info.java
new file mode 100644
index 0000000..cce46a0
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/util/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Other useful utilities.
+ */
+package datafu.pig.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/PigTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/PigTests.java b/datafu-pig/src/test/java/datafu/test/pig/PigTests.java
new file mode 100644
index 0000000..d1d6fcc
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/PigTests.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig;
+
+import static org.testng.Assert.*;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.apache.pig.tools.parameters.ParseException;
+
+public abstract class PigTests
+{    
+  @org.testng.annotations.BeforeClass
+  public void beforeClass()
+  {
+    // TODO make it configurable whether this happens, for travis-ci we can't spam the logs so much,
+    // however otherwise it is useful to see the errors
+    Logger.getRootLogger().removeAllAppenders();
+    Logger.getLogger(JvmMetrics.class).setLevel(Level.OFF);
+  }
+  
+  @org.testng.annotations.BeforeMethod
+  public void beforeMethod(Method method)
+  {
+    System.out.println("\n*** Running " + method.getName() + " ***");
+  }
+  
+  protected String[] getDefaultArgs()
+  {
+    String[] args = {
+        getDataDirParam()
+      };
+    return args;
+  }
+  
+  protected List<String> getDefaultArgsAsList()
+  {
+    String[] args = getDefaultArgs();
+    List<String> argsList = new ArrayList<String>(args.length);
+    for (String arg : args)
+    {
+      argsList.add(arg);
+    }
+    return argsList;
+  }
+  
+  protected PigTest createPigTestFromString(String str, String... args) throws IOException
+  {
+    return createPigTest(str.split("\n"),args);
+  }
+  
+  protected PigTest createPigTest(String[] lines, String... args) throws IOException
+  {
+    // append args to list of default args
+    List<String> theArgs = getDefaultArgsAsList();
+    for (String arg : args)
+    {
+      theArgs.add(arg);
+    }
+    
+    for (String arg : theArgs)
+    {
+      String[] parts = arg.split("=",2);
+      if (parts.length == 2)
+      {
+        for (int i=0; i<lines.length; i++)
+        {
+          lines[i] = lines[i].replaceAll(Pattern.quote("$" + parts[0]), parts[1]);
+        }
+      }
+    }
+    
+    return new PigTest(lines);
+  }
+  
+  protected PigTest createPigTest(String scriptPath, String... args) throws IOException
+  {
+    return createPigTest(getLinesFromFile(scriptPath), args);
+  }
+  
+  protected String getDataDirParam()
+  {
+    return "DATA_DIR=" + getDataPath();
+  }
+  
+  protected String getDataPath()
+  {
+    if (System.getProperty("datafu.data.dir") != null)
+    {
+      return System.getProperty("datafu.data.dir");
+    }
+    else
+    {
+      return new File(System.getProperty("user.dir"), "data").getAbsolutePath();
+    }  
+  }
+  
+  protected String getJarPath()
+  {    
+    String jarDir = null;
+    
+    if (System.getProperty("datafu.jar.dir") != null)
+    {
+      jarDir = System.getProperty("datafu.jar.dir");
+    }
+    else
+    {
+      jarDir = new File(System.getProperty("user.dir"), "build/libs").getAbsolutePath();
+    }  
+    
+    File userDir = new File(jarDir);
+    
+    String[] files = userDir.list(new FilenameFilter() {
+
+      @Override
+      public boolean accept(File dir, String name)
+      {
+        return name.endsWith(".jar") && !name.contains("sources") && !name.contains("javadoc");
+      }
+      
+    });
+    
+    if (files == null || files.length == 0)
+    {
+      throw new RuntimeException("Could not find JAR file");
+    }
+    else if (files.length > 1)
+    {
+      StringBuilder sb = new StringBuilder();
+      for (String file : files)
+      {
+        sb.append(file);
+        sb.append(",");
+      }
+      throw new RuntimeException("Found more JAR files than expected: " + sb.substring(0, sb.length()-1));
+    }
+    
+    return  userDir.getAbsolutePath() + "/" + files[0];
+  }
+  
+  protected List<Tuple> getLinesForAlias(PigTest test, String alias) throws IOException, ParseException
+  {
+    return getLinesForAlias(test,alias,true);
+  }
+  
+  protected List<Tuple> getLinesForAlias(PigTest test, String alias, boolean logValues) throws IOException, ParseException
+  {
+    Iterator<Tuple> tuplesIterator = test.getAlias(alias);
+    List<Tuple> tuples = new ArrayList<Tuple>();
+    if (logValues)
+    {
+      System.out.println(String.format("Values for %s: ", alias));
+    }
+    while (tuplesIterator.hasNext())
+    {
+      Tuple tuple = tuplesIterator.next();
+      if (logValues)
+      {
+        System.out.println(tuple.toString());
+      }
+      tuples.add(tuple);
+    }
+    return tuples;
+  }
+    
+  protected void writeLinesToFile(String fileName, String... lines) throws IOException
+  {
+    File inputFile = deleteIfExists(getFile(fileName));
+    writeLinesToFile(inputFile, lines);
+  }
+  
+  protected void writeLinesToFile(File file, String[] lines) throws IOException
+  {
+    FileWriter writer = new FileWriter(file);
+    for (String line : lines)
+    {
+      writer.write(line + "\n");
+    }
+    writer.close();
+  }
+
+  protected void assertOutput(PigTest test, String alias, String... expected) throws IOException, ParseException
+  {
+    List<Tuple> tuples = getLinesForAlias(test, alias);
+    assertEquals(expected.length, tuples.size());
+    int i=0;
+    for (String e : expected)
+    {
+      assertEquals(tuples.get(i++).toString(), e);
+    }
+  }
+  
+  protected File deleteIfExists(File file)
+  {
+    if (file.exists())
+    {
+      file.delete();
+    }
+    return file;
+  }
+  
+  protected File getFile(String fileName)
+  {
+    return new File(System.getProperty("user.dir"), fileName).getAbsoluteFile();
+  }
+  
+  /**
+   * Gets the lines from a given file.
+   * 
+   * @param relativeFilePath The path relative to the datafu-tests project.
+   * @return The lines from the file
+   * @throws IOException
+   */
+  protected String[] getLinesFromFile(String relativeFilePath) throws IOException
+  {
+    // assume that the working directory is the datafu-tests project
+    File file = new File(System.getProperty("user.dir"), relativeFilePath).getAbsoluteFile();
+    BufferedInputStream content = new BufferedInputStream(new FileInputStream(file));
+    Object[] lines = IOUtils.readLines(content).toArray();
+    String[] result = new String[lines.length];
+    for (int i=0; i<lines.length; i++)
+    {
+      result[i] = (String)lines[i];
+    }
+    return result;
+  }
+}


Mime
View raw message