Repository: datafu
Updated Branches:
refs/heads/master 1e69bf12d -> 857cf164c
DATAFU-129 - New macro - Dedup
Signed-off-by: Matthew Hayes <mhayes@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/datafu/commit/857cf164
Tree: http://git-wip-us.apache.org/repos/asf/datafu/tree/857cf164
Diff: http://git-wip-us.apache.org/repos/asf/datafu/diff/857cf164
Branch: refs/heads/master
Commit: 857cf164c30883d739c4895c9a9c758880526435
Parents: 1e69bf1
Author: Eyal Allweil <eyal@apache.org>
Authored: Thu Oct 11 16:09:53 2018 +0300
Committer: Matthew Hayes <mhayes@apache.org>
Committed: Thu Oct 11 09:20:27 2018 -0700
----------------------------------------------------------------------
.../evaluation/ExtremalTupleByNthField.java | 306 +++++++++++++++++++
datafu-pig/src/main/resources/datafu/dedup.pig | 37 +++
.../java/datafu/test/pig/util/DedupTests.java | 91 ++++++
3 files changed, 434 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/datafu/blob/857cf164/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
b/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
new file mode 100644
index 0000000..e425417
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package datafu.org.apache.pig.piggybank.evaluation;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * This class is a copy of org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField
+ *
+ * https://github.com/apache/pig/blob/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
+ */
+public class ExtremalTupleByNthField extends EvalFunc<Tuple> implements Algebraic,
Accumulator<Tuple> {
+
+ /**
+ * Indicates once for how many items progress heart beat should be sent.
+ * This number has been increased from 10 to reduce verbosity.
+ */
+ private static final int PROGRESS_FREQUENCY = 10000;
+
+ int fieldIndex;
+ int sign;
+
+ /**
+ *
+ *
+ *
+ *
+ *
+ * Constructors
+ *
+ * @throws ExecException
+ *
+ */
+
+ // defaults to max by first field
+ public ExtremalTupleByNthField() throws ExecException {
+ this("1", "max");
+ }
+
+ // defaults to max
+ public ExtremalTupleByNthField(String fieldIndexString)
+ throws ExecException {
+ this(fieldIndexString, "max");
+ }
+
+ public ExtremalTupleByNthField(String fieldIndexString, String order)
+ throws ExecException {
+ super();
+ this.fieldIndex = parseFieldIndex(fieldIndexString);
+ this.sign = parseOrdering(order);
+ }
+
+ /**
+ *
+ *
+ *
+ *
+ *
+ * The EvalFunc interface
+ */
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return extreme(fieldIndex, sign, input, reporter);
+ }
+
+ @Override
+ public Type getReturnType() {
+ return Tuple.class;
+ }
+
+ public Schema outputSchema(Schema input) {
+ return input;
+ }
+
+ /**
+ *
+ *
+ *
+ *
+ *
+ * Algebraic interface
+ */
+ @Override
+ public String getInitial() {
+ return HelperClass.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return HelperClass.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return HelperClass.class.getName();
+ }
+
+ /**
+ *
+ *
+ *
+ *
+ *
+ * The Accumulator interface
+ */
+ Tuple intermediate = null;
+ DataBag tempDb = BagFactory.getInstance().newDefaultBag();
+ Tuple parameterToExtreme = TupleFactory.getInstance().newTuple(tempDb);
+
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try {
+ if (b != null) {
+ if (intermediate == null) {
+ // intermediate = b;
+ // make a shallow copy in case the Tuple was reused.
+ intermediate = TupleFactory.getInstance()
+ .newTuple(b.size());
+ for (int i = 0; i < b.size(); ++i) {
+ intermediate.set(i, b.get(i));
+ }
+ } else {
+ tempDb.clear();
+ tempDb.add(b);
+ tempDb.add(intermediate);
+ intermediate = extreme(fieldIndex, sign,
+ parameterToExtreme, reporter);
+ }
+ }// new result is null, don't consider it
+
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = -1;
+ String msg = "Error while computing ExtremalTupleByNthField in "
+ + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ intermediate = null;
+ }
+
+ @Override
+ public Tuple getValue() {
+ return intermediate; // could be null correctly
+ }
+
+ /**
+ *
+ *
+ *
+ *
+ * Utility classes and methods
+ *
+ */
+ public static final class HelperClass extends EvalFunc<Tuple> {
+ int fieldIndex, sign;
+
+ public HelperClass() throws ExecException {
+ this("1", "max");
+ }
+
+ public HelperClass(String fieldIndexString) throws ExecException {
+ this(fieldIndexString, "max");
+ }
+
+ public HelperClass(String fieldIndexString, String order)
+ throws ExecException {
+
+ this.fieldIndex = parseFieldIndex(fieldIndexString);
+ this.sign = parseOrdering(order);
+ }
+
+ public Tuple exec(Tuple input) throws IOException {
+ return extreme(fieldIndex, sign, input, reporter);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ protected final static Tuple extreme(int pind, int psign, Tuple input,
+ PigProgressable reporter) throws ExecException {
+ if (input == null || input.size() == 0 || input.get(0) == null) {
+ return null;
+ }
+ DataBag values = (DataBag) input.get(0);
+
+ // if we were handed an empty bag, return NULL
+ // this is in compliance with SQL standard
+ if (values.size() == 0)
+ return null;
+
+ java.lang.Comparable curMax = null;
+ Tuple curMaxTuple = null;
+ int n = 0;
+ for (Tuple t : values) {
+ if (reporter != null && ++n % PROGRESS_FREQUENCY == 0)
+ reporter.progress();
+ if (t == null) {
+ // just in case.
+ continue;
+ }
+ try {
+ Object o = t.get(pind);
+ if (o == null) {
+ // if the comparison field is null it will never be
+ // returned, we won't even compare.
+ continue;
+ }
+
+ java.lang.Comparable d = (java.lang.Comparable) o;
+
+ if (curMax == null) {
+ curMax = d;
+ curMaxTuple = t;
+ } else {
+ /**
+ * <pre>
+ * c > 0 iff ((sign==1 && d>curMax) || (sign==-1 && d<curMax))
+ * </pre>
+ *
+ * In both case we want to replace curMax/curMaxTuple by the
+ * new values
+ *
+ **/
+ int c = psign * d.compareTo(curMax);
+ if (c > 0) {
+ curMax = d;
+ curMaxTuple = t;
+ }
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = -1;
+ String msg = "Error while computing ExtremalTupleByNthField in ExtremalTupleByNthField,";
+ throw new ExecException(msg, errCode, PigException.ERROR, e);
+ }
+ }
+
+ return curMaxTuple;
+ }
+
+ protected static int parseFieldIndex(String inputFieldIndex)
+ throws ExecException {
+ // using a decrement to make sure that the subtraction happens correctly
+ int fieldIndex = Integer.valueOf(inputFieldIndex);
+
+ // to make fieldIndex 1-based instead of 0-based
+ --fieldIndex;
+ if (fieldIndex < 0) {
+ throw new ExecException("field index cannot be less than 1:"
+ + inputFieldIndex, -1, PigException.ERROR, null);
+ }
+ return fieldIndex;
+ }
+
+ protected static int parseOrdering(String order) {
+ int sign = 1;
+ order = order.toLowerCase().trim();
+ if (order != null
+ && (order.startsWith("min") || order.startsWith("desc")
+ || order.startsWith("-") || order.startsWith("small") || order
+ .startsWith("least"))) {
+ sign = -1;
+ } else {
+ // either default to 1 by not specifying order(null) or it indicated
+ // "min" which is the string "min" the string "desc" or any string
+ // starting with a minus sign.
+ sign = 1;
+ }
+ return sign;
+ }
+}
http://git-wip-us.apache.org/repos/asf/datafu/blob/857cf164/datafu-pig/src/main/resources/datafu/dedup.pig
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/resources/datafu/dedup.pig b/datafu-pig/src/main/resources/datafu/dedup.pig
new file mode 100644
index 0000000..02c9e4f
--- /dev/null
+++ b/datafu-pig/src/main/resources/datafu/dedup.pig
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Used to take the most recent row of a table for a given key.
+ *
+ * relation - relation to dedup
+ * row_key - field(s) for group by
+ * order_field - the field for ordering (to find the most recent record)
+ *
+ */
+DEFINE dedup(relation, row_key, order_field) returns out {
+
+ DEFINE argmax datafu.org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField('1', 'max');
+
+ with_max_field = FOREACH $relation GENERATE $order_field AS field_for_max, *;
+ grouped = GROUP with_max_field BY $row_key;
+ max_only = FOREACH grouped GENERATE argmax(with_max_field);
+ flattened = FOREACH max_only GENERATE FLATTEN($0);
+ $out = FOREACH flattened GENERATE $1 .. ;
+};
http://git-wip-us.apache.org/repos/asf/datafu/blob/857cf164/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java b/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java
new file mode 100644
index 0000000..7f51bba
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/util/DedupTests.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.util;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+
+public class DedupTests extends PigTests
+{
+
+ /**
+ import 'datafu/dedup.pig';
+
+ data = LOAD 'input' AS (key: int, val: chararray, dt: chararray);
+
+ dedup_data = dedup(data, key, 'dt');
+
+ STORE dedup_data INTO 'output';
+ */
+ @Multiline
+ private String dedupTest;
+
+ @Test
+ public void dedupTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(dedupTest);
+
+ writeLinesToFile( "input",
+ "1\ta\t20140201",
+ "2\td\t20140201",
+ "2\td2\t20140301");
+
+ assertOutput(test, "dedup_data",
+ "(1,a,20140201)",
+ "(2,d2,20140301)");
+
+ }
+
+ /**
+ import 'datafu/dedup.pig';
+
+ data = LOAD 'input' AS (key: int, val: chararray, dt: chararray);
+
+ dedup_data = dedup(data, '(key,val)', 'dt');
+
+ STORE dedup_data INTO 'output';
+ */
+ @Multiline
+ private String dedupWithMultipleKeysTest;
+
+ @Test
+ public void dedupWithMultipleKeysTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(dedupWithMultipleKeysTest);
+
+ writeLinesToFile( "input",
+ "1\ta\t20140201",
+ "1\td\t20140202",
+ "2\tb\t20110201",
+ "2\tb\t20140201",
+ "3\tc\t20160201" );
+
+ assertOutput(test, "dedup_data",
+ "(1,a,20140201)",
+ "(1,d,20140202)",
+ "(2,b,20140201)",
+ "(3,c,20160201)");
+ }
+
+}
|