usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [1/2] usergrid git commit: move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes
Date Mon, 17 Jul 2017 22:11:24 GMT
Repository: usergrid
Updated Branches:
  refs/heads/collectionDelete 99ba349c8 -> b6d14069a


move deletes to new delete queue -- read repair will fix attempts to access deleted entities
and connections, so indexing and collection deletes can proceed more slowly than other types
of changes


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/221b99dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/221b99dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/221b99dd

Branch: refs/heads/collectionDelete
Commit: 221b99dd9d81968a5629647771a6eaf83c4177be
Parents: 99ba349
Author: Mike Dunker <mdunker@google.com>
Authored: Mon Jul 10 08:03:52 2017 -0700
Committer: Mike Dunker <mdunker@google.com>
Committed: Mon Jul 10 08:03:52 2017 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventQueueType.java        |  35 +++
 .../asyncevents/AsyncEventService.java          |   4 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 315 ++++++++-----------
 .../index/IndexProcessorFig.java                |  18 ++
 .../corepersistence/index/ReIndexAction.java    |   5 +-
 .../index/ReIndexServiceImpl.java               |   3 +-
 .../read/traverse/AbstractReadGraphFilter.java  |  11 +-
 .../AbstractReadReverseGraphFilter.java         |  11 +-
 .../index/CollectionVersionTest.java            |   9 +
 9 files changed, 220 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+    REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+    private String displayName;
+    AsyncEventQueueType(String displayName) {
+        this.displayName = displayName;
+    }
+
+    @Override
+    public String toString() {
+        return displayName;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 5fe4295..7ce208f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -85,9 +85,9 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      *
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean
forUtilityQueue);
+    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType
queueType);
 
     /**
      * @param applicationScope

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 257e172..e33865e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -103,13 +103,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue
name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits
queue name size to 80 chars
+    public static final String QUEUE_NAME_DELETE = "delete";
     public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
     private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager deleteQueue;
     private final LegacyQueueManager indexQueueDead;
     private final LegacyQueueManager utilityQueueDead;
+    private final LegacyQueueManager deleteQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final CollectionVersionFig collectionVersionFig;
@@ -133,8 +136,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterDelete = new AtomicLong();
     private final AtomicLong counterIndexDead = new AtomicLong();
     private final AtomicLong counterUtilityDead = new AtomicLong();
+    private final AtomicLong counterDeleteDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -177,16 +182,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
 
+        LegacyQueueScope deleteQueueScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
+
         LegacyQueueScope indexQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL,
true);
 
         LegacyQueueScope utilityQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL,
true);
 
+        LegacyQueueScope deleteQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL,
true);
+
         this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
         this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
         this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+        this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -211,24 +224,73 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         start();
     }
 
+    private String getQueueName(AsyncEventQueueType queueType) {
+        switch (queueType) {
+            case REGULAR:
+                return QUEUE_NAME;
+
+            case UTILITY:
+                return QUEUE_NAME_UTILITY;
+
+            case DELETE:
+                return QUEUE_NAME_DELETE;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+        return getQueue(queueType, false);
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue)
{
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? indexQueueDead : indexQueue;
+
+            case UTILITY:
+                return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+            case DELETE:
+                return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? counterIndexDead : counter;
+
+            case UTILITY:
+                return isDeadQueue ? counterUtilityDead : counterUtility;
+
+            case DELETE:
+                return isDeadQueue ? counterDeleteDead : counterDelete;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+
 
     /**
      * Offer the EntityIdScope to SQS
      */
     private void offer(final Serializable operation) {
-        offer(operation, false);
+        offer(operation, AsyncEventQueueType.REGULAR);
     }
 
-    private void offer(final Serializable operation, boolean forUtilityQueue) {
+    private void offer(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.indexQueue.sendMessageToLocalRegion(operation);
-            } else {
-                this.indexQueue.sendMessageToLocalRegion(operation);
-            }
+            getQueue(queueType).sendMessageToLocalRegion(operation);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -238,16 +300,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
+    private void offerTopic(final Serializable operation, AsyncEventQueueType queueType)
{
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.utilityQueue.sendMessageToAllRegions(operation);
-            } else {
-                this.indexQueue.sendMessageToAllRegions(operation);
-            }
+            getQueue(queueType).sendMessageToAllRegions(operation);
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -258,15 +316,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerBatch(final List operations, boolean forUtilityQueue){
+    private void offerBatch(final List operations, AsyncEventQueueType queueType){
         final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            if( forUtilityQueue ){
-                this.utilityQueue.sendMessages(operations);
-            }else{
-                this.indexQueue.sendMessages(operations);
-            }
+            getQueue(queueType).sendMessages(operations);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -274,78 +328,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-    private void offerBatchToUtilityQueue(final List operations){
-        try {
-            //signal to SQS
-            this.utilityQueue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        }
-    }
-
 
     /**
      * Take message
      */
-    private List<LegacyQueueMessage> take() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from SQS utility queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from index dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue)
{
 
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
+            return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
         }
         finally {
             //stop our timer
             timer.stop();
         }
     }
-
-    /**
-     * Take message from SQS utility dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+        return take(queueType, false);
     }
 
 
@@ -376,42 +376,20 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-
-    /**
-     * calls the event handlers and returns a result with information on whether
-     * it needs to be ack'd and whether it needs to be indexed
-     * Ack message in SQS
-     */
-    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueue.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+    public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType,
boolean isDeadQueue) {
+        if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+            // different functionality
+            ack(messages);
         }
-    }
-
-    /**
-     * ack messages in index dead letter queue
-     */
-    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            indexQueueDead.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+        try {
+            getQueue(queueType, isDeadQueue).commitMessages( messages );
         }
-    }
-
-    /**
-     * ack messages in utility dead letter queue
-     */
-    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueueDead.commitMessages( messages );
-        }catch(Exception e){
+        catch (Exception e) {
             throw new RuntimeException("Unable to ack messages", e);
         }
     }
 
+
     /**
      * calls the event handlers and returns a result with information on whether
      * it needs to be ack'd and whether it needs to be indexed
@@ -555,7 +533,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
 
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
     }
 
 
@@ -645,7 +623,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues
a multi-region de-index op
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge )
);
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ),
AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage message) {
@@ -678,9 +656,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage,
boolean forUtilityQueue) {
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage,
AsyncEventQueueType queueType) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -706,7 +684,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
 
-        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+        offerTopic( elasticsearchIndexEvent, queueType );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
@@ -782,7 +760,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
 
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion), false);
+            new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE);
 
     }
 
@@ -844,7 +822,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues
a multi-region de-index op
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope,
entityId ) ) );
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope,
entityId ) ) ,
+            AsyncEventQueueType.DELETE);
     }
 
     private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
@@ -883,7 +862,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues
a multi-region de-index op
-        offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
true);
+        offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+            AsyncEventQueueType.DELETE);
     }
 
     private void handleCollectionDelete(final LegacyQueueMessage message) {
@@ -932,7 +912,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                 offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
                     new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false),
-                    true);
+                    AsyncEventQueueType.DELETE);
                 count.incrementAndGet();
             }).toBlocking().lastOrDefault(null);
 
@@ -940,7 +920,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         if (count.intValue() >= maxDeletes) {
             // requeue collection delete for next chunk of deletes
-            offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope,
collectionVersion), true);
+            offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope,
collectionVersion),
+                AsyncEventQueueType.DELETE);
         }
     }
 
@@ -965,29 +946,39 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void start() {
         final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int deleteCount = indexProcessorFig.getWorkerCountDelete();
         final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
         final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
+        final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
 
         for (int i = 0; i < indexCount; i++) {
-            startWorker(QUEUE_NAME);
+            startWorker(AsyncEventQueueType.REGULAR);
         }
 
         for (int i = 0; i < utilityCount; i++) {
-            startWorker(QUEUE_NAME_UTILITY);
+            startWorker(AsyncEventQueueType.UTILITY);
+        }
+
+        for (int i = 0; i < deleteCount; i++) {
+            startWorker(AsyncEventQueueType.DELETE);
         }
 
         if( indexQueue instanceof SNSQueueManagerImpl ) {
-            logger.info("Queue manager implementation supports dead letters, start dead letter
queue worker.");
+            logger.info("Queue manager implementation supports dead letters, start dead letter
queue workers.");
             for (int i = 0; i < indexDeadCount; i++) {
-                startDeadQueueWorker(QUEUE_NAME);
+                startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+            }
+
+            for (int i = 0; i < utilityDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+            }
+
+            for (int i = 0; i < deleteDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.DELETE);
             }
         }else{
             logger.info("Queue manager implementation does NOT support dead letters, NOT
starting dead letter queue worker.");
         }
-
-        for (int i = 0; i < utilityDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME_UTILITY);
-        }
     }
 
 
@@ -1005,11 +996,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startWorker(final AsyncEventQueueType queueType) {
         synchronized (mutex) {
 
-            boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+            String type = getQueueName(queueType);
 
             Observable<List<LegacyQueueMessage>> consumer =
                 Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>()
{
@@ -1017,20 +1007,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                     public void call( final Subscriber<? super List<LegacyQueueMessage>>
subscriber ) {
 
                         //name our thread so it's easy to see
-                        long threadNum = isUtilityQueue ?
-                            counterUtility.incrementAndGet() : counter.incrementAndGet();
-                        Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum
);
+                        long threadNum = getCounter(queueType, false).incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum
);
 
                         List<LegacyQueueMessage> drainList = null;
 
                         do {
                             try {
-                                if ( isUtilityQueue ){
-                                    drainList = takeFromUtilityQueue();
-                                }else{
-                                    drainList = take();
+                                drainList = take(queueType);
 
-                                }
                                 //emit our list in it's entity to hand off to a worker pool
                                 subscriber.onNext(drainList);
 
@@ -1086,7 +1071,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                     // submit the processed messages to index producer
                                     List<LegacyQueueMessage> messagesToAck =
-                                        submitToIndex( indexEventResults, isUtilityQueue
);
+                                        submitToIndex( indexEventResults, queueType );
 
                                     if ( messagesToAck.size() < messages.size() ) {
                                         logger.warn(
@@ -1096,12 +1081,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                     // ack each message if making it to this point
                                     if( messagesToAck.size() > 0 ){
-
-                                        if ( isUtilityQueue ){
-                                            ackUtilityQueue( messagesToAck );
-                                        }else{
-                                            ack( messagesToAck );
-                                        }
+                                        ack(messagesToAck, queueType, false);
                                     }
 
                                     return messagesToAck;
@@ -1125,31 +1105,25 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startDeadQueueWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+        String type = getQueueName(queueType);
         synchronized (mutex) {
 
-            boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
             Observable<List<LegacyQueueMessage>> consumer =
                     Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>()
{
                         @Override
                         public void call( final Subscriber<? super List<LegacyQueueMessage>>
subscriber ) {
 
                             //name our thread so it's easy to see
-                            long threadNum = isUtilityDeadQueue ?
-                                counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
-                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" +
type+ "_" + threadNum );
+                            long threadNum = getCounter(queueType, true).incrementAndGet();
+                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" +
type + "_" + threadNum );
 
                             List<LegacyQueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    if ( isUtilityDeadQueue ){
-                                        drainList = takeFromUtilityDeadQueue();
-                                    }else{
-                                        drainList = takeFromIndexDeadQueue();
-                                    }
+                                    drainList = take(queueType, true);
+
                                     //emit our list in it's entity to hand off to a worker
pool
                                     subscriber.onNext(drainList);
 
@@ -1198,18 +1172,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                                  try {
                                                      // put the dead letter messages back
in the appropriate queue
-                                                     LegacyQueueManager returnQueue = null;
-                                                     String queueType;
-                                                     if (isUtilityDeadQueue) {
-                                                         returnQueue = utilityQueue;
-                                                         queueType = "utility";
-                                                     } else {
-                                                         returnQueue = indexQueue;
-                                                         queueType = "index";
-                                                     }
+                                                     LegacyQueueManager returnQueue = getQueue(queueType,
false);
+
                                                      List<LegacyQueueMessage> successMessages
= returnQueue.sendQueueMessages(messages);
                                                      for (LegacyQueueMessage msg : successMessages)
{
-                                                         logger.warn("Returning message to
{} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                                         logger.warn("Returning message to
{} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(),
msg.getStringBody());
                                                      }
                                                      int unsuccessfulMessagesSize = messages.size()
- successMessages.size();
                                                      if (unsuccessfulMessagesSize > 0)
{
@@ -1226,16 +1193,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                                          for (LegacyQueueMessage msg : messages)
{
                                                              String messageId = msg.getMessageId();
                                                              if (!successMessageIds.contains(messageId))
{
-                                                                 logger.warn("Failed to return
message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId,
msg.getStringBody());
+                                                                 logger.warn("Failed to return
message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(),
messageId, msg.getStringBody());
                                                              }
                                                          }
                                                      }
 
-                                                     if (isUtilityDeadQueue) {
-                                                         ackUtilityDeadQueue(successMessages);
-                                                     } else {
-                                                         ackIndexDeadQueue(successMessages);
-                                                     }
+                                                     ack(successMessages, queueType, true);
 
                                                      return messages;
                                                  }
@@ -1261,7 +1224,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * Submit results to index and return the queue messages to be ack'd
      *
      */
-    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults,
boolean forUtilityQueue) {
+    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults,
AsyncEventQueueType queueType) {
 
         // if nothing came back then return empty list
         if(indexEventResults==null){
@@ -1288,7 +1251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined, forUtilityQueue);
+        queueIndexOperationMessage(combined, queueType);
 
         return queueMessages;
     }
@@ -1299,10 +1262,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
         queueIndexOperationMessage(
-            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null),
false);
+            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null),
AsyncEventQueueType.REGULAR);
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean
forUtilityQueue) {
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType
queueType) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -1315,7 +1278,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
 
-        offerBatch( batch, forUtilityQueue );
+        offerBatch( batch, queueType );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..eb63056 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete";
+
     String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -105,6 +109,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read delete requests from the queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+    int getWorkerCountDelete();
+
+    /**
      * The number of worker threads used to read dead messages from the index dead letter
queue and reload them into the index queue.
      */
     @Default("1")
@@ -119,6 +130,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtilityDeadLetter();
 
     /**
+     * The number of worker threads used to read dead messages from the delete dead letter
queue and reload them into the delete queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+    int getWorkerCountDeleteDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ public interface ReIndexAction {
      * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType
queueType);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..c7371b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince, true);
+                indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index e9aa6c8..b1b7f75 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -122,7 +123,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope,
markedEdge);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -132,7 +133,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
                     logger.info("Edge {} has a deleted source node, deleting the entity for
id {}", markedEdge, sourceNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope,
sourceNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -142,7 +143,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
                     logger.info("Edge {} has a deleted target node, deleting the entity for
id {}", markedEdge, targetNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope,
targetNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
                 }
 
 
@@ -227,13 +228,13 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector()
{
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType
queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1b662cc..c75545e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -122,7 +123,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope,
markedEdge);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -132,7 +133,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     logger.info("Edge {} has a deleted source node, deleting the entity for
id {}", markedEdge, sourceNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope,
sourceNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -142,7 +143,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     logger.info("Edge {} has a deleted target node, deleting the entity for
id {}", markedEdge, targetNodeId);
 
                     final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope,
targetNodeId);
-                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -221,13 +222,13 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector()
{
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType
queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/221b99dd/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
index 0278708..a3c7284 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/CollectionVersionTest.java
@@ -19,5 +19,14 @@
 
 package org.apache.usergrid.corepersistence.index;
 
+import org.junit.runner.RunWith;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+@NotThreadSafe
 public class CollectionVersionTest {
 }


Mime
View raw message