Repository: incubator-datafu
Updated Branches:
refs/heads/master e7835c11f -> 1563535f2
DATAFU-79 Added UDF ZipBags which can zip and arbitrary number of bags into one
https://issues.apache.org/jira/browse/DATAFU-79
Signed-off-by: Matthew Hayes <matthew.terence.hayes@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/1563535f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/1563535f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/1563535f
Branch: refs/heads/master
Commit: 1563535f20cba433497664262717d861775c5f9b
Parents: e7835c1
Author: Aaron Josephs <ajoseph4@binghamton.edu>
Authored: Mon Sep 15 13:09:59 2014 -0400
Committer: Matthew Hayes <matthew.terence.hayes@gmail.com>
Committed: Thu Nov 27 07:44:16 2014 -0800
----------------------------------------------------------------------
.../src/main/java/datafu/pig/bags/ZipBags.java | 104 +++++++++++++++++++
.../java/datafu/test/pig/bags/BagTests.java | 20 ++--
.../java/datafu/test/pig/bags/ZipBagsTests.java | 98 +++++++++++++++++
3 files changed, 211 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/1563535f/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java b/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java
new file mode 100644
index 0000000..c9f652f
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This udf takes any number of bags and allows you to zip them into one bag
+ * with the tuples inside each bag concatenated to each other.
+ * Example:
+ * <pre>
+ * {@code
+ * -- input:
+ * -- ({(1,2),(3,4),(5,6)},{(7,8),(9,10),(11,12)})
+ * input = LOAD 'input' AS (OUTER: tuple(B1: bag {a:INT,b:INT}, B2: bag{c:INT,d:INT}));
+ *
+ * -- output:
+ * -- ({(1,2,7,8),(3,4,9,10),(5,6,11,12)})
+ * k
+ * output = FOREACH input GENERATE ZipBags(B1,B2);
+ * }
+ * </pre>
+ * For this to work as expected each bag should be the same length. It will run as long as
+ * the first bag is the shortest however this may not be the desired behavior.
+ */
+public class ZipBags extends EvalFunc<DataBag> {
+
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ DataBag outputBag = new DefaultDataBag();
+ //All bags should have the same length for this to work as expected
+ List<Iterator<Tuple>> bagIterators = new ArrayList<Iterator<Tuple>>();
+ for (int i = 0; i < input.size(); ++i) {
+ Object obj = input.get(i);
+ if (obj instanceof DataBag) {
+ DataBag bag = (DataBag)obj;
+ bagIterators.add(bag.iterator());
+ }
+ else {
+ throw new IllegalArgumentException("Expected all fields to be bags");
+ }
+ }
+ while (bagIterators.get(0).hasNext()) {
+ Tuple nextTuple = new DefaultTuple();
+ for (Iterator<Tuple> iter : bagIterators) {
+ if (!iter.hasNext()) {
+ throw new IllegalArgumentException("The first bag must be the shortest
one");
+ }
+ Tuple tempTuple = iter.next();
+ //This loop is to flatten the tuples stored in the DataBag
+ for (int i = 0; i < tempTuple.size(); ++i) nextTuple.append(tempTuple.get(i));
+ }
+ outputBag.add(nextTuple);
+ }
+ return outputBag;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ Schema bagTupleSchema = new Schema();
+ Set<String> aliasSet = new HashSet<String>();
+ for (FieldSchema schema : input.getFields()) { //Each field should be a bag
+ if (schema.schema == null) throw new RuntimeException("Inner bag schemas are
null");
+ for (FieldSchema innerBagTuple : schema.schema.getFields()) {
+ for (FieldSchema tupleField : innerBagTuple.schema.getFields()) {
+ if (!aliasSet.add(tupleField.alias)) {
+ throw new RuntimeException("Duplicate field alias specified");
+ }
+ bagTupleSchema.add(tupleField);
+ }
+ }
+ }
+ try {
+ return new Schema(new FieldSchema("zipped",bagTupleSchema, DataType.BAG));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/1563535f/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index 0eb07c7..9bcc384 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -19,14 +19,11 @@
package datafu.test.pig.bags;
-import static org.testng.Assert.assertEquals;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
+import datafu.pig.bags.CountEach;
+import datafu.pig.bags.DistinctBy;
+import datafu.pig.bags.Enumerate;
+import datafu.test.pig.PigTests;
import junit.framework.Assert;
-
import org.adrianwalker.multilinestring.Multiline;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -35,10 +32,11 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.pigunit.PigTest;
import org.testng.annotations.Test;
-import datafu.pig.bags.CountEach;
-import datafu.pig.bags.DistinctBy;
-import datafu.pig.bags.Enumerate;
-import datafu.test.pig.PigTests;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
public class BagTests extends PigTests
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/1563535f/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java
new file mode 100644
index 0000000..43c6ace
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.bags;
+
+import datafu.test.pig.PigTests;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+public class ZipBagsTests extends PigTests {
+
+ /**
+ DEFINE ZipBags datafu.pig.bags.ZipBags();
+
+ data = LOAD 'input' AS (B1: bag {T:tuple(a:INT,b:INT)}, B2: bag {U:tuple(c:INT,d:INT)});
+
+ dump data;
+
+ describe data;
+
+ zipped = FOREACH data GENERATE ZipBags(B1,B2);
+
+ describe zipped;
+
+ dump zipped;
+
+ STORE zipped INTO 'output';
+
+ */
+ @Multiline
+ private String zipBagsTest;
+
+ @Test
+ public void zipBagsTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(zipBagsTest);
+ writeLinesToFile("input", "{(1,2),(3,4),(5,6)}\t{(7,8),(9,10),(11,12)}");
+ test.runScript();
+ assertOutput(test, "zipped", "({(1,2,7,8),(3,4,9,10),(5,6,11,12)})");
+ }
+
+ @Test(expectedExceptions = FrontendException.class)
+ public void zipUnevenBagsExceptionTest() throws Exception {
+ PigTest test = createPigTestFromString(zipBagsTest);
+ writeLinesToFile("input", "{(1,2),(3,4),(5,6),(20,40)}\t{(7,8),(9,10)}");
+ test.runScript();
+ }
+
+ @Test
+ public void zipUnevenBagsTest() throws Exception {
+ PigTest test = createPigTestFromString(zipBagsTest);
+ writeLinesToFile("input", "{(1,2),(3,4),(5,6))}\t{(7,8),(9,10),(11,12),(1,2)}");
+ test.runScript();
+ assertOutput(test, "zipped", "({(1,2,7,8),(3,4,9,10),(5,6,11,12)})");
+ }
+
+ /**
+ DEFINE ZipBags datafu.pig.bags.ZipBags();
+
+ data = LOAD 'input' AS (B1: bag {T:tuple(a:INT,b:INT)}, B2: bag {U:tuple(a:INT,d:INT)});
+
+ describe data;
+
+ zipped = FOREACH data GENERATE ZipBags(B1,B2);
+
+ describe zipped;
+
+ STORE zipped INTO 'output';
+ */
+ @Multiline
+ private String duplicateAliasTest;
+
+ @Test(expectedExceptions = FrontendException.class)
+ public void duplicateAliasTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(duplicateAliasTest);
+ writeLinesToFile("input", "{(1,2),(3,4),(5,6)}\t{(7,8),(9,10),(11,12)}");
+ test.runScript();
+ }
+}
|