drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [10/12] git commit: DRILL-1106: Fix race condition in incoming buffers
Date Thu, 03 Jul 2014 17:45:23 GMT
DRILL-1106: Fix race condition in incoming buffers


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1e1e438d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1e1e438d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1e1e438d

Branch: refs/heads/master
Commit: 1e1e438db84648217b785c020c0e79968b3c50c9
Parents: 22709c2
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Jul 2 23:46:27 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Jul 3 08:59:53 2014 -0700

----------------------------------------------------------------------
 .../exec/work/batch/AbstractDataCollector.java  | 13 ++-----
 .../drill/exec/work/batch/MergingCollector.java |  9 -----
 .../exec/work/batch/PartitionedCollector.java   |  9 -----
 .../work/batch/UnlimitedRawBatchBuffer.java     | 40 +++++++++++++++++++-
 4 files changed, 43 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 23794d9..6eafe57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -42,6 +42,7 @@ public abstract class AbstractDataCollector implements DataCollector{
   protected final RawBatchBuffer[] buffers;
   private final AtomicInteger parentAccounter;
   private final AtomicInteger finishedStreams = new AtomicInteger();
+  private final FragmentContext context;
 
   public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired,
FragmentContext context) {
     Preconditions.checkArgument(minInputsRequired > 0);
@@ -53,11 +54,12 @@ public abstract class AbstractDataCollector implements DataCollector{
     this.remainders = new AtomicIntegerArray(incoming.size());
     this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
     this.buffers = new RawBatchBuffer[minInputsRequired];
+    this.context = context;
     try {
       String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
       Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class,
int.class);
       for(int i = 0; i < buffers.length; i++) {
-          buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, incoming.size());
+          buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, receiver.supportsOutOfOrderExchange()
? incoming.size() : 1);
       }
     } catch (InstantiationException | IllegalAccessException | InvocationTargetException
|
             NoSuchMethodException | ClassNotFoundException e) {
@@ -79,8 +81,6 @@ public abstract class AbstractDataCollector implements DataCollector{
   }
 
 
-  public abstract void streamFinished(int minorFragmentId);
-
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch)  throws IOException
{
 
     // if we received an out of memory, add an item to all the buffer queues.
@@ -100,13 +100,8 @@ public abstract class AbstractDataCollector implements DataCollector{
       }
     }
 
-    // mark stream finished if we got the last batch.
-    if(batch.getHeader().getIsLastBatch()){
-      streamFinished(minorFragmentId);
-    }
-
-
     getBuffer(minorFragmentId).enqueue(batch);
+
     return decremented;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index 806b115..5248bb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -35,13 +35,4 @@ public class MergingCollector extends AbstractDataCollector{
   protected RawBatchBuffer getBuffer(int minorFragmentId) {
     return buffers[0];
   }
-
-  
-  public void streamFinished(int minorFragmentId) {
-    if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
-  }
-
-  
-  
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index c1f4fa5..5190d84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -32,13 +32,4 @@ public class PartitionedCollector extends AbstractDataCollector{
   protected RawBatchBuffer getBuffer(int minorFragmentId) {
     return buffers[minorFragmentId];
   }
-
-  @Override
-  public void streamFinished(int minorFragmentId) {
-    buffers[minorFragmentId].finished();
-  }
-
-  
-  
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e1e438d/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 11e20c7..9d24c66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
 
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -38,6 +39,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private final AtomicBoolean overlimit = new AtomicBoolean(false);
   private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ResponseSenderQueue readController = new ResponseSenderQueue();
+  private int streamCounter;
+  private FragmentContext context;
 
   public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
     int bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
@@ -45,10 +48,15 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     this.softlimit = bufferSizePerSocket * fragmentCount;
     this.startlimit = Math.max(softlimit/2, 1);
     this.buffer = Queues.newLinkedBlockingDeque();
+    this.streamCounter = fragmentCount;
+    this.context = context;
   }
 
   @Override
   public void enqueue(RawFragmentBatch batch) {
+    if (finished) {
+      throw new RuntimeException("Attempted to enqueue batch after finished");
+    }
     if (batch.getHeader().getIsOutOfMemory()) {
       logger.debug("Setting autoread false");
       if (!outOfMemory.get() && !buffer.peekFirst().getHeader().getIsOutOfMemory())
{
@@ -68,7 +76,24 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   @Override
   public void cleanup() {
+    if (!finished) {
+      context.fail(new IllegalStateException("Cleanup before finished"));
+    }
 
+    if (!buffer.isEmpty()) {
+      if (!context.isFailed()) {
+        context.fail(new IllegalStateException("Batches still in queue during cleanup"));
+        logger.error("{} Batches in queue.", buffer.size());
+        RawFragmentBatch batch;
+        while ((batch = buffer.poll()) != null) {
+          logger.error("Batch left in queue: {}", batch);
+        }
+      }
+      RawFragmentBatch batch;
+      while ((batch = buffer.poll()) != null) {
+        if (batch.getBody() != null) batch.getBody().release();
+      }
+    }
   }
 
   @Override
@@ -82,6 +107,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   @Override
   public void finished() {
     finished = true;
+    if (!buffer.isEmpty()) {
+      throw new IllegalStateException("buffer not empty when finished");
+    }
   }
 
   @Override
@@ -98,7 +126,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     b = buffer.poll();
 
     // if we didn't get a buffer, block on waiting for buffer.
-    if(b == null && !finished){
+    if(b == null && (!finished || !buffer.isEmpty())){
       try {
         b = buffer.take();
       } catch (InterruptedException e) {
@@ -120,6 +148,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       }
     }
 
+    if (b != null && b.getHeader().getIsLastBatch()) {
+      streamCounter--;
+      if (streamCounter == 0) {
+        finished();
+      }
+    }
+
+    if (b == null && buffer.size() > 0) {
+      throw new IllegalStateException("Returning null when there are batches left in queue");
+    }
     return b;
 
   }


Mime
View raw message