http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java b/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java new file mode 100644 index 0000000..5ebe778 --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/urls/UserAgentTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.urls; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.pig.pigunit.PigTest; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; + +public class UserAgentTest extends PigTests +{ + + /** + + + define UserAgentClassify datafu.pig.urls.UserAgentClassify(); + + data = load 'input' as (usr_agent:chararray); + data_out = foreach data generate UserAgentClassify(usr_agent) as class; + --describe data_out; + store data_out into 'output'; + */ + @Multiline private String userAgentTest; + + @Test + public void userAgentTest() throws Exception + { + PigTest test = createPigTestFromString(userAgentTest); + + String[] input = { + "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5", + "Mozilla/5.0 (compatible; Konqueror/3.5; Linux; X11; de) KHTML/3.5.2 (like Gecko) Kubuntu 6.06 Dapper", + "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:2.2a1pre) Gecko/20110331 Firefox/4.2a1pre Fennec/4.1a1pre", + "Opera/9.00 (X11; Linux i686; U; en)", + "Wget/1.10.2", + "Opera/9.80 (Android; Linux; Opera Mobi/ADR-1012221546; U; pl) Presto/2.7.60 Version/10.5", + "Mozilla/5.0 (Linux; U; Android 2.2; en-us; DROID2 Build/VZW) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1" + }; + + String[] output = { + "(mobile)", + "(desktop)", + "(mobile)", + "(desktop)", + "(desktop)", + "(mobile)", + "(mobile)", + }; + + test.assertOutput("data",input,"data_out",output); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java b/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java new file mode 100644 index 0000000..3b23d35 --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/util/AliasEvalFuncTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.util; + +import static org.testng.Assert.*; + +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.io.IOException; + +import org.apache.pig.data.Tuple; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.data.BagFactory; +import org.apache.pig.pigunit.PigTest; +import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; +import datafu.pig.util.AliasableEvalFunc; + +public class AliasEvalFuncTest extends PigTests +{ + static class ReportBuilder extends AliasableEvalFunc { + static final String ORDERED_ROUTES = "orderedRoutes"; + + public DataBag exec(Tuple input) throws IOException { + DataBag inputBag = getBag(input, ORDERED_ROUTES); + DataBag outputBag = BagFactory.getInstance().newDefaultBag(); + for(Iterator tupleIter = inputBag.iterator(); tupleIter.hasNext(); ) { + outputBag.add(tupleIter.next()); + } + return outputBag; + } + + public Schema getOutputSchema(Schema input) { + try { + Schema bagSchema = input.getField(0).schema; + Schema outputSchema = new Schema(new Schema.FieldSchema(getSchemaName(this.getClass() + .getName() + .toLowerCase(), input), + bagSchema, + DataType.BAG)); + return outputSchema; + } catch (Exception ex) { + return null; + } + } + } + + @Test + public void getBagTest() throws Exception + { + ReportBuilder udf = new ReportBuilder(); + udf.setUDFContextSignature("test"); + List fieldSchemaList = new ArrayList(); + fieldSchemaList.add(new Schema.FieldSchema("msisdn", DataType.LONG)); + fieldSchemaList.add(new Schema.FieldSchema("ts", DataType.INTEGER)); + fieldSchemaList.add(new Schema.FieldSchema("center_lon", DataType.DOUBLE)); + fieldSchemaList.add(new Schema.FieldSchema("center_lat", DataType.DOUBLE)); + Schema schemaTuple = new Schema(fieldSchemaList); + Schema schemaBag = new Schema(new Schema.FieldSchema(ReportBuilder.ORDERED_ROUTES, schemaTuple, DataType.BAG)); + udf.outputSchema(schemaBag); + + Tuple inputTuple = TupleFactory.getInstance().newTuple(); + DataBag inputBag = BagFactory.getInstance().newDefaultBag(); + inputBag.add(TupleFactory.getInstance().newTuple(Arrays.asList(71230000000L, 1382351612, 10.697, 20.713))); + inputTuple.append(inputBag); + DataBag outputBag = udf.exec(inputTuple); + Assert.assertEquals(inputBag, outputBag); + } +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java new file mode 100644 index 0000000..1a6a741 --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/util/AssertTests.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.util; + +import static org.testng.Assert.*; + +import java.util.List; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.pig.data.Tuple; +import org.apache.pig.pigunit.PigTest; +import org.testng.Assert; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; + +public class AssertTests extends PigTests +{ + /** + + + define ASRT datafu.pig.util.AssertUDF(); + + data = LOAD 'input' AS (val:INT); + + data2 = FILTER data BY ASRT(val,'assertion appears to have failed, doh!'); + + STORE data2 INTO 'output'; + */ + @Multiline private static String assertWithMessage; + + @Test + public void shouldAssertWithMessageOnZero() throws Exception + { + try + { + PigTest test = createPigTestFromString(assertWithMessage); + + this.writeLinesToFile("input", "0"); + + test.runScript(); + + this.getLinesForAlias(test, "data2"); + + fail("test should have failed, but it didn't"); + } + catch (Exception e) + { + } + } + + @Test + public void shouldNotAssertWithMessageOnOne() throws Exception + { + PigTest test = createPigTestFromString(assertWithMessage); + + this.writeLinesToFile("input", "1"); + + test.runScript(); + + List result = this.getLinesForAlias(test, "data2"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).size(), 1); + Assert.assertEquals(result.get(0).get(0), 1); + } + + /** + + + define ASRT datafu.pig.util.AssertUDF(); + + data = LOAD 'input' AS (val:INT); + + data2 = FILTER data BY ASRT(val); + + STORE data2 INTO 'output'; + */ + @Multiline private static String assertWithoutMessage; + + @Test + public void shouldAssertWithoutMessageOnZero() throws Exception + { + try + { + PigTest test = createPigTestFromString(assertWithoutMessage); + + this.writeLinesToFile("input", "0"); + + test.runScript(); + + this.getLinesForAlias(test, "data2"); + + fail("test should have failed, but it didn't"); + } + catch (Exception e) + { + } + } + + @Test + public void shouldNotAssertWithoutMessageOnOne() throws Exception + { + PigTest test = createPigTestFromString(assertWithoutMessage); + + this.writeLinesToFile("input", "1"); + + test.runScript(); + + List result = this.getLinesForAlias(test, "data2"); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).size(), 1); + Assert.assertEquals(result.get(0).get(0), 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java new file mode 100644 index 0000000..3baf251 --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/util/CoalesceTests.java @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.util; + +import java.util.List; + +import junit.framework.Assert; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.pigunit.PigTest; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; + +public class CoalesceTests extends PigTests +{ + /** + + + define COALESCE datafu.pig.util.Coalesce(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2,val3) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + STORE data3 INTO 'output'; + */ + @Multiline private static String coalesceIntTest; + + @Test + public void coalesceIntTest() throws Exception + { + PigTest test = createPigTestFromString(coalesceIntTest); + + this.writeLinesToFile("input", "1,1,2,3", + "2,,2,3", + "3,,,3", + "4,,,", + "5,1,,3", + "6,1,,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data3"); + + Assert.assertEquals(6, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(1, t.get(1)); break; + case 2: + Assert.assertEquals(2, t.get(1)); break; + case 3: + Assert.assertEquals(3, t.get(1)); break; + case 4: + Assert.assertEquals(null, t.get(1)); break; + case 5: + Assert.assertEquals(1, t.get(1)); break; + case 6: + Assert.assertEquals(1, t.get(1)); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + data4 = FOREACH data3 GENERATE testcase, result*100 as result; + + STORE data4 INTO 'output'; + */ + @Multiline private static String coalesceLongTest; + + @Test + public void coalesceLongTest() throws Exception + { + PigTest test = createPigTestFromString(coalesceLongTest); + + this.writeLinesToFile("input", "1,5", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data4"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(500L, t.get(1)); break; + case 2: + Assert.assertEquals(10000L, t.get(1)); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,100) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + data4 = FOREACH data3 GENERATE testcase, result*100 as result; + + STORE data4 INTO 'output'; + */ + @Multiline private static String coalesceCastIntToLongTestFails; + + // The first parameter is a long and the fixed value is an int. + // They cannot be merged without the lazy option. + @Test(expectedExceptions=FrontendException.class) + public void coalesceCastIntToLongTestFails() throws Exception + { + PigTest test = createPigTestFromString(coalesceCastIntToLongTestFails); + + this.writeLinesToFile("input", "1,5", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data4"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(500L, t.get(1)); break; + case 2: + Assert.assertEquals(10000L, t.get(1)); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce('lazy'); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,100) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + data4 = FOREACH data3 GENERATE testcase, result*100 as result; + + STORE data4 INTO 'output'; + */ + @Multiline private static String coalesceIntAndLongTest; + + // The first parameter is a long and the fixed value is an int. + // They are merged to a long. + @Test + public void coalesceCastIntToLongTest1() throws Exception + { + PigTest test = createPigTestFromString(coalesceIntAndLongTest); + + this.writeLinesToFile("input", "1,5", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data4"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(500L, t.get(1)); break; + case 2: + Assert.assertEquals(10000L, t.get(1)); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce('lazy'); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,100L) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + data4 = FOREACH data3 GENERATE testcase, result*100 as result; + + STORE data4 INTO 'output'; + */ + @Multiline private static String coalesceIntAndLongTest2; + + // The first parameter is an int, but the fixed parameter is a long. + // They are merged to a long. + @Test + public void coalesceCastIntToLongTest2() throws Exception + { + PigTest test = createPigTestFromString(coalesceIntAndLongTest2); + + this.writeLinesToFile("input", "1,5", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data4"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(500L, t.get(1)); break; + case 2: + Assert.assertEquals(10000L, t.get(1)); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce('lazy'); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,100.0) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + data4 = FOREACH data3 GENERATE testcase, result*100 as result; + + STORE data4 INTO 'output'; + */ + @Multiline private static String coalesceIntAndDoubleTest; + + // The first parameter is an int, but the fixed parameter is a long. + // They are merged to a long. + @Test + public void coalesceCastIntToDoubleTest() throws Exception + { + PigTest test = createPigTestFromString(coalesceIntAndDoubleTest); + + this.writeLinesToFile("input", "1,5", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data4"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(500.0, t.get(1)); break; + case 2: + Assert.assertEquals(10000.0, t.get(1)); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG); + + data = FOREACH data GENERATE testcase, (val1 IS NOT NULL ? ToDate(val1) : (datetime)null) as val1; + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,ToDate('1970-01-01T00:00:00.000Z')) as result; + + --describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + STORE data3 INTO 'output'; + */ + @Multiline private static String coalesceCastIntToDatetimeTest; + + @Test + public void coalesceCastIntToDatetimeTest() throws Exception + { + PigTest test = createPigTestFromString(coalesceCastIntToDatetimeTest); + + this.writeLinesToFile("input", "1,1375826183000", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data3"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + Integer testcase = (Integer)t.get(0); + Assert.assertNotNull(testcase); + switch(testcase) + { + case 1: + Assert.assertEquals("2013-08-06T21:56:23.000Z", ((DateTime)t.get(1)).toDateTime(DateTimeZone.UTC).toString()); break; + case 2: + Assert.assertEquals("1970-01-01T00:00:00.000Z", t.get(1).toString()); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce('lazy'); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:LONG); + + data = FOREACH data GENERATE testcase, (val1 IS NOT NULL ? ToDate(val1) : (datetime)null) as val1; + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,ToDate('1970-01-01T00:00:00.000Z')) as result; + + --describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + STORE data3 INTO 'output'; + */ + @Multiline private static String coalesceCastIntToDatetimeLazyTest; + + @Test + public void coalesceCastIntToDatetimeLazyTest() throws Exception + { + PigTest test = createPigTestFromString(coalesceCastIntToDatetimeLazyTest); + + this.writeLinesToFile("input", "1,1375826183000", + "2,"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data3"); + + Assert.assertEquals(2, lines.size()); + for (Tuple t : lines) + { + Integer testcase = (Integer)t.get(0); + Assert.assertNotNull(testcase); + switch(testcase) + { + case 1: + Assert.assertEquals("2013-08-06T21:56:23.000Z", ((DateTime)t.get(1)).toDateTime(DateTimeZone.UTC).toString()); break; + case 2: + Assert.assertEquals("1970-01-01T00:00:00.000Z", t.get(1).toString()); break; + default: + Assert.fail("Did not expect: " + t.get(1)); + } + } + } + + /** + + + define COALESCE datafu.pig.util.Coalesce(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:LONG); + + data2 = FOREACH data GENERATE testcase, COALESCE(val1,val2) as result; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, result; + + STORE data3 INTO 'output'; + */ + @Multiline private static String coalesceBagIncompatibleTypeTest; + + @Test(expectedExceptions=FrontendException.class) + public void coalesceBagIncompatibleTypeTest() throws Exception + { + PigTest test = createPigTestFromString(coalesceBagIncompatibleTypeTest); + + this.writeLinesToFile("input", "1,1,2L}"); + + test.runScript(); + + this.getLinesForAlias(test, "data3"); + } + + /** + + + define COALESCE datafu.pig.util.Coalesce('lazy'); + define EmptyBagToNullFields datafu.pig.bags.EmptyBagToNullFields(); + + input1 = LOAD 'input1' using PigStorage(',') AS (val1:INT,val2:INT); + input2 = LOAD 'input2' using PigStorage(',') AS (val1:INT,val2:INT); + input3 = LOAD 'input3' using PigStorage(',') AS (val1:INT,val2:INT); + + data4 = COGROUP input1 BY val1, + input2 BY val1, + input3 BY val1; + + dump data4; + + data4 = FOREACH data4 GENERATE + FLATTEN(input1), + FLATTEN(EmptyBagToNullFields(input2)), + FLATTEN(EmptyBagToNullFields(input3)); + + dump data4; + + describe data4; + + data5 = FOREACH data4 GENERATE input1::val1 as val1, COALESCE(input2::val2,0L) as val2, COALESCE(input3::val2,0L) as val3; + + --describe data5; + + STORE data5 INTO 'output'; + */ + @Multiline private static String leftJoinTest; + + @Test + public void leftJoinTest() throws Exception + { + PigTest test = createPigTestFromString(leftJoinTest); + + this.writeLinesToFile("input1", "1,1", + "2,2", + "5,5"); + + this.writeLinesToFile("input2", "1,10", + "3,30", + "5,50"); + + this.writeLinesToFile("input3", "2,100", + "5,500"); + + test.runScript(); + + List lines = this.getLinesForAlias(test, "data5"); + + Assert.assertEquals(3, lines.size()); + for (Tuple t : lines) + { + switch((Integer)t.get(0)) + { + case 1: + Assert.assertEquals(10L, t.get(1)); + Assert.assertEquals(0L, t.get(2)); + break; + case 2: + Assert.assertEquals(0L, t.get(1)); + Assert.assertEquals(100L, t.get(2)); + break; + case 5: + Assert.assertEquals(50L, t.get(1)); + Assert.assertEquals(500L, t.get(2)); + break; + default: + Assert.fail("Did not expect: " + t.get(0)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java new file mode 100644 index 0000000..591ee82 --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/util/InTests.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.util; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.pig.pigunit.PigTest; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; + +public class InTests extends PigTests +{ + /** + + + define I datafu.pig.util.InUDF(); + + data = LOAD 'input' AS (B: bag {T: tuple(v:INT)}); + + data2 = FOREACH data { + C = FILTER B By I(v, 1,2,3); + GENERATE C; + } + + describe data2; + + STORE data2 INTO 'output'; + */ + @Multiline private static String inIntTest; + + @Test + public void inIntTest() throws Exception + { + PigTest test = createPigTestFromString(inIntTest); + + writeLinesToFile("input", + "({(1),(2),(3),(4),(5)})", + "({(1),(2)})", + "({(4),(5)})"); + + test.runScript(); + + assertOutput(test, "data2", + "({(1),(2),(3)})", + "({(1),(2)})", + "({})"); + } + + /** + + + define I datafu.pig.util.InUDF(); + + data = LOAD 'input' AS (B: bag {T: tuple(v:chararray)}); + + data2 = FOREACH data { + C = FILTER B By I(v, 'will','matt','sam'); + GENERATE C; + } + + describe data2; + + STORE data2 INTO 'output'; + */ + @Multiline private static String inStringTest; + + @Test + public void inStringTest() throws Exception + { + PigTest test = createPigTestFromString(inStringTest); + + writeLinesToFile("input", + "({(alice),(bob),(will),(matt),(sam)})", + "({(will),(matt)})", + "({(alice),(bob)})"); + + test.runScript(); + + assertOutput(test, "data2", + "({(will),(matt),(sam)})", + "({(will),(matt)})", + "({})"); + } + + /** + + + define I datafu.pig.util.InUDF(); + + data = LOAD 'input' AS (owner:chararray, color:chararray); + describe data; + data2 = FILTER data BY I(color, 'red','blue'); + describe data2; + STORE data2 INTO 'output'; + */ + @Multiline private static String inTopLevelTest; + + @Test + public void inTopLevelTest() throws Exception + { + PigTest test = createPigTestFromString(inTopLevelTest); + + writeLinesToFile("input", + "alice\tred", + "bob\tblue", + "charlie\tgreen", + "dave\tred"); + test.runScript(); + + assertOutput(test, "data2", + "(alice,red)", + "(bob,blue)", + "(dave,red)"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java new file mode 100644 index 0000000..465e8b2 --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/util/IntBoolConversionPigTests.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.util; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.pig.pigunit.PigTest; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; + +public class IntBoolConversionPigTests extends PigTests +{ + /** + + + define IntToBool datafu.pig.util.IntToBool(); + + data = LOAD 'input' AS (val:INT); + + data2 = FOREACH data GENERATE IntToBool(val); + + STORE data2 INTO 'output'; + */ + @Multiline private static String intToBoolTest; + + @Test + public void intToBoolTest() throws Exception + { + PigTest test = createPigTestFromString(intToBoolTest); + + String[] input = { + "", // null + "0", + "1" + }; + + String[] output = { + "(false)", + "(false)", + "(true)" + }; + + test.assertOutput("data",input,"data2",output); + } + + /** + + + define IntToBool datafu.pig.util.IntToBool(); + define BoolToInt datafu.pig.util.BoolToInt(); + + data = LOAD 'input' AS (val:INT); + + data2 = FOREACH data GENERATE IntToBool(val) as val; + data3 = FOREACH data2 GENERATE BoolToInt(val) as val; + + STORE data3 INTO 'output'; + */ + @Multiline private static String intToBoolToIntTest; + + @Test + public void intToBoolToIntTest() throws Exception + { + PigTest test = createPigTestFromString(intToBoolToIntTest); + + String[] input = { + "", // null + "0", + "1", + "2", + "-1", + "-2", + "0", + "" + }; + + String[] output = { + "(0)", + "(0)", + "(1)", + "(1)", + "(1)", + "(1)", + "(0)", + "(0)" + }; + + test.assertOutput("data",input,"data3",output); + } +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java ---------------------------------------------------------------------- diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java b/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java new file mode 100644 index 0000000..3d7654a --- /dev/null +++ b/datafu-pig/src/test/java/datafu/test/pig/util/TransposeTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package datafu.test.pig.util; + +import java.util.List; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.pigunit.PigTest; +import org.testng.Assert; +import org.testng.annotations.Test; + +import datafu.test.pig.PigTests; + +public class TransposeTest extends PigTests +{ + /** + + + define Transpose datafu.pig.util.TransposeTupleToBag(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT); + + data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, transposed; + + STORE data3 INTO 'output'; + */ + @Multiline private static String transposeTest; + + @Test + public void transposeTest() throws Exception + { + PigTest test = createPigTestFromString(transposeTest); + writeLinesToFile("input", "1,10,11,12", + "2,20,21,22"); + test.runScript(); + + List output = getLinesForAlias(test, "data3"); + for (Tuple tuple : output) { + int testCase = (Integer)tuple.get(0); + DataBag bag = (DataBag)tuple.get(1); + Assert.assertEquals(bag.size(), 3); + int i=0; + for (Tuple t : bag) { + String expectedKey = String.format("val%d",i+1); + Assert.assertEquals((String)t.get(0), expectedKey); + int actualValue = (Integer)t.get(1); + Assert.assertEquals(actualValue, testCase*10+i); + i++; + } + } + } + + /** + + + define Transpose datafu.pig.util.TransposeTupleToBag(); + + data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:DOUBLE); + + data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed; + + describe data2; + + data3 = FOREACH data2 GENERATE testcase, transposed; + + STORE data3 INTO 'output'; + */ + @Multiline private static String transposeBadTypeTest; + + @Test(expectedExceptions={org.apache.pig.impl.logicalLayer.FrontendException.class}) + public void transposeBadTypeTest() throws Exception + { + PigTest test = createPigTestFromString(transposeBadTypeTest); + writeLinesToFile("input", "1,10,11,12.0", + "2,20,21,22.0"); + test.runScript(); + + List output = getLinesForAlias(test, "data3"); + for (Tuple tuple : output) { + int testCase = (Integer)tuple.get(0); + DataBag bag = (DataBag)tuple.get(1); + Assert.assertEquals(bag.size(), 3); + int i=0; + for (Tuple t : bag) { + String expectedKey = String.format("val%d",i+1); + Assert.assertEquals((String)t.get(0), expectedKey); + int actualValue = (Integer)t.get(1); + Assert.assertEquals(actualValue, testCase*10+i); + i++; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..98e86a2 --- /dev/null +++ b/gradle.properties @@ -0,0 +1,2 @@ +group=org.apache.datafu +version=1.2.1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/buildscript.gradle ---------------------------------------------------------------------- diff --git a/gradle/buildscript.gradle b/gradle/buildscript.gradle new file mode 100644 index 0000000..225e0a8 --- /dev/null +++ b/gradle/buildscript.gradle @@ -0,0 +1,12 @@ +repositories { + repositories { + // For license plugin. + maven { + url 'http://dl.bintray.com/content/netflixoss/external-gradle-plugins/' + } + } +} + +dependencies { + classpath 'nl.javadude.gradle.plugins:license-gradle-plugin:0.6.1' +} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle new file mode 100644 index 0000000..29d3e36 --- /dev/null +++ b/gradle/dependency-versions.gradle @@ -0,0 +1,20 @@ +ext { + antlrVersion="3.2" + avroVersion="1.5.3" + streamVersion="2.5.0" + commonsMathVersion="2.2" + commonsIoVersion="1.4" + fastutilVersion="6.5.7" + guavaVersion="11.0" + hadoopVersion="0.20.2" + jodaTimeVersion="1.6" + log4jVersion="1.2.14" + mavenVersion="2.1.3" + jlineVersion="0.9.94" + pigVersion="0.11.1" + testngVersion="6.2" + toolsVersion="1.4.2" + wagonHttpVersion="1.0-beta-2" + openNlpVersion="1.5.3" + openNlpMaxEntVersion="3.0.3" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/wrapper/gradle-wrapper.jar ---------------------------------------------------------------------- diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..a7634b0 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradle/wrapper/gradle-wrapper.properties ---------------------------------------------------------------------- diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..610282a --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Thu Jul 11 22:18:11 PDT 2013 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=http\://services.gradle.org/distributions/gradle-1.6-bin.zip http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/gradlew ---------------------------------------------------------------------- diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..16bbbbf --- /dev/null +++ b/gradlew @@ -0,0 +1,164 @@ +#!/usr/bin/env bash + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="-XX:MaxPermSize=512m" + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn ( ) { + echo "$*" +} + +die ( ) { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; +esac + +# For Cygwin, ensure paths are in UNIX format before anything is touched. +if $cygwin ; then + [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` +fi + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >&- +APP_HOME="`pwd -P`" +cd "$SAVED" >&- + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules +function splitJvmOpts() { + JVM_OPTS=("$@") +} +eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS +JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" + +exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java ---------------------------------------------------------------------- diff --git a/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java deleted file mode 100644 index 7dbfe9a..0000000 --- a/plugin/java/org/adrianwalker/multilinestring/EcjMultilineProcessor.java +++ /dev/null @@ -1,54 +0,0 @@ -// Code from https://github.com/benelog/multiline.git -// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html - -package org.adrianwalker.multilinestring; - -import java.util.Set; - -import javax.annotation.processing.AbstractProcessor; -import javax.annotation.processing.ProcessingEnvironment; -import javax.annotation.processing.RoundEnvironment; -import javax.annotation.processing.SupportedAnnotationTypes; -import javax.annotation.processing.SupportedSourceVersion; -import javax.lang.model.SourceVersion; -import javax.lang.model.element.Element; -import javax.lang.model.element.TypeElement; -import javax.lang.model.util.Elements; - -import org.eclipse.jdt.internal.compiler.apt.model.VariableElementImpl; -import org.eclipse.jdt.internal.compiler.ast.FieldDeclaration; -import org.eclipse.jdt.internal.compiler.ast.StringLiteral; -import org.eclipse.jdt.internal.compiler.lookup.FieldBinding; - -import java.lang.reflect.Constructor; - -@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"}) -@SupportedSourceVersion(SourceVersion.RELEASE_6) -public final class EcjMultilineProcessor extends AbstractProcessor { - - private Elements elementUtils; - - @Override - public void init(final ProcessingEnvironment procEnv) { - super.init(procEnv); - this.elementUtils = procEnv.getElementUtils(); - } - - @Override - public boolean process(final Set annotations, final RoundEnvironment roundEnv) { - Set fields = roundEnv.getElementsAnnotatedWith(Multiline.class); - - for (Element field : fields) { - String docComment = elementUtils.getDocComment(field); - - if (null != docComment) { - VariableElementImpl fieldElem = (VariableElementImpl) field; - FieldBinding biding = (FieldBinding) fieldElem._binding; - FieldDeclaration decl = biding.sourceField(); - StringLiteral string = new StringLiteral(docComment.toCharArray(), decl.sourceStart, decl.sourceEnd, decl.sourceStart); - decl.initialization = string; - } - } - return true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java ---------------------------------------------------------------------- diff --git a/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java deleted file mode 100644 index 39aa24e..0000000 --- a/plugin/java/org/adrianwalker/multilinestring/JavacMultilineProcessor.java +++ /dev/null @@ -1,49 +0,0 @@ -// Code from https://github.com/benelog/multiline.git -// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html - -package org.adrianwalker.multilinestring; - -import java.util.Set; - -import javax.annotation.processing.AbstractProcessor; -import javax.annotation.processing.ProcessingEnvironment; -import javax.annotation.processing.RoundEnvironment; -import javax.annotation.processing.SupportedAnnotationTypes; -import javax.annotation.processing.SupportedSourceVersion; -import javax.lang.model.SourceVersion; -import javax.lang.model.element.Element; -import javax.lang.model.element.TypeElement; - -import com.sun.tools.javac.model.JavacElements; -import com.sun.tools.javac.processing.JavacProcessingEnvironment; -import com.sun.tools.javac.tree.JCTree.JCVariableDecl; -import com.sun.tools.javac.tree.TreeMaker; - -@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"}) -@SupportedSourceVersion(SourceVersion.RELEASE_6) -public final class JavacMultilineProcessor extends AbstractProcessor { - - private JavacElements elementUtils; - private TreeMaker maker; - - @Override - public void init(final ProcessingEnvironment procEnv) { - super.init(procEnv); - JavacProcessingEnvironment javacProcessingEnv = (JavacProcessingEnvironment) procEnv; - this.elementUtils = javacProcessingEnv.getElementUtils(); - this.maker = TreeMaker.instance(javacProcessingEnv.getContext()); - } - - @Override - public boolean process(final Set annotations, final RoundEnvironment roundEnv) { - Set fields = roundEnv.getElementsAnnotatedWith(Multiline.class); - for (Element field : fields) { - String docComment = elementUtils.getDocComment(field); - if (null != docComment) { - JCVariableDecl fieldNode = (JCVariableDecl) elementUtils.getTree(field); - fieldNode.init = maker.Literal(docComment); - } - } - return true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/Multiline.java ---------------------------------------------------------------------- diff --git a/plugin/java/org/adrianwalker/multilinestring/Multiline.java b/plugin/java/org/adrianwalker/multilinestring/Multiline.java deleted file mode 100644 index c29d3ef..0000000 --- a/plugin/java/org/adrianwalker/multilinestring/Multiline.java +++ /dev/null @@ -1,14 +0,0 @@ -// Code from https://github.com/benelog/multiline.git -// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html - -package org.adrianwalker.multilinestring; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Target({ElementType.FIELD,ElementType.LOCAL_VARIABLE}) -@Retention(RetentionPolicy.SOURCE) -public @interface Multiline { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java ---------------------------------------------------------------------- diff --git a/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java b/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java deleted file mode 100644 index 9abdba5..0000000 --- a/plugin/java/org/adrianwalker/multilinestring/MultilineProcessor.java +++ /dev/null @@ -1,41 +0,0 @@ -// Code from https://github.com/benelog/multiline.git -// Based on Adrian Walker's blog post: http://www.adrianwalker.org/2011/12/java-multiline-string.html - -package org.adrianwalker.multilinestring; - -import java.util.Set; - -import javax.annotation.processing.AbstractProcessor; -import javax.annotation.processing.ProcessingEnvironment; -import javax.annotation.processing.Processor; -import javax.annotation.processing.RoundEnvironment; -import javax.annotation.processing.SupportedAnnotationTypes; -import javax.annotation.processing.SupportedSourceVersion; -import javax.lang.model.SourceVersion; -import javax.lang.model.element.TypeElement; - -@SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"}) -@SupportedSourceVersion(SourceVersion.RELEASE_6) -public final class MultilineProcessor extends AbstractProcessor { - private Processor delegator = null; - - @Override - public void init(final ProcessingEnvironment procEnv) { - super.init(procEnv); - String envClassName = procEnv.getClass().getName(); - if (envClassName.contains("com.sun.tools")) { - delegator = new JavacMultilineProcessor(); - } else { - delegator = new EcjMultilineProcessor(); - } - delegator.init(procEnv); - } - - @Override - public boolean process(final Set annotations, final RoundEnvironment roundEnv) { - if (delegator == null ) { - return true; - } - return delegator.process(annotations, roundEnv); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..62b1899 --- /dev/null +++ b/settings.gradle @@ -0,0 +1 @@ +include "build-plugin","datafu-pig" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/AppendToBag.java ---------------------------------------------------------------------- diff --git a/src/java/datafu/pig/bags/AppendToBag.java b/src/java/datafu/pig/bags/AppendToBag.java deleted file mode 100644 index 55c9e76..0000000 --- a/src/java/datafu/pig/bags/AppendToBag.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package datafu.pig.bags; - -import java.io.IOException; - -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.schema.Schema; - -import datafu.pig.util.SimpleEvalFunc; - -/** - * Appends a tuple to a bag. - *

- * Example: - *

- * {@code
- * define AppendToBag datafu.pig.bags.AppendToBag();
- * 
- * -- input:
- * -- ({(1),(2),(3)},(4))
- * -- ({(10),(20),(30),(40),(50)},(60))
- * input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-
- * -- output:
- * -- ({(1),(2),(3),(4)})
- * -- ({(10),(20),(30),(40),(50),(60)})
- * output = FOREACH input GENERATE AppendToBag(B,T) as B;
- * }
- * 
- */ -public class AppendToBag extends SimpleEvalFunc -{ - public DataBag call(DataBag inputBag, Tuple t) throws IOException - { - inputBag.add(t); - return inputBag; - } - - @Override - public Schema outputSchema(Schema input) - { - try { - return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), - input.getField(0).schema, DataType.BAG)); - } - catch (FrontendException e) { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagConcat.java ---------------------------------------------------------------------- diff --git a/src/java/datafu/pig/bags/BagConcat.java b/src/java/datafu/pig/bags/BagConcat.java deleted file mode 100644 index 290f44b..0000000 --- a/src/java/datafu/pig/bags/BagConcat.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package datafu.pig.bags; - -import java.io.IOException; - -import org.apache.pig.EvalFunc; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; - -/** - * Unions all input bags to produce a single bag containing all tuples. - *

- * This UDF accepts two forms of input: - *

    - *
  1. a tuple of 2 or more elements where each element is a bag with the same schema
  2. - *
  3. a single bag where each element of that bag is a bag and all of these bags have the same schema
  4. - *
- *

- * Example 1: - *

- * {@code
- * define BagConcat datafu.pig.bags.BagConcat();
- * -- This example illustrates the use on a tuple of bags
- * 
- * -- input:
- * -- ({(1),(2),(3)},{(3),(4),(5)})
- * -- ({(20),(25)},{(40),(50)})
- * input = LOAD 'input' AS (A: bag{T: tuple(v:INT)}, B: bag{T: tuple(v:INT)});
- * 
- * -- output:
- * -- ({(1),(2),(3),(3),(4),(5)})
- * -- ({(20),(25),(40),(50)})
- * output = FOREACH input GENERATE BagConcat(A,B); 
- * }
- * 
- *

- * Example 2: - *

- * {@code
- * define BagConcat datafu.pig.bags.BagConcat();
- * -- This example illustrates the use on a bag of bags
- * 
- * -- input:
- * -- ({({(1),(2),(3)}),({(3),(4),(5)})})
- * -- ({({(20),(25)}),({(40),(50)})})
- * input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})});
- * 
- * -- output:
- * -- ({(1),(2),(3),(3),(4),(5)})
- * -- ({(20),(25),(40),(50)})
- * output = FOREACH input GENERATE BagConcat(A);
- * }
- * 
- * - * @author wvaughan - * - */ -public class BagConcat extends EvalFunc -{ - - @Override - public DataBag exec(Tuple input) throws IOException - { - DataBag output = BagFactory.getInstance().newDefaultBag(); - if (input.size() > 1) { - // tuple of bags - for (int i=0; i < input.size(); i++) { - Object o = input.get(i); - if (!(o instanceof DataBag)) { - throw new RuntimeException("Expected a TUPLE of BAGs as input"); - } - - DataBag inputBag = (DataBag) o; - for (Tuple elem : inputBag) { - output.add(elem); - } - } - } else { - // bag of bags - DataBag outerBag = (DataBag)input.get(0); - for (Tuple outerTuple : outerBag) { - DataBag innerBag = (DataBag)outerTuple.get(0); - for (Tuple innerTuple : innerBag) { - output.add(innerTuple); - } - } - } - return output; - } - - @Override - public Schema outputSchema(Schema input) - { - try { - Schema outputBagTupleSchema = null; - - if (input.size() == 0) { - throw new RuntimeException("Expected input tuple to contain fields. Got none."); - } - // determine whether the input is a tuple of bags or a bag of bags - if (input.size() != 1) { - // tuple of bags - - // verify that each element in the input is a bag - for (FieldSchema fieldSchema : input.getFields()) { - if (fieldSchema.type != DataType.BAG) { - throwBadTypeError("Expected a TUPLE of BAGs as input. Got instead: %s in schema: %s", fieldSchema.type, input); - } - if (fieldSchema.schema == null) { - throwBadSchemaError(fieldSchema.alias, input); - } - } - - outputBagTupleSchema = input.getField(0).schema; - } else { - // bag of bags - - // should only be a single element in the input and it should be a bag - FieldSchema fieldSchema = input.getField(0); - if (fieldSchema.type != DataType.BAG) { - throwBadTypeError("Expected a BAG of BAGs as input. Got instead: %s in schema: %s", fieldSchema.type, input); - } - - // take the tuple schema from this outer bag - Schema outerBagTuple = input.getField(0).schema; - // verify that this tuple contains only a bag for each element - Schema outerBagSchema = outerBagTuple.getField(0).schema; - if (outerBagSchema.size() != 1) { - throw new RuntimeException(String.format("Expected outer bag to have only a single field. Got instead: %s", - outerBagSchema.prettyPrint())); - } - FieldSchema outerBagFieldSchema = outerBagSchema.getField(0); - if (outerBagFieldSchema.type != DataType.BAG) { - throwBadTypeError("Expected a BAG of BAGs as input. Got instead: %s", outerBagFieldSchema.type, outerBagTuple); - } - - // take the schema of the inner tuple as the schema for the tuple in the output bag - FieldSchema innerTupleSchema = outerBagSchema.getField(0); - if (innerTupleSchema.schema == null) { - throwBadSchemaError(innerTupleSchema.alias, outerBagSchema); - } - outputBagTupleSchema = innerTupleSchema.schema; - } - - // return the correct schema - return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), - outputBagTupleSchema, DataType.BAG)); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void throwBadSchemaError(String alias, Schema schema) throws FrontendException - { - throw new FrontendException(String.format("Expected non-null schema for all bags. Got null on field %s, in: %s", - alias, schema.prettyPrint())); - } - - private void throwBadTypeError(String expectedMessage, byte actualType, Schema schema) throws FrontendException - { - throw new FrontendException(String.format(expectedMessage, DataType.findTypeName(actualType), schema.prettyPrint())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagGroup.java ---------------------------------------------------------------------- diff --git a/src/java/datafu/pig/bags/BagGroup.java b/src/java/datafu/pig/bags/BagGroup.java deleted file mode 100644 index 7b6c2e2..0000000 --- a/src/java/datafu/pig/bags/BagGroup.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package datafu.pig.bags; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; - -import datafu.pig.util.AliasableEvalFunc; - -/** - * Performs an in-memory group operation on a bag. The first argument is the bag. - * The second argument is a projection of that bag to the group keys. - * - *

- * Example: - * - * define BagGroup datafu.pig.bags.BagGroup(); - * - * data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)}); - * -- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)}) - * - * data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped; - * -- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}} - * -- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})}) - * - *

- * - * @author wvaughan - * - */ -public class BagGroup extends AliasableEvalFunc -{ - private final String FIELD_NAMES_PROPERTY = "FIELD_NAMES"; - private List fieldNames; - - @Override - public Schema getOutputSchema(Schema input) - { - try { - if (input.size() != 2) { - throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %d field.", input.size())); - } - // Expect the first field to be a bag - FieldSchema bagFieldSchema = input.getField(0); - if (bagFieldSchema.type != DataType.BAG) { - throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as first field.", DataType.findTypeName(bagFieldSchema.type))); - } - // Expect the second fields to be a projection of the bag - FieldSchema projectedBagFieldSchema = input.getField(1); - if (projectedBagFieldSchema.type != DataType.BAG) { - throw new RuntimeException(String.format("Expected input of format (BAG, PROJECTED_BAG...). Got %s as second field.", DataType.findTypeName(projectedBagFieldSchema.type))); - } - - String bagName = bagFieldSchema.alias; - // handle named tuples - if (bagFieldSchema.schema.size() == 1) { - FieldSchema bagTupleFieldSchema = bagFieldSchema.schema.getField(0); - if (bagTupleFieldSchema.type == DataType.TUPLE && bagTupleFieldSchema.alias != null) { - bagName = getPrefixedAliasName(bagName, bagTupleFieldSchema.alias); - } - } - if (projectedBagFieldSchema.schema.size() == 1) { - FieldSchema projectedBagTupleFieldSchema = projectedBagFieldSchema.schema.getField(0); - if (projectedBagTupleFieldSchema.type == DataType.TUPLE && projectedBagTupleFieldSchema.schema != null) { - projectedBagFieldSchema = projectedBagTupleFieldSchema; - } - } - - // create the output schema for the 'group' - // store the field names for the group keys - Schema groupTupleSchema = new Schema(); - fieldNames = new ArrayList(projectedBagFieldSchema.schema.size()); - for (int i=0; i 1) { - // multiple group keys - outputTupleSchema.add(new FieldSchema("group", groupTupleSchema, DataType.TUPLE)); - } else { - // single group key - outputTupleSchema.add(new FieldSchema("group", groupTupleSchema.getField(0).type)); - } - outputTupleSchema.add(bagFieldSchema); - - return new Schema(new Schema.FieldSchema( - getSchemaName(this.getClass().getName().toLowerCase(), input), - outputTupleSchema, - DataType.BAG)); - } catch (FrontendException e) { - throw new RuntimeException(e); - } - } - - Map> groups = new HashMap>(); - TupleFactory tupleFactory = TupleFactory.getInstance(); - BagFactory bagFactory = BagFactory.getInstance(); - - @SuppressWarnings("unchecked") - @Override - public DataBag exec(Tuple input) throws IOException - { - fieldNames = (List)getInstanceProperties().get(FIELD_NAMES_PROPERTY); - - DataBag inputBag = (DataBag)input.get(0); - - for (Tuple tuple : inputBag) { - Tuple key = extractKey(tuple); - addGroup(key, tuple); - } - - DataBag outputBag = bagFactory.newDefaultBag(); - for (Tuple key : groups.keySet()) { - Tuple outputTuple = tupleFactory.newTuple(); - if (fieldNames.size() > 1) { - outputTuple.append(key); - } else { - outputTuple.append(key.get(0)); - } - DataBag groupBag = bagFactory.newDefaultBag(); - for (Tuple groupedTuple : groups.get(key)) { - groupBag.add(groupedTuple); - } - outputTuple.append(groupBag); - outputBag.add(outputTuple); - } - - return outputBag; - } - - private Tuple extractKey(Tuple tuple) throws ExecException { - Tuple key = tupleFactory.newTuple(); - for (String field : fieldNames) { - key.append(getObject(tuple, field)); - } - return key; - } - - private void addGroup(Tuple key, Tuple value) { - if (!groups.containsKey(key)) { - groups.put(key, new LinkedList()); - } - groups.get(key).add(value); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagLeftOuterJoin.java ---------------------------------------------------------------------- diff --git a/src/java/datafu/pig/bags/BagLeftOuterJoin.java b/src/java/datafu/pig/bags/BagLeftOuterJoin.java deleted file mode 100644 index ba6bc11..0000000 --- a/src/java/datafu/pig/bags/BagLeftOuterJoin.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package datafu.pig.bags; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; - -import datafu.pig.util.AliasableEvalFunc; - -/** - * Performs an in-memory left outer join across multiple bags. - * - *

- * The format for invocation is BagLeftOuterJoin(bag, 'key',....). - * This UDF expects that all bags are non-null and that there is a corresponding key for each bag. - * The key that is expected is the alias of the key inside of the preceding bag. - *

- * - *

- * Example: - * - * define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin(); - * - * -- describe data: - * -- data: {bag1: {(key1: chararray,value1: chararray)},bag2: {(key2: chararray,value2: int)}} - * - * bag_joined = FOREACH data GENERATE BagLeftOuterJoin(bag1, 'key1', bag2, 'key2') as joined; - * - * -- describe bag_joined: - * -- bag_joined: {joined: {(bag1::key1: chararray, bag1::value1: chararray, bag2::key2: chararray, bag2::value2: int)}} - * - *

- * - * @author wvaughan - * - */ -public class BagLeftOuterJoin extends AliasableEvalFunc -{ - - private static final String BAG_NAMES_PROPERTY = "BagLeftOuterJoin_BAG_NAMES"; - private static final String BAG_NAME_TO_JOIN_PREFIX_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_JOIN_PREFIX"; - private static final String BAG_NAME_TO_SIZE_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_SIZE_PROPERTY"; - - ArrayList bagNames; - Map bagNameToJoinKeyPrefix; - Map bagNameToSize; - - public BagLeftOuterJoin() { - - } - - @SuppressWarnings("unchecked") - private void retrieveContextValues() - { - Properties properties = getInstanceProperties(); - bagNames = (ArrayList) properties.get(BAG_NAMES_PROPERTY); - bagNameToJoinKeyPrefix = (Map) properties.get(BAG_NAME_TO_JOIN_PREFIX_PROPERTY); - bagNameToSize = (Map) properties.get(BAG_NAME_TO_SIZE_PROPERTY); - } - - class JoinCollector - { - HashMap> joinData; - - public void printJoinData() throws ExecException { - printData(joinData); - } - - public void printData(HashMap> data) throws ExecException { - for (Object o : data.keySet()) { - System.out.println(o); - for (Tuple t : data.get(o)) { - System.out.println("\t"+t.toDelimitedString(", ")); - } - } - } - - public HashMap> groupTuples(Iterable tuples, String keyName) throws ExecException { - HashMap> group = new HashMap>(); - for (Tuple tuple : tuples) { - Object key = getObject(tuple, keyName); - if (!group.containsKey(key)) { - group.put(key, new LinkedList()); - } - group.get(key).add(tuple); - } - return group; - } - - public HashMap> insertNullTuples(HashMap> groupedData, int tupleSize) throws ExecException { - Tuple nullTuple = TupleFactory.getInstance().newTuple(tupleSize); - for (int i=0; i tuples) throws ExecException { - List currentTuples = joinData.get(key); - if (currentTuples != null) { - List newTuples = new LinkedList(); - if (tuples != null) { - for (Tuple t1 : currentTuples) { - for (Tuple t2 : tuples) { - Tuple t = TupleFactory.getInstance().newTuple(); - for (Object o : t1.getAll()) { - t.append(o); - } - for (Object o : t2.getAll()) { - t.append(o); - } - newTuples.add(t); - } - } - } - joinData.put(key, newTuples); - } - } - - public HashMap> getJoinData() { - return this.joinData; - } - - public void setJoinData(HashMap> joinData) { - this.joinData = joinData; - } - } - - @Override - public DataBag exec(Tuple input) throws IOException - { - retrieveContextValues(); - - ArrayList joinKeyNames = new ArrayList(); - for (int i = 1; i < input.size(); i += 2) { - joinKeyNames.add((String) input.get(i)); - } - - JoinCollector collector = new JoinCollector(); - // the first bag is the outer bag - String leftBagName = bagNames.get(0); - DataBag leftBag = getBag(input, leftBagName); - String leftBagJoinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(leftBagName), joinKeyNames.get(0)); - collector.setJoinData(collector.groupTuples(leftBag, leftBagJoinKeyName)); - // now, for each additional bag, group up the tuples by the join key, then join them in - if (bagNames.size() > 1) { - for (int i = 1; i < bagNames.size(); i++) { - String bagName = bagNames.get(i); - DataBag bag = getBag(input, bagName); - String joinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(bagName), joinKeyNames.get(i)); - int tupleSize = bagNameToSize.get(bagName); - if (bag == null) throw new IOException("Error in instance: "+getInstanceName() - + " with properties: " + getInstanceProperties() - + " and tuple: " + input.toDelimitedString(", ") - + " -- Expected bag, got null"); - HashMap> groupedData = collector.groupTuples(bag, joinKeyName); - // outer join, so go back in and add nulls; - groupedData = collector.insertNullTuples(groupedData, tupleSize); - for (Map.Entry> entry : groupedData.entrySet()) { - collector.joinTuples(entry.getKey(), entry.getValue()); - } - } - } - - // assemble output bag - DataBag outputBag = BagFactory.getInstance().newDefaultBag(); - for (List tuples : collector.getJoinData().values()) { - for (Tuple tuple : tuples) { - outputBag.add(tuple); - } - } - - return outputBag; - } - - @Override - public Schema getOutputSchema(Schema input) - { - ArrayList bagNames = new ArrayList(input.size() / 2); - Map bagNameToJoinPrefix = new HashMap(input.size() / 2); - Map bagNameToSize = new HashMap(input.size() / 2); - Schema outputSchema = null; - Schema bagSchema = new Schema(); - try { - int i = 0; - // all even fields should be bags, odd fields are key names - String bagName = null; - String tupleName = null; - for (FieldSchema outerField : input.getFields()) { - if (i++ % 2 == 1) - continue; - bagName = outerField.alias; - bagNames.add(bagName); - if (bagName == null) - bagName = "null"; - if (outerField.schema == null) - throw new RuntimeException("Expected input format of (bag, 'field') pairs. " - +"Did not receive a bag at index: "+i+", alias: "+bagName+". " - +"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString()); - FieldSchema tupleField = outerField.schema.getField(0); - tupleName = tupleField.alias; - bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName)); - if (tupleField.schema == null) { - log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName, bagName)); - } else { - bagNameToSize.put(bagName, tupleField.schema.size()); - for (FieldSchema innerField : tupleField.schema.getFields()) { - String innerFieldName = innerField.alias; - if (innerFieldName == null) - innerFieldName = "null"; - String outputFieldName = bagName + "::" + innerFieldName; - bagSchema.add(new FieldSchema(outputFieldName, innerField.type)); - } - } - } - outputSchema = new Schema(new Schema.FieldSchema("joined", bagSchema, DataType.BAG)); - log.debug("output schema: "+outputSchema.toString()); - } catch (FrontendException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - Properties properties = getInstanceProperties(); - properties.put(BAG_NAMES_PROPERTY, bagNames); - properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix); - properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize); - return outputSchema; - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/src/java/datafu/pig/bags/BagSplit.java ---------------------------------------------------------------------- diff --git a/src/java/datafu/pig/bags/BagSplit.java b/src/java/datafu/pig/bags/BagSplit.java deleted file mode 100644 index 74d39d0..0000000 --- a/src/java/datafu/pig/bags/BagSplit.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package datafu.pig.bags; - -import java.io.IOException; - -import org.apache.pig.EvalFunc; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.logicalLayer.schema.Schema; - -/** - * Splits a bag of tuples into a bag of bags, where the inner bags collectively contain - * the tuples from the original bag. This can be used to split a bag into a set of smaller bags. - *

- * Example: - *

- * {@code
- * define BagSplit datafu.pig.bags.BagSplit();
- * 
- * -- input:
- * -- ({(1),(2),(3),(4),(5),(6),(7)})
- * -- ({(1),(2),(3),(4),(5)})
- * -- ({(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11)})
- * input = LOAD 'input' AS (B:bag{T:tuple(val1:INT,val2:INT)});
- * 
- * -- ouput:
- * -- ({{(1),(2),(3),(4),(5)},{(6),(7)}})
- * -- ({{(1),(2),(3),(4),(5)},{(6),(7),(8),(9),(10)},{(11)}})
- * output = FOREACH input GENERATE BagSplit(5,B);
- * }
- * 
- */ -public class BagSplit extends EvalFunc -{ - private static final BagFactory bagFactory = BagFactory.getInstance(); - private static final TupleFactory tupleFactory = TupleFactory.getInstance(); - - private final boolean appendBagNum; - - public BagSplit() - { - this.appendBagNum = false; - } - - public BagSplit(String appendBagNum) - { - this.appendBagNum = Boolean.parseBoolean(appendBagNum); - } - - @Override - public DataBag exec(Tuple arg0) throws IOException - { - DataBag outputBag = bagFactory.newDefaultBag(); - - Integer maxSize = (Integer)arg0.get(0); - - Object o = arg0.get(1); - if (!(o instanceof DataBag)) - throw new RuntimeException("parameter must be a databag"); - - DataBag inputBag = (DataBag)o; - - DataBag currentBag = null; - - int count = 0; - int numBags = 0; - for (Tuple tuple : inputBag) - { - if (currentBag == null) - { - currentBag = bagFactory.newDefaultBag(); - } - - currentBag.add(tuple); - count++; - - if (count >= maxSize) - { - Tuple newTuple = tupleFactory.newTuple(); - newTuple.append(currentBag); - - if (this.appendBagNum) - { - newTuple.append(numBags); - } - - numBags++; - - outputBag.add(newTuple); - - count = 0; - currentBag = null; - } - } - - if (currentBag != null) - { - Tuple newTuple = tupleFactory.newTuple(); - newTuple.append(currentBag); - if (this.appendBagNum) - { - newTuple.append(numBags); - } - outputBag.add(newTuple); - } - - return outputBag; - } - - @Override - public Schema outputSchema(Schema input) - { - try { - if (input.getField(0).type != DataType.INTEGER) - { - throw new RuntimeException("Expected first argument to be an INTEGER"); - } - - if (input.getField(1).type != DataType.BAG) - { - throw new RuntimeException("Expected second argument to be a BAG"); - } - - Schema tupleSchema = new Schema(); - tupleSchema.add(new Schema.FieldSchema("data", input.getField(1).schema.clone(), DataType.BAG)); - - if (this.appendBagNum) - { - tupleSchema.add(new Schema.FieldSchema("index", DataType.INTEGER)); - } - - return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), - tupleSchema, DataType.BAG)); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } -}