drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [8/8] git commit: DRILL-1107: Fix regression from first patch
Date Tue, 08 Jul 2014 01:01:20 GMT
DRILL-1107: Fix regression from first patch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/810a2040
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/810a2040
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/810a2040

Branch: refs/heads/master
Commit: 810a20409089a10ae984807ff4418bfe7ee18957
Parents: 0b905fe
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Mon Jul 7 14:26:00 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon Jul 7 15:53:33 2014 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 28 +++++++++++++++++---
 1 file changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/810a2040/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e83f461..914a187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -187,6 +187,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           }
           if (rawBatch != null) {
             rawBatches.add(rawBatch);
+          } else {
+            rawBatches.add(emptyBatch);
           }
         }
       }
@@ -288,8 +290,26 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       });
 
       // populate the priority queue with initial values
-      for (int b = 0; b < senderCount; ++b)
-        pqueue.add(new Node(b, 0));
+      for (int b = 0; b < senderCount; ++b) {
+        while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0)
{
+          try {
+            RawFragmentBatch batch = getNext(fragProviders[b]);
+            incomingBatches[b] = batch;
+            if (batch != null) {
+              batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
+            } else {
+              batchLoaders[b].clear();
+              batchLoaders[b] = null;
+            }
+          } catch (IOException | SchemaChangeException e) {
+            context.fail(e);
+            return IterOutcome.STOP;
+          }
+        }
+        if (batchLoaders[b] != null) {
+          pqueue.add(new Node(b, 0));
+        }
+      }
 
       hasRun = true;
       // finished lazy initialization
@@ -566,7 +586,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     outgoingContainer.clear();
     if (batchLoaders != null) {
       for(RecordBatchLoader rbl : batchLoaders){
-        rbl.clear();
+        if (rbl != null) {
+          rbl.clear();
+        }
       }
     }
     oContext.close();


Mime
View raw message