DRILL-1107: Handle case in Hash join where first batch has zero records
Handle case in merging receiver when first batch has zero records
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0de4650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0de4650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0de4650
Branch: refs/heads/master
Commit: e0de4650a112eba6db31685152c132cf6015cc56
Parents: 1e1e438
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jul 2 23:47:53 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Jul 3 08:59:58 2014 -0700
----------------------------------------------------------------------
.../exec/physical/impl/join/HashJoinBatch.java | 10 ++++++-
.../impl/join/HashJoinProbeTemplate.java | 2 +-
.../impl/mergereceiver/MergingRecordBatch.java | 28 ++++++++++++++------
.../work/batch/UnlimitedRawBatchBuffer.java | 7 ++++-
4 files changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2e33d50..e24f250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -108,6 +108,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
private boolean first = true;
+ private boolean done = false;
+
// Generator mapping for the build side
private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/*
setup method */,
"projectBuildRecord"
/* eval method */,
@@ -160,6 +162,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
@Override
public IterOutcome innerNext() {
+ if (done) {
+ return IterOutcome.NONE;
+ }
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
@@ -238,8 +243,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
}
// No more output records, clean up and return
+ done = true;
+ if (first) {
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
return IterOutcome.NONE;
-
} catch (ClassTransformationException | SchemaChangeException | IOException e) {
context.fail(e);
killIncoming();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 9c84e54..0b90362 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -107,7 +107,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
}
public void executeProbePhase() throws SchemaChangeException {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsToProcess > 0)
{
+ while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE
&& probeState != ProbeState.PROJECT_RIGHT) {
// Check if we have processed all records in this batch we need to invoke next
if (recordsProcessed == recordsToProcess) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ace1539..e83f461 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -137,8 +137,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
@Override
public IterOutcome innerNext() {
- if (fragProviders.length == 0) return IterOutcome.NONE;
- if (done) return IterOutcome.NONE;
+ if (fragProviders.length == 0) {
+ return IterOutcome.NONE;
+ }
+ if (done) {
+ return IterOutcome.NONE;
+ }
boolean schemaChanged = false;
if (prevBatchWasFull) {
@@ -171,11 +175,19 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
rawBatches.add(rawBatch);
- } else if (emptyBatch == null) {
- emptyBatch = rawBatch;
- }
- if (firstBatch) {
- schema = BatchSchema.newBuilder().addSerializedFields(rawBatch.getHeader().getDef().getFieldList()).build();
+ } else {
+ if (emptyBatch == null) {
+ emptyBatch = rawBatch;
+ }
+ try {
+ while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount()
== 0);
+ } catch (IOException e) {
+ context.fail(e);
+ return IterOutcome.STOP;
+ }
+ if (rawBatch != null) {
+ rawBatches.add(rawBatch);
+ }
}
}
@@ -375,7 +387,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
@Override
public BatchSchema getSchema() {
- return schema;
+ return outgoingContainer.getSchema();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0de4650/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 9d24c66..41d70a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -77,7 +77,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@Override
public void cleanup() {
if (!finished) {
- context.fail(new IllegalStateException("Cleanup before finished"));
+ IllegalStateException e = new IllegalStateException("Cleanup before finished");
+ context.fail(e);
+ throw e;
}
if (!buffer.isEmpty()) {
@@ -158,6 +160,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
if (b == null && buffer.size() > 0) {
throw new IllegalStateException("Returning null when there are batches left in queue");
}
+ if (b == null && !finished) {
+ throw new IllegalStateException("Returning null when not finished");
+ }
return b;
}
|