DRILL-1106: Fix race condition in incoming buffers
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1e1e438d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1e1e438d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1e1e438d
Branch: refs/heads/master
Commit: 1e1e438db84648217b785c020c0e79968b3c50c9
Parents: 22709c2
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jul 2 23:46:27 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Jul 3 08:59:53 2014 -0700
----------------------------------------------------------------------
.../exec/work/batch/AbstractDataCollector.java | 13 ++-----
.../drill/exec/work/batch/MergingCollector.java | 9 -----
.../exec/work/batch/PartitionedCollector.java | 9 -----
.../work/batch/UnlimitedRawBatchBuffer.java | 40 +++++++++++++++++++-
4 files changed, 43 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 23794d9..6eafe57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -42,6 +42,7 @@ public abstract class AbstractDataCollector implements DataCollector{
protected final RawBatchBuffer[] buffers;
private final AtomicInteger parentAccounter;
private final AtomicInteger finishedStreams = new AtomicInteger();
+ private final FragmentContext context;
public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired,
FragmentContext context) {
Preconditions.checkArgument(minInputsRequired > 0);
@@ -53,11 +54,12 @@ public abstract class AbstractDataCollector implements DataCollector{
this.remainders = new AtomicIntegerArray(incoming.size());
this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
this.buffers = new RawBatchBuffer[minInputsRequired];
+ this.context = context;
try {
String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class,
int.class);
for(int i = 0; i < buffers.length; i++) {
- buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, incoming.size());
+ buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, receiver.supportsOutOfOrderExchange()
? incoming.size() : 1);
}
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
|
NoSuchMethodException | ClassNotFoundException e) {
@@ -79,8 +81,6 @@ public abstract class AbstractDataCollector implements DataCollector{
}
- public abstract void streamFinished(int minorFragmentId);
-
public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException
{
// if we received an out of memory, add an item to all the buffer queues.
@@ -100,13 +100,8 @@ public abstract class AbstractDataCollector implements DataCollector{
}
}
- // mark stream finished if we got the last batch.
- if(batch.getHeader().getIsLastBatch()){
- streamFinished(minorFragmentId);
- }
-
-
getBuffer(minorFragmentId).enqueue(batch);
+
return decremented;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 806b115..5248bb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -35,13 +35,4 @@ public class MergingCollector extends AbstractDataCollector{
protected RawBatchBuffer getBuffer(int minorFragmentId) {
return buffers[0];
}
-
-
- public void streamFinished(int minorFragmentId) {
- if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
- }
-
-
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index c1f4fa5..5190d84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -32,13 +32,4 @@ public class PartitionedCollector extends AbstractDataCollector{
protected RawBatchBuffer getBuffer(int minorFragmentId) {
return buffers[minorFragmentId];
}
-
- @Override
- public void streamFinished(int minorFragmentId) {
- buffers[minorFragmentId].finished();
- }
-
-
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 11e20c7..9d24c66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
@@ -38,6 +39,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
private final AtomicBoolean overlimit = new AtomicBoolean(false);
private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
private final ResponseSenderQueue readController = new ResponseSenderQueue();
+ private int streamCounter;
+ private FragmentContext context;
public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
int bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
@@ -45,10 +48,15 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
this.softlimit = bufferSizePerSocket * fragmentCount;
this.startlimit = Math.max(softlimit/2, 1);
this.buffer = Queues.newLinkedBlockingDeque();
+ this.streamCounter = fragmentCount;
+ this.context = context;
}
@Override
public void enqueue(RawFragmentBatch batch) {
+ if (finished) {
+ throw new RuntimeException("Attempted to enqueue batch after finished");
+ }
if (batch.getHeader().getIsOutOfMemory()) {
logger.debug("Setting autoread false");
if (!outOfMemory.get() && !buffer.peekFirst().getHeader().getIsOutOfMemory())
{
@@ -68,7 +76,24 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@Override
public void cleanup() {
+ if (!finished) {
+ context.fail(new IllegalStateException("Cleanup before finished"));
+ }
+ if (!buffer.isEmpty()) {
+ if (!context.isFailed()) {
+ context.fail(new IllegalStateException("Batches still in queue during cleanup"));
+ logger.error("{} Batches in queue.", buffer.size());
+ RawFragmentBatch batch;
+ while ((batch = buffer.poll()) != null) {
+ logger.error("Batch left in queue: {}", batch);
+ }
+ }
+ RawFragmentBatch batch;
+ while ((batch = buffer.poll()) != null) {
+ if (batch.getBody() != null) batch.getBody().release();
+ }
+ }
}
@Override
@@ -82,6 +107,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@Override
public void finished() {
finished = true;
+ if (!buffer.isEmpty()) {
+ throw new IllegalStateException("buffer not empty when finished");
+ }
}
@Override
@@ -98,7 +126,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
b = buffer.poll();
// if we didn't get a buffer, block on waiting for buffer.
- if(b == null && !finished){
+ if(b == null && (!finished || !buffer.isEmpty())){
try {
b = buffer.take();
} catch (InterruptedException e) {
@@ -120,6 +148,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
}
+ if (b != null && b.getHeader().getIsLastBatch()) {
+ streamCounter--;
+ if (streamCounter == 0) {
+ finished();
+ }
+ }
+
+ if (b == null && buffer.size() > 0) {
+ throw new IllegalStateException("Returning null when there are batches left in queue");
+ }
return b;
}
|