DRILL-1116: Flush batch when copy fails due to insufficient space
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/de00d6f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/de00d6f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/de00d6f5
Branch: refs/heads/master
Commit: de00d6f52e98432280bf8d509fc1ecee79a365a8
Parents: 84dc2de
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Tue Jul 8 00:16:56 2014 -0700
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Tue Jul 8 03:20:37 2014 -0700
----------------------------------------------------------------------
.../exec/physical/impl/partitionsender/PartitionerTemplate.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de00d6f5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 8b63c5c..0fe3f15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -152,6 +152,7 @@ public abstract class PartitionerTemplate implements Partitioner {
outgoingBatch.flush();
if (!outgoingBatch.copy(recordId)) {
logger.debug(RECORD_TOO_BIG_MSG, recordId);
+ throw new IOException(RECORD_TOO_BIG_MSG);
}
}
}
@@ -163,8 +164,10 @@ public abstract class PartitionerTemplate implements Partitioner {
OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex));
if (!outgoingBatch.copy(svIndex)) {
logger.trace(REWRITE_MSG, svIndex);
+ outgoingBatch.flush();
if (!outgoingBatch.copy(svIndex)) {
logger.debug(RECORD_TOO_BIG_MSG, recordId);
+ throw new IOException(RECORD_TOO_BIG_MSG);
}
}
}
@@ -176,8 +179,10 @@ public abstract class PartitionerTemplate implements Partitioner {
OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex));
if (!outgoingBatch.copy(svIndex)) {
logger.trace(REWRITE_MSG, svIndex);
+ outgoingBatch.flush();
if (!outgoingBatch.copy(svIndex)) {
logger.debug(RECORD_TOO_BIG_MSG, recordId);
+ throw new IOException(RECORD_TOO_BIG_MSG);
}
}
}
|