datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mha...@apache.org
Subject incubator-datafu git commit: DATAFU-117 CountDistinctUpTo
Date Tue, 28 Jun 2016 15:56:52 GMT
Repository: incubator-datafu
Updated Branches:
  refs/heads/master 27c1af403 -> 4519690c6


DATAFU-117 CountDistinctUpTo

Signed-off-by: Matthew Hayes <matthew.terence.hayes@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/4519690c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/4519690c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/4519690c

Branch: refs/heads/master
Commit: 4519690c65ab5243f63e8ccfd557b861b7dc65ef
Parents: 27c1af4
Author: eallweil <eallweil@paypal.com>
Authored: Wed Jun 8 16:45:37 2016 +0300
Committer: Matthew Hayes <matthew.terence.hayes@gmail.com>
Committed: Tue Jun 28 08:55:08 2016 -0700

----------------------------------------------------------------------
 .../java/datafu/pig/bags/CountDistinctUpTo.java | 289 +++++++++++++++++++
 .../java/datafu/test/pig/bags/BagTests.java     | 273 +++++++++++++-----
 2 files changed, 484 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4519690c/datafu-pig/src/main/java/datafu/pig/bags/CountDistinctUpTo.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/CountDistinctUpTo.java b/datafu-pig/src/main/java/datafu/pig/bags/CountDistinctUpTo.java
new file mode 100644
index 0000000..d707cd6
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/bags/CountDistinctUpTo.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.pig.bags;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+/**
+ * Generates a count of the number of distinct tuples in a bag, up to a preset limit.
+ * 
+ * If the number of distinct tuples is small, performance is comparable to using Pig's DISTINCT
and COUNT in a nested foreach.
+ * 
+ * Use this UDF when your threshold is low, and some records have a distinct count that is
much higher. In such cases this UDF will prevent memory problems
+ * and perform an order of magnitude faster than using pure Pig.
+ *
+ * Example:
+ * <pre>
+ * {@code
+ * DEFINE CountDistinctUpTo10 datafu.pig.bags.CountDistinctUpTo('10');
+ * DEFINE CountDistinctUpTo3 datafu.pig.bags.CountDistinctUpTo('3');
+ * 
+ * -- input: 
+ * -- {(A),(B),(D),(A),(C),(E),(A),(B),(A),(B)}
+ * input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY)});
+ * 
+ * -- output: 
+ * -- (5)
+ * output = FOREACH input GENERATE CountDistinctUpTo10(B); 
+ *
+ * -- output2: 
+ * -- (3)
+ * output2 = FOREACH input GENERATE CountDistinctUpTo3(B); 
+ * } 
+ * </pre>
+ */
+public class CountDistinctUpTo extends AccumulatorEvalFunc<Integer> implements Algebraic{
+
+	private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+	private static final BagFactory bagFactory = BagFactory.getInstance();
+
+	// for accumulator implementation
+	private Set<Tuple> set;
+	private final int max;
+	
+	public CountDistinctUpTo(String maxAmount) {
+		max = Integer.valueOf(maxAmount);
+		set = new HashSet<Tuple>(max);
+	}
+	
+	@Override
+	public void accumulate(Tuple tuple) throws IOException {
+		count(set, tuple, max, log);
+	}
+
+	/**
+	 * Counts tuples with a single level of embedding (input contains a bag containing tuples
with the original schema)
+	 * 
+	 * Used by both accumulator and algebraic implementations
+	 */
+	private static void count(Set<Tuple> set, Tuple input, int max, Log log) throws ExecException
{
+		if (set.size() == max) {
+			return; // don't bother looking at the rest of the input
+		}
+
+		if (input == null) {
+			return;
+		}
+
+		DataBag bag = (DataBag) input.get(0);
+
+		if (bag == null) {
+			return;
+		}
+
+		for (Tuple t : bag) {
+			if (set.add(t) && (set.size() == max)) {
+				return;
+			}
+		}
+	}
+
+	/**
+	 * Counts tuples with two levels of embedding (input contains a bag containing bags of tuples
with the original schema)
+	 * 
+	 * Used by the algebraic implementation. Returns null if the maximum was reached
+	 */
+	private static Set<Tuple> makeDistinctSet(Tuple input, int max, Log log) throws ExecException
{
+		Set<Tuple> set = new HashSet<Tuple>(max);
+
+		DataBag bag = (DataBag) input.get(0);
+		
+		for (Tuple t : bag) {
+			
+			// we've encountered a null value from the combiner, therefore the max has already been
reached (obviously, not used in the combiner itself)
+			if (t.get(0) == null) {
+				return null;
+			}
+			
+			count(set, t, max, log);
+			
+			// we've just now reached the maximum
+			if (set.size() == max) {
+				return null;
+			}
+		}
+
+		return set;
+	}
+	
+	@Override
+	public void cleanup() {
+		set.clear();
+	}
+
+	@Override
+	public Integer getValue() {
+		return set.size();
+	}
+
+	@Override
+	public String getInitial() {
+		return Initial.class.getName();
+	}
+
+	@Override
+	public String getIntermed() {
+		return Intermediate.class.getName();
+	}
+
+	@Override
+	public String getFinal() {
+		return Final.class.getName();
+	}
+
+	/**
+	 * Outputs a tuple containing a DataBag containing a single tuple T (the original schema)
or an empty bag
+	 * 
+	 *  T -> ({T})
+	 */
+	public static class Initial extends EvalFunc<Tuple> {
+
+		public Initial() {}
+		public Initial(String maxAmount) {}
+			
+		@Override
+		public Tuple exec(Tuple input) throws IOException {
+			DataBag inputBag = (DataBag) input.get(0);
+			Iterator<Tuple> it = inputBag.iterator();
+			DataBag outputBag = bagFactory.newDefaultBag();
+
+			if (it.hasNext()) {
+				Tuple t = (Tuple) it.next();
+				if ((t != null) && (t.size() > 0) && (t.get(0) != null)) {
+					outputBag.add(t);
+				}
+			}
+			
+			return tupleFactory.newTuple(outputBag);
+		}
+	}
+
+	/**
+	 * Receives a bag of bags, each containing a single tuple with the original input schema
T
+	 * 
+	 * Outputs a bag of distinct tuples each with the original schema T: {({T}),({T}),({T})}
-> ({T, T, T}) 
+	 * 
+	 * or if the maximum is reached, null: {({T}),({T}),({T}) ..} -> (null) 
+	 */
+	public static class Intermediate extends EvalFunc<Tuple> {
+
+		private final int max;
+
+		public Intermediate() {
+			this("0");
+		}
+		
+		public Intermediate(String maxAmount) {
+			max = Integer.valueOf(maxAmount);
+		}
+
+		@Override
+		public Tuple exec(Tuple input) throws IOException {
+			Set<Tuple> set = makeDistinctSet(input, max, log);
+
+			// this is the optimistic case, in which we already have enough distinct tuples in the
combiner to stop counting
+			if (set == null){
+				Tuple result = tupleFactory.newTuple(1);
+				result.set(0, null);
+				return result;
+			} 
+			
+			DataBag outputBag = bagFactory.newDefaultBag();
+	
+			for (Tuple t : set) {
+				outputBag.add(t);
+			}
+
+			return tupleFactory.newTuple(outputBag);
+		}
+	}
+
+	/**
+	 * Receives output either from initial results or intermediate
+	 * 
+	 * Outputs an integer with the number of distinct tuples, up to the maximum desired. 
+	 * 
+	 * {({T}),({T,T,T})} -> (3)
+	 * 
+	 * or
+	 * 
+	 * {({T}),({T,T,T}),(null)} -> (MAX)
+	 */
+	public static class Final extends EvalFunc<Integer> {
+
+		private final int max;
+
+		public Final() {
+			this("0");
+		}
+		
+		public Final(String maxAmount) {
+			max = Integer.valueOf(maxAmount);
+		}
+
+		@Override
+		public Integer exec(Tuple input) throws IOException {
+			Set<Tuple> set = makeDistinctSet(input, max, log);
+			
+			if (set == null) {
+				return max;
+			}
+			
+			return set.size();
+		}
+	}
+
+	@Override
+	public Schema outputSchema(Schema input) {
+
+		if (input.size() != 1) {
+			throw new RuntimeException("Expected a single field of type bag, but found " + input.size()
+ " fields");
+		}
+
+		FieldSchema field;
+		try {
+			field = input.getField(0);
+
+			if (field.type != DataType.BAG) {
+				throw new RuntimeException("Expected a bag but got: " + DataType.findTypeName(field.type));
+			}
+		} catch (FrontendException e) {
+			throw new RuntimeException(e);
+		}
+
+		return new Schema(new FieldSchema("CountDistinctUpTo", DataType.INTEGER));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4519690c/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index 28292db..bd0f918 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -19,6 +19,7 @@
 
 package datafu.test.pig.bags;
 
+import datafu.pig.bags.CountDistinctUpTo;
 import datafu.pig.bags.CountEach;
 import datafu.pig.bags.DistinctBy;
 import datafu.pig.bags.Enumerate;
@@ -28,6 +29,7 @@ import datafu.test.pig.PigTests;
 import junit.framework.Assert;
 
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.Accumulator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.SortedDataBag;
@@ -36,6 +38,7 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.pigunit.PigTest;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -971,63 +974,100 @@ result4 = FOREACH grouped GENERATE group AS a,TupleFromBag(data,0,$emptyTuple).b
     Assert.assertEquals("(11,51)", iter.next().toString());
   }
 
+  private void firstAccumulateForTests(Accumulator distinct) throws IOException {
+	    DataBag bag;
+	    Tuple input;
+	    Tuple data;
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 10);
+	    data.set(1, 20);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 11);
+	    data.set(1, 50);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 10);
+	    data.set(1, 22);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 12);
+	    data.set(1, 40);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 11);
+	    data.set(1, 50);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 11);
+	    data.set(1, 51);
+	    distinct.accumulate(input);
+}
+
+  private void secondAccumulateForTests(Accumulator distinct) throws IOException {
+	    DataBag bag;
+	    Tuple input;
+	    Tuple data;
+
+	    // do it again to test cleanup
+	    distinct.cleanup();
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 12);
+	    data.set(1, 42);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 11);
+	    data.set(1, 51);
+	    distinct.accumulate(input);
+
+	    data = TupleFactory.getInstance().newTuple(2);
+	    bag = BagFactory.getInstance().newDefaultBag();
+	    bag.add(data);
+	    input = TupleFactory.getInstance().newTuple(bag);
+	    data.set(0, 11);
+	    data.set(1, 50);
+	    distinct.accumulate(input);
+  }
+  
   @Test
   public void distinctByAccumulateTest() throws Exception
   {
     DistinctBy distinct = new DistinctBy("0");
 
-    DataBag bag;
-    Tuple input;
-    Tuple data;
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 10);
-    data.set(1, 20);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 11);
-    data.set(1, 50);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 10);
-    data.set(1, 22);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 12);
-    data.set(1, 40);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 11);
-    data.set(1, 50);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 11);
-    data.set(1, 51);
-    distinct.accumulate(input);
-
+    firstAccumulateForTests(distinct);
+    
     DataBag result = distinct.getValue();
 
     Assert.assertEquals(3, result.size());
@@ -1037,32 +1077,7 @@ result4 = FOREACH grouped GENERATE group AS a,TupleFromBag(data,0,$emptyTuple).b
     Assert.assertEquals("(11,50)", iter.next().toString());
     Assert.assertEquals("(12,40)", iter.next().toString());
 
-    // do it again to test cleanup
-    distinct.cleanup();
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 12);
-    data.set(1, 42);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 11);
-    data.set(1, 51);
-    distinct.accumulate(input);
-
-    data = TupleFactory.getInstance().newTuple(2);
-    bag = BagFactory.getInstance().newDefaultBag();
-    bag.add(data);
-    input = TupleFactory.getInstance().newTuple(bag);
-    data.set(0, 11);
-    data.set(1, 50);
-    distinct.accumulate(input);
+    secondAccumulateForTests(distinct);
 
     result = distinct.getValue();
 
@@ -1247,6 +1262,108 @@ result4 = FOREACH grouped GENERATE group AS a,TupleFromBag(data,0,$emptyTuple).b
 
   /**
 
+  define CountDistinctUpTo3 datafu.pig.bags.CountDistinctUpTo('3');
+  define CountDistinctUpTo10 datafu.pig.bags.CountDistinctUpTo('10');
+
+  data = LOAD 'input' AS (bag1: bag {T: tuple(t1:chararray, t2:int)});
+
+  data2 = FOREACH data GENERATE CountDistinctUpTo3(bag1) as counted;
+
+  data3 = FOREACH data GENERATE CountDistinctUpTo10(bag1) as counted;
+
+  STORE data2 INTO 'output';
+
+   */
+  @Multiline
+  private String countDistinctUpToTest;
+
+  @Test
+  public void countDistinctUpToTest() throws Exception {
+    PigTest test = createPigTestFromString(countDistinctUpToTest);
+
+    writeLinesToFile("input", "({(A,0),(B,0),(D,0),(A,0),(C,0),(E,0),(A,0),(B,0),(A,0),(B,0)})");
+    test.runScript();
+
+    assertOutput(test, "data2", "(3)");
+    assertOutput(test, "data3", "(5)");
+    
+    writeLinesToFile("input", "({(A,0),(B,2),(D,0),(A,1),(C,3),(E,2),(A,1),(B,2),(A,0),(B,2),(E,0)})");
+    test.runScript();
+
+    assertOutput(test, "data2", "(3)");
+    assertOutput(test, "data3", "(7)");
+
+  }
+
+  @Test
+  public void countDistinctUpToAccumulatorTest() throws IOException
+  {
+    CountDistinctUpTo distinct = new CountDistinctUpTo("2");
+
+    firstAccumulateForTests(distinct);
+    
+    int result = distinct.getValue();
+
+    Assert.assertEquals(2, result);
+
+    secondAccumulateForTests(distinct);
+
+    result = distinct.getValue();
+
+    Assert.assertEquals(2, result);
+  }
+  
+  private void countDistinctUpToAlgebraic(String amount, int expected) throws IOException
+  {
+    CountDistinctUpTo.Initial initial = new CountDistinctUpTo.Initial(amount);
+    CountDistinctUpTo.Intermediate intermediate = new CountDistinctUpTo.Intermediate(amount);
+    CountDistinctUpTo.Final finalFunc = new CountDistinctUpTo.Final(amount);
+
+    DataBag intermediateBag = BagFactory.getInstance().newDefaultBag();
+    
+    for (int i=0; i<100; i++) {
+        DataBag innerBag = BagFactory.getInstance().newDefaultBag();
+        innerBag.add(TupleFactory.getInstance().newTuple((Object)(i % 20)));
+    	Tuple initialInput = TupleFactory.getInstance().newTuple(innerBag);
+    	intermediateBag.add(initial.exec(initialInput));
+    }
+
+    Tuple intermediateOutput = intermediate.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+    intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateOutput));
+    Integer result = finalFunc.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+
+    Assert.assertEquals(expected, result.intValue());
+    
+    intermediateBag = BagFactory.getInstance().newDefaultBag();
+    
+    for (int i=0; i<10; i++) {
+        DataBag innerBag = BagFactory.getInstance().newDefaultBag();
+        innerBag.add(TupleFactory.getInstance().newTuple((Object)(i % 3)));
+    	Tuple initialInput = TupleFactory.getInstance().newTuple(innerBag);
+    	intermediateBag.add(initial.exec(initialInput));
+    }
+
+    intermediateOutput = intermediate.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+    intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateOutput));
+    result = finalFunc.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+    
+    Assert.assertEquals(3, result.intValue());
+
+  }
+
+  @Test
+  public void countDistinctUpToAlgebraicTest() throws IOException
+  {
+	  // check flow in which Intermediate passes all the distinct tuples it finds
+	  countDistinctUpToAlgebraic("50", 20); 
+	  
+	  // check flow in which Intermediate passes the max count as a null in its result
+	  countDistinctUpToAlgebraic("5", 5);
+  }
+
+  /**
+
+
 
   define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();
 


Mime
View raw message