datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [10/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java b/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java
new file mode 100644
index 0000000..5ebe778
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.urls;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class UserAgentTest extends PigTests
+{
+  
+  /**
+  
+
+  define UserAgentClassify datafu.pig.urls.UserAgentClassify();
+  
+  data = load 'input' as (usr_agent:chararray);
+  data_out = foreach data generate UserAgentClassify(usr_agent) as class;
+  --describe data_out;
+  store data_out into 'output';
+   */
+  @Multiline private String userAgentTest;
+  
+  @Test
+  public void userAgentTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(userAgentTest);
+  
+    String[] input = {
+        "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
+        "Mozilla/5.0 (compatible; Konqueror/3.5; Linux; X11; de) KHTML/3.5.2 (like Gecko) Kubuntu 6.06 Dapper",
+        "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:2.2a1pre) Gecko/20110331 Firefox/4.2a1pre Fennec/4.1a1pre",
+        "Opera/9.00 (X11; Linux i686; U; en)",
+        "Wget/1.10.2",
+        "Opera/9.80 (Android; Linux; Opera Mobi/ADR-1012221546; U; pl) Presto/2.7.60 Version/10.5",
+        "Mozilla/5.0 (Linux; U; Android 2.2; en-us; DROID2 Build/VZW) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"
+    };
+    
+    String[] output = {
+        "(mobile)",
+        "(desktop)",
+        "(mobile)",
+        "(desktop)",
+        "(desktop)",
+        "(mobile)",
+        "(mobile)",
+      };
+    
+    test.assertOutput("data",input,"data_out",output);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java b/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java
new file mode 100644
index 0000000..3b23d35
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import static org.testng.Assert.*;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+import datafu.pig.util.AliasableEvalFunc;
+
+public class AliasEvalFuncTest extends PigTests
+{
+  static class ReportBuilder extends AliasableEvalFunc<DataBag> {
+    static final String ORDERED_ROUTES = "orderedRoutes";
+
+    public DataBag exec(Tuple input) throws IOException {
+       DataBag inputBag = getBag(input, ORDERED_ROUTES);
+       DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+       for(Iterator<Tuple> tupleIter = inputBag.iterator(); tupleIter.hasNext(); ) {
+           outputBag.add(tupleIter.next());
+       }
+       return outputBag;
+    }
+
+    public Schema getOutputSchema(Schema input) {
+       try {
+            Schema bagSchema = input.getField(0).schema;
+            Schema outputSchema = new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+                                                                    .getName()
+                                                                    .toLowerCase(), input),
+                                                                    bagSchema,
+                                                                    DataType.BAG));
+            return outputSchema;
+       } catch (Exception ex) {
+            return null;
+       }
+    }
+  }
+
+  @Test
+  public void getBagTest() throws Exception
+  {
+     ReportBuilder udf = new ReportBuilder();
+     udf.setUDFContextSignature("test");
+     List<Schema.FieldSchema> fieldSchemaList = new ArrayList<Schema.FieldSchema>();
+     fieldSchemaList.add(new Schema.FieldSchema("msisdn", DataType.LONG));
+     fieldSchemaList.add(new Schema.FieldSchema("ts", DataType.INTEGER));
+     fieldSchemaList.add(new Schema.FieldSchema("center_lon", DataType.DOUBLE));
+     fieldSchemaList.add(new Schema.FieldSchema("center_lat", DataType.DOUBLE));
+     Schema schemaTuple = new Schema(fieldSchemaList);
+     Schema schemaBag = new Schema(new Schema.FieldSchema(ReportBuilder.ORDERED_ROUTES, schemaTuple, DataType.BAG));
+     udf.outputSchema(schemaBag);
+
+     Tuple inputTuple = TupleFactory.getInstance().newTuple();
+     DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+     inputBag.add(TupleFactory.getInstance().newTuple(Arrays.asList(71230000000L, 1382351612, 10.697, 20.713)));
+     inputTuple.append(inputBag);
+     DataBag outputBag = udf.exec(inputTuple);
+     Assert.assertEquals(inputBag, outputBag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java
new file mode 100644
index 0000000..1a6a741
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import static org.testng.Assert.*;
+
+import java.util.List;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class AssertTests extends PigTests
+{
+  /**
+  
+  
+  define ASRT datafu.pig.util.AssertUDF();
+  
+  data = LOAD 'input' AS (val:INT);
+  
+  data2 = FILTER data BY ASRT(val,'assertion appears to have failed, doh!');
+  
+  STORE data2 INTO 'output';
+  */
+  @Multiline private static String assertWithMessage;
+  
+  @Test
+  public void shouldAssertWithMessageOnZero() throws Exception
+  {
+    try
+    {
+      PigTest test = createPigTestFromString(assertWithMessage);
+      
+      this.writeLinesToFile("input", "0");
+      
+      test.runScript();
+      
+      this.getLinesForAlias(test, "data2");
+      
+      fail("test should have failed, but it didn't");
+    }
+    catch (Exception e)
+    {
+    }
+  }
+  
+  @Test
+  public void shouldNotAssertWithMessageOnOne() throws Exception
+  {
+    PigTest test = createPigTestFromString(assertWithMessage);
+    
+    this.writeLinesToFile("input", "1");
+    
+    test.runScript();
+    
+    List<Tuple> result = this.getLinesForAlias(test, "data2");
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).size(), 1);
+    Assert.assertEquals(result.get(0).get(0), 1);
+  }
+  
+  /**
+  
+  
+  define ASRT datafu.pig.util.AssertUDF();
+  
+  data = LOAD 'input' AS (val:INT);
+  
+  data2 = FILTER data BY ASRT(val);
+  
+  STORE data2 INTO 'output';
+  */
+  @Multiline private static String assertWithoutMessage;
+  
+  @Test
+  public void shouldAssertWithoutMessageOnZero() throws Exception
+  {
+    try
+    {
+      PigTest test = createPigTestFromString(assertWithoutMessage);
+      
+      this.writeLinesToFile("input", "0");
+      
+      test.runScript();
+      
+      this.getLinesForAlias(test, "data2");
+      
+      fail("test should have failed, but it didn't");
+    }
+    catch (Exception e)
+    {
+    }
+  }
+  
+  @Test
+  public void shouldNotAssertWithoutMessageOnOne() throws Exception
+  {
+    PigTest test = createPigTestFromString(assertWithoutMessage);
+    
+    this.writeLinesToFile("input", "1");
+    
+    test.runScript();
+    
+    List<Tuple> result = this.getLinesForAlias(test, "data2");
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0).size(), 1);
+    Assert.assertEquals(result.get(0).get(0), 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java
new file mode 100644
index 0000000..3baf251
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.pigunit.PigTest;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class CoalesceTests extends PigTests
+{
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2,val3) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String coalesceIntTest;
+  
+  @Test
+  public void coalesceIntTest() throws Exception
+  { 
+    PigTest test = createPigTestFromString(coalesceIntTest);
+    
+    this.writeLinesToFile("input", "1,1,2,3",
+                                   "2,,2,3",
+                                   "3,,,3",
+                                   "4,,,",
+                                   "5,1,,3",
+                                   "6,1,,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data3");
+    
+    Assert.assertEquals(6, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(1, t.get(1)); break;
+      case 2:
+        Assert.assertEquals(2, t.get(1)); break;
+      case 3:
+        Assert.assertEquals(3, t.get(1)); break;
+      case 4:
+        Assert.assertEquals(null, t.get(1)); break;
+      case 5:
+        Assert.assertEquals(1, t.get(1)); break;
+      case 6:
+        Assert.assertEquals(1, t.get(1)); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+  
+  STORE data4 INTO 'output';
+  */
+  @Multiline private static String coalesceLongTest;
+  
+  @Test
+  public void coalesceLongTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceLongTest);
+    
+    this.writeLinesToFile("input", "1,5",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data4");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(500L, t.get(1)); break;
+      case 2:
+        Assert.assertEquals(10000L, t.get(1)); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,100) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+  
+  STORE data4 INTO 'output';
+  */
+  @Multiline private static String coalesceCastIntToLongTestFails;
+  
+  // The first parameter is a long and the fixed value is an int.
+  // They cannot be merged without the lazy option.
+  @Test(expectedExceptions=FrontendException.class)
+  public void coalesceCastIntToLongTestFails() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceCastIntToLongTestFails);
+    
+    this.writeLinesToFile("input", "1,5",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data4");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(500L, t.get(1)); break;
+      case 2:
+        Assert.assertEquals(10000L, t.get(1)); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce('lazy');
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,100) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+  
+  STORE data4 INTO 'output';
+  */
+  @Multiline private static String coalesceIntAndLongTest;
+  
+  // The first parameter is a long and the fixed value is an int.
+  // They are merged to a long.
+  @Test
+  public void coalesceCastIntToLongTest1() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceIntAndLongTest);
+    
+    this.writeLinesToFile("input", "1,5",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data4");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(500L, t.get(1)); break;
+      case 2:
+        Assert.assertEquals(10000L, t.get(1)); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce('lazy');
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+  
+  STORE data4 INTO 'output';
+  */
+  @Multiline private static String coalesceIntAndLongTest2;
+  
+  // The first parameter is an int, but the fixed parameter is a long.
+  // They are merged to a long.
+  @Test
+  public void coalesceCastIntToLongTest2() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceIntAndLongTest2);
+    
+    this.writeLinesToFile("input", "1,5",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data4");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(500L, t.get(1)); break;
+      case 2:
+        Assert.assertEquals(10000L, t.get(1)); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce('lazy');
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,100.0) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+  
+  STORE data4 INTO 'output';
+  */
+  @Multiline private static String coalesceIntAndDoubleTest;
+  
+  // The first parameter is an int, but the fixed parameter is a long.
+  // They are merged to a long.
+  @Test
+  public void coalesceCastIntToDoubleTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceIntAndDoubleTest);
+    
+    this.writeLinesToFile("input", "1,5",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data4");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(500.0, t.get(1)); break;
+      case 2:
+        Assert.assertEquals(10000.0, t.get(1)); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+  
+  data = FOREACH data GENERATE testcase, (val1 IS NOT NULL ? ToDate(val1) : (datetime)null) as val1;
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,ToDate('1970-01-01T00:00:00.000Z')) as result;
+  
+  --describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+    
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String coalesceCastIntToDatetimeTest;
+  
+  @Test
+  public void coalesceCastIntToDatetimeTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceCastIntToDatetimeTest);
+    
+    this.writeLinesToFile("input", "1,1375826183000",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data3");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      Integer testcase = (Integer)t.get(0);
+      Assert.assertNotNull(testcase);
+      switch(testcase)
+      {
+      case 1:
+        Assert.assertEquals("2013-08-06T21:56:23.000Z", ((DateTime)t.get(1)).toDateTime(DateTimeZone.UTC).toString()); break;
+      case 2:
+        Assert.assertEquals("1970-01-01T00:00:00.000Z", t.get(1).toString()); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce('lazy');
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+  
+  data = FOREACH data GENERATE testcase, (val1 IS NOT NULL ? ToDate(val1) : (datetime)null) as val1;
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,ToDate('1970-01-01T00:00:00.000Z')) as result;
+  
+  --describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+    
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String coalesceCastIntToDatetimeLazyTest;
+  
+  @Test
+  public void coalesceCastIntToDatetimeLazyTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceCastIntToDatetimeLazyTest);
+    
+    this.writeLinesToFile("input", "1,1375826183000",
+                                   "2,");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data3");
+    
+    Assert.assertEquals(2, lines.size());
+    for (Tuple t : lines)
+    {
+      Integer testcase = (Integer)t.get(0);
+      Assert.assertNotNull(testcase);
+      switch(testcase)
+      {
+      case 1:
+        Assert.assertEquals("2013-08-06T21:56:23.000Z", ((DateTime)t.get(1)).toDateTime(DateTimeZone.UTC).toString()); break;
+      case 2:
+        Assert.assertEquals("1970-01-01T00:00:00.000Z", t.get(1).toString()); break;
+      default:
+        Assert.fail("Did not expect: " + t.get(1));                    
+      }
+    }
+  }
+  
+  /**
+  
+  
+  define COALESCE datafu.pig.util.Coalesce();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:LONG);
+  
+  data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2) as result;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, result;
+  
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String coalesceBagIncompatibleTypeTest;
+  
+  @Test(expectedExceptions=FrontendException.class)
+  public void coalesceBagIncompatibleTypeTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(coalesceBagIncompatibleTypeTest);
+    
+    this.writeLinesToFile("input", "1,1,2L}");
+    
+    test.runScript();
+    
+    this.getLinesForAlias(test, "data3");
+  }
+  
+  /**
+  
+
+  define COALESCE datafu.pig.util.Coalesce('lazy');
+  define EmptyBagToNullFields datafu.pig.bags.EmptyBagToNullFields();
+  
+  input1 = LOAD 'input1' using PigStorage(',') AS (val1:INT,val2:INT);
+  input2 = LOAD 'input2' using PigStorage(',') AS (val1:INT,val2:INT);
+  input3 = LOAD 'input3' using PigStorage(',') AS (val1:INT,val2:INT);
+  
+  data4 = COGROUP input1 BY val1,
+                  input2 BY val1,
+                  input3 BY val1;
+  
+  dump data4;
+  
+  data4 = FOREACH data4 GENERATE
+    FLATTEN(input1),
+    FLATTEN(EmptyBagToNullFields(input2)),
+    FLATTEN(EmptyBagToNullFields(input3));
+  
+  dump data4;
+    
+  describe data4;
+  
+  data5 = FOREACH data4 GENERATE input1::val1 as val1, COALESCE(input2::val2,0L) as val2, COALESCE(input3::val2,0L) as val3;
+  
+  --describe data5;
+  
+  STORE data5 INTO 'output';
+  */
+  @Multiline private static String leftJoinTest;
+  
+  @Test
+  public void leftJoinTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(leftJoinTest);
+    
+    this.writeLinesToFile("input1", "1,1",
+                                    "2,2",
+                                    "5,5");
+    
+    this.writeLinesToFile("input2", "1,10",
+                                    "3,30",
+                                    "5,50");
+    
+    this.writeLinesToFile("input3", "2,100",
+                                    "5,500");
+    
+    test.runScript();
+    
+    List<Tuple> lines = this.getLinesForAlias(test, "data5");
+    
+    Assert.assertEquals(3, lines.size());
+    for (Tuple t : lines)
+    {
+      switch((Integer)t.get(0))
+      {
+      case 1:
+        Assert.assertEquals(10L, t.get(1));  
+        Assert.assertEquals(0L, t.get(2));  
+        break;
+      case 2:
+        Assert.assertEquals(0L, t.get(1)); 
+        Assert.assertEquals(100L, t.get(2)); 
+        break;
+      case 5:
+        Assert.assertEquals(50L, t.get(1)); 
+        Assert.assertEquals(500L, t.get(2)); 
+        break;
+      default:
+        Assert.fail("Did not expect: " + t.get(0));                    
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java
new file mode 100644
index 0000000..591ee82
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class InTests extends PigTests
+{
+  /**
+  
+
+  define I datafu.pig.util.InUDF();
+  
+  data = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
+  
+  data2 = FOREACH data {
+    C = FILTER B By I(v, 1,2,3);
+    GENERATE C;
+  }
+  
+  describe data2;
+  
+  STORE data2 INTO 'output';
+  */
+  @Multiline private static String inIntTest;
+  
+  @Test
+  public void inIntTest() throws Exception
+  { 
+    PigTest test = createPigTestFromString(inIntTest);
+    
+    writeLinesToFile("input", 
+                     "({(1),(2),(3),(4),(5)})",
+                     "({(1),(2)})",
+                     "({(4),(5)})");
+    
+    test.runScript();
+    
+    assertOutput(test, "data2",
+                 "({(1),(2),(3)})",
+                 "({(1),(2)})",
+                 "({})");    
+  }
+  
+  /**
+  
+
+  define I datafu.pig.util.InUDF();
+  
+  data = LOAD 'input' AS (B: bag {T: tuple(v:chararray)});
+  
+  data2 = FOREACH data {
+    C = FILTER B By I(v, 'will','matt','sam');
+    GENERATE C;
+  }
+  
+  describe data2;
+  
+  STORE data2 INTO 'output';
+  */
+  @Multiline private static String inStringTest;
+  
+  @Test
+  public void inStringTest() throws Exception
+  { 
+    PigTest test = createPigTestFromString(inStringTest);
+    
+    writeLinesToFile("input", 
+                     "({(alice),(bob),(will),(matt),(sam)})",
+                     "({(will),(matt)})",
+                     "({(alice),(bob)})");
+    
+    test.runScript();
+    
+    assertOutput(test, "data2",
+                 "({(will),(matt),(sam)})",
+                 "({(will),(matt)})",
+                 "({})");    
+  }
+  
+  /**
+  
+  
+  define I datafu.pig.util.InUDF();
+  
+  data = LOAD 'input' AS (owner:chararray, color:chararray);
+  describe data;
+  data2 = FILTER data BY I(color, 'red','blue');
+  describe data2;
+  STORE data2 INTO 'output';
+  */
+  @Multiline private static String inTopLevelTest;
+  
+  @Test
+  public void inTopLevelTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(inTopLevelTest);
+    
+    writeLinesToFile("input", 
+                     "alice\tred",
+                     "bob\tblue",
+                     "charlie\tgreen",
+                     "dave\tred");
+    test.runScript();
+    
+    assertOutput(test, "data2", 
+                 "(alice,red)",
+                 "(bob,blue)",
+                 "(dave,red)");
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java
new file mode 100644
index 0000000..465e8b2
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class IntBoolConversionPigTests extends PigTests
+{
+  /**
+  
+  
+  define IntToBool datafu.pig.util.IntToBool();
+  
+  data = LOAD 'input' AS (val:INT);
+  
+  data2 = FOREACH data GENERATE IntToBool(val);
+  
+  STORE data2 INTO 'output';
+  */
+  @Multiline private static String intToBoolTest;
+  
+  @Test
+  public void intToBoolTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(intToBoolTest);
+        
+    String[] input = {
+      "", // null
+      "0",
+      "1"
+    };
+    
+    String[] output = {
+        "(false)",
+        "(false)",
+        "(true)"
+      };
+    
+    test.assertOutput("data",input,"data2",output);
+  }
+  
+  /**
+  
+  
+  define IntToBool datafu.pig.util.IntToBool();
+  define BoolToInt datafu.pig.util.BoolToInt();
+  
+  data = LOAD 'input' AS (val:INT);
+  
+  data2 = FOREACH data GENERATE IntToBool(val) as val;
+  data3 = FOREACH data2 GENERATE BoolToInt(val) as val;
+  
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String intToBoolToIntTest;
+  
+  @Test
+  public void intToBoolToIntTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(intToBoolToIntTest);
+        
+    String[] input = {
+      "", // null
+      "0",
+      "1",
+      "2",
+      "-1",
+      "-2",
+      "0",
+      ""
+    };
+    
+    String[] output = {
+        "(0)",
+        "(0)",
+        "(1)",
+        "(1)",
+        "(1)",
+        "(1)",
+        "(0)",
+        "(0)"
+      };
+    
+    test.assertOutput("data",input,"data3",output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java b/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java
new file mode 100644
index 0000000..3d7654a
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import java.util.List;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class TransposeTest extends PigTests
+{
+  /**
+  
+
+  define Transpose datafu.pig.util.TransposeTupleToBag();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT);
+  
+  data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, transposed;
+  
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String transposeTest;
+  
+  @Test
+  public void transposeTest() throws Exception
+  { 
+    PigTest test = createPigTestFromString(transposeTest);
+    writeLinesToFile("input", "1,10,11,12",
+                              "2,20,21,22");
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data3");
+    for (Tuple tuple : output) {
+      int testCase = (Integer)tuple.get(0);
+      DataBag bag = (DataBag)tuple.get(1);
+      Assert.assertEquals(bag.size(), 3);
+      int i=0;
+      for (Tuple t : bag) {
+        String expectedKey = String.format("val%d",i+1);
+        Assert.assertEquals((String)t.get(0), expectedKey);
+        int actualValue = (Integer)t.get(1); 
+        Assert.assertEquals(actualValue, testCase*10+i);
+        i++;
+      }
+    }
+  }
+  
+  /**
+  
+
+  define Transpose datafu.pig.util.TransposeTupleToBag();
+  
+  data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:DOUBLE);
+  
+  data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed;
+  
+  describe data2;
+  
+  data3 = FOREACH data2 GENERATE testcase, transposed;
+  
+  STORE data3 INTO 'output';
+  */
+  @Multiline private static String transposeBadTypeTest;
+  
+  @Test(expectedExceptions={org.apache.pig.impl.logicalLayer.FrontendException.class})
+  public void transposeBadTypeTest() throws Exception
+  { 
+    PigTest test = createPigTestFromString(transposeBadTypeTest);
+    writeLinesToFile("input", "1,10,11,12.0",
+                              "2,20,21,22.0");
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data3");
+    for (Tuple tuple : output) {
+      int testCase = (Integer)tuple.get(0);
+      DataBag bag = (DataBag)tuple.get(1);
+      Assert.assertEquals(bag.size(), 3);
+      int i=0;
+      for (Tuple t : bag) {
+        String expectedKey = String.format("val%d",i+1);
+        Assert.assertEquals((String)t.get(0), expectedKey);
+        int actualValue = (Integer)t.get(1); 
+        Assert.assertEquals(actualValue, testCase*10+i);
+        i++;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..98e86a2
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.datafu
+version=1.2.1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/buildscript.gradle
----------------------------------------------------------------------
diff --git a/gradle/buildscript.gradle b/gradle/buildscript.gradle
new file mode 100644
index 0000000..225e0a8
--- /dev/null
+++ b/gradle/buildscript.gradle
@@ -0,0 +1,12 @@
+repositories {
+  repositories {
+    // For license plugin.
+    maven {
+      url 'http://dl.bintray.com/content/netflixoss/external-gradle-plugins/'
+    }
+  }
+}
+
+dependencies {
+  classpath 'nl.javadude.gradle.plugins:license-gradle-plugin:0.6.1'
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
new file mode 100644
index 0000000..29d3e36
--- /dev/null
+++ b/gradle/dependency-versions.gradle
@@ -0,0 +1,20 @@
+ext {
+  antlrVersion="3.2"
+  avroVersion="1.5.3"
+  streamVersion="2.5.0"
+  commonsMathVersion="2.2"
+  commonsIoVersion="1.4"
+  fastutilVersion="6.5.7"
+  guavaVersion="11.0"
+  hadoopVersion="0.20.2"
+  jodaTimeVersion="1.6"
+  log4jVersion="1.2.14"
+  mavenVersion="2.1.3"
+  jlineVersion="0.9.94"
+  pigVersion="0.11.1"
+  testngVersion="6.2"
+  toolsVersion="1.4.2"
+  wagonHttpVersion="1.0-beta-2"
+  openNlpVersion="1.5.3"
+  openNlpMaxEntVersion="3.0.3"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..a7634b0
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..610282a
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Thu Jul 11 22:18:11 PDT 2013
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.6-bin.zip

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradlew
----------------------------------------------------------------------
diff --git a/gradlew b/gradlew
new file mode 100755
index 0000000..16bbbbf
--- /dev/null
+++ b/gradlew
@@ -0,0 +1,164 @@
+#!/usr/bin/env bash
+
+##############################################################################
+##
+##  Gradle start up script for UN*X
+##
+##############################################################################
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS="-XX:MaxPermSize=512m"
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+    echo "$*"
+}
+
+die ( ) {
+    echo
+    echo "$*"
+    echo
+    exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+case "`uname`" in
+  CYGWIN* )
+    cygwin=true
+    ;;
+  Darwin* )
+    darwin=true
+    ;;
+  MINGW* )
+    msys=true
+    ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched.
+if $cygwin ; then
+    [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+fi
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+        PRG="$link"
+    else
+        PRG=`dirname "$PRG"`"/$link"
+    fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >&-
+APP_HOME="`pwd -P`"
+cd "$SAVED" >&-
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+        # IBM's JDK on AIX uses strange locations for the executables
+        JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+        JAVACMD="$JAVA_HOME/bin/java"
+    fi
+    if [ ! -x "$JAVACMD" ] ; then
+        die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+    fi
+else
+    JAVACMD="java"
+    which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+    MAX_FD_LIMIT=`ulimit -H -n`
+    if [ $? -eq 0 ] ; then
+        if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+            MAX_FD="$MAX_FD_LIMIT"
+        fi
+        ulimit -n $MAX_FD
+        if [ $? -ne 0 ] ; then
+            warn "Could not set maximum file descriptor limit: $MAX_FD"
+        fi
+    else
+        warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+    fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+    GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+    APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+    CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+
+    # We build the pattern for arguments to be converted via cygpath
+    ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+    SEP=""
+    for dir in $ROOTDIRSRAW ; do
+        ROOTDIRS="$ROOTDIRS$SEP$dir"
+        SEP="|"
+    done
+    OURCYGPATTERN="(^($ROOTDIRS))"
+    # Add a user-defined pattern to the cygpath arguments
+    if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+        OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+    fi
+    # Now convert the arguments - kludge to limit ourselves to /bin/sh
+    i=0
+    for arg in "$@" ; do
+        CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+        CHECK2=`echo "$arg"|egrep -c "^-"`                                 ### Determine if an option
+
+        if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then                    ### Added a condition
+            eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+        else
+            eval `echo args$i`="\"$arg\""
+        fi
+        i=$((i+1))
+    done
+    case $i in
+        (0) set -- ;;
+        (1) set -- "$args0" ;;
+        (2) set -- "$args0" "$args1" ;;
+        (3) set -- "$args0" "$args1" "$args2" ;;
+        (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+        (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+        (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+        (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+        (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+        (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+    esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+    JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java
deleted file mode 100644
index 7dbfe9a..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.util.Set;
-
-import javax.annotation.processing.AbstractProcessor;
-import javax.annotation.processing.ProcessingEnvironment;
-import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedAnnotationTypes;
-import javax.annotation.processing.SupportedSourceVersion;
-import javax.lang.model.SourceVersion;
-import javax.lang.model.element.Element;
-import javax.lang.model.element.TypeElement;
-import javax.lang.model.util.Elements;
-
-import org.eclipse.jdt.internal.compiler.apt.model.VariableElementImpl;
-import org.eclipse.jdt.internal.compiler.ast.FieldDeclaration;
-import org.eclipse.jdt.internal.compiler.ast.StringLiteral;
-import org.eclipse.jdt.internal.compiler.lookup.FieldBinding;
-
-import java.lang.reflect.Constructor;
-
-@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
-public final class EcjMultilineProcessor extends AbstractProcessor {
-
-  private Elements elementUtils;
-  
-  @Override
-  public void init(final ProcessingEnvironment procEnv) {
-    super.init(procEnv);
-    this.elementUtils = procEnv.getElementUtils();
-  }
-
-  @Override
-  public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
-    Set<? extends Element> fields = roundEnv.getElementsAnnotatedWith(Multiline.class);
-
-    for (Element field : fields) {
-      String docComment = elementUtils.getDocComment(field);
-
-      if (null != docComment) {
-        VariableElementImpl fieldElem = (VariableElementImpl) field;
-        FieldBinding biding = (FieldBinding) fieldElem._binding;
-        FieldDeclaration decl = biding.sourceField();
-        StringLiteral string = new StringLiteral(docComment.toCharArray(), decl.sourceStart, decl.sourceEnd, decl.sourceStart);
-        decl.initialization = string;
-      }
-    }
-    return true;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java
deleted file mode 100644
index 39aa24e..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.util.Set;
-
-import javax.annotation.processing.AbstractProcessor;
-import javax.annotation.processing.ProcessingEnvironment;
-import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedAnnotationTypes;
-import javax.annotation.processing.SupportedSourceVersion;
-import javax.lang.model.SourceVersion;
-import javax.lang.model.element.Element;
-import javax.lang.model.element.TypeElement;
-
-import com.sun.tools.javac.model.JavacElements;
-import com.sun.tools.javac.processing.JavacProcessingEnvironment;
-import com.sun.tools.javac.tree.JCTree.JCVariableDecl;
-import com.sun.tools.javac.tree.TreeMaker;
-
-@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
-public final class JavacMultilineProcessor extends AbstractProcessor {
-
-  private JavacElements elementUtils;
-  private TreeMaker maker;
-  
-  @Override
-  public void init(final ProcessingEnvironment procEnv) {
-    super.init(procEnv);
-    JavacProcessingEnvironment javacProcessingEnv = (JavacProcessingEnvironment) procEnv;
-    this.elementUtils = javacProcessingEnv.getElementUtils();
-    this.maker = TreeMaker.instance(javacProcessingEnv.getContext());
-  }
-
-  @Override
-  public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
-    Set<? extends Element> fields = roundEnv.getElementsAnnotatedWith(Multiline.class);
-    for (Element field : fields) {
-      String docComment = elementUtils.getDocComment(field);
-      if (null != docComment) {
-        JCVariableDecl fieldNode = (JCVariableDecl) elementUtils.getTree(field);
-        fieldNode.init = maker.Literal(docComment);
-      }
-    }
-    return true;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/Multiline.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/Multiline.java b/plugin/java/org/adrianwalker/multilinestring/Multiline.java
deleted file mode 100644
index c29d3ef..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/Multiline.java
+++ /dev/null
@@ -1,14 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Target({ElementType.FIELD,ElementType.LOCAL_VARIABLE})
-@Retention(RetentionPolicy.SOURCE)
-public @interface Multiline {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java
deleted file mode 100644
index 9abdba5..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.util.Set;
-
-import javax.annotation.processing.AbstractProcessor;
-import javax.annotation.processing.ProcessingEnvironment;
-import javax.annotation.processing.Processor;
-import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedAnnotationTypes;
-import javax.annotation.processing.SupportedSourceVersion;
-import javax.lang.model.SourceVersion;
-import javax.lang.model.element.TypeElement;
-
-@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
-public final class MultilineProcessor extends AbstractProcessor {
-  private Processor delegator = null;
-  
-  @Override
-  public void init(final ProcessingEnvironment procEnv) {
-    super.init(procEnv);
-    String envClassName = procEnv.getClass().getName();
-    if (envClassName.contains("com.sun.tools")) {
-      delegator = new JavacMultilineProcessor();
-    } else {
-      delegator = new EcjMultilineProcessor();
-    }
-    delegator.init(procEnv);
-  }
-
-  @Override
-  public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
-    if (delegator == null ) {
-      return true;
-    }
-    return delegator.process(annotations, roundEnv);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
new file mode 100644
index 0000000..62b1899
--- /dev/null
+++ b/settings.gradle
@@ -0,0 +1 @@
+include "build-plugin","datafu-pig"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/AppendToBag.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/AppendToBag.java b/src/java/datafu/pig/bags/AppendToBag.java
deleted file mode 100644
index 55c9e76..0000000
--- a/src/java/datafu/pig/bags/AppendToBag.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Appends a tuple to a bag.
- * <p>
- * Example:
- * <pre>
- * {@code
- * define AppendToBag datafu.pig.bags.AppendToBag();
- * 
- * -- input:
- * -- ({(1),(2),(3)},(4))
- * -- ({(10),(20),(30),(40),(50)},(60))
- * input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-
- * -- output:
- * -- ({(1),(2),(3),(4)})
- * -- ({(10),(20),(30),(40),(50),(60)})
- * output = FOREACH input GENERATE AppendToBag(B,T) as B;
- * }
- * </pre>
- */
-public class AppendToBag extends SimpleEvalFunc<DataBag>
-{
-  public DataBag call(DataBag inputBag, Tuple t) throws IOException
-  {
-    inputBag.add(t);
-    return inputBag;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
-              input.getField(0).schema, DataType.BAG));
-    }
-    catch (FrontendException e) {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagConcat.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagConcat.java b/src/java/datafu/pig/bags/BagConcat.java
deleted file mode 100644
index 290f44b..0000000
--- a/src/java/datafu/pig/bags/BagConcat.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * Unions all input bags to produce a single bag containing all tuples.
- * <p>
- * This UDF accepts two forms of input:
- * <ol>
- *  <li>a tuple of 2 or more elements where each element is a bag with the same schema</li>
- *  <li>a single bag where each element of that bag is a bag and all of these bags have the same schema</li>
- * </ol>
- * <p>
- * Example 1:
- * <pre>
- * {@code
- * define BagConcat datafu.pig.bags.BagConcat();
- * -- This example illustrates the use on a tuple of bags
- * 
- * -- input:
- * -- ({(1),(2),(3)},{(3),(4),(5)})
- * -- ({(20),(25)},{(40),(50)})
- * input = LOAD 'input' AS (A: bag{T: tuple(v:INT)}, B: bag{T: tuple(v:INT)});
- * 
- * -- output:
- * -- ({(1),(2),(3),(3),(4),(5)})
- * -- ({(20),(25),(40),(50)})
- * output = FOREACH input GENERATE BagConcat(A,B); 
- * }
- * </pre>
- * <p>
- * Example 2:
- * <pre>
- * {@code
- * define BagConcat datafu.pig.bags.BagConcat();
- * -- This example illustrates the use on a bag of bags
- * 
- * -- input:
- * -- ({({(1),(2),(3)}),({(3),(4),(5)})})
- * -- ({({(20),(25)}),({(40),(50)})})
- * input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})});
- * 
- * -- output:
- * -- ({(1),(2),(3),(3),(4),(5)})
- * -- ({(20),(25),(40),(50)})
- * output = FOREACH input GENERATE BagConcat(A);
- * }
- * </pre>
- *  
- * @author wvaughan
- *
- */
-public class BagConcat extends EvalFunc<DataBag>
-{
-
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    DataBag output = BagFactory.getInstance().newDefaultBag();
-    if (input.size() > 1) {
-      // tuple of bags
-      for (int i=0; i < input.size(); i++) {
-        Object o = input.get(i);
-        if (!(o instanceof DataBag)) {
-          throw new RuntimeException("Expected a TUPLE of BAGs as input");
-        }
-
-        DataBag inputBag = (DataBag) o;
-        for (Tuple elem : inputBag) {
-          output.add(elem);
-        }
-      }
-    } else {
-      // bag of bags
-      DataBag outerBag = (DataBag)input.get(0);
-      for (Tuple outerTuple : outerBag) {
-        DataBag innerBag = (DataBag)outerTuple.get(0);
-        for (Tuple innerTuple : innerBag) {
-          output.add(innerTuple);
-        }
-      }
-    }
-    return output;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      Schema outputBagTupleSchema = null;
-      
-      if (input.size() == 0) {
-        throw new RuntimeException("Expected input tuple to contain fields. Got none.");
-      }
-      // determine whether the input is a tuple of bags or a bag of bags      
-      if (input.size() != 1) {
-        // tuple of bags
-        
-        // verify that each element in the input is a bag
-        for (FieldSchema fieldSchema : input.getFields()) {
-          if (fieldSchema.type != DataType.BAG) {
-            throwBadTypeError("Expected a TUPLE of BAGs as input.  Got instead: %s in schema: %s", fieldSchema.type, input);
-          }
-          if (fieldSchema.schema == null) {
-            throwBadSchemaError(fieldSchema.alias, input);
-          }
-        }
-        
-        outputBagTupleSchema = input.getField(0).schema;                
-      } else {
-        // bag of bags
-        
-        // should only be a single element in the input and it should be a bag
-        FieldSchema fieldSchema = input.getField(0); 
-        if (fieldSchema.type != DataType.BAG) {
-          throwBadTypeError("Expected a BAG of BAGs as input.  Got instead: %s in schema: %s", fieldSchema.type, input);
-        }
-        
-        // take the tuple schema from this outer bag
-        Schema outerBagTuple = input.getField(0).schema;      
-        // verify that this tuple contains only a bag for each element
-        Schema outerBagSchema = outerBagTuple.getField(0).schema;      
-        if (outerBagSchema.size() != 1) {
-          throw new RuntimeException(String.format("Expected outer bag to have only a single field.  Got instead: %s", 
-                                                   outerBagSchema.prettyPrint()));
-        }
-        FieldSchema outerBagFieldSchema = outerBagSchema.getField(0); 
-        if (outerBagFieldSchema.type != DataType.BAG) {
-          throwBadTypeError("Expected a BAG of BAGs as input.  Got instead: %s", outerBagFieldSchema.type, outerBagTuple); 
-        }
-        
-        // take the schema of the inner tuple as the schema for the tuple in the output bag
-        FieldSchema innerTupleSchema = outerBagSchema.getField(0);        
-        if (innerTupleSchema.schema == null) {
-          throwBadSchemaError(innerTupleSchema.alias, outerBagSchema);
-        }
-        outputBagTupleSchema = innerTupleSchema.schema;
-      }    
-      
-      // return the correct schema
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
-                                                      outputBagTupleSchema, DataType.BAG));      
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  private void throwBadSchemaError(String alias, Schema schema) throws FrontendException
-  {    
-    throw new FrontendException(String.format("Expected non-null schema for all bags. Got null on field %s, in: %s", 
-                                              alias, schema.prettyPrint()));    
-  }
-
-  private void throwBadTypeError(String expectedMessage, byte actualType, Schema schema) throws FrontendException
-  {
-    throw new FrontendException(String.format(expectedMessage, DataType.findTypeName(actualType), schema.prettyPrint()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagGroup.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagGroup.java b/src/java/datafu/pig/bags/BagGroup.java
deleted file mode 100644
index 7b6c2e2..0000000
--- a/src/java/datafu/pig/bags/BagGroup.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.AliasableEvalFunc;
-
-/**
- * Performs an in-memory group operation on a bag.  The first argument is the bag.
- * The second argument is a projection of that bag to the group keys.
- *
- * <p>
- * Example:
- * <code>
- * define BagGroup datafu.pig.bags.BagGroup();
- * 
- * data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)});
- * -- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)})
- * 
- * data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped;
- * -- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
- * -- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})})
- * </code>
- * </p>
- * 
- * @author wvaughan
- *
- */
-public class BagGroup extends AliasableEvalFunc<DataBag>
-{
-  private final String FIELD_NAMES_PROPERTY = "FIELD_NAMES";
-  private List<String> fieldNames;
-  
-  @Override
-  public Schema getOutputSchema(Schema input)
-  {
-    try {
-      if (input.size() != 2) {
-        throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %d field.", input.size()));
-      }      
-      // Expect the first field to be a bag
-      FieldSchema bagFieldSchema = input.getField(0);
-      if (bagFieldSchema.type != DataType.BAG) {
-        throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as first field.", DataType.findTypeName(bagFieldSchema.type)));
-      }
-      // Expect the second fields to be a projection of the bag
-      FieldSchema projectedBagFieldSchema = input.getField(1);
-      if (projectedBagFieldSchema.type != DataType.BAG) {
-        throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as second field.", DataType.findTypeName(projectedBagFieldSchema.type)));
-      }
-      
-      String bagName = bagFieldSchema.alias;
-      // handle named tuples
-      if (bagFieldSchema.schema.size() == 1) {
-        FieldSchema bagTupleFieldSchema = bagFieldSchema.schema.getField(0);
-        if (bagTupleFieldSchema.type == DataType.TUPLE && bagTupleFieldSchema.alias != null) {
-          bagName = getPrefixedAliasName(bagName, bagTupleFieldSchema.alias);
-        }
-      }      
-      if (projectedBagFieldSchema.schema.size() == 1) {
-        FieldSchema projectedBagTupleFieldSchema = projectedBagFieldSchema.schema.getField(0);
-        if (projectedBagTupleFieldSchema.type == DataType.TUPLE && projectedBagTupleFieldSchema.schema != null) {
-          projectedBagFieldSchema = projectedBagTupleFieldSchema;
-        }
-      }
-      
-      // create the output schema for the 'group'
-      // store the field names for the group keys
-      Schema groupTupleSchema = new Schema();
-      fieldNames = new ArrayList<String>(projectedBagFieldSchema.schema.size());
-      for (int i=0; i<projectedBagFieldSchema.schema.size(); i++) {
-        FieldSchema fieldSchema = projectedBagFieldSchema.schema.getField(i);
-        String fieldName = fieldSchema.alias;
-        fieldNames.add(getPrefixedAliasName(bagName, fieldName));
-        groupTupleSchema.add(new FieldSchema(fieldSchema.alias, fieldSchema.type));
-      }
-      getInstanceProperties().put(FIELD_NAMES_PROPERTY, fieldNames);
-      
-      Schema outputTupleSchema = new Schema();
-      if (projectedBagFieldSchema.schema.size() > 1) {
-        // multiple group keys
-        outputTupleSchema.add(new FieldSchema("group", groupTupleSchema, DataType.TUPLE));
-      } else {
-        // single group key
-        outputTupleSchema.add(new FieldSchema("group", groupTupleSchema.getField(0).type));
-      }
-      outputTupleSchema.add(bagFieldSchema);      
-      
-      return new Schema(new Schema.FieldSchema(
-            getSchemaName(this.getClass().getName().toLowerCase(), input),
-            outputTupleSchema, 
-            DataType.BAG));
-    } catch (FrontendException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  
-  Map<Tuple, List<Tuple>> groups = new HashMap<Tuple, List<Tuple>>();
-  TupleFactory tupleFactory = TupleFactory.getInstance();
-  BagFactory bagFactory = BagFactory.getInstance();
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    fieldNames = (List<String>)getInstanceProperties().get(FIELD_NAMES_PROPERTY);
-    
-    DataBag inputBag = (DataBag)input.get(0);    
-    
-    for (Tuple tuple : inputBag) {
-      Tuple key = extractKey(tuple);
-      addGroup(key, tuple);
-    }
-    
-    DataBag outputBag = bagFactory.newDefaultBag();
-    for (Tuple key : groups.keySet()) {
-      Tuple outputTuple = tupleFactory.newTuple();
-      if (fieldNames.size() > 1) {
-        outputTuple.append(key);
-      } else {        
-        outputTuple.append(key.get(0));
-      }
-      DataBag groupBag = bagFactory.newDefaultBag();
-      for (Tuple groupedTuple : groups.get(key)) {
-        groupBag.add(groupedTuple);
-      }
-      outputTuple.append(groupBag);
-      outputBag.add(outputTuple);
-    }
-    
-    return outputBag;
-  }
-  
-  private Tuple extractKey(Tuple tuple) throws ExecException {
-    Tuple key = tupleFactory.newTuple();
-    for (String field : fieldNames) {
-      key.append(getObject(tuple, field));
-    }
-    return key;
-  }
-  
-  private void addGroup(Tuple key, Tuple value) {
-    if (!groups.containsKey(key)) {
-      groups.put(key, new LinkedList<Tuple>());
-    }
-    groups.get(key).add(value);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagLeftOuterJoin.java b/src/java/datafu/pig/bags/BagLeftOuterJoin.java
deleted file mode 100644
index ba6bc11..0000000
--- a/src/java/datafu/pig/bags/BagLeftOuterJoin.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.AliasableEvalFunc;
-
-/**
- * Performs an in-memory left outer join across multiple bags.
- * 
- * <p>
- * The format for invocation is BagLeftOuterJoin(bag, 'key',....).  
- * This UDF expects that all bags are non-null and that there is a corresponding key for each bag.  
- * The <em>key</em> that is expected is the alias of the key inside of the preceding bag.
- * </p> 
- * 
- * <p>
- * Example:
- * <code>
- * define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();
- * 
- * -- describe data: 
- * -- data: {bag1: {(key1: chararray,value1: chararray)},bag2: {(key2: chararray,value2: int)}} 
- * 
- * bag_joined = FOREACH data GENERATE BagLeftOuterJoin(bag1, 'key1', bag2, 'key2') as joined;
- * 
- * -- describe bag_joined:
- * -- bag_joined: {joined: {(bag1::key1: chararray, bag1::value1: chararray, bag2::key2: chararray, bag2::value2: int)}} 
- * </code>
- * </p>
- * 
- * @author wvaughan
- * 
- */
-public class BagLeftOuterJoin extends AliasableEvalFunc<DataBag>
-{
-
-  private static final String BAG_NAMES_PROPERTY = "BagLeftOuterJoin_BAG_NAMES";
-  private static final String BAG_NAME_TO_JOIN_PREFIX_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_JOIN_PREFIX";
-  private static final String BAG_NAME_TO_SIZE_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_SIZE_PROPERTY";
-  
-  ArrayList<String> bagNames;
-  Map<String, String> bagNameToJoinKeyPrefix;  
-  Map<String, Integer> bagNameToSize;
-  
-  public BagLeftOuterJoin() {
-    
-  }
-  
-  @SuppressWarnings("unchecked")
-  private void retrieveContextValues()
-  {
-    Properties properties = getInstanceProperties();   
-    bagNames = (ArrayList<String>) properties.get(BAG_NAMES_PROPERTY);    
-    bagNameToJoinKeyPrefix = (Map<String, String>) properties.get(BAG_NAME_TO_JOIN_PREFIX_PROPERTY);
-    bagNameToSize = (Map<String, Integer>) properties.get(BAG_NAME_TO_SIZE_PROPERTY);
-  }
-  
-  class JoinCollector
-  {
-    HashMap<Object, List<Tuple>> joinData;
-    
-    public void printJoinData() throws ExecException {
-      printData(joinData);
-    }
-    
-    public void printData(HashMap<Object, List<Tuple>> data) throws ExecException {
-      for (Object o : data.keySet()) {
-        System.out.println(o);
-        for (Tuple t : data.get(o)) {
-          System.out.println("\t"+t.toDelimitedString(", "));
-        }
-      }
-    }
-    
-    public HashMap<Object, List<Tuple>> groupTuples(Iterable<Tuple> tuples, String keyName) throws ExecException {
-      HashMap<Object, List<Tuple>> group = new HashMap<Object, List<Tuple>>();
-      for (Tuple tuple : tuples) {
-        Object key = getObject(tuple, keyName);
-        if (!group.containsKey(key)) {
-          group.put(key, new LinkedList<Tuple>());
-        }
-        group.get(key).add(tuple);
-      }
-      return group;
-    }
-    
-    public HashMap<Object, List<Tuple>> insertNullTuples(HashMap<Object, List<Tuple>> groupedData, int tupleSize) throws ExecException {
-      Tuple nullTuple = TupleFactory.getInstance().newTuple(tupleSize);
-      for (int i=0; i<tupleSize; i++) {
-        nullTuple.set(i, null);
-      }     
-      for (Object key : joinData.keySet()) {
-        if (!groupedData.containsKey(key)) {
-          groupedData.put(key, Collections.singletonList(nullTuple));
-        }
-      }
-      return groupedData;      
-    }
-    
-    public void joinTuples(Object key, List<Tuple> tuples) throws ExecException {
-      List<Tuple> currentTuples = joinData.get(key);
-      if (currentTuples != null) {
-        List<Tuple> newTuples = new LinkedList<Tuple>();
-        if (tuples != null) {
-          for (Tuple t1 : currentTuples) {
-            for (Tuple t2 : tuples) {
-              Tuple t = TupleFactory.getInstance().newTuple();
-              for (Object o : t1.getAll()) {
-                t.append(o);
-              }
-              for (Object o : t2.getAll()) {
-                t.append(o);
-              }
-              newTuples.add(t);
-            }
-          }
-        }
-        joinData.put(key, newTuples);
-      }      
-    }    
-    
-    public HashMap<Object, List<Tuple>> getJoinData() {
-      return this.joinData;
-    }
-    
-    public void setJoinData(HashMap<Object, List<Tuple>> joinData) {
-      this.joinData = joinData;
-    }
-  }
-
-  @Override
-  public DataBag exec(Tuple input) throws IOException
-  {
-    retrieveContextValues();
-
-    ArrayList<String> joinKeyNames = new ArrayList<String>();
-    for (int i = 1; i < input.size(); i += 2) {
-      joinKeyNames.add((String) input.get(i));
-    }
-    
-    JoinCollector collector = new JoinCollector();
-    // the first bag is the outer bag
-    String leftBagName = bagNames.get(0);
-    DataBag leftBag = getBag(input, leftBagName);
-    String leftBagJoinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(leftBagName), joinKeyNames.get(0));    
-    collector.setJoinData(collector.groupTuples(leftBag, leftBagJoinKeyName));
-    // now, for each additional bag, group up the tuples by the join key, then join them in
-    if (bagNames.size() > 1) {
-      for (int i = 1; i < bagNames.size(); i++) {
-        String bagName = bagNames.get(i);
-        DataBag bag = getBag(input, bagName);
-        String joinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(bagName), joinKeyNames.get(i));
-        int tupleSize = bagNameToSize.get(bagName);
-        if (bag == null) throw new IOException("Error in instance: "+getInstanceName()
-                + " with properties: " + getInstanceProperties()
-                + " and tuple: " + input.toDelimitedString(", ")
-                + " -- Expected bag, got null");
-        HashMap<Object, List<Tuple>> groupedData = collector.groupTuples(bag, joinKeyName);
-        // outer join, so go back in and add nulls;
-        groupedData = collector.insertNullTuples(groupedData, tupleSize);
-        for (Map.Entry<Object, List<Tuple>> entry : groupedData.entrySet()) {
-          collector.joinTuples(entry.getKey(), entry.getValue());          
-        }
-      }      
-    }
-    
-    // assemble output bag
-    DataBag outputBag = BagFactory.getInstance().newDefaultBag();
-    for (List<Tuple> tuples : collector.getJoinData().values()) {
-      for (Tuple tuple : tuples) {
-        outputBag.add(tuple);
-      }
-    }
-
-    return outputBag;
-  }
-
-  @Override
-  public Schema getOutputSchema(Schema input)
-  {
-    ArrayList<String> bagNames = new ArrayList<String>(input.size() / 2);
-    Map<String, String> bagNameToJoinPrefix = new HashMap<String, String>(input.size() / 2);
-    Map<String, Integer> bagNameToSize = new HashMap<String, Integer>(input.size() / 2);
-    Schema outputSchema = null;
-    Schema bagSchema = new Schema();
-    try {
-      int i = 0;
-      // all even fields should be bags, odd fields are key names
-      String bagName = null;
-      String tupleName = null;
-      for (FieldSchema outerField : input.getFields()) {
-        if (i++ % 2 == 1)
-          continue;
-        bagName = outerField.alias;
-        bagNames.add(bagName);
-        if (bagName == null)
-          bagName = "null";
-        if (outerField.schema == null)
-          throw new RuntimeException("Expected input format of (bag, 'field') pairs. "
-              +"Did not receive a bag at index: "+i+", alias: "+bagName+". "
-              +"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString());
-        FieldSchema tupleField = outerField.schema.getField(0);
-        tupleName = tupleField.alias;
-        bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName));
-        if (tupleField.schema == null) {
-          log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName, bagName));
-        } else {          
-          bagNameToSize.put(bagName, tupleField.schema.size());
-          for (FieldSchema innerField : tupleField.schema.getFields()) {
-            String innerFieldName = innerField.alias;
-            if (innerFieldName == null)
-              innerFieldName = "null";
-            String outputFieldName = bagName + "::" + innerFieldName;
-            bagSchema.add(new FieldSchema(outputFieldName, innerField.type));
-          }
-        }
-      }
-      outputSchema = new Schema(new Schema.FieldSchema("joined", bagSchema, DataType.BAG));
-      log.debug("output schema: "+outputSchema.toString());
-    } catch (FrontendException e) {
-      e.printStackTrace();
-      throw new RuntimeException(e);
-    }
-    Properties properties = getInstanceProperties();
-    properties.put(BAG_NAMES_PROPERTY, bagNames);
-    properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix);
-    properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize);
-    return outputSchema;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagSplit.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagSplit.java b/src/java/datafu/pig/bags/BagSplit.java
deleted file mode 100644
index 74d39d0..0000000
--- a/src/java/datafu/pig/bags/BagSplit.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- 
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Splits a bag of tuples into a bag of bags, where the inner bags collectively contain
- * the tuples from the original bag.  This can be used to split a bag into a set of smaller bags.
- * <p>
- * Example:
- * <pre>
- * {@code
- * define BagSplit datafu.pig.bags.BagSplit();
- * 
- * -- input:
- * -- ({(1),(2),(3),(4),(5),(6),(7)})
- * -- ({(1),(2),(3),(4),(5)})
- * -- ({(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11)})
- * input = LOAD 'input' AS (B:bag{T:tuple(val1:INT,val2:INT)});
- * 
- * -- ouput:
- * -- ({{(1),(2),(3),(4),(5)},{(6),(7)}})
- * -- ({{(1),(2),(3),(4),(5)},{(6),(7),(8),(9),(10)},{(11)}})
- * output = FOREACH input GENERATE BagSplit(5,B);
- * }
- * </pre>
- */
-public class BagSplit extends EvalFunc<DataBag>
-{
-  private static final BagFactory bagFactory = BagFactory.getInstance();
-  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-    
-  private final boolean appendBagNum;
-  
-  public BagSplit()
-  {    
-    this.appendBagNum = false;
-  }
-  
-  public BagSplit(String appendBagNum)
-  {
-    this.appendBagNum = Boolean.parseBoolean(appendBagNum);
-  }
-  
-  @Override
-  public DataBag exec(Tuple arg0) throws IOException
-  { 
-    DataBag outputBag = bagFactory.newDefaultBag();
-    
-    Integer maxSize = (Integer)arg0.get(0);
-    
-    Object o = arg0.get(1);
-    if (!(o instanceof DataBag))
-      throw new RuntimeException("parameter must be a databag");
-    
-    DataBag inputBag = (DataBag)o;
-    
-    DataBag currentBag = null;
-    
-    int count = 0;
-    int numBags = 0;
-    for (Tuple tuple : inputBag)
-    {
-      if (currentBag == null)
-      {
-        currentBag = bagFactory.newDefaultBag();
-      }
-      
-      currentBag.add(tuple);
-      count++;
-      
-      if (count >= maxSize)
-      {
-        Tuple newTuple = tupleFactory.newTuple();
-        newTuple.append(currentBag);
-        
-        if (this.appendBagNum)
-        {
-          newTuple.append(numBags);
-        }
-        
-        numBags++;
-        
-        outputBag.add(newTuple);
-        
-        count = 0;
-        currentBag = null;
-      }
-    }
-    
-    if (currentBag != null)
-    {
-      Tuple newTuple = tupleFactory.newTuple();
-      newTuple.append(currentBag);
-      if (this.appendBagNum)
-      {
-        newTuple.append(numBags);
-      }
-      outputBag.add(newTuple);
-    }
-        
-    return outputBag;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input)
-  {
-    try {
-      if (input.getField(0).type != DataType.INTEGER)
-      {
-        throw new RuntimeException("Expected first argument to be an INTEGER");
-      }
-      
-      if (input.getField(1).type != DataType.BAG)
-      {
-        throw new RuntimeException("Expected second argument to be a BAG");
-      }
-      
-      Schema tupleSchema = new Schema();
-      tupleSchema.add(new Schema.FieldSchema("data", input.getField(1).schema.clone(), DataType.BAG));
-      
-      if (this.appendBagNum)
-      {
-        tupleSchema.add(new Schema.FieldSchema("index", DataType.INTEGER));
-      }
-      
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
-                                               tupleSchema, DataType.BAG));
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}


Mime
View raw message