drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [04/17] git commit: DRILL-1092: Ensure we have enough records while performing merge and spill in ExternalSortBatch
Date Mon, 21 Jul 2014 02:46:26 GMT
DRILL-1092: Ensure we have enough records while performing merge and spill in ExternalSortBatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5080585b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5080585b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5080585b

Branch: refs/heads/master
Commit: 5080585b6e2f69f49931e860de1cd4e27d08af2a
Parents: 19dbb02
Author: Mehant Baid <mehantr@gmail.com>
Authored: Mon Jul 14 23:28:04 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Jul 20 16:28:02 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/xsort/ExternalSortBatch.java      | 11 +++++++----
 .../exec/physical/impl/xsort/PriorityQueueCopier.java    |  2 +-
 .../physical/impl/xsort/PriorityQueueCopierTemplate.java |  7 ++++++-
 3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 237a631..e5da896 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -339,6 +339,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
     VectorContainer hyperBatch = new VectorContainer();
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
+    int recordCount = 0;
     for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) {
       if (batchGroups.size() == 0) {
         break;
@@ -346,13 +347,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
       if (batchGroups.peekLast().getSecondContainer() != null) {
         break;
       }
-      batchGroupList.add(batchGroups.pollLast());
+      BatchGroup batch = batchGroups.pollLast();
+      recordCount += batch.getSv2().getCount();
+      batchGroupList.add(batch);
     }
     if (batchGroupList.size() == 0) {
       return;
     }
     constructHyperBatch(batchGroupList, hyperBatch);
-    createCopier(hyperBatch, batchGroupList, outputContainer);
+    createCopier(hyperBatch, batchGroupList, outputContainer, recordCount);
 
     int count = copier.next();
     assert count > 0;
@@ -535,7 +538,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
     }
   }
 
-  private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList,
VectorContainer outputContainer) throws SchemaChangeException {
+  private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList,
VectorContainer outputContainer, int recordCount) throws SchemaChangeException {
     try {
       if (copier == null) {
         CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
@@ -555,7 +558,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
         outputContainer.add(v);
         allocators.add(VectorAllocator.getAllocator(v, 110));
       }
-      copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators);
+      copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators,
recordCount);
     } catch (ClassTransformationException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index 7122963..f2da717 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import java.util.List;
 
 public interface PriorityQueueCopier {
-  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators) throws SchemaChangeException;
+  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators, int recordCnt) throws SchemaChangeException;
   public int next();
   public List<VectorAllocator> getAllocators();
   public void cleanup();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index a74f14c..6e9c355 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -44,7 +44,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
   private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators) throws SchemaChangeException {
+  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators, int recordCnt) throws SchemaChangeException {
     this.context = context;
     this.allocator = allocator;
     this.hyperBatch = hyperBatch;
@@ -64,6 +64,11 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
       siftUp();
       queueSize++;
     }
+
+    // Check if the we have enough records to create BatchData with two containers.
+    if (recordCnt < (2 * targetRecordCount)) {
+      targetRecordCount = (recordCnt / 2);
+    }
   }
 
   @Override


Mime
View raw message