drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [11/12] git commit: DRILL-1107: Handle case in Hash join where first batch has zero records
Date Thu, 03 Jul 2014 17:45:24 GMT
DRILL-1107: Handle case in Hash join where first batch has zero records

Handle case in merging receiver when first batch has zero records


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0de4650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0de4650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0de4650

Branch: refs/heads/master
Commit: e0de4650a112eba6db31685152c132cf6015cc56
Parents: 1e1e438
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jul 2 23:47:53 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Jul 3 08:59:58 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/HashJoinBatch.java  | 10 ++++++-
 .../impl/join/HashJoinProbeTemplate.java        |  2 +-
 .../impl/mergereceiver/MergingRecordBatch.java  | 28 ++++++++++++++------
 .../work/batch/UnlimitedRawBatchBuffer.java     |  7 ++++-
 4 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2e33d50..e24f250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -108,6 +108,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
 
     private boolean first = true;
 
+    private boolean done = false;
+
     // Generator mapping for the build side
     private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/*
setup method */,
                                                                                   "projectBuildRecord"
/* eval method */,
@@ -160,6 +162,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
 
     @Override
     public IterOutcome innerNext() {
+        if (done) {
+          return IterOutcome.NONE;
+        }
         try {
             /* If we are here for the first time, execute the build phase of the
              * hash join and setup the run time generated class for the probe side
@@ -238,8 +243,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
             }
 
             // No more output records, clean up and return
+            done = true;
+            if (first) {
+              return IterOutcome.OK_NEW_SCHEMA;
+            }
             return IterOutcome.NONE;
-
         } catch (ClassTransformationException | SchemaChangeException | IOException e) {
             context.fail(e);
             killIncoming();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 9c84e54..0b90362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -107,7 +107,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
   }
 
   public void executeProbePhase() throws SchemaChangeException {
-    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsToProcess > 0)
{
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE
&& probeState != ProbeState.PROJECT_RIGHT) {
 
       // Check if we have processed all records in this batch we need to invoke next
       if (recordsProcessed == recordsToProcess) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ace1539..e83f461 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -137,8 +137,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
   @Override
   public IterOutcome innerNext() {
-    if (fragProviders.length == 0) return IterOutcome.NONE;
-    if (done) return IterOutcome.NONE;
+    if (fragProviders.length == 0) {
+      return IterOutcome.NONE;
+    }
+    if (done) {
+      return IterOutcome.NONE;
+    }
     boolean schemaChanged = false;
 
     if (prevBatchWasFull) {
@@ -171,11 +175,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         }
         if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
           rawBatches.add(rawBatch);
-        } else if (emptyBatch == null) {
-          emptyBatch = rawBatch;
-        }
-        if (firstBatch) {
-          schema = BatchSchema.newBuilder().addSerializedFields(rawBatch.getHeader().getDef().getFieldList()).build();
+        } else {
+          if (emptyBatch == null) {
+            emptyBatch = rawBatch;
+          }
+          try {
+            while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount()
== 0);
+          } catch (IOException e) {
+            context.fail(e);
+            return IterOutcome.STOP;
+          }
+          if (rawBatch != null) {
+            rawBatches.add(rawBatch);
+          }
         }
       }
 
@@ -375,7 +387,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
   @Override
   public BatchSchema getSchema() {
-    return schema;
+    return outgoingContainer.getSchema();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 9d24c66..41d70a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -77,7 +77,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   @Override
   public void cleanup() {
     if (!finished) {
-      context.fail(new IllegalStateException("Cleanup before finished"));
+      IllegalStateException e = new IllegalStateException("Cleanup before finished");
+      context.fail(e);
+      throw e;
     }
 
     if (!buffer.isEmpty()) {
@@ -158,6 +160,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     if (b == null && buffer.size() > 0) {
       throw new IllegalStateException("Returning null when there are batches left in queue");
     }
+    if (b == null && !finished) {
+      throw new IllegalStateException("Returning null when not finished");
+    }
     return b;
 
   }


Mime
View raw message