Repository: incubator-datafu
Updated Branches:
refs/heads/master 83e1f5411 -> fe9b86eaf
DATAFU-70: added generic BagJoin that supports inner, left, and full outer joins
https://issues.apache.org/jira/browse/DATAFU-70
Signed-off-by: Matthew Hayes <matthew.terence.hayes@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/598fd924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/598fd924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/598fd924
Branch: refs/heads/master
Commit: 598fd9247c042e273930c0c2d3c732d6b1173d74
Parents: 83e1f54
Author: jasonr <jasonr@netflix.com>
Authored: Mon Oct 20 11:54:38 2014 -0700
Committer: Matthew Hayes <matthew.terence.hayes@gmail.com>
Committed: Thu Nov 20 20:24:03 2014 -0800
----------------------------------------------------------------------
.../src/main/java/datafu/pig/bags/BagJoin.java | 310 +++++++++++++++++++
.../java/datafu/pig/bags/BagLeftOuterJoin.java | 194 +-----------
.../java/datafu/test/pig/bags/BagTests.java | 38 +++
3 files changed, 350 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/598fd924/datafu-pig/src/main/java/datafu/pig/bags/BagJoin.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/BagJoin.java b/datafu-pig/src/main/java/datafu/pig/bags/BagJoin.java
new file mode 100644
index 0000000..1bad955
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/BagJoin.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.bags;
+
+import datafu.pig.util.AliasableEvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.*;
+
+/**
+ * Performs an in-memory join across multiple bags.
+ *
+ * <p>
+ * The format for invocation is BagJoin(bag, 'key',....).
+ * This UDF expects that all bags are non-null and that there is a corresponding key for
each bag.
+ * The <em>key</em> that is expected is the alias of the key inside of the preceding
bag. By default, an inner
+ * join is performed. You can also perform 'left' and 'full' outer joins by specifying 'left'
or 'full' in the
+ * definition.
+ * </p>
+ *
+ * <p>
+ * Example:
+ * <code>
+ * define BagJoin datafu.pig.bags.BagJoin();
+ *
+ * -- describe data:
+ * -- data: {bag1: {(key1: chararray,value1: chararray)},bag2: {(key2: chararray,value2:
int)}}
+ *
+ * bag_joined = FOREACH data GENERATE BagJoin(bag1, 'key1', bag2, 'key2') as joined;
+ *
+ * -- describe bag_joined:
+ * -- bag_joined: {joined: {(bag1::key1: chararray, bag1::value1: chararray, bag2::key2:
chararray, bag2::value2: int)}}
+ * </code>
+ * </p>
+ *
+ * @author wvaughan
+ *
+ */
+public class BagJoin extends AliasableEvalFunc<DataBag>
+{
+
+ private static final String BAG_NAMES_PROPERTY = "BagFullOuterJoin_BAG_NAMES";
+ private static final String BAG_NAME_TO_JOIN_PREFIX_PROPERTY = "BagFullOuterJoin_BAG_NAME_TO_JOIN_PREFIX";
+ private static final String BAG_NAME_TO_SIZE_PROPERTY = "BagFullOuterJoin_BAG_NAME_TO_SIZE_PROPERTY";
+
+ private final JoinType joinType;
+
+ ArrayList<String> bagNames;
+ Map<String, String> bagNameToJoinKeyPrefix;
+ Map<String, Integer> bagNameToSize;
+
+
+ public enum JoinType { INNER,LEFT,FULL }
+
+ public BagJoin() {
+ this.joinType = JoinType.INNER;
+ }
+
+ public BagJoin(String joinType) {
+ if ("left".equals(joinType.toLowerCase())) {
+ this.joinType = JoinType.LEFT;
+ } else if ("full".equals(joinType.toLowerCase())) {
+ this.joinType = JoinType.FULL;
+ } else {
+ throw new IllegalArgumentException("Invalid constructor argument. Valid values
are 'left' or 'full', found: " + joinType);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void retrieveContextValues()
+ {
+ Properties properties = getInstanceProperties();
+ bagNames = (ArrayList<String>) properties.get(BAG_NAMES_PROPERTY);
+ bagNameToJoinKeyPrefix = (Map<String, String>) properties.get(BAG_NAME_TO_JOIN_PREFIX_PROPERTY);
+ bagNameToSize = (Map<String, Integer>) properties.get(BAG_NAME_TO_SIZE_PROPERTY);
+ }
+
+ class JoinCollector
+ {
+ private final JoinType joinType;
+
+ Set<Object> keys;
+ HashMap<Object, List<Tuple>> data;
+ int tupleSize = 0;
+
+ public JoinCollector(JoinType joinType) {
+ this.joinType = joinType;
+ }
+
+ public void printData() throws ExecException {
+ for (Entry<Object, List<Tuple>> entry : data.entrySet()) {
+ System.out.println(entry.getKey());
+ for (Tuple t : entry.getValue()) {
+ System.out.println("\t"+t.toDelimitedString(", "));
+ }
+ }
+ }
+
+ public HashMap<Object, List<Tuple>> join(Iterable<Tuple> tuples, String
keyName, int tupleSize) throws ExecException {
+
+ // if this is the first list of tuples, then just add them all
+ if (this.data == null) {
+ this.data = new HashMap<>();
+ this.keys = new HashSet<>();
+ for (Tuple tuple : tuples) {
+ Object key = getObject(tuple, keyName);
+ if (!this.data.containsKey(key)) {
+ this.data.put(key, new ArrayList<Tuple>());
+
+ // add this key to our set of keys for convenience
+ this.keys.add(key);
+ }
+ this.data.get(key).add(tuple);
+ }
+
+ this.tupleSize = tupleSize;
+ return data;
+ }
+
+ // otherwise, join
+ HashMap<Object, List<Tuple>> joinedData = new HashMap<>(this.data.size());
+ Set<Object> joinedKeys = new HashSet<>();
+ for (Tuple tuple : tuples) {
+ Object key = getObject(tuple, keyName);
+ if (data.containsKey(key)) {
+ if (!joinedData.containsKey(key)) {
+ joinedData.put(key, new ArrayList<Tuple>());
+ joinedKeys.add(key);
+ }
+ for (Tuple t : this.data.get(key)) {
+ Tuple joinedTuple = TupleFactory.getInstance().newTuple();
+ for (Object o : t.getAll()) {
+ joinedTuple.append(o);
+ }
+ for (Object o : tuple.getAll()) {
+ joinedTuple.append(o);
+ }
+ joinedData.get(key).add(joinedTuple);
+ }
+
+ // remove from the previous key set for dealing with left join
+ this.keys.remove(key);
+
+ } else if (this.joinType == JoinType.FULL) {
+ if (!joinedData.containsKey(key)) {
+ joinedData.put(key, new ArrayList<Tuple>());
+ joinedKeys.add(key);
+ }
+ Tuple nullTuple = TupleFactory.getInstance().newTuple(this.tupleSize);
+ for (Object o : tuple.getAll()) {
+ nullTuple.append(o);
+ }
+ joinedData.get(key).add(nullTuple);
+ }
+ }
+
+ // process the keys that did not join
+ if (this.joinType == JoinType.LEFT || this.joinType == JoinType.FULL) {
+ for (Object key : this.keys) {
+ // join with a null tuple
+ if (!joinedData.containsKey(key)) {
+ joinedData.put(key, new ArrayList<Tuple>());
+ joinedKeys.add(key);
+ }
+ for (Tuple t : this.data.get(key)) {
+ Tuple joinedTuple = TupleFactory.getInstance().newTuple();
+ for (Object o : t.getAll()) {
+ joinedTuple.append(o);
+ }
+ for (int i = 0; i < tupleSize; i++) {
+ joinedTuple.append(null);
+ }
+ joinedData.get(key).add(joinedTuple);
+ }
+ }
+ }
+
+ this.data = joinedData;
+ this.keys = joinedKeys;
+ this.tupleSize += tupleSize;
+
+ return this.data;
+ }
+
+ public HashMap<Object, List<Tuple>> getJoinData() {
+ return this.data;
+ }
+
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ retrieveContextValues();
+
+ HashMap<String,String> joinKeyNames = new HashMap<>();
+ for (int i = 1; i < input.size(); i += 2) {
+ joinKeyNames.put(bagNames.get(i / 2), (String) input.get(i));
+ }
+
+ JoinCollector collector = new JoinCollector(this.joinType);
+
+ for (String bagName: bagNames) {
+ DataBag bag = getBag(input, bagName);
+ String joinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(bagName), joinKeyNames.get(bagName));
+ int tupleSize = bagNameToSize.get(bagName);
+ if (bag == null) throw new IOException("Error in instance: "+getInstanceName()
+ + " with properties: " + getInstanceProperties()
+ + " and tuple: " + input.toDelimitedString(", ")
+ + " -- Expected bag, got null");
+
+ collector.join(bag, joinKeyName, tupleSize);
+ }
+
+ // assemble output bag
+ DataBag outputBag = BagFactory.getInstance().newDefaultBag();
+ for (List<Tuple> tuples : collector.getJoinData().values()) {
+ for (Tuple tuple : tuples) {
+ outputBag.add(tuple);
+ }
+ }
+
+ return outputBag;
+ }
+
+ @Override
+ public Schema getOutputSchema(Schema input)
+ {
+ ArrayList<String> bagNames = new ArrayList<String>(input.size() / 2);
+ Map<String, String> bagNameToJoinPrefix = new HashMap<String, String>(input.size()
/ 2);
+ Map<String, Integer> bagNameToSize = new HashMap<String, Integer>(input.size()
/ 2);
+ Schema outputSchema = null;
+ Schema bagSchema = new Schema();
+ try {
+ int i = 0;
+ // all even fields should be bags, odd fields are key names
+ String bagName = null;
+ String tupleName = null;
+ for (FieldSchema outerField : input.getFields()) {
+ if (i++ % 2 == 1)
+ continue;
+ bagName = outerField.alias;
+ bagNames.add(bagName);
+ if (bagName == null)
+ bagName = "null";
+ if (outerField.schema == null)
+ throw new RuntimeException("Expected input format of (bag, 'field') pairs. "
+ +"Did not receive a bag at index: "+i+", alias: "+bagName+". "
+ +"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString());
+ FieldSchema tupleField = outerField.schema.getField(0);
+ tupleName = tupleField.alias;
+ bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName));
+ if (tupleField.schema == null) {
+ log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName,
bagName));
+ } else {
+ bagNameToSize.put(bagName, tupleField.schema.size());
+ for (FieldSchema innerField : tupleField.schema.getFields()) {
+ String innerFieldName = innerField.alias;
+ if (innerFieldName == null)
+ innerFieldName = "null";
+ String outputFieldName = bagName + "::" + innerFieldName;
+ if (innerField.schema != null) {
+ bagSchema.add(new FieldSchema(outputFieldName, innerField.schema, innerField.type));
+ } else {
+ bagSchema.add(new FieldSchema(outputFieldName, innerField.type));
+ }
+ }
+ }
+ }
+ outputSchema = new Schema(new Schema.FieldSchema(
+ getSchemaName(this.getClass().getName().toLowerCase(), input),
+ bagSchema,
+ DataType.BAG));
+ log.debug("output schema: "+outputSchema.toString());
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ Properties properties = getInstanceProperties();
+ properties.put(BAG_NAMES_PROPERTY, bagNames);
+ properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix);
+ properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize);
+ return outputSchema;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/598fd924/datafu-pig/src/main/java/datafu/pig/bags/BagLeftOuterJoin.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/BagLeftOuterJoin.java b/datafu-pig/src/main/java/datafu/pig/bags/BagLeftOuterJoin.java
index ba6bc11..501af56 100644
--- a/datafu-pig/src/main/java/datafu/pig/bags/BagLeftOuterJoin.java
+++ b/datafu-pig/src/main/java/datafu/pig/bags/BagLeftOuterJoin.java
@@ -67,201 +67,11 @@ import datafu.pig.util.AliasableEvalFunc;
* @author wvaughan
*
*/
-public class BagLeftOuterJoin extends AliasableEvalFunc<DataBag>
+public class BagLeftOuterJoin extends BagJoin
{
-
- private static final String BAG_NAMES_PROPERTY = "BagLeftOuterJoin_BAG_NAMES";
- private static final String BAG_NAME_TO_JOIN_PREFIX_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_JOIN_PREFIX";
- private static final String BAG_NAME_TO_SIZE_PROPERTY = "BagLeftOuterJoin_BAG_NAME_TO_SIZE_PROPERTY";
-
- ArrayList<String> bagNames;
- Map<String, String> bagNameToJoinKeyPrefix;
- Map<String, Integer> bagNameToSize;
public BagLeftOuterJoin() {
-
- }
-
- @SuppressWarnings("unchecked")
- private void retrieveContextValues()
- {
- Properties properties = getInstanceProperties();
- bagNames = (ArrayList<String>) properties.get(BAG_NAMES_PROPERTY);
- bagNameToJoinKeyPrefix = (Map<String, String>) properties.get(BAG_NAME_TO_JOIN_PREFIX_PROPERTY);
- bagNameToSize = (Map<String, Integer>) properties.get(BAG_NAME_TO_SIZE_PROPERTY);
- }
-
- class JoinCollector
- {
- HashMap<Object, List<Tuple>> joinData;
-
- public void printJoinData() throws ExecException {
- printData(joinData);
- }
-
- public void printData(HashMap<Object, List<Tuple>> data) throws ExecException
{
- for (Object o : data.keySet()) {
- System.out.println(o);
- for (Tuple t : data.get(o)) {
- System.out.println("\t"+t.toDelimitedString(", "));
- }
- }
- }
-
- public HashMap<Object, List<Tuple>> groupTuples(Iterable<Tuple> tuples,
String keyName) throws ExecException {
- HashMap<Object, List<Tuple>> group = new HashMap<Object, List<Tuple>>();
- for (Tuple tuple : tuples) {
- Object key = getObject(tuple, keyName);
- if (!group.containsKey(key)) {
- group.put(key, new LinkedList<Tuple>());
- }
- group.get(key).add(tuple);
- }
- return group;
- }
-
- public HashMap<Object, List<Tuple>> insertNullTuples(HashMap<Object, List<Tuple>>
groupedData, int tupleSize) throws ExecException {
- Tuple nullTuple = TupleFactory.getInstance().newTuple(tupleSize);
- for (int i=0; i<tupleSize; i++) {
- nullTuple.set(i, null);
- }
- for (Object key : joinData.keySet()) {
- if (!groupedData.containsKey(key)) {
- groupedData.put(key, Collections.singletonList(nullTuple));
- }
- }
- return groupedData;
- }
-
- public void joinTuples(Object key, List<Tuple> tuples) throws ExecException {
- List<Tuple> currentTuples = joinData.get(key);
- if (currentTuples != null) {
- List<Tuple> newTuples = new LinkedList<Tuple>();
- if (tuples != null) {
- for (Tuple t1 : currentTuples) {
- for (Tuple t2 : tuples) {
- Tuple t = TupleFactory.getInstance().newTuple();
- for (Object o : t1.getAll()) {
- t.append(o);
- }
- for (Object o : t2.getAll()) {
- t.append(o);
- }
- newTuples.add(t);
- }
- }
- }
- joinData.put(key, newTuples);
- }
- }
-
- public HashMap<Object, List<Tuple>> getJoinData() {
- return this.joinData;
- }
-
- public void setJoinData(HashMap<Object, List<Tuple>> joinData) {
- this.joinData = joinData;
- }
- }
-
- @Override
- public DataBag exec(Tuple input) throws IOException
- {
- retrieveContextValues();
-
- ArrayList<String> joinKeyNames = new ArrayList<String>();
- for (int i = 1; i < input.size(); i += 2) {
- joinKeyNames.add((String) input.get(i));
- }
-
- JoinCollector collector = new JoinCollector();
- // the first bag is the outer bag
- String leftBagName = bagNames.get(0);
- DataBag leftBag = getBag(input, leftBagName);
- String leftBagJoinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(leftBagName),
joinKeyNames.get(0));
- collector.setJoinData(collector.groupTuples(leftBag, leftBagJoinKeyName));
- // now, for each additional bag, group up the tuples by the join key, then join them
in
- if (bagNames.size() > 1) {
- for (int i = 1; i < bagNames.size(); i++) {
- String bagName = bagNames.get(i);
- DataBag bag = getBag(input, bagName);
- String joinKeyName = getPrefixedAliasName(bagNameToJoinKeyPrefix.get(bagName), joinKeyNames.get(i));
- int tupleSize = bagNameToSize.get(bagName);
- if (bag == null) throw new IOException("Error in instance: "+getInstanceName()
- + " with properties: " + getInstanceProperties()
- + " and tuple: " + input.toDelimitedString(", ")
- + " -- Expected bag, got null");
- HashMap<Object, List<Tuple>> groupedData = collector.groupTuples(bag,
joinKeyName);
- // outer join, so go back in and add nulls;
- groupedData = collector.insertNullTuples(groupedData, tupleSize);
- for (Map.Entry<Object, List<Tuple>> entry : groupedData.entrySet()) {
- collector.joinTuples(entry.getKey(), entry.getValue());
- }
- }
- }
-
- // assemble output bag
- DataBag outputBag = BagFactory.getInstance().newDefaultBag();
- for (List<Tuple> tuples : collector.getJoinData().values()) {
- for (Tuple tuple : tuples) {
- outputBag.add(tuple);
- }
- }
-
- return outputBag;
- }
-
- @Override
- public Schema getOutputSchema(Schema input)
- {
- ArrayList<String> bagNames = new ArrayList<String>(input.size() / 2);
- Map<String, String> bagNameToJoinPrefix = new HashMap<String, String>(input.size()
/ 2);
- Map<String, Integer> bagNameToSize = new HashMap<String, Integer>(input.size()
/ 2);
- Schema outputSchema = null;
- Schema bagSchema = new Schema();
- try {
- int i = 0;
- // all even fields should be bags, odd fields are key names
- String bagName = null;
- String tupleName = null;
- for (FieldSchema outerField : input.getFields()) {
- if (i++ % 2 == 1)
- continue;
- bagName = outerField.alias;
- bagNames.add(bagName);
- if (bagName == null)
- bagName = "null";
- if (outerField.schema == null)
- throw new RuntimeException("Expected input format of (bag, 'field') pairs. "
- +"Did not receive a bag at index: "+i+", alias: "+bagName+". "
- +"Instead received type: "+DataType.findTypeName(outerField.type)+" in schema:"+input.toString());
- FieldSchema tupleField = outerField.schema.getField(0);
- tupleName = tupleField.alias;
- bagNameToJoinPrefix.put(bagName, getPrefixedAliasName(outerField.alias, tupleName));
- if (tupleField.schema == null) {
- log.error(String.format("could not get schema for inner tuple %s in bag %s", tupleName,
bagName));
- } else {
- bagNameToSize.put(bagName, tupleField.schema.size());
- for (FieldSchema innerField : tupleField.schema.getFields()) {
- String innerFieldName = innerField.alias;
- if (innerFieldName == null)
- innerFieldName = "null";
- String outputFieldName = bagName + "::" + innerFieldName;
- bagSchema.add(new FieldSchema(outputFieldName, innerField.type));
- }
- }
- }
- outputSchema = new Schema(new Schema.FieldSchema("joined", bagSchema, DataType.BAG));
- log.debug("output schema: "+outputSchema.toString());
- } catch (FrontendException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- Properties properties = getInstanceProperties();
- properties.put(BAG_NAMES_PROPERTY, bagNames);
- properties.put(BAG_NAME_TO_JOIN_PREFIX_PROPERTY, bagNameToJoinPrefix);
- properties.put(BAG_NAME_TO_SIZE_PROPERTY, bagNameToSize);
- return outputSchema;
+ super("left");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/598fd924/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index 6d21dbe..427285b 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -1156,6 +1156,44 @@ public class BagTests extends PigTests
"(1,{(K1,A1,K1,A2,K1,A3),(K2,B1,K2,B2,,),(K2,B1,K2,B22,,),(K3,C1,,,K3,C3)},{(K1,A1,K1,A3,K1,A2),(K2,B1,,,K2,B2),(K2,B1,,,K2,B22),(K3,C1,K3,C3,,)})");
}
+ /**
+
+
+ define BagFullOuterJoin datafu.pig.bags.BagJoin('full');
+
+ data = LOAD 'input' AS (outer_key:chararray, bag1:bag{T:tuple(k:chararray,v:chararray)},
bag2:bag{T:tuple(k:chararray,v:chararray)}, bag3:bag{T:tuple(k3:chararray,v3:chararray)});
+ describe data;
+
+ data2 = FOREACH data GENERATE
+ outer_key,
+ BagFullOuterJoin(bag1, 'k', bag2, 'k', bag3, 'k3') as joined1,
+ BagFullOuterJoin(bag1, 'k', bag3, 'k3', bag2, 'k') as joined2; --this will break without
UDF signature and pig < 0.11
+ describe data2;
+
+ STORE data2 INTO 'output';
+
+ */
+ @Multiline
+ private String bagJoinFullOuterTest;
+
+ @Test
+ public void bagJoinFullOuterTest() throws Exception {
+ PigTest test = createPigTestFromString(bagJoinFullOuterTest);
+
+ writeLinesToFile("input",
+ "1\t{(K1,A1),(K2,B1),(K3,C1)}\t{(K1,A2),(K2,B2),(K2,B22)}\t{(K1,A3),(K3,C3),(K4,D3)}");
+
+ try {
+ test.runScript();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+
+ assertOutput(test, "data2",
+ "(1,{(K1,A1,K1,A2,K1,A3),(K2,B1,K2,B2,,),(K2,B1,K2,B22,,),(K3,C1,,,K3,C3),(,,,,K4,D3)},{(K1,A1,K1,A3,K1,A2),(K2,B1,,,K2,B2),(K2,B1,,,K2,B22),(K3,C1,K3,C3,,),(,,K4,D3,,)})");
+ }
+
/**
|