usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [10/15] usergrid git commit: move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes
Date Mon, 28 Aug 2017 23:23:56 GMT
move deletes to new delete queue -- read repair will fix attempts to access deleted entities and connections, so indexing and collection deletes can proceed more slowly than other types of changes


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/570e1ab4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/570e1ab4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/570e1ab4

Branch: refs/heads/master
Commit: 570e1ab4e2d4d356756fedee6b7ed5a1bd2d8e93
Parents: 39ec4f2
Author: Mike Dunker <mdunker@google.com>
Authored: Fri Aug 18 15:12:56 2017 -0700
Committer: Mike Dunker <mdunker@google.com>
Committed: Fri Aug 18 15:12:56 2017 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventQueueType.java        |  35 ++
 .../asyncevents/AsyncEventService.java          |   4 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 463 +++++++++----------
 .../index/IndexProcessorFig.java                |  18 +
 .../corepersistence/index/ReIndexAction.java    |   5 +-
 .../index/ReIndexServiceImpl.java               |   3 +-
 .../read/traverse/AbstractReadGraphFilter.java  |  11 +-
 .../AbstractReadReverseGraphFilter.java         |  11 +-
 8 files changed, 292 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+    REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+    private String displayName;
+    AsyncEventQueueType(String displayName) {
+        this.displayName = displayName;
+    }
+
+    @Override
+    public String toString() {
+        return displayName;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index cab4e3e..9e346cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -84,9 +84,9 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      *
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
+    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType);
 
     /**
      * @param applicationScope

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 9501ad3..0e55e9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -104,13 +105,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+    public static final String QUEUE_NAME_DELETE = "delete";
     public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
     private final LegacyQueueManager indexQueue;
     private final LegacyQueueManager utilityQueue;
+    private final LegacyQueueManager deleteQueue;
     private final LegacyQueueManager indexQueueDead;
     private final LegacyQueueManager utilityQueueDead;
+    private final LegacyQueueManager deleteQueueDead;
     private final IndexProcessorFig indexProcessorFig;
     private final LegacyQueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -132,8 +136,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong counterUtility = new AtomicLong();
+    private final AtomicLong counterDelete = new AtomicLong();
     private final AtomicLong counterIndexDead = new AtomicLong();
     private final AtomicLong counterUtilityDead = new AtomicLong();
+    private final AtomicLong counterDeleteDead = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -174,16 +180,24 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         LegacyQueueScope utilityQueueScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
 
+        LegacyQueueScope deleteQueueScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
+
         LegacyQueueScope indexQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
 
         LegacyQueueScope utilityQueueDeadScope =
             new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
 
+        LegacyQueueScope deleteQueueDeadScope =
+            new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true);
+
         this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
         this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+        this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
         this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
         this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+        this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -206,34 +220,90 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         start();
     }
 
+    private String getQueueName(AsyncEventQueueType queueType) {
+        switch (queueType) {
+            case REGULAR:
+                return QUEUE_NAME;
+
+            case UTILITY:
+                return QUEUE_NAME_UTILITY;
+
+            case DELETE:
+                return QUEUE_NAME_DELETE;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+        return getQueue(queueType, false);
+    }
+
+    private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? indexQueueDead : indexQueue;
+
+            case UTILITY:
+                return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+            case DELETE:
+                return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+    private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
+        switch (queueType) {
+            case REGULAR:
+                return isDeadQueue ? counterIndexDead : counter;
+
+            case UTILITY:
+                return isDeadQueue ? counterUtilityDead : counterUtility;
+
+            case DELETE:
+                return isDeadQueue ? counterDeleteDead : counterDelete;
+
+            default:
+                throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+        }
+    }
+
+
+
+
 
     /**
      * Offer the EntityIdScope to SQS
      */
     private void offer(final Serializable operation) {
+        offer(operation, AsyncEventQueueType.REGULAR);
+    }
+
+    private void offer(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            this.indexQueue.sendMessageToLocalRegion( operation );
+            getQueue(queueType).sendMessageToLocalRegion(operation);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
             timer.stop();
         }
+
     }
 
 
-    private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
+    private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            if (forUtilityQueue) {
-                this.utilityQueue.sendMessageToAllRegions(operation);
-            } else {
-                this.indexQueue.sendMessageToAllRegions(operation);
-            }
+            getQueue(queueType).sendMessageToAllRegions(operation);
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -244,15 +314,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerBatch(final List operations, boolean forUtilityQueue){
+    private void offerBatch(final List operations, AsyncEventQueueType queueType){
         final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            if( forUtilityQueue ){
-                this.utilityQueue.sendMessages(operations);
-            }else{
-                this.indexQueue.sendMessages(operations);
-            }
+            getQueue(queueType).sendMessages(operations);
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -260,25 +326,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-    private void offerBatchToUtilityQueue(final List operations){
-        try {
-            //signal to SQS
-            this.utilityQueue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        }
-    }
-
 
     /**
      * Take message
      */
-    private List<LegacyQueueMessage> take() {
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) {
 
         final Timer.Context timer = this.readTimer.time();
 
         try {
-            return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+            return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
         }
         finally {
             //stop our timer
@@ -286,52 +343,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-    /**
-     * Take message from SQS utility queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from index dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
-    }
-
-    /**
-     * Take message from SQS utility dead letter queue
-     */
-    private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
-        }
-        finally {
-            //stop our timer
-            timer.stop();
-        }
+    private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+        return take(queueType, false);
     }
 
 
@@ -362,38 +375,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-
-    /**
-     * calls the event handlers and returns a result with information on whether
-     * it needs to be ack'd and whether it needs to be indexed
-     * Ack message in SQS
-     */
-    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueue.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+    public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) {
+        if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+            // different functionality
+            ack(messages);
         }
-    }
-
-    /**
-     * ack messages in index dead letter queue
-     */
-    public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            indexQueueDead.commitMessages( messages );
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
+        try {
+            getQueue(queueType, isDeadQueue).commitMessages( messages );
         }
-    }
-
-    /**
-     * ack messages in utility dead letter queue
-     */
-    public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
-        try{
-            utilityQueueDead.commitMessages( messages );
-        }catch(Exception e){
+        catch (Exception e) {
             throw new RuntimeException("Unable to ack messages", e);
         }
     }
@@ -536,7 +526,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
 
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
     }
 
 
@@ -626,7 +616,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage message) {
@@ -660,9 +650,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
-     * @param forUtilityQueue
+     * @param queueType
      */
-    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -688,7 +678,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
 
-        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+        offerTopic( elasticsearchIndexEvent, queueType );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
@@ -764,7 +754,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
 
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion), false);
+            new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE );
 
     }
 
@@ -824,7 +814,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ),
+            AsyncEventQueueType.DELETE );
     }
 
     private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
@@ -879,25 +870,39 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void start() {
         final int indexCount = indexProcessorFig.getWorkerCount();
         final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+        final int deleteCount = indexProcessorFig.getWorkerCountDelete();
         final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
         final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
-        logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={}", indexCount, indexDeadCount,
-            utilityCount, utilityDeadCount);
+        final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
+        logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={} delete={} deleteDLQ={}",
+            indexCount, indexDeadCount, utilityCount, utilityDeadCount, deleteCount, deleteDeadCount);
 
         for (int i = 0; i < indexCount; i++) {
-            startWorker(QUEUE_NAME);
+            startWorker(AsyncEventQueueType.REGULAR);
         }
 
         for (int i = 0; i < utilityCount; i++) {
-            startWorker(QUEUE_NAME_UTILITY);
+            startWorker(AsyncEventQueueType.UTILITY);
         }
 
-        for (int i = 0; i < indexDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME);
+        for (int i = 0; i < deleteCount; i++) {
+            startWorker(AsyncEventQueueType.DELETE);
         }
+        if( indexQueue instanceof SNSQueueManagerImpl) {
+            logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
+            for (int i = 0; i < indexDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+            }
 
-        for (int i = 0; i < utilityDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME_UTILITY);
+            for (int i = 0; i < utilityDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+            }
+
+            for (int i = 0; i < deleteDeadCount; i++) {
+                startDeadQueueWorker(AsyncEventQueueType.DELETE);
+            }
+        }else{
+            logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue worker.");
         }
     }
 
@@ -916,11 +921,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startWorker(final AsyncEventQueueType queueType) {
         synchronized (mutex) {
 
-            boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+            String type = getQueueName(queueType);
 
             Observable<List<LegacyQueueMessage>> consumer =
                 Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@@ -928,20 +932,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                     public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                         //name our thread so it's easy to see
-                        long threadNum = isUtilityQueue ?
-                            counterUtility.incrementAndGet() : counter.incrementAndGet();
-                        Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+                        long threadNum = getCounter(queueType, false).incrementAndGet();
+                        Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum );
 
                         List<LegacyQueueMessage> drainList = null;
 
                         do {
                             try {
-                                if ( isUtilityQueue ){
-                                    drainList = takeFromUtilityQueue();
-                                }else{
-                                    drainList = take();
+                                drainList = take(queueType);
 
-                                }
                                 //emit our list in it's entity to hand off to a worker pool
                                 subscriber.onNext(drainList);
 
@@ -997,7 +996,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                     // submit the processed messages to index producer
                                     List<LegacyQueueMessage> messagesToAck =
-                                        submitToIndex( indexEventResults, isUtilityQueue );
+                                        submitToIndex( indexEventResults, queueType );
 
                                     if ( messagesToAck.size() < messages.size() ) {
                                         logger.warn(
@@ -1007,12 +1006,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                     // ack each message if making it to this point
                                     if( messagesToAck.size() > 0 ){
-
-                                        if ( isUtilityQueue ){
-                                            ackUtilityQueue( messagesToAck );
-                                        }else{
-                                            ack( messagesToAck );
-                                        }
+                                        ack(messagesToAck, queueType, false);
                                     }
 
                                     return messagesToAck;
@@ -1036,129 +1030,112 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startDeadQueueWorker(final String type) {
-        Preconditions.checkNotNull(type, "Worker type required");
+    private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+        String type = getQueueName(queueType);
         synchronized (mutex) {
 
-            boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
             Observable<List<LegacyQueueMessage>> consumer =
-                    Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
-                        @Override
-                        public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
+                Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+                    @Override
+                    public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
-                            //name our thread so it's easy to see
-                            long threadNum = isUtilityDeadQueue ?
-                                counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
-                            Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
+                        //name our thread so it's easy to see
+                        long threadNum = getCounter(queueType, true).incrementAndGet();
+                        Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum );
 
-                            List<LegacyQueueMessage> drainList = null;
+                        List<LegacyQueueMessage> drainList = null;
 
-                            do {
-                                try {
-                                    if ( isUtilityDeadQueue ){
-                                        drainList = takeFromUtilityDeadQueue();
-                                    }else{
-                                        drainList = takeFromIndexDeadQueue();
-                                    }
-                                    //emit our list in it's entity to hand off to a worker pool
-                                    subscriber.onNext(drainList);
+                        do {
+                            try {
+                                drainList = take(queueType, true);
 
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
+                                //emit our list in it's entity to hand off to a worker pool
+                                subscriber.onNext(drainList);
 
-                                } catch ( Throwable t ) {
+                                //take since  we're in flight
+                                inFlight.addAndGet( drainList.size() );
 
-                                    final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
+                            } catch ( Throwable t ) {
 
-                                    // there might be an error here during tests, just clean the cache
-                                    indexQueueDead.clearQueueNameCache();
+                                final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
 
-                                    if ( t instanceof InvalidQueryException ) {
+                                // there might be an error here during tests, just clean the cache
+                                indexQueueDead.clearQueueNameCache();
 
-                                        // don't fill up log with exceptions when keyspace and column
-                                        // families are not ready during bootstrap/setup
-                                        logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
-                                            t.getMessage(), sleepTime );
+                                if ( t instanceof InvalidQueryException ) {
 
-                                    } else {
-                                        logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
-                                    }
+                                    // don't fill up log with exceptions when keyspace and column
+                                    // families are not ready during bootstrap/setup
+                                    logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
+                                        t.getMessage(), sleepTime );
 
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * drainList.size() );
-                                    }
+                                } else {
+                                    logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
+                                }
 
-                                    try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() );
                                 }
+
+                                try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
                             }
-                            while ( true );
                         }
-                    } )        //this won't block our read loop, just reads and proceeds
-                        .flatMap( sqsMessages -> {
-
-                            //do this on a different schedule, and introduce concurrency
-                            // with flatmap for faster processing
-                            return Observable.just( sqsMessages )
-
-                                             .map( messages -> {
-                                                 if ( messages == null || messages.size() == 0 ) {
-                                                     // no messages came from the queue, move on
-                                                     return null;
-                                                 }
-
-                                                 try {
-                                                     // put the dead letter messages back in the appropriate queue
-                                                     LegacyQueueManager returnQueue = null;
-                                                     String queueType;
-                                                     if (isUtilityDeadQueue) {
-                                                         returnQueue = utilityQueue;
-                                                         queueType = "utility";
-                                                     } else {
-                                                         returnQueue = indexQueue;
-                                                         queueType = "index";
-                                                     }
-                                                     List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
-                                                     for (LegacyQueueMessage msg : successMessages) {
-                                                         logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
-                                                     }
-                                                     int unsuccessfulMessagesSize = messages.size() - successMessages.size();
-                                                     if (unsuccessfulMessagesSize > 0) {
-                                                         // some messages couldn't be sent to originating queue, log
-                                                         Set<String> successMessageIds = new HashSet<>();
-                                                         for (LegacyQueueMessage msg : successMessages) {
-                                                             String messageId = msg.getMessageId();
-                                                             if (successMessageIds.contains(messageId)) {
-                                                                 logger.warn("Found duplicate messageId in returned messages: {}", messageId);
-                                                             } else {
-                                                                 successMessageIds.add(messageId);
-                                                             }
-                                                         }
-                                                         for (LegacyQueueMessage msg : messages) {
-                                                             String messageId = msg.getMessageId();
-                                                             if (!successMessageIds.contains(messageId)) {
-                                                                 logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
-                                                             }
-                                                         }
-                                                     }
-
-                                                     if (isUtilityDeadQueue) {
-                                                         ackUtilityDeadQueue(successMessages);
-                                                     } else {
-                                                         ackIndexDeadQueue(successMessages);
-                                                     }
-
-                                                     return messages;
-                                                 }
-                                                 catch ( Exception e ) {
-                                                     logger.error( "Failed to ack messages", e );
-                                                     return null;
-                                                     //do not rethrow so we can process all of them
-                                                 }
-                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
-
-                            //end flatMap
-                        }, indexProcessorFig.getEventConcurrencyFactor() );
+                        while ( true );
+                    }
+                } )        //this won't block our read loop, just reads and proceeds
+                    .flatMap( sqsMessages -> {
+
+                        //do this on a different schedule, and introduce concurrency
+                        // with flatmap for faster processing
+                        return Observable.just( sqsMessages )
+
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 ) {
+                                    // no messages came from the queue, move on
+                                    return null;
+                                }
+
+                                try {
+                                    // put the dead letter messages back in the appropriate queue
+                                    LegacyQueueManager returnQueue = getQueue(queueType, false);
+
+                                    List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+                                    for (LegacyQueueMessage msg : successMessages) {
+                                        logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                    }
+                                    int unsuccessfulMessagesSize = messages.size() - successMessages.size();
+                                    if (unsuccessfulMessagesSize > 0) {
+                                        // some messages couldn't be sent to originating queue, log
+                                        Set<String> successMessageIds = new HashSet<>();
+                                        for (LegacyQueueMessage msg : successMessages) {
+                                            String messageId = msg.getMessageId();
+                                            if (successMessageIds.contains(messageId)) {
+                                                logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+                                            } else {
+                                                successMessageIds.add(messageId);
+                                            }
+                                        }
+                                        for (LegacyQueueMessage msg : messages) {
+                                            String messageId = msg.getMessageId();
+                                            if (!successMessageIds.contains(messageId)) {
+                                                logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody());
+                                            }
+                                        }
+                                    }
+
+                                    ack(successMessages, queueType, true);
+
+                                    return messages;
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "Failed to ack messages", e );
+                                    return null;
+                                    //do not rethrow so we can process all of them
+                                }
+                            } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+
+                        //end flatMap
+                    }, indexProcessorFig.getEventConcurrencyFactor() );
 
             //start in the background
 
@@ -1172,7 +1149,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * Submit results to index and return the queue messages to be ack'd
      *
      */
-    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) {
 
         // if nothing came back then return empty list
         if(indexEventResults==null){
@@ -1199,7 +1176,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined, forUtilityQueue);
+        queueIndexOperationMessage(combined, queueType);
 
         return queueMessages;
     }
@@ -1210,10 +1187,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
         queueIndexOperationMessage(
-            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
+            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR );
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -1226,7 +1203,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
 
-        offerBatch( batch, forUtilityQueue );
+        offerBatch( batch, queueType );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..eb63056 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete";
+
     String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
 
     String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
 
+    String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -105,6 +109,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtility();
 
     /**
+     * The number of worker threads used to read delete requests from the queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+    int getWorkerCountDelete();
+
+    /**
      * The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
      */
     @Default("1")
@@ -119,6 +130,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCountUtilityDeadLetter();
 
     /**
+     * The number of worker threads used to read dead messages from the delete dead letter queue and reload them into the delete queue.
+     */
+    @Default("1")
+    @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+    int getWorkerCountDeleteDeadLetter();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@ public interface ReIndexAction {
      * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
-     * @param forUtilityQueue
+     * @param queueType
      */
-    void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..c7371b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince, true);
+                indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 83f4c8b..4886b08 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -124,7 +125,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
 
                     indexMessageObservable
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -139,7 +140,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                         entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -159,7 +160,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                         entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -252,13 +253,13 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/570e1ab4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1afb524..e8a20dd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -124,7 +125,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
 
                     indexMessageObservable
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -139,7 +140,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                         entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -159,7 +160,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                         entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
 
                     entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
+                        .compose(applyCollector(AsyncEventQueueType.DELETE))
                         .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
                         .subscribe();
 
@@ -245,13 +246,13 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
         }
     }
 
-    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
 
         return observable -> observable
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation, false);
+                asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
             });
 
     }


Mime
View raw message