datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mha...@apache.org
Subject incubator-datafu git commit: DATAFU-79 Added UDF ZipBags which can zip and arbitrary number of bags into one
Date Thu, 27 Nov 2014 15:45:26 GMT
Repository: incubator-datafu
Updated Branches:
  refs/heads/master e7835c11f -> 1563535f2


DATAFU-79 Added UDF ZipBags which can zip and arbitrary number of bags into one

https://issues.apache.org/jira/browse/DATAFU-79

Signed-off-by: Matthew Hayes <matthew.terence.hayes@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/1563535f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/1563535f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/1563535f

Branch: refs/heads/master
Commit: 1563535f20cba433497664262717d861775c5f9b
Parents: e7835c1
Author: Aaron Josephs <ajoseph4@binghamton.edu>
Authored: Mon Sep 15 13:09:59 2014 -0400
Committer: Matthew Hayes <matthew.terence.hayes@gmail.com>
Committed: Thu Nov 27 07:44:16 2014 -0800

----------------------------------------------------------------------
 .../src/main/java/datafu/pig/bags/ZipBags.java  | 104 +++++++++++++++++++
 .../java/datafu/test/pig/bags/BagTests.java     |  20 ++--
 .../java/datafu/test/pig/bags/ZipBagsTests.java |  98 +++++++++++++++++
 3 files changed, 211 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/1563535f/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java b/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java
new file mode 100644
index 0000000..c9f652f
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/ZipBags.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This udf takes any number of bags and allows you to zip them into one bag
+ * with the tuples inside each bag concatenated to each other.
+ * Example:
+ * <pre>
+ * {@code
+ * -- input:
+ * -- ({(1,2),(3,4),(5,6)},{(7,8),(9,10),(11,12)})
+ * input = LOAD 'input' AS (OUTER: tuple(B1: bag {a:INT,b:INT}, B2: bag{c:INT,d:INT}));
+ *
+ * -- output:
+ * -- ({(1,2,7,8),(3,4,9,10),(5,6,11,12)})
+ * k
+ * output = FOREACH input GENERATE ZipBags(B1,B2);
+ * }
+ * </pre>
+ * For this to work as expected each bag should be the same length. It will run as long as
+ * the first bag is the shortest however this may not be the desired behavior.
+ */
+public class ZipBags extends EvalFunc<DataBag> {
+
+
+    @Override
+    public DataBag exec(Tuple input) throws IOException {
+        DataBag outputBag = new DefaultDataBag();
+        //All bags should have the same length for this to work as expected
+        List<Iterator<Tuple>> bagIterators = new ArrayList<Iterator<Tuple>>();
+        for (int i = 0; i < input.size(); ++i) {
+            Object obj = input.get(i);
+            if (obj instanceof DataBag) {
+                DataBag bag = (DataBag)obj;
+                bagIterators.add(bag.iterator());
+            }
+            else {
+                throw new IllegalArgumentException("Expected all fields to be bags");
+            }
+        }
+        while (bagIterators.get(0).hasNext()) {
+            Tuple nextTuple = new DefaultTuple();
+            for (Iterator<Tuple> iter : bagIterators) {
+                if (!iter.hasNext()) {
+                    throw new IllegalArgumentException("The first bag must be the shortest
one");
+                }
+                Tuple tempTuple = iter.next();
+                //This loop is to flatten the tuples stored in the DataBag
+                for (int i = 0; i < tempTuple.size(); ++i) nextTuple.append(tempTuple.get(i));
+            }
+            outputBag.add(nextTuple);
+        }
+        return outputBag;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        Schema bagTupleSchema = new Schema();
+        Set<String> aliasSet = new HashSet<String>();
+        for (FieldSchema schema : input.getFields()) { //Each field should be a bag
+            if (schema.schema == null) throw new RuntimeException("Inner bag schemas are
null");
+            for (FieldSchema innerBagTuple : schema.schema.getFields()) {
+                for (FieldSchema tupleField : innerBagTuple.schema.getFields()) {
+                    if (!aliasSet.add(tupleField.alias)) {
+                        throw new RuntimeException("Duplicate field alias specified");
+                    }
+                    bagTupleSchema.add(tupleField);
+                }
+            }
+        }
+        try {
+            return new Schema(new FieldSchema("zipped",bagTupleSchema, DataType.BAG));
+        } catch (FrontendException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/1563535f/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index 0eb07c7..9bcc384 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -19,14 +19,11 @@
 
 package datafu.test.pig.bags;
 
-import static org.testng.Assert.assertEquals;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
+import datafu.pig.bags.CountEach;
+import datafu.pig.bags.DistinctBy;
+import datafu.pig.bags.Enumerate;
+import datafu.test.pig.PigTests;
 import junit.framework.Assert;
-
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -35,10 +32,11 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.pigunit.PigTest;
 import org.testng.annotations.Test;
 
-import datafu.pig.bags.CountEach;
-import datafu.pig.bags.DistinctBy;
-import datafu.pig.bags.Enumerate;
-import datafu.test.pig.PigTests;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
 
 
 public class BagTests extends PigTests

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/1563535f/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java
new file mode 100644
index 0000000..43c6ace
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/ZipBagsTests.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.bags;
+
+import datafu.test.pig.PigTests;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+public class ZipBagsTests extends PigTests {
+
+    /**
+     DEFINE ZipBags datafu.pig.bags.ZipBags();
+
+     data = LOAD 'input' AS (B1: bag {T:tuple(a:INT,b:INT)}, B2: bag {U:tuple(c:INT,d:INT)});
+
+     dump data;
+
+     describe data;
+
+     zipped = FOREACH data GENERATE ZipBags(B1,B2);
+
+     describe zipped;
+
+     dump zipped;
+
+     STORE zipped INTO 'output';
+
+     */
+    @Multiline
+    private String zipBagsTest;
+
+    @Test
+    public void zipBagsTest() throws Exception
+    {
+        PigTest test = createPigTestFromString(zipBagsTest);
+        writeLinesToFile("input", "{(1,2),(3,4),(5,6)}\t{(7,8),(9,10),(11,12)}");
+        test.runScript();
+        assertOutput(test, "zipped", "({(1,2,7,8),(3,4,9,10),(5,6,11,12)})");
+    }
+
+    @Test(expectedExceptions = FrontendException.class)
+    public void zipUnevenBagsExceptionTest() throws Exception {
+        PigTest test = createPigTestFromString(zipBagsTest);
+        writeLinesToFile("input", "{(1,2),(3,4),(5,6),(20,40)}\t{(7,8),(9,10)}");
+        test.runScript();
+    }
+
+    @Test
+    public void zipUnevenBagsTest() throws Exception {
+        PigTest test = createPigTestFromString(zipBagsTest);
+        writeLinesToFile("input", "{(1,2),(3,4),(5,6))}\t{(7,8),(9,10),(11,12),(1,2)}");
+        test.runScript();
+        assertOutput(test, "zipped", "({(1,2,7,8),(3,4,9,10),(5,6,11,12)})");
+    }
+
+    /**
+     DEFINE ZipBags datafu.pig.bags.ZipBags();
+
+     data = LOAD 'input' AS (B1: bag {T:tuple(a:INT,b:INT)}, B2: bag {U:tuple(a:INT,d:INT)});
+
+     describe data;
+
+     zipped = FOREACH data GENERATE ZipBags(B1,B2);
+
+     describe zipped;
+
+     STORE zipped INTO 'output';
+     */
+    @Multiline
+    private String duplicateAliasTest;
+
+    @Test(expectedExceptions = FrontendException.class)
+    public void duplicateAliasTest() throws Exception
+    {
+        PigTest test = createPigTestFromString(duplicateAliasTest);
+        writeLinesToFile("input", "{(1,2),(3,4),(5,6)}\t{(7,8),(9,10),(11,12)}");
+        test.runScript();
+    }
+}


Mime
View raw message