Repository: incubator-datafu
Updated Branches:
refs/heads/master 92a56e49f -> a15a15c58
DATAFU-114: Make FirstTupleFromBag implement Accumulator
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/f7c9b232
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/f7c9b232
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/f7c9b232
Branch: refs/heads/master
Commit: f7c9b232fe1f7a9a2c445f7c25bb73a3c6a099c1
Parents: 92a56e4
Author: Eyal Allweil <eyal_allweil@yahoo.com>
Authored: Thu Feb 4 16:03:51 2016 -0800
Committer: Matthew Hayes <matthew.terence.hayes@gmail.com>
Committed: Thu Feb 4 16:04:05 2016 -0800
----------------------------------------------------------------------
.../java/datafu/pig/bags/FirstTupleFromBag.java | 37 +++++++++++++++++---
1 file changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/f7c9b232/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
index 1f24984..01b0d61 100644
--- a/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
+++ b/datafu-pig/src/main/java/datafu/pig/bags/FirstTupleFromBag.java
@@ -21,11 +21,13 @@ package datafu.pig.bags;
import java.io.IOException;
-import datafu.pig.util.SimpleEvalFunc;
+import org.apache.pig.Accumulator;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import datafu.pig.util.SimpleEvalFunc;
+
/**
* Returns the first tuple from a bag. Requires a second parameter that will be returned
if the bag is empty.
*
@@ -46,8 +48,36 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
* </pre>
*/
-public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
+public class FirstTupleFromBag extends SimpleEvalFunc<Tuple> implements Accumulator<Tuple>
{
+ private Tuple result = null;
+ private boolean found = false;
+
+ @Override
+ public void accumulate(Tuple tuple) throws IOException
+ {
+ if (found == false) {
+ DataBag bag = (DataBag) tuple.get(0);
+ Tuple defaultValue = (Tuple) tuple.get(1);
+
+ result = call(bag, defaultValue);
+ found = true;
+ }
+ }
+
+ @Override
+ public void cleanup()
+ {
+ found = false;
+ result = null;
+ }
+
+ @Override
+ public Tuple getValue()
+ {
+ return result;
+ }
+
public Tuple call(DataBag bag, Tuple defaultValue) throws IOException
{
for (Tuple t : bag) {
@@ -66,5 +96,4 @@ public class FirstTupleFromBag extends SimpleEvalFunc<Tuple>
return null;
}
}
-}
-
+}
\ No newline at end of file
|