apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From david...@apache.org
Subject incubator-apex-malhar git commit: - MLHR-1864 #resolve #comment Made adding a query to a queue atomic - Fixed race condition where a query could be accessed from the queue and then have its countdown decremented.
Date Fri, 02 Oct 2015 08:32:01 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 fdc91fa3f -> fd670e6d4


 - MLHR-1864 #resolve #comment Made adding a query to a queue atomic
 - Fixed race condition where a query could be accessed from the queue and then have its countdown
decremented.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/fd670e6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/fd670e6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/fd670e6d

Branch: refs/heads/devel-3
Commit: fd670e6d400d816d26d0de1ca08a3c12f8278ae5
Parents: fdc91fa
Author: Timothy Farkas <tim@datatorrent.com>
Authored: Thu Oct 1 17:55:42 2015 -0700
Committer: Timothy Farkas <tim@datatorrent.com>
Committed: Thu Oct 1 22:39:57 2015 -0700

----------------------------------------------------------------------
 .../query/AbstractWindowEndQueueManager.java    | 18 ++++++++++-----
 .../appdata/query/QueryManagerAsynchronous.java | 23 ++++++++++++++------
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fd670e6d/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
b/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
index 8b6301e..951f591 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/AbstractWindowEndQueueManager.java
@@ -93,12 +93,14 @@ public abstract class AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY,
QUEU
 
     QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>> node =
new QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT>>(queryQueueable);
 
-    if(addingFilter(queryQueueable)) {
-      queryQueue.enqueue(node);
-      numLeft.getAndIncrement();
-      semaphore.release();
+    synchronized (numLeft) {
+      if (addingFilter(queryQueueable)) {
+        queryQueue.enqueue(node);
+        numLeft.getAndIncrement();
+        semaphore.release();
 
-      addedNode(node);
+        addedNode(node);
+      }
     }
 
     return true;
@@ -241,6 +243,12 @@ public abstract class AbstractWindowEndQueueManager<QUERY_TYPE, META_QUERY,
QUEU
     return qq;
   }
 
+  //Dirty hack TODO fix QueueManager interface
+  public boolean isEmptyAndBlocked()
+  {
+    return numLeft.get() == 0 && semaphore.availablePermits() == 0;
+  }
+
   private void acquire()
   {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fd670e6d/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
b/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
index c6b9a5f..4c33183 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/query/QueryManagerAsynchronous.java
@@ -47,6 +47,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT,
RES
 {
   private DefaultOutputPort<String> resultPort = null;
 
+  //TODO I believe this semaphore is no longer necessary and can just be straight up deleted.
   private transient final Semaphore inWindowSemaphore = new Semaphore(0);
   private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
   private QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager;
@@ -123,11 +124,10 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT,
RES
   {
     queueManager.haltEnqueue();
 
-    while(queueManager.getNumLeft() > 0) {
-      if(queue.isEmpty()) {
+    while (!isProcessingDone()) {
+      if (queue.isEmpty()) {
         Thread.yield();
-      }
-      else {
+      } else {
         emptyQueue();
       }
     }
@@ -142,6 +142,16 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT,
RES
     }
   }
 
+  //Dirty hack TODO fix QueManager interface
+  private boolean isProcessingDone()
+  {
+    if (queueManager instanceof AbstractWindowEndQueueManager) {
+      return ((AbstractWindowEndQueueManager) queueManager).isEmptyAndBlocked();
+    }
+
+    return queueManager.getNumLeft() == 0;
+  }
+
   private void emptyQueue()
   {
     while(!queue.isEmpty()) {
@@ -201,8 +211,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT,
RES
 
         try {
           inWindowSemaphore.acquire();
-        }
-        catch(InterruptedException ex) {
+        } catch (InterruptedException ex) {
           throw new RuntimeException(ex);
         }
 
@@ -210,7 +219,7 @@ public class QueryManagerAsynchronous<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT,
RES
         Result result = queryExecutor.executeQuery(queryBundle.getQuery(),
                                                    queryBundle.getMetaQuery(),
                                                    queryBundle.getQueueContext());
-        if(result != null) {
+        if (result != null) {
           String serializedMessage = messageSerializerFactory.serialize(result);
           queue.add(serializedMessage);
         }


Mime
View raw message