DRILL-1105: Fix bug in streaming aggregate when first batch is empty
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dedff8ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dedff8ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dedff8ca
Branch: refs/heads/master
Commit: dedff8ca8559dba758326bf8cec95b6a4be05416
Parents: 405abb2
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Mon Jun 30 14:05:31 2014 -0700
Committer: Aditya Kishore <aditya@maprtech.com>
Committed: Thu Jul 3 02:11:24 2014 -0700
----------------------------------------------------------------------
.../impl/aggregate/StreamingAggTemplate.java | 29 ++++++++++++++++++++
1 file changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dedff8ca/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 8a9ba3b..6ed37e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -47,6 +47,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
private StreamingAggBatch outgoing;
private FragmentContext context;
private InternalBatch remainderBatch;
+ private boolean done = false;
@Override
@@ -84,6 +85,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
@Override
public AggOutcome doWork() {
+ if (done) {
+ outcome = IterOutcome.NONE;
+ return AggOutcome.CLEANUP_AND_RETURN;
+ }
try{ // outside loop to ensure that first is set to false after the first run.
outputCount = 0;
@@ -92,6 +97,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
allocateOutgoing();
}
+ if (incoming.getRecordCount() == 0) {
+ outer: while (true) {
+ IterOutcome out = outgoing.next(0, incoming);
+ switch (out) {
+ case OK_NEW_SCHEMA:
+ case OK:
+ if (incoming.getRecordCount() == 0) {
+ continue;
+ } else {
+ break outer;
+ }
+ case NONE:
+ out = IterOutcome.OK_NEW_SCHEMA;
+ case STOP:
+ default:
+ lastOutcome = out;
+ outcome = out;
+ done = true;
+ return AggOutcome.CLEANUP_AND_RETURN;
+ }
+ }
+ }
+
// pick up a remainder batch if we have one.
if(remainderBatch != null){
if (!outputToBatch( previousIndex )) return tooBigFailure();
@@ -162,6 +190,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
switch(out){
case NONE:
+ done = true;
lastOutcome = out;
if (first && addedRecordCount == 0) {
return setOkAndReturn();
|