drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [07/12] git commit: DRILL-1105: Fix bug in streaming aggregate when first batch is empty
Date Thu, 03 Jul 2014 17:45:20 GMT
DRILL-1105: Fix bug in streaming aggregate when first batch is empty


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dedff8ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dedff8ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dedff8ca

Branch: refs/heads/master
Commit: dedff8ca8559dba758326bf8cec95b6a4be05416
Parents: 405abb2
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Mon Jun 30 14:05:31 2014 -0700
Committer: Aditya Kishore <aditya@maprtech.com>
Committed: Thu Jul 3 02:11:24 2014 -0700

----------------------------------------------------------------------
 .../impl/aggregate/StreamingAggTemplate.java    | 29 ++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dedff8ca/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 8a9ba3b..6ed37e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -47,6 +47,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
   private StreamingAggBatch outgoing;
   private FragmentContext context;
   private InternalBatch remainderBatch;
+  private boolean done = false;
 
 
   @Override
@@ -84,6 +85,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
 
   @Override
   public AggOutcome doWork() {
+    if (done) {
+      outcome = IterOutcome.NONE;
+      return AggOutcome.CLEANUP_AND_RETURN;
+    }
     try{ // outside loop to ensure that first is set to false after the first run.
       outputCount = 0;
 
@@ -92,6 +97,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
         allocateOutgoing();
       }
 
+      if (incoming.getRecordCount() == 0) {
+        outer: while (true) {
+          IterOutcome out = outgoing.next(0, incoming);
+          switch (out) {
+            case OK_NEW_SCHEMA:
+            case OK:
+              if (incoming.getRecordCount() == 0) {
+                continue;
+              } else {
+                break outer;
+              }
+            case NONE:
+              out = IterOutcome.OK_NEW_SCHEMA;
+            case STOP:
+            default:
+              lastOutcome = out;
+              outcome = out;
+              done = true;
+              return AggOutcome.CLEANUP_AND_RETURN;
+          }
+        }
+      }
+
       // pick up a remainder batch if we have one.
       if(remainderBatch != null){
         if (!outputToBatch( previousIndex )) return tooBigFailure();
@@ -162,6 +190,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
             if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NONE:
+              done = true;
               lastOutcome = out;
               if (first && addedRecordCount == 0) {
                 return setOkAndReturn();


Mime
View raw message