http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java b/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java
new file mode 100644
index 0000000..5ebe778
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.urls;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class UserAgentTest extends PigTests
+{
+
+ /**
+
+
+ define UserAgentClassify datafu.pig.urls.UserAgentClassify();
+
+ data = load 'input' as (usr_agent:chararray);
+ data_out = foreach data generate UserAgentClassify(usr_agent) as class;
+ --describe data_out;
+ store data_out into 'output';
+ */
+ @Multiline private String userAgentTest;
+
+ @Test
+ public void userAgentTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(userAgentTest);
+
+ String[] input = {
+ "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
+ "Mozilla/5.0 (compatible; Konqueror/3.5; Linux; X11; de) KHTML/3.5.2 (like Gecko) Kubuntu 6.06 Dapper",
+ "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:2.2a1pre) Gecko/20110331 Firefox/4.2a1pre Fennec/4.1a1pre",
+ "Opera/9.00 (X11; Linux i686; U; en)",
+ "Wget/1.10.2",
+ "Opera/9.80 (Android; Linux; Opera Mobi/ADR-1012221546; U; pl) Presto/2.7.60 Version/10.5",
+ "Mozilla/5.0 (Linux; U; Android 2.2; en-us; DROID2 Build/VZW) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1"
+ };
+
+ String[] output = {
+ "(mobile)",
+ "(desktop)",
+ "(mobile)",
+ "(desktop)",
+ "(desktop)",
+ "(mobile)",
+ "(mobile)",
+ };
+
+ test.assertOutput("data",input,"data_out",output);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java b/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java
new file mode 100644
index 0000000..3b23d35
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import static org.testng.Assert.*;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.io.IOException;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+import datafu.pig.util.AliasableEvalFunc;
+
+public class AliasEvalFuncTest extends PigTests
+{
+ static class ReportBuilder extends AliasableEvalFunc<DataBag> {
+ static final String ORDERED_ROUTES = "orderedRoutes";
+
+ public DataBag exec(Tuple input) throws IOException {
+ DataBag inputBag = getBag(input, ORDERED_ROUTES);
+ DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+ for(Iterator<Tuple> tupleIter = inputBag.iterator(); tupleIter.hasNext(); ) {
+ outputBag.add(tupleIter.next());
+ }
+ return outputBag;
+ }
+
+ public Schema getOutputSchema(Schema input) {
+ try {
+ Schema bagSchema = input.getField(0).schema;
+ Schema outputSchema = new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
+ .getName()
+ .toLowerCase(), input),
+ bagSchema,
+ DataType.BAG));
+ return outputSchema;
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+ }
+
+ @Test
+ public void getBagTest() throws Exception
+ {
+ ReportBuilder udf = new ReportBuilder();
+ udf.setUDFContextSignature("test");
+ List<Schema.FieldSchema> fieldSchemaList = new ArrayList<Schema.FieldSchema>();
+ fieldSchemaList.add(new Schema.FieldSchema("msisdn", DataType.LONG));
+ fieldSchemaList.add(new Schema.FieldSchema("ts", DataType.INTEGER));
+ fieldSchemaList.add(new Schema.FieldSchema("center_lon", DataType.DOUBLE));
+ fieldSchemaList.add(new Schema.FieldSchema("center_lat", DataType.DOUBLE));
+ Schema schemaTuple = new Schema(fieldSchemaList);
+ Schema schemaBag = new Schema(new Schema.FieldSchema(ReportBuilder.ORDERED_ROUTES, schemaTuple, DataType.BAG));
+ udf.outputSchema(schemaBag);
+
+ Tuple inputTuple = TupleFactory.getInstance().newTuple();
+ DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+ inputBag.add(TupleFactory.getInstance().newTuple(Arrays.asList(71230000000L, 1382351612, 10.697, 20.713)));
+ inputTuple.append(inputBag);
+ DataBag outputBag = udf.exec(inputTuple);
+ Assert.assertEquals(inputBag, outputBag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java
new file mode 100644
index 0000000..1a6a741
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import static org.testng.Assert.*;
+
+import java.util.List;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class AssertTests extends PigTests
+{
+ /**
+
+
+ define ASRT datafu.pig.util.AssertUDF();
+
+ data = LOAD 'input' AS (val:INT);
+
+ data2 = FILTER data BY ASRT(val,'assertion appears to have failed, doh!');
+
+ STORE data2 INTO 'output';
+ */
+ @Multiline private static String assertWithMessage;
+
+ @Test
+ public void shouldAssertWithMessageOnZero() throws Exception
+ {
+ try
+ {
+ PigTest test = createPigTestFromString(assertWithMessage);
+
+ this.writeLinesToFile("input", "0");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+
+ fail("test should have failed, but it didn't");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
+ @Test
+ public void shouldNotAssertWithMessageOnOne() throws Exception
+ {
+ PigTest test = createPigTestFromString(assertWithMessage);
+
+ this.writeLinesToFile("input", "1");
+
+ test.runScript();
+
+ List<Tuple> result = this.getLinesForAlias(test, "data2");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).size(), 1);
+ Assert.assertEquals(result.get(0).get(0), 1);
+ }
+
+ /**
+
+
+ define ASRT datafu.pig.util.AssertUDF();
+
+ data = LOAD 'input' AS (val:INT);
+
+ data2 = FILTER data BY ASRT(val);
+
+ STORE data2 INTO 'output';
+ */
+ @Multiline private static String assertWithoutMessage;
+
+ @Test
+ public void shouldAssertWithoutMessageOnZero() throws Exception
+ {
+ try
+ {
+ PigTest test = createPigTestFromString(assertWithoutMessage);
+
+ this.writeLinesToFile("input", "0");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data2");
+
+ fail("test should have failed, but it didn't");
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
+ @Test
+ public void shouldNotAssertWithoutMessageOnOne() throws Exception
+ {
+ PigTest test = createPigTestFromString(assertWithoutMessage);
+
+ this.writeLinesToFile("input", "1");
+
+ test.runScript();
+
+ List<Tuple> result = this.getLinesForAlias(test, "data2");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0).size(), 1);
+ Assert.assertEquals(result.get(0).get(0), 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java
new file mode 100644
index 0000000..3baf251
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.pigunit.PigTest;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class CoalesceTests extends PigTests
+{
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2,val3) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String coalesceIntTest;
+
+ @Test
+ public void coalesceIntTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceIntTest);
+
+ this.writeLinesToFile("input", "1,1,2,3",
+ "2,,2,3",
+ "3,,,3",
+ "4,,,",
+ "5,1,,3",
+ "6,1,,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data3");
+
+ Assert.assertEquals(6, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(1, t.get(1)); break;
+ case 2:
+ Assert.assertEquals(2, t.get(1)); break;
+ case 3:
+ Assert.assertEquals(3, t.get(1)); break;
+ case 4:
+ Assert.assertEquals(null, t.get(1)); break;
+ case 5:
+ Assert.assertEquals(1, t.get(1)); break;
+ case 6:
+ Assert.assertEquals(1, t.get(1)); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+
+ STORE data4 INTO 'output';
+ */
+ @Multiline private static String coalesceLongTest;
+
+ @Test
+ public void coalesceLongTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceLongTest);
+
+ this.writeLinesToFile("input", "1,5",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data4");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(500L, t.get(1)); break;
+ case 2:
+ Assert.assertEquals(10000L, t.get(1)); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,100) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+
+ STORE data4 INTO 'output';
+ */
+ @Multiline private static String coalesceCastIntToLongTestFails;
+
+ // The first parameter is a long and the fixed value is an int.
+ // They cannot be merged without the lazy option.
+ @Test(expectedExceptions=FrontendException.class)
+ public void coalesceCastIntToLongTestFails() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceCastIntToLongTestFails);
+
+ this.writeLinesToFile("input", "1,5",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data4");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(500L, t.get(1)); break;
+ case 2:
+ Assert.assertEquals(10000L, t.get(1)); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce('lazy');
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,100) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+
+ STORE data4 INTO 'output';
+ */
+ @Multiline private static String coalesceIntAndLongTest;
+
+ // The first parameter is a long and the fixed value is an int.
+ // They are merged to a long.
+ @Test
+ public void coalesceCastIntToLongTest1() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceIntAndLongTest);
+
+ this.writeLinesToFile("input", "1,5",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data4");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(500L, t.get(1)); break;
+ case 2:
+ Assert.assertEquals(10000L, t.get(1)); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce('lazy');
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+
+ STORE data4 INTO 'output';
+ */
+ @Multiline private static String coalesceIntAndLongTest2;
+
+ // The first parameter is an int, but the fixed parameter is a long.
+ // They are merged to a long.
+ @Test
+ public void coalesceCastIntToLongTest2() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceIntAndLongTest2);
+
+ this.writeLinesToFile("input", "1,5",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data4");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(500L, t.get(1)); break;
+ case 2:
+ Assert.assertEquals(10000L, t.get(1)); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce('lazy');
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,100.0) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ data4 = FOREACH data3 GENERATE testcase, result*100 as result;
+
+ STORE data4 INTO 'output';
+ */
+ @Multiline private static String coalesceIntAndDoubleTest;
+
+ // The first parameter is an int, but the fixed parameter is a long.
+ // They are merged to a long.
+ @Test
+ public void coalesceCastIntToDoubleTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceIntAndDoubleTest);
+
+ this.writeLinesToFile("input", "1,5",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data4");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(500.0, t.get(1)); break;
+ case 2:
+ Assert.assertEquals(10000.0, t.get(1)); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+
+ data = FOREACH data GENERATE testcase, (val1 IS NOT NULL ? ToDate(val1) : (datetime)null) as val1;
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,ToDate('1970-01-01T00:00:00.000Z')) as result;
+
+ --describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String coalesceCastIntToDatetimeTest;
+
+ @Test
+ public void coalesceCastIntToDatetimeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceCastIntToDatetimeTest);
+
+ this.writeLinesToFile("input", "1,1375826183000",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data3");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ Integer testcase = (Integer)t.get(0);
+ Assert.assertNotNull(testcase);
+ switch(testcase)
+ {
+ case 1:
+ Assert.assertEquals("2013-08-06T21:56:23.000Z", ((DateTime)t.get(1)).toDateTime(DateTimeZone.UTC).toString()); break;
+ case 2:
+ Assert.assertEquals("1970-01-01T00:00:00.000Z", t.get(1).toString()); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce('lazy');
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG);
+
+ data = FOREACH data GENERATE testcase, (val1 IS NOT NULL ? ToDate(val1) : (datetime)null) as val1;
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,ToDate('1970-01-01T00:00:00.000Z')) as result;
+
+ --describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String coalesceCastIntToDatetimeLazyTest;
+
+ @Test
+ public void coalesceCastIntToDatetimeLazyTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceCastIntToDatetimeLazyTest);
+
+ this.writeLinesToFile("input", "1,1375826183000",
+ "2,");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data3");
+
+ Assert.assertEquals(2, lines.size());
+ for (Tuple t : lines)
+ {
+ Integer testcase = (Integer)t.get(0);
+ Assert.assertNotNull(testcase);
+ switch(testcase)
+ {
+ case 1:
+ Assert.assertEquals("2013-08-06T21:56:23.000Z", ((DateTime)t.get(1)).toDateTime(DateTimeZone.UTC).toString()); break;
+ case 2:
+ Assert.assertEquals("1970-01-01T00:00:00.000Z", t.get(1).toString()); break;
+ default:
+ Assert.fail("Did not expect: " + t.get(1));
+ }
+ }
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:LONG);
+
+ data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2) as result;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, result;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String coalesceBagIncompatibleTypeTest;
+
+ @Test(expectedExceptions=FrontendException.class)
+ public void coalesceBagIncompatibleTypeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(coalesceBagIncompatibleTypeTest);
+
+ this.writeLinesToFile("input", "1,1,2L}");
+
+ test.runScript();
+
+ this.getLinesForAlias(test, "data3");
+ }
+
+ /**
+
+
+ define COALESCE datafu.pig.util.Coalesce('lazy');
+ define EmptyBagToNullFields datafu.pig.bags.EmptyBagToNullFields();
+
+ input1 = LOAD 'input1' using PigStorage(',') AS (val1:INT,val2:INT);
+ input2 = LOAD 'input2' using PigStorage(',') AS (val1:INT,val2:INT);
+ input3 = LOAD 'input3' using PigStorage(',') AS (val1:INT,val2:INT);
+
+ data4 = COGROUP input1 BY val1,
+ input2 BY val1,
+ input3 BY val1;
+
+ dump data4;
+
+ data4 = FOREACH data4 GENERATE
+ FLATTEN(input1),
+ FLATTEN(EmptyBagToNullFields(input2)),
+ FLATTEN(EmptyBagToNullFields(input3));
+
+ dump data4;
+
+ describe data4;
+
+ data5 = FOREACH data4 GENERATE input1::val1 as val1, COALESCE(input2::val2,0L) as val2, COALESCE(input3::val2,0L) as val3;
+
+ --describe data5;
+
+ STORE data5 INTO 'output';
+ */
+ @Multiline private static String leftJoinTest;
+
+ @Test
+ public void leftJoinTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(leftJoinTest);
+
+ this.writeLinesToFile("input1", "1,1",
+ "2,2",
+ "5,5");
+
+ this.writeLinesToFile("input2", "1,10",
+ "3,30",
+ "5,50");
+
+ this.writeLinesToFile("input3", "2,100",
+ "5,500");
+
+ test.runScript();
+
+ List<Tuple> lines = this.getLinesForAlias(test, "data5");
+
+ Assert.assertEquals(3, lines.size());
+ for (Tuple t : lines)
+ {
+ switch((Integer)t.get(0))
+ {
+ case 1:
+ Assert.assertEquals(10L, t.get(1));
+ Assert.assertEquals(0L, t.get(2));
+ break;
+ case 2:
+ Assert.assertEquals(0L, t.get(1));
+ Assert.assertEquals(100L, t.get(2));
+ break;
+ case 5:
+ Assert.assertEquals(50L, t.get(1));
+ Assert.assertEquals(500L, t.get(2));
+ break;
+ default:
+ Assert.fail("Did not expect: " + t.get(0));
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java
new file mode 100644
index 0000000..591ee82
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class InTests extends PigTests
+{
+ /**
+
+
+ define I datafu.pig.util.InUDF();
+
+ data = LOAD 'input' AS (B: bag {T: tuple(v:INT)});
+
+ data2 = FOREACH data {
+ C = FILTER B By I(v, 1,2,3);
+ GENERATE C;
+ }
+
+ describe data2;
+
+ STORE data2 INTO 'output';
+ */
+ @Multiline private static String inIntTest;
+
+ @Test
+ public void inIntTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(inIntTest);
+
+ writeLinesToFile("input",
+ "({(1),(2),(3),(4),(5)})",
+ "({(1),(2)})",
+ "({(4),(5)})");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "({(1),(2),(3)})",
+ "({(1),(2)})",
+ "({})");
+ }
+
+ /**
+
+
+ define I datafu.pig.util.InUDF();
+
+ data = LOAD 'input' AS (B: bag {T: tuple(v:chararray)});
+
+ data2 = FOREACH data {
+ C = FILTER B By I(v, 'will','matt','sam');
+ GENERATE C;
+ }
+
+ describe data2;
+
+ STORE data2 INTO 'output';
+ */
+ @Multiline private static String inStringTest;
+
+ @Test
+ public void inStringTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(inStringTest);
+
+ writeLinesToFile("input",
+ "({(alice),(bob),(will),(matt),(sam)})",
+ "({(will),(matt)})",
+ "({(alice),(bob)})");
+
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "({(will),(matt),(sam)})",
+ "({(will),(matt)})",
+ "({})");
+ }
+
+ /**
+
+
+ define I datafu.pig.util.InUDF();
+
+ data = LOAD 'input' AS (owner:chararray, color:chararray);
+ describe data;
+ data2 = FILTER data BY I(color, 'red','blue');
+ describe data2;
+ STORE data2 INTO 'output';
+ */
+ @Multiline private static String inTopLevelTest;
+
+ @Test
+ public void inTopLevelTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(inTopLevelTest);
+
+ writeLinesToFile("input",
+ "alice\tred",
+ "bob\tblue",
+ "charlie\tgreen",
+ "dave\tred");
+ test.runScript();
+
+ assertOutput(test, "data2",
+ "(alice,red)",
+ "(bob,blue)",
+ "(dave,red)");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java
new file mode 100644
index 0000000..465e8b2
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class IntBoolConversionPigTests extends PigTests
+{
+ /**
+
+
+ define IntToBool datafu.pig.util.IntToBool();
+
+ data = LOAD 'input' AS (val:INT);
+
+ data2 = FOREACH data GENERATE IntToBool(val);
+
+ STORE data2 INTO 'output';
+ */
+ @Multiline private static String intToBoolTest;
+
+ @Test
+ public void intToBoolTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(intToBoolTest);
+
+ String[] input = {
+ "", // null
+ "0",
+ "1"
+ };
+
+ String[] output = {
+ "(false)",
+ "(false)",
+ "(true)"
+ };
+
+ test.assertOutput("data",input,"data2",output);
+ }
+
+ /**
+
+
+ define IntToBool datafu.pig.util.IntToBool();
+ define BoolToInt datafu.pig.util.BoolToInt();
+
+ data = LOAD 'input' AS (val:INT);
+
+ data2 = FOREACH data GENERATE IntToBool(val) as val;
+ data3 = FOREACH data2 GENERATE BoolToInt(val) as val;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String intToBoolToIntTest;
+
+ @Test
+ public void intToBoolToIntTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(intToBoolToIntTest);
+
+ String[] input = {
+ "", // null
+ "0",
+ "1",
+ "2",
+ "-1",
+ "-2",
+ "0",
+ ""
+ };
+
+ String[] output = {
+ "(0)",
+ "(0)",
+ "(1)",
+ "(1)",
+ "(1)",
+ "(1)",
+ "(0)",
+ "(0)"
+ };
+
+ test.assertOutput("data",input,"data3",output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java b/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java
new file mode 100644
index 0000000..3d7654a
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import java.util.List;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class TransposeTest extends PigTests
+{
+ /**
+
+
+ define Transpose datafu.pig.util.TransposeTupleToBag();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT);
+
+ data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, transposed;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String transposeTest;
+
+ @Test
+ public void transposeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(transposeTest);
+ writeLinesToFile("input", "1,10,11,12",
+ "2,20,21,22");
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data3");
+ for (Tuple tuple : output) {
+ int testCase = (Integer)tuple.get(0);
+ DataBag bag = (DataBag)tuple.get(1);
+ Assert.assertEquals(bag.size(), 3);
+ int i=0;
+ for (Tuple t : bag) {
+ String expectedKey = String.format("val%d",i+1);
+ Assert.assertEquals((String)t.get(0), expectedKey);
+ int actualValue = (Integer)t.get(1);
+ Assert.assertEquals(actualValue, testCase*10+i);
+ i++;
+ }
+ }
+ }
+
+ /**
+
+
+ define Transpose datafu.pig.util.TransposeTupleToBag();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:DOUBLE);
+
+ data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, transposed;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String transposeBadTypeTest;
+
+ @Test(expectedExceptions={org.apache.pig.impl.logicalLayer.FrontendException.class})
+ public void transposeBadTypeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(transposeBadTypeTest);
+ writeLinesToFile("input", "1,10,11,12.0",
+ "2,20,21,22.0");
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data3");
+ for (Tuple tuple : output) {
+ int testCase = (Integer)tuple.get(0);
+ DataBag bag = (DataBag)tuple.get(1);
+ Assert.assertEquals(bag.size(), 3);
+ int i=0;
+ for (Tuple t : bag) {
+ String expectedKey = String.format("val%d",i+1);
+ Assert.assertEquals((String)t.get(0), expectedKey);
+ int actualValue = (Integer)t.get(1);
+ Assert.assertEquals(actualValue, testCase*10+i);
+ i++;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..98e86a2
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.datafu
+version=1.2.1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/buildscript.gradle
----------------------------------------------------------------------
diff --git a/gradle/buildscript.gradle b/gradle/buildscript.gradle
new file mode 100644
index 0000000..225e0a8
--- /dev/null
+++ b/gradle/buildscript.gradle
@@ -0,0 +1,12 @@
+repositories {
+ repositories {
+ // For license plugin.
+ maven {
+ url 'http://dl.bintray.com/content/netflixoss/external-gradle-plugins/'
+ }
+ }
+}
+
+dependencies {
+ classpath 'nl.javadude.gradle.plugins:license-gradle-plugin:0.6.1'
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
new file mode 100644
index 0000000..29d3e36
--- /dev/null
+++ b/gradle/dependency-versions.gradle
@@ -0,0 +1,20 @@
+ext {
+ antlrVersion="3.2"
+ avroVersion="1.5.3"
+ streamVersion="2.5.0"
+ commonsMathVersion="2.2"
+ commonsIoVersion="1.4"
+ fastutilVersion="6.5.7"
+ guavaVersion="11.0"
+ hadoopVersion="0.20.2"
+ jodaTimeVersion="1.6"
+ log4jVersion="1.2.14"
+ mavenVersion="2.1.3"
+ jlineVersion="0.9.94"
+ pigVersion="0.11.1"
+ testngVersion="6.2"
+ toolsVersion="1.4.2"
+ wagonHttpVersion="1.0-beta-2"
+ openNlpVersion="1.5.3"
+ openNlpMaxEntVersion="3.0.3"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/wrapper/gradle-wrapper.jar
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..a7634b0
Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..610282a
--- /dev/null
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Thu Jul 11 22:18:11 PDT 2013
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.6-bin.zip
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradlew
----------------------------------------------------------------------
diff --git a/gradlew b/gradlew
new file mode 100755
index 0000000..16bbbbf
--- /dev/null
+++ b/gradlew
@@ -0,0 +1,164 @@
+#!/usr/bin/env bash
+
+##############################################################################
+##
+## Gradle start up script for UN*X
+##
+##############################################################################
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS="-XX:MaxPermSize=512m"
+
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD="maximum"
+
+warn ( ) {
+ echo "$*"
+}
+
+die ( ) {
+ echo
+ echo "$*"
+ echo
+ exit 1
+}
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+case "`uname`" in
+ CYGWIN* )
+ cygwin=true
+ ;;
+ Darwin* )
+ darwin=true
+ ;;
+ MINGW* )
+ msys=true
+ ;;
+esac
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched.
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+fi
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >&-
+APP_HOME="`pwd -P`"
+cd "$SAVED" >&-
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD="java"
+ which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+ MAX_FD_LIMIT=`ulimit -H -n`
+ if [ $? -eq 0 ] ; then
+ if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
+ MAX_FD="$MAX_FD_LIMIT"
+ fi
+ ulimit -n $MAX_FD
+ if [ $? -ne 0 ] ; then
+ warn "Could not set maximum file descriptor limit: $MAX_FD"
+ fi
+ else
+ warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
+ fi
+fi
+
+# For Darwin, add options to specify how the application appears in the dock
+if $darwin; then
+ GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
+fi
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin ; then
+ APP_HOME=`cygpath --path --mixed "$APP_HOME"`
+ CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
+
+ # We build the pattern for arguments to be converted via cygpath
+ ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
+ SEP=""
+ for dir in $ROOTDIRSRAW ; do
+ ROOTDIRS="$ROOTDIRS$SEP$dir"
+ SEP="|"
+ done
+ OURCYGPATTERN="(^($ROOTDIRS))"
+ # Add a user-defined pattern to the cygpath arguments
+ if [ "$GRADLE_CYGPATTERN" != "" ] ; then
+ OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
+ fi
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ i=0
+ for arg in "$@" ; do
+ CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
+ CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
+
+ if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
+ eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
+ else
+ eval `echo args$i`="\"$arg\""
+ fi
+ i=$((i+1))
+ done
+ case $i in
+ (0) set -- ;;
+ (1) set -- "$args0" ;;
+ (2) set -- "$args0" "$args1" ;;
+ (3) set -- "$args0" "$args1" "$args2" ;;
+ (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
+ (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
+ (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
+ (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
+ (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
+ (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
+ esac
+fi
+
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+ JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java
deleted file mode 100644
index 7dbfe9a..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.util.Set;
-
-import javax.annotation.processing.AbstractProcessor;
-import javax.annotation.processing.ProcessingEnvironment;
-import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedAnnotationTypes;
-import javax.annotation.processing.SupportedSourceVersion;
-import javax.lang.model.SourceVersion;
-import javax.lang.model.element.Element;
-import javax.lang.model.element.TypeElement;
-import javax.lang.model.util.Elements;
-
-import org.eclipse.jdt.internal.compiler.apt.model.VariableElementImpl;
-import org.eclipse.jdt.internal.compiler.ast.FieldDeclaration;
-import org.eclipse.jdt.internal.compiler.ast.StringLiteral;
-import org.eclipse.jdt.internal.compiler.lookup.FieldBinding;
-
-import java.lang.reflect.Constructor;
-
-@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
-public final class EcjMultilineProcessor extends AbstractProcessor {
-
- private Elements elementUtils;
-
- @Override
- public void init(final ProcessingEnvironment procEnv) {
- super.init(procEnv);
- this.elementUtils = procEnv.getElementUtils();
- }
-
- @Override
- public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
- Set<? extends Element> fields = roundEnv.getElementsAnnotatedWith(Multiline.class);
-
- for (Element field : fields) {
- String docComment = elementUtils.getDocComment(field);
-
- if (null != docComment) {
- VariableElementImpl fieldElem = (VariableElementImpl) field;
- FieldBinding biding = (FieldBinding) fieldElem._binding;
- FieldDeclaration decl = biding.sourceField();
- StringLiteral string = new StringLiteral(docComment.toCharArray(), decl.sourceStart, decl.sourceEnd, decl.sourceStart);
- decl.initialization = string;
- }
- }
- return true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java
deleted file mode 100644
index 39aa24e..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.util.Set;
-
-import javax.annotation.processing.AbstractProcessor;
-import javax.annotation.processing.ProcessingEnvironment;
-import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedAnnotationTypes;
-import javax.annotation.processing.SupportedSourceVersion;
-import javax.lang.model.SourceVersion;
-import javax.lang.model.element.Element;
-import javax.lang.model.element.TypeElement;
-
-import com.sun.tools.javac.model.JavacElements;
-import com.sun.tools.javac.processing.JavacProcessingEnvironment;
-import com.sun.tools.javac.tree.JCTree.JCVariableDecl;
-import com.sun.tools.javac.tree.TreeMaker;
-
-@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
-public final class JavacMultilineProcessor extends AbstractProcessor {
-
- private JavacElements elementUtils;
- private TreeMaker maker;
-
- @Override
- public void init(final ProcessingEnvironment procEnv) {
- super.init(procEnv);
- JavacProcessingEnvironment javacProcessingEnv = (JavacProcessingEnvironment) procEnv;
- this.elementUtils = javacProcessingEnv.getElementUtils();
- this.maker = TreeMaker.instance(javacProcessingEnv.getContext());
- }
-
- @Override
- public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
- Set<? extends Element> fields = roundEnv.getElementsAnnotatedWith(Multiline.class);
- for (Element field : fields) {
- String docComment = elementUtils.getDocComment(field);
- if (null != docComment) {
- JCVariableDecl fieldNode = (JCVariableDecl) elementUtils.getTree(field);
- fieldNode.init = maker.Literal(docComment);
- }
- }
- return true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/Multiline.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/Multiline.java b/plugin/java/org/adrianwalker/multilinestring/Multiline.java
deleted file mode 100644
index c29d3ef..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/Multiline.java
+++ /dev/null
@@ -1,14 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Target({ElementType.FIELD,ElementType.LOCAL_VARIABLE})
-@Retention(RetentionPolicy.SOURCE)
-public @interface Multiline {
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java
----------------------------------------------------------------------
diff --git a/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java
deleted file mode 100644
index 9abdba5..0000000
--- a/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Code from https://github.com/benelog/multiline.git
-// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html
-
-package org.adrianwalker.multilinestring;
-
-import java.util.Set;
-
-import javax.annotation.processing.AbstractProcessor;
-import javax.annotation.processing.ProcessingEnvironment;
-import javax.annotation.processing.Processor;
-import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedAnnotationTypes;
-import javax.annotation.processing.SupportedSourceVersion;
-import javax.lang.model.SourceVersion;
-import javax.lang.model.element.TypeElement;
-
-@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
-public final class MultilineProcessor extends AbstractProcessor {
- private Processor delegator = null;
-
- @Override
- public void init(final ProcessingEnvironment procEnv) {
- super.init(procEnv);
- String envClassName = procEnv.getClass().getName();
- if (envClassName.contains("com.sun.tools")) {
- delegator = new JavacMultilineProcessor();
- } else {
- delegator = new EcjMultilineProcessor();
- }
- delegator.init(procEnv);
- }
-
- @Override
- public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
- if (delegator == null ) {
- return true;
- }
- return delegator.process(annotations, roundEnv);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
new file mode 100644
index 0000000..62b1899
--- /dev/null
+++ b/settings.gradle
@@ -0,0 +1 @@
+include "build-plugin","datafu-pig"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/AppendToBag.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/AppendToBag.java b/src/java/datafu/pig/bags/AppendToBag.java
deleted file mode 100644
index 55c9e76..0000000
--- a/src/java/datafu/pig/bags/AppendToBag.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import datafu.pig.util.SimpleEvalFunc;
-
-/**
- * Appends a tuple to a bag.
- * <p>
- * Example:
- * <pre>
- * {@code
- * define AppendToBag datafu.pig.bags.AppendToBag();
- *
- * -- input:
- * -- ({(1),(2),(3)},(4))
- * -- ({(10),(20),(30),(40),(50)},(60))
- * input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-
- * -- output:
- * -- ({(1),(2),(3),(4)})
- * -- ({(10),(20),(30),(40),(50),(60)})
- * output = FOREACH input GENERATE AppendToBag(B,T) as B;
- * }
- * </pre>
- */
-public class AppendToBag extends SimpleEvalFunc<DataBag>
-{
- public DataBag call(DataBag inputBag, Tuple t) throws IOException
- {
- inputBag.add(t);
- return inputBag;
- }
-
- @Override
- public Schema outputSchema(Schema input)
- {
- try {
- return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
- input.getField(0).schema, DataType.BAG));
- }
- catch (FrontendException e) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagConcat.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagConcat.java b/src/java/datafu/pig/bags/BagConcat.java
deleted file mode 100644
index 290f44b..0000000
--- a/src/java/datafu/pig/bags/BagConcat.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-/**
- * Unions all input bags to produce a single bag containing all tuples.
- * <p>
- * This UDF accepts two forms of input:
- * <ol>
- * <li>a tuple of 2 or more elements where each element is a bag with the same schema</li>
- * <li>a single bag where each element of that bag is a bag and all of these bags have the same schema</li>
- * </ol>
- * <p>
- * Example 1:
- * <pre>
- * {@code
- * define BagConcat datafu.pig.bags.BagConcat();
- * -- This example illustrates the use on a tuple of bags
- *
- * -- input:
- * -- ({(1),(2),(3)},{(3),(4),(5)})
- * -- ({(20),(25)},{(40),(50)})
- * input = LOAD 'input' AS (A: bag{T: tuple(v:INT)}, B: bag{T: tuple(v:INT)});
- *
- * -- output:
- * -- ({(1),(2),(3),(3),(4),(5)})
- * -- ({(20),(25),(40),(50)})
- * output = FOREACH input GENERATE BagConcat(A,B);
- * }
- * </pre>
- * <p>
- * Example 2:
- * <pre>
- * {@code
- * define BagConcat datafu.pig.bags.BagConcat();
- * -- This example illustrates the use on a bag of bags
- *
- * -- input:
- * -- ({({(1),(2),(3)}),({(3),(4),(5)})})
- * -- ({({(20),(25)}),({(40),(50)})})
- * input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})});
- *
- * -- output:
- * -- ({(1),(2),(3),(3),(4),(5)})
- * -- ({(20),(25),(40),(50)})
- * output = FOREACH input GENERATE BagConcat(A);
- * }
- * </pre>
- *
- * @author wvaughan
- *
- */
-public class BagConcat extends EvalFunc<DataBag>
-{
-
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- DataBag output = BagFactory.getInstance().newDefaultBag();
- if (input.size() > 1) {
- // tuple of bags
- for (int i=0; i < input.size(); i++) {
- Object o = input.get(i);
- if (!(o instanceof DataBag)) {
- throw new RuntimeException("Expected a TUPLE of BAGs as input");
- }
-
- DataBag inputBag = (DataBag) o;
- for (Tuple elem : inputBag) {
- output.add(elem);
- }
- }
- } else {
- // bag of bags
- DataBag outerBag = (DataBag)input.get(0);
- for (Tuple outerTuple : outerBag) {
- DataBag innerBag = (DataBag)outerTuple.get(0);
- for (Tuple innerTuple : innerBag) {
- output.add(innerTuple);
- }
- }
- }
- return output;
- }
-
- @Override
- public Schema outputSchema(Schema input)
- {
- try {
- Schema outputBagTupleSchema = null;
-
- if (input.size() == 0) {
- throw new RuntimeException("Expected input tuple to contain fields. Got none.");
- }
- // determine whether the input is a tuple of bags or a bag of bags
- if (input.size() != 1) {
- // tuple of bags
-
- // verify that each element in the input is a bag
- for (FieldSchema fieldSchema : input.getFields()) {
- if (fieldSchema.type != DataType.BAG) {
- throwBadTypeError("Expected a TUPLE of BAGs as input. Got instead: %s in schema: %s", fieldSchema.type, input);
- }
- if (fieldSchema.schema == null) {
- throwBadSchemaError(fieldSchema.alias, input);
- }
- }
-
- outputBagTupleSchema = input.getField(0).schema;
- } else {
- // bag of bags
-
- // should only be a single element in the input and it should be a bag
- FieldSchema fieldSchema = input.getField(0);
- if (fieldSchema.type != DataType.BAG) {
- throwBadTypeError("Expected a BAG of BAGs as input. Got instead: %s in schema: %s", fieldSchema.type, input);
- }
-
- // take the tuple schema from this outer bag
- Schema outerBagTuple = input.getField(0).schema;
- // verify that this tuple contains only a bag for each element
- Schema outerBagSchema = outerBagTuple.getField(0).schema;
- if (outerBagSchema.size() != 1) {
- throw new RuntimeException(String.format("Expected outer bag to have only a single field. Got instead: %s",
- outerBagSchema.prettyPrint()));
- }
- FieldSchema outerBagFieldSchema = outerBagSchema.getField(0);
- if (outerBagFieldSchema.type != DataType.BAG) {
- throwBadTypeError("Expected a BAG of BAGs as input. Got instead: %s", outerBagFieldSchema.type, outerBagTuple);
- }
-
- // take the schema of the inner tuple as the schema for the tuple in the output bag
- FieldSchema innerTupleSchema = outerBagSchema.getField(0);
- if (innerTupleSchema.schema == null) {
- throwBadSchemaError(innerTupleSchema.alias, outerBagSchema);
- }
- outputBagTupleSchema = innerTupleSchema.schema;
- }
-
- // return the correct schema
- return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
- outputBagTupleSchema, DataType.BAG));
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void throwBadSchemaError(String alias, Schema schema) throws FrontendException
- {
- throw new FrontendException(String.format("Expected non-null schema for all bags. Got null on field %s, in: %s",
- alias, schema.prettyPrint()));
- }
-
- private void throwBadTypeError(String expectedMessage, byte actualType, Schema schema) throws FrontendException
- {
- throw new FrontendException(String.format(expectedMessage, DataType.findTypeName(actualType), schema.prettyPrint()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagGroup.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagGroup.java b/src/java/datafu/pig/bags/BagGroup.java
deleted file mode 100644
index 7b6c2e2..0000000
--- a/src/java/datafu/pig/bags/BagGroup.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.AliasableEvalFunc;
-
-/**
- * Performs an in-memory group operation on a bag. The first argument is the bag.
- * The second argument is a projection of that bag to the group keys.
- *
- * <p>
- * Example:
- * <code>
- * define BagGroup datafu.pig.bags.BagGroup();
- *
- * data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)});
- * -- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)})
- *
- * data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped;
- * -- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
- * -- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})})
- * </code>
- * </p>
- *
- * @author wvaughan
- *
- */
-public class BagGroup extends AliasableEvalFunc<DataBag>
-{
- private final String FIELD_NAMES_PROPERTY = "FIELD_NAMES";
- private List<String> fieldNames;
-
- @Override
- public Schema getOutputSchema(Schema input)
- {
- try {
- if (input.size() != 2) {
- throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %d field.", input.size()));
- }
- // Expect the first field to be a bag
- FieldSchema bagFieldSchema = input.getField(0);
- if (bagFieldSchema.type != DataType.BAG) {
- throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as first field.", DataType.findTypeName(bagFieldSchema.type)));
- }
- // Expect the second fields to be a projection of the bag
- FieldSchema projectedBagFieldSchema = input.getField(1);
- if (projectedBagFieldSchema.type != DataType.BAG) {
- throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as second field.", DataType.findTypeName(projectedBagFieldSchema.type)));
- }
-
- String bagName = bagFieldSchema.alias;
- // handle named tuples
- if (bagFieldSchema.schema.size() == 1) {
- FieldSchema bagTupleFieldSchema = bagFieldSchema.schema.getField(0);
- if (bagTupleFieldSchema.type == DataType.TUPLE && bagTupleFieldSchema.alias != null) {
- bagName = getPrefixedAliasName(bagName, bagTupleFieldSchema.alias);
- }
- }
- if (projectedBagFieldSchema.schema.size() == 1) {
- FieldSchema projectedBagTupleFieldSchema = projectedBagFieldSchema.schema.getField(0);
- if (projectedBagTupleFieldSchema.type == DataType.TUPLE && projectedBagTupleFieldSchema.schema != null) {
- projectedBagFieldSchema = projectedBagTupleFieldSchema;
- }
- }
-
- // create the output schema for the 'group'
- // store the field names for the group keys
- Schema groupTupleSchema = new Schema();
- fieldNames = new ArrayList<String>(projectedBagFieldSchema.schema.size());
- for (int i=0; i<projectedBagFieldSchema.schema.size(); i++) {
- FieldSchema fieldSchema = projectedBagFieldSchema.schema.getField(i);
- String fieldName = fieldSchema.alias;
- fieldNames.add(getPrefixedAliasName(bagName, fieldName));
- groupTupleSchema.add(new FieldSchema(fieldSchema.alias, fieldSchema.type));
- }
- getInstanceProperties().put(FIELD_NAMES_PROPERTY, fieldNames);
-
- Schema outputTupleSchema = new Schema();
- if (projectedBagFieldSchema.schema.size() > 1) {
- // multiple group keys
- outputTupleSchema.add(new FieldSchema("group", groupTupleSchema, DataType.TUPLE));
- } else {
- // single group key
- outputTupleSchema.add(new FieldSchema("group", groupTupleSchema.getField(0).type));
- }
- outputTupleSchema.add(bagFieldSchema);
-
- return new Schema(new Schema.FieldSchema(
- getSchemaName(this.getClass().getName().toLowerCase(), input),
- outputTupleSchema,
- DataType.BAG));
- } catch (FrontendException e) {
- throw new RuntimeException(e);
- }
- }
-
- Map<Tuple, List<Tuple>> groups = new HashMap<Tuple, List<Tuple>>();
- TupleFactory tupleFactory = TupleFactory.getInstance();
- BagFactory bagFactory = BagFactory.getInstance();
-
- @SuppressWarnings("unchecked")
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- fieldNames = (List<String>)getInstanceProperties().get(FIELD_NAMES_PROPERTY);
-
- DataBag inputBag = (DataBag)input.get(0);
-
- for (Tuple tuple : inputBag) {
- Tuple key = extractKey(tuple);
- addGroup(key, tuple);
- }
-
- DataBag outputBag = bagFactory.newDefaultBag();
- for (Tuple key : groups.keySet()) {
- Tuple outputTuple = tupleFactory.newTuple();
- if (fieldNames.size() > 1) {
- outputTuple.append(key);
- } else {
- outputTuple.append(key.get(0));
- }
- DataBag groupBag = bagFactory.newDefaultBag();
- for (Tuple groupedTuple : groups.get(key)) {
- groupBag.add(groupedTuple);
- }
- outputTuple.append(groupBag);
- outputBag.add(outputTuple);
- }
-
- return outputBag;
- }
-
- private Tuple extractKey(Tuple tuple) throws ExecException {
- Tuple key = tupleFactory.newTuple();
- for (String field : fieldNames) {
- key.append(getObject(tuple, field));
- }
- return key;
- }
-
- private void addGroup(Tuple key, Tuple value) {
- if (!groups.containsKey(key)) {
- groups.put(key, new LinkedList<Tuple>());
- }
- groups.get(key).add(value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagLeftOuterJoin.java b/src/java/datafu/pig/bags/BagLeftOuterJoin.java
deleted file mode 100644
index ba6bc11..0000000
--- a/src/java/datafu/pig/bags/BagLeftOuterJoin.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
-import datafu.pig.util.AliasableEvalFunc;
-
-/**
- * Performs an in-memory left outer join across multiple bags.
- *
- * <p>
- * The format for invocation is BagLeftOuterJoin(bag, 'key',....).
- * This UDF expects that all bags are non-null and that there is a corresponding key for each bag.
- * The <em>key</em> that is expected is the alias of the key inside of the preceding bag.
- * </p>
- *
- * <p>
- * Example:
- * <code>
- * define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();
- *
- * -- describe data:
- * -- data: {bag1: {(key1: chararray,value1: chararray)},bag2: {(key2: chararray,value2: int)}}
- *
- * bag_joined = FOREACH data GENERATE BagLeftOuterJoin(bag1, 'key1', bag2, 'key2') as joined;
- *
- * -- describe bag_joined:
- * -- bag_joined: {joined: {(bag1::key1: chararray, bag1::value1: chararray, bag2::key2: chararray, bag2::value2: int)}}
- * </code>
- * </p>
- *
- * @author wvaughan
- *
- */
-public class BagLeftOuterJoin extends AliasableEvalFunc<DataBag>
-{
-
- private static final String BAG_NAMES_PROPERTY = "BagLeftOuterJoin_BAG_NAMES";
- private static final String BAG_NAME_TO_JOIN_PREFIX_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_JOIN_PREFIX";
- private static final String BAG_NAME_TO_SIZE_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_SIZE_PROPERTY";
-
- ArrayList<String> bagNames;
- Map<String, String> bagNameToJoinKeyPrefix;
- Map<String, Integer> bagNameToSize;
-
- public BagLeftOuterJoin() {
-
- }
-
- @SuppressWarnings("unchecked")
- private void retrieveContextValues()
- {
- Properties properties = getInstanceProperties();
- bagNames = (ArrayList<String>) properties.get(BAG_NAMES_PROPERTY);
- bagNameToJoinKeyPrefix = (Map<String, String>) properties.get(BAG_NAME_TO_JOIN_PREFIX_PROPERTY);
- bagNameToSize = (Map<String, Integer>) properties.get(BAG_NAME_TO_SIZE_PROPERTY);
- }
-
- class JoinCollector
- {
- HashMap<Object, List<Tuple>> joinData;
-
- public void printJoinData() throws ExecException {
- printData(joinData);
- }
-
- public void printData(HashMap<Object, List<Tuple>> data) throws ExecException {
- for (Object o : data.keySet()) {
- System.out.println(o);
- for (Tuple t : data.get(o)) {
- System.out.println("\t"+t.toDelimitedString(", "));
- }
- }
- }
-
- public HashMap<Object, List<Tuple>> groupTuples(Iterable<Tuple> tuples, String keyName) throws ExecException {
- HashMap<Object, List<Tuple>> group = new HashMap<Object, List<Tuple>>();
- for (Tuple tuple : tuples) {
- Object key = getObject(tuple, keyName);
- if (!group.containsKey(key)) {
- group.put(key, new LinkedList<Tuple>());
- }
- group.get(key).add(tuple);
- }
- return group;
- }
-
- public HashMap<Object, List<Tuple>> insertNullTuples(HashMap<Object, List<Tuple>> groupedData, int tupleSize) throws ExecException {
- Tuple nullTuple = TupleFactory.getInstance().newTuple(tupleSize);
- for (int i=0; i<tupleSize; i++) {
- nullTuple.set(i, null);
- }
- for (Object key : joinData.keySet()) {
- if (!groupedData.containsKey(key)) {
- groupedData.put(key, Collections.singletonList(nullTuple));
- }
- }
- return groupedData;
- }
-
- public void joinTuples(Object key, List<Tuple> tuples) throws ExecException {
- List<Tuple> currentTuples = joinData.get(key);
- if (currentTuples != null) {
- List<Tuple> newTuples = new LinkedList<Tuple>();
- if (tuples != null) {
- for (Tuple t1 : currentTuples) {
- for (Tuple t2 : tuples) {
- Tuple t = TupleFactory.getInstance().newTuple();
- for (Object o : t1.getAll()) {
- t.append(o);
- }
- for (Object o : t2.getAll()) {
- t.append(o);
- }
- newTuples.add(t);
- }
- }
- }
- joinData.put(key, newTuples);
- }
- }
-
- public HashMap<Object, List<Tuple>> getJoinData() {
- return this.joinData;
- }
-
- public void setJoinData(HashMap<Object, List<Tuple>> joinData) {
- this.joinData = joinData;
- }
- }
-
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- retrieveContextValues();
-
- ArrayList<String> joinKeyNames = new ArrayList<String>();
- for (int i = 1; i < input.size(); i += 2) {
- joinKeyNames.add((String) input.get(i));
- }
-
- JoinCollector collector = new JoinCollector();
- // the first bag is the outer bag
- String leftBagName = bagNames.get(0);
- DataBag leftBag = getBag(input, leftBagName);
- String leftBagJoinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(leftBagName), joinKeyNames.get(0));
- collector.setJoinData(collector.groupTuples(leftBag, leftBagJoinKeyName));
- // now, for each additional bag, group up the tuples by the join key, then join them in
- if (bagNames.size() > 1) {
- for (int i = 1; i < bagNames.size(); i++) {
- String bagName = bagNames.get(i);
- DataBag bag = getBag(input, bagName);
- String joinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(bagName), joinKeyNames.get(i));
- int tupleSize = bagNameToSize.get(bagName);
- if (bag == null) throw new IOException("Error in instance: "+getInstanceName()
- + " with properties: " + getInstanceProperties()
- + " and tuple: " + input.toDelimitedString(", ")
- + " -- Expected bag, got null");
- HashMap<Object, List<Tuple>> groupedData = collector.groupTuples(bag, joinKeyName);
- // outer join, so go back in and add nulls;
- groupedData = collector.insertNullTuples(groupedData, tupleSize);
- for (Map.Entry<Object, List<Tuple>> entry : groupedData.entrySet()) {
- collector.joinTuples(entry.getKey(), entry.getValue());
- }
- }
- }
-
- // assemble output bag
- DataBag outputBag = BagFactory.getInstance().newDefaultBag();
- for (List<Tuple> tuples : collector.getJoinData().values()) {
- for (Tuple tuple : tuples) {
- outputBag.add(tuple);
- }
- }
-
- return outputBag;
- }
-
- @Override
- public Schema getOutputSchema(Schema input)
- {
- ArrayList<String> bagNames = new ArrayList<String>(input.size() / 2);
- Map<String, String> bagNameToJoinPrefix = new HashMap<String, String>(input.size() / 2);
- Map<String, Integer> bagNameToSize = new HashMap<String, Integer>(input.size() / 2);
- Schema outputSchema = null;
- Schema bagSchema = new Schema();
- try {
- int i = 0;
- // all even fields should be bags, odd fields are key names
- String bagName = null;
- String tupleName = null;
- for (FieldSchema outerField : input.getFields()) {
- if (i++ % 2 == 1)
- continue;
- bagName = outerField.alias;
- bagNames.add(bagName);
- if (bagName == null)
- bagName = "null";
- if (outerField.schema == null)
- throw new RuntimeException("Expected input format of (bag, 'field') pairs. "
- +"Did not receive a bag at index: "+i+", alias: "+bagName+". "
- +"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString());
- FieldSchema tupleField = outerField.schema.getField(0);
- tupleName = tupleField.alias;
- bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName));
- if (tupleField.schema == null) {
- log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName, bagName));
- } else {
- bagNameToSize.put(bagName, tupleField.schema.size());
- for (FieldSchema innerField : tupleField.schema.getFields()) {
- String innerFieldName = innerField.alias;
- if (innerFieldName == null)
- innerFieldName = "null";
- String outputFieldName = bagName + "::" + innerFieldName;
- bagSchema.add(new FieldSchema(outputFieldName, innerField.type));
- }
- }
- }
- outputSchema = new Schema(new Schema.FieldSchema("joined", bagSchema, DataType.BAG));
- log.debug("output schema: "+outputSchema.toString());
- } catch (FrontendException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- Properties properties = getInstanceProperties();
- properties.put(BAG_NAMES_PROPERTY, bagNames);
- properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix);
- properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize);
- return outputSchema;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagSplit.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/bags/BagSplit.java b/src/java/datafu/pig/bags/BagSplit.java
deleted file mode 100644
index 74d39d0..0000000
--- a/src/java/datafu/pig/bags/BagSplit.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.pig.bags;
-
-import java.io.IOException;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-/**
- * Splits a bag of tuples into a bag of bags, where the inner bags collectively contain
- * the tuples from the original bag. This can be used to split a bag into a set of smaller bags.
- * <p>
- * Example:
- * <pre>
- * {@code
- * define BagSplit datafu.pig.bags.BagSplit();
- *
- * -- input:
- * -- ({(1),(2),(3),(4),(5),(6),(7)})
- * -- ({(1),(2),(3),(4),(5)})
- * -- ({(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11)})
- * input = LOAD 'input' AS (B:bag{T:tuple(val1:INT,val2:INT)});
- *
- * -- ouput:
- * -- ({{(1),(2),(3),(4),(5)},{(6),(7)}})
- * -- ({{(1),(2),(3),(4),(5)},{(6),(7),(8),(9),(10)},{(11)}})
- * output = FOREACH input GENERATE BagSplit(5,B);
- * }
- * </pre>
- */
-public class BagSplit extends EvalFunc<DataBag>
-{
- private static final BagFactory bagFactory = BagFactory.getInstance();
- private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-
- private final boolean appendBagNum;
-
- public BagSplit()
- {
- this.appendBagNum = false;
- }
-
- public BagSplit(String appendBagNum)
- {
- this.appendBagNum = Boolean.parseBoolean(appendBagNum);
- }
-
- @Override
- public DataBag exec(Tuple arg0) throws IOException
- {
- DataBag outputBag = bagFactory.newDefaultBag();
-
- Integer maxSize = (Integer)arg0.get(0);
-
- Object o = arg0.get(1);
- if (!(o instanceof DataBag))
- throw new RuntimeException("parameter must be a databag");
-
- DataBag inputBag = (DataBag)o;
-
- DataBag currentBag = null;
-
- int count = 0;
- int numBags = 0;
- for (Tuple tuple : inputBag)
- {
- if (currentBag == null)
- {
- currentBag = bagFactory.newDefaultBag();
- }
-
- currentBag.add(tuple);
- count++;
-
- if (count >= maxSize)
- {
- Tuple newTuple = tupleFactory.newTuple();
- newTuple.append(currentBag);
-
- if (this.appendBagNum)
- {
- newTuple.append(numBags);
- }
-
- numBags++;
-
- outputBag.add(newTuple);
-
- count = 0;
- currentBag = null;
- }
- }
-
- if (currentBag != null)
- {
- Tuple newTuple = tupleFactory.newTuple();
- newTuple.append(currentBag);
- if (this.appendBagNum)
- {
- newTuple.append(numBags);
- }
- outputBag.add(newTuple);
- }
-
- return outputBag;
- }
-
- @Override
- public Schema outputSchema(Schema input)
- {
- try {
- if (input.getField(0).type != DataType.INTEGER)
- {
- throw new RuntimeException("Expected first argument to be an INTEGER");
- }
-
- if (input.getField(1).type != DataType.BAG)
- {
- throw new RuntimeException("Expected second argument to be a BAG");
- }
-
- Schema tupleSchema = new Schema();
- tupleSchema.add(new Schema.FieldSchema("data", input.getField(1).schema.clone(), DataType.BAG));
-
- if (this.appendBagNum)
- {
- tupleSchema.add(new Schema.FieldSchema("index", DataType.INTEGER));
- }
-
- return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input),
- tupleSchema, DataType.BAG));
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
|