DRILL-1092: Ensure we have enough records while performing merge and spill in ExternalSortBatch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5080585b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5080585b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5080585b
Branch: refs/heads/master
Commit: 5080585b6e2f69f49931e860de1cd4e27d08af2a
Parents: 19dbb02
Author: Mehant Baid <mehantr@gmail.com>
Authored: Mon Jul 14 23:28:04 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Jul 20 16:28:02 2014 -0700
----------------------------------------------------------------------
.../exec/physical/impl/xsort/ExternalSortBatch.java | 11 +++++++----
.../exec/physical/impl/xsort/PriorityQueueCopier.java | 2 +-
.../physical/impl/xsort/PriorityQueueCopierTemplate.java | 7 ++++++-
3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 237a631..e5da896 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -339,6 +339,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
VectorContainer hyperBatch = new VectorContainer();
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
+ int recordCount = 0;
for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) {
if (batchGroups.size() == 0) {
break;
@@ -346,13 +347,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
if (batchGroups.peekLast().getSecondContainer() != null) {
break;
}
- batchGroupList.add(batchGroups.pollLast());
+ BatchGroup batch = batchGroups.pollLast();
+ recordCount += batch.getSv2().getCount();
+ batchGroupList.add(batch);
}
if (batchGroupList.size() == 0) {
return;
}
constructHyperBatch(batchGroupList, hyperBatch);
- createCopier(hyperBatch, batchGroupList, outputContainer);
+ createCopier(hyperBatch, batchGroupList, outputContainer, recordCount);
int count = copier.next();
assert count > 0;
@@ -535,7 +538,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
}
}
- private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList,
VectorContainer outputContainer) throws SchemaChangeException {
+ private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList,
VectorContainer outputContainer, int recordCount) throws SchemaChangeException {
try {
if (copier == null) {
CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
@@ -555,7 +558,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
outputContainer.add(v);
allocators.add(VectorAllocator.getAllocator(v, 110));
}
- copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators);
+ copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators,
recordCount);
} catch (ClassTransformationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index 7122963..f2da717 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
import java.util.List;
public interface PriorityQueueCopier {
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators) throws SchemaChangeException;
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators, int recordCnt) throws SchemaChangeException;
public int next();
public List<VectorAllocator> getAllocators();
public void cleanup();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5080585b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index a74f14c..6e9c355 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -44,7 +44,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
@Override
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators) throws SchemaChangeException {
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible
hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator>
allocators, int recordCnt) throws SchemaChangeException {
this.context = context;
this.allocator = allocator;
this.hyperBatch = hyperBatch;
@@ -64,6 +64,11 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
siftUp();
queueSize++;
}
+
+ // Check if the we have enough records to create BatchData with two containers.
+ if (recordCnt < (2 * targetRecordCount)) {
+ targetRecordCount = (recordCnt / 2);
+ }
}
@Override
|