datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mha...@apache.org
Subject datafu git commit: DATAFU-129 - New macro - Dedup
Date Thu, 11 Oct 2018 16:24:22 GMT
Repository: datafu
Updated Branches:
  refs/heads/master 1e69bf12d -> 857cf164c


DATAFU-129 - New macro - Dedup

Signed-off-by: Matthew Hayes <mhayes@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/datafu/commit/857cf164
Tree: http://git-wip-us.apache.org/repos/asf/datafu/tree/857cf164
Diff: http://git-wip-us.apache.org/repos/asf/datafu/diff/857cf164

Branch: refs/heads/master
Commit: 857cf164c30883d739c4895c9a9c758880526435
Parents: 1e69bf1
Author: Eyal Allweil <eyal@apache.org>
Authored: Thu Oct 11 16:09:53 2018 +0300
Committer: Matthew Hayes <mhayes@apache.org>
Committed: Thu Oct 11 09:20:27 2018 -0700

----------------------------------------------------------------------
 .../evaluation/ExtremalTupleByNthField.java     | 306 +++++++++++++++++++
 datafu-pig/src/main/resources/datafu/dedup.pig  |  37 +++
 .../java/datafu/test/pig/util/DedupTests.java   |  91 ++++++
 3 files changed, 434 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/datafu/blob/857cf164/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
b/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
new file mode 100644
index 0000000..e425417
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.org.apache.pig.piggybank.evaluation;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * This class is a copy of org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField
+ * 
+ * https://github.com/apache/pig/blob/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
+ */
+public class ExtremalTupleByNthField extends EvalFunc<Tuple> implements Algebraic,
Accumulator<Tuple> {
+	
+	/**
+	 * Indicates once for how many items progress heart beat should be sent.
+	 * This number has been increased from 10 to reduce verbosity.
+	 */
+	private static final int PROGRESS_FREQUENCY = 10000;
+	
+	int fieldIndex;
+	int sign;
+	
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * Constructors
+	 * 
+	 * @throws ExecException
+	 * 
+	 */
+	
+	// defaults to max by first field
+	public ExtremalTupleByNthField() throws ExecException {
+		this("1", "max");
+	}
+	
+	// defaults to max
+	public ExtremalTupleByNthField(String fieldIndexString)
+			throws ExecException {
+		this(fieldIndexString, "max");
+	}
+	
+	public ExtremalTupleByNthField(String fieldIndexString, String order)
+			throws ExecException {
+		super();
+		this.fieldIndex = parseFieldIndex(fieldIndexString);
+		this.sign = parseOrdering(order);
+	}
+	
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * The EvalFunc interface
+	 */
+	@Override
+	public Tuple exec(Tuple input) throws IOException {
+		return extreme(fieldIndex, sign, input, reporter);
+	}
+	
+	@Override
+	public Type getReturnType() {
+		return Tuple.class;
+	}
+	
+	public Schema outputSchema(Schema input) {
+		return input;
+	}
+	
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * Algebraic interface
+	 */
+	@Override
+	public String getInitial() {
+		return HelperClass.class.getName();
+	}
+	
+	@Override
+	public String getIntermed() {
+		return HelperClass.class.getName();
+	}
+	
+	@Override
+	public String getFinal() {
+		return HelperClass.class.getName();
+	}
+	
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * The Accumulator interface
+	 */
+	Tuple intermediate = null;
+	DataBag tempDb = BagFactory.getInstance().newDefaultBag();
+	Tuple parameterToExtreme = TupleFactory.getInstance().newTuple(tempDb);
+
+	@Override
+	public void accumulate(Tuple b) throws IOException {
+		try {
+			if (b != null) {
+				if (intermediate == null) {
+					// intermediate = b;
+					// make a shallow copy in case the Tuple was reused.
+					intermediate = TupleFactory.getInstance()
+							.newTuple(b.size());
+					for (int i = 0; i < b.size(); ++i) {
+						intermediate.set(i, b.get(i));
+					}
+				} else {
+					tempDb.clear();
+					tempDb.add(b);
+					tempDb.add(intermediate);
+					intermediate = extreme(fieldIndex, sign,
+							parameterToExtreme, reporter);
+				}
+			}// new result is null, don't consider it
+
+		} catch (ExecException ee) {
+			throw ee;
+		} catch (Exception e) {
+			int errCode = -1;
+			String msg = "Error while computing ExtremalTupleByNthField in "
+					+ this.getClass().getSimpleName();
+			throw new ExecException(msg, errCode, PigException.BUG, e);
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		intermediate = null;
+	}
+
+	@Override
+	public Tuple getValue() {
+		return intermediate; // could be null correctly
+	}
+
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * Utility classes and methods
+	 * 
+	 */
+	public static final class HelperClass extends EvalFunc<Tuple> {
+		int fieldIndex, sign;
+
+		public HelperClass() throws ExecException {
+			this("1", "max");
+		}
+
+		public HelperClass(String fieldIndexString) throws ExecException {
+			this(fieldIndexString, "max");
+		}
+
+		public HelperClass(String fieldIndexString, String order)
+				throws ExecException {
+
+			this.fieldIndex = parseFieldIndex(fieldIndexString);
+			this.sign = parseOrdering(order);
+		}
+
+		public Tuple exec(Tuple input) throws IOException {
+			return extreme(fieldIndex, sign, input, reporter);
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	protected final static Tuple extreme(int pind, int psign, Tuple input,
+			PigProgressable reporter) throws ExecException {
+	    if (input == null || input.size() == 0 || input.get(0) == null) {
+	        return null;
+	    }
+		DataBag values = (DataBag) input.get(0);
+
+		// if we were handed an empty bag, return NULL
+		// this is in compliance with SQL standard
+		if (values.size() == 0)
+			return null;
+
+		java.lang.Comparable curMax = null;
+		Tuple curMaxTuple = null;
+		int n = 0;
+		for (Tuple t : values) {
+			if (reporter != null && ++n % PROGRESS_FREQUENCY == 0)
+				reporter.progress();
+			if (t == null) {
+				// just in case.
+				continue;
+			}
+			try {
+				Object o = t.get(pind);
+				if (o == null) {
+					// if the comparison field is null it will never be
+					// returned, we won't even compare.
+					continue;
+				}
+
+				java.lang.Comparable d = (java.lang.Comparable) o;
+
+				if (curMax == null) {
+					curMax = d;
+					curMaxTuple = t;
+				} else {
+					/**
+					 * <pre>
+					 * c > 0 iff ((sign==1 && d>curMax) || (sign==-1 && d<curMax))
+					 * </pre>
+					 * 
+					 * In both case we want to replace curMax/curMaxTuple by the
+					 * new values
+					 * 
+					 **/
+					int c = psign * d.compareTo(curMax);
+					if (c > 0) {
+						curMax = d;
+						curMaxTuple = t;
+					}
+				}
+			} catch (ExecException ee) {
+				throw ee;
+			} catch (Exception e) {
+				int errCode = -1;
+				String msg = "Error while computing ExtremalTupleByNthField in ExtremalTupleByNthField,";
+				throw new ExecException(msg, errCode, PigException.ERROR, e);
+			}
+		}
+
+		return curMaxTuple;
+	}
+
+	protected static int parseFieldIndex(String inputFieldIndex)
+			throws ExecException {
+		// using a decrement to make sure that the subtraction happens correctly
+		int fieldIndex = Integer.valueOf(inputFieldIndex);
+
+		// to make fieldIndex 1-based instead of 0-based
+		--fieldIndex;
+		if (fieldIndex < 0) {
+			throw new ExecException("field index cannot be less than 1:"
+					+ inputFieldIndex, -1, PigException.ERROR, null);
+		}
+		return fieldIndex;
+	}
+
+	protected static int parseOrdering(String order) {
+		int sign = 1;
+		order = order.toLowerCase().trim();
+		if (order != null
+				&& (order.startsWith("min") || order.startsWith("desc")
+						|| order.startsWith("-") || order.startsWith("small") || order
+						.startsWith("least"))) {
+			sign = -1;
+		} else {
+			// either default to 1 by not specifying order(null) or it indicated
+			// "min" which is the string "min" the string "desc" or any string
+			// starting with a minus sign.
+			sign = 1;
+		}
+		return sign;
+	}
+}

http://git-wip-us.apache.org/repos/asf/datafu/blob/857cf164/datafu-pig/src/main/resources/datafu/dedup.pig
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/resources/datafu/dedup.pig b/datafu-pig/src/main/resources/datafu/dedup.pig
new file mode 100644
index 0000000..02c9e4f
--- /dev/null
+++ b/datafu-pig/src/main/resources/datafu/dedup.pig
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *  Used to take the most recent row of a table for a given key.
+ *
+ *	relation - relation to dedup
+ *	row_key - field(s) for group by
+ *	order_field - the field for ordering (to find the most recent record)
+ *
+ */
+DEFINE dedup(relation, row_key, order_field) returns out {
+
+	DEFINE argmax datafu.org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField('1', 'max');
+
+	with_max_field = FOREACH $relation GENERATE $order_field AS field_for_max, *;
+	grouped  = GROUP with_max_field BY $row_key;
+	max_only = FOREACH grouped  GENERATE argmax(with_max_field);
+	flattened = FOREACH max_only GENERATE FLATTEN($0);
+	$out = FOREACH flattened GENERATE $1 .. ;
+};

http://git-wip-us.apache.org/repos/asf/datafu/blob/857cf164/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java
new file mode 100644
index 0000000..7f51bba
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+
+public class DedupTests extends PigTests
+{
+    
+  /**
+  import 'datafu/dedup.pig';
+
+  data = LOAD 'input' AS (key: int, val: chararray, dt: chararray);
+
+  dedup_data = dedup(data, key, 'dt');
+
+  STORE dedup_data INTO 'output';
+   */
+  @Multiline
+  private String dedupTest;
+
+  @Test
+  public void dedupTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(dedupTest);
+
+    writeLinesToFile(	"input",
+    					"1\ta\t20140201",
+    					"2\td\t20140201",
+    					"2\td2\t20140301");
+
+    assertOutput(test, "dedup_data",
+			"(1,a,20140201)",
+			"(2,d2,20140301)");
+
+  }
+
+  /**
+  import 'datafu/dedup.pig';
+
+  data = LOAD 'input' AS (key: int, val: chararray, dt: chararray);
+
+  dedup_data = dedup(data, '(key,val)', 'dt');
+
+  STORE dedup_data INTO 'output';
+   */
+  @Multiline
+  private String dedupWithMultipleKeysTest;
+
+  @Test
+  public void dedupWithMultipleKeysTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(dedupWithMultipleKeysTest);
+
+    writeLinesToFile(	"input",
+    					"1\ta\t20140201",
+    					"1\td\t20140202",
+    					"2\tb\t20110201",
+    					"2\tb\t20140201",
+    					"3\tc\t20160201" );
+
+    assertOutput(test, "dedup_data",
+			"(1,a,20140201)",
+			"(1,d,20140202)",
+			"(2,b,20140201)",
+			"(3,c,20160201)");
+  }
+
+}


Mime
View raw message