usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [2/3] usergrid git commit: Add option to include old version in result Add debug options Add gzip support
Date Tue, 14 Nov 2017 16:37:41 GMT
Add option to include old version in result
Add debug options
Add gzip support


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/000eaaad
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/000eaaad
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/000eaaad

Branch: refs/heads/master
Commit: 000eaaadfd83ace702d99fb740b6b0129688d9f7
Parents: 7139781
Author: Peter Johnson <pjohnson@apigee.com>
Authored: Tue Nov 7 12:29:50 2017 -0800
Committer: Peter Johnson <pjohnson@apigee.com>
Committed: Mon Nov 13 21:37:43 2017 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  15 +-
 .../corepersistence/CpRelationManager.java      |  26 +--
 .../asyncevents/AsyncEventService.java          |   6 +-
 .../asyncevents/AsyncEventServiceImpl.java      |  27 ++-
 .../asyncevents/direct/BufferedQueueImpl.java   | 191 -------------------
 .../asyncevents/direct/BufferedQueueNOP.java    |   3 +
 .../direct/DirectFirstEventServiceImpl.java     |  54 +++---
 .../corepersistence/index/IndexingStrategy.java |  69 -------
 .../corepersistence/pipeline/Pipeline.java      |   9 +-
 .../pipeline/PipelineContext.java               |  21 +-
 .../pipeline/builder/CandidateBuilder.java      |  13 +-
 .../pipeline/builder/IdBuilder.java             |  10 +-
 .../pipeline/builder/PipelineBuilder.java       |  19 +-
 .../read/search/CandidateEntityFilter.java      |  66 +++++--
 .../service/CollectionSearch.java               |   9 +
 .../service/CollectionServiceImpl.java          |  12 +-
 .../corepersistence/util/CpCollectionUtils.java |  49 +++--
 .../persistence/CollectionDeleteTest.java       |   2 +-
 .../usergrid/persistence/index/EntityIndex.java |   1 -
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../persistence/index/impl/EntityIndexTest.java |   1 +
 .../persistence/queue/LegacyQueueFig.java       |   9 +-
 .../queue/impl/SNSQueueManagerImpl.java         |  10 +-
 .../queue/settings/IndexConsistency.java        |  64 +++++++
 .../queue/settings/QueueIndexingStrategy.java   |  80 ++++++++
 .../rest/interceptors/GZIPInterceptor.java      |  79 ++++++++
 .../interceptors/GZIPWriterInterceptor.java     |  78 --------
 .../collection/CollectionsResourceIT.java       | 151 +++++++++++++++
 28 files changed, 629 insertions(+), 447 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1c979d6..7a4c781 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -32,7 +32,6 @@ import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.CollectionSettings;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
@@ -70,6 +69,7 @@ import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 import org.apache.usergrid.utils.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -524,7 +524,7 @@ public class CpEntityManager implements EntityManager {
 
         String entityType = cpEntity.getId().getType();
         boolean skipIndexingForType = skipIndexingForType(entityType);
-        IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+        QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
 
         try {
 
@@ -552,14 +552,14 @@ public class CpEntityManager implements EntityManager {
         }
 
         if (!skipIndexingForType) {
-            indexEntity(cpEntity, indexingStrategy);
+            indexEntity(cpEntity, queueIndexingStrategy);
             deIndexOldVersionsOfEntity(cpEntity);
         }
     }
 
-    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, IndexingStrategy indexingStrategy) {
+    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, QueueIndexingStrategy queueIndexingStrategy) {
         // queue an event to update the new entity
-        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , indexingStrategy);
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , queueIndexingStrategy);
     }
 
     private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
@@ -569,7 +569,7 @@ public class CpEntityManager implements EntityManager {
         }
     }
 
-    private IndexingStrategy getIndexingStrategyForType(String type ) {
+    private QueueIndexingStrategy getIndexingStrategyForType(String type ) {
         return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
     }
 
@@ -1813,7 +1813,8 @@ public class CpEntityManager implements EntityManager {
 
         for (String validName : CpCollectionUtils.getValidSettings()) {
             if (newSettings.containsKey(validName)) {
-                updatedSettings.put(validName, newSettings.get(validName));
+                Object value = CpCollectionUtils.validateValue(validName, newSettings.get(validName));
+                updatedSettings.put(validName, value);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index a23d6ac..e329c29 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -20,10 +20,7 @@ package org.apache.usergrid.corepersistence;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.index.CollectionSettings;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
-import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
 import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
@@ -50,6 +47,8 @@ import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.settings.IndexConsistency;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.InflectionUtils;
 import org.apache.usergrid.utils.MapUtils;
@@ -62,7 +61,6 @@ import java.util.*;
 
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
 import static org.apache.usergrid.persistence.Schema.*;
-import static org.apache.usergrid.utils.ClassUtils.cast;
 import static org.apache.usergrid.utils.InflectionUtils.singularize;
 import static org.apache.usergrid.utils.MapUtils.addMapSet;
 
@@ -397,8 +395,8 @@ public class CpRelationManager implements RelationManager {
 
                 String entityType = cpHeadEntity.getId().getType();
                 if ( !skipIndexingForType( entityType) ) {
-                    IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
-                    indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, indexingStrategy);
+                    QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
+                    indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, queueIndexingStrategy);
                 }
 
             } );
@@ -406,8 +404,8 @@ public class CpRelationManager implements RelationManager {
 
             String entityType = memberEntity.getId().getType();
             if ( !skipIndexingForType( entityType ) ) {
-                IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
-                indexService.queueNewEdge(applicationScope, memberEntityId, edge, indexingStrategy);
+                QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
+                indexService.queueNewEdge(applicationScope, memberEntityId, edge, queueIndexingStrategy);
             }
 
 
@@ -668,6 +666,8 @@ public class CpRelationManager implements RelationManager {
                         queryString, cursor );
 
                 search.setAnalyzeOnly(analyzeOnly);
+                IndexConsistency indexConsistency = getIndexConsistencyForType(collectionName);
+                search.setKeepStaleEntries(indexConsistency == IndexConsistency.LATEST);
 
                 return collectionService.searchCollection( search );
             }
@@ -738,8 +738,8 @@ public class CpRelationManager implements RelationManager {
 
         String entityType = targetEntity.getId().getType();
         if ( !skipIndexingForType( entityType ) ) {
-            IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
-            indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, indexingStrategy);
+            QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
+            indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, queueIndexingStrategy);
         }
 
         // remove any duplicate edges (keeps the duplicate edge with same timestamp)
@@ -1100,7 +1100,11 @@ public class CpRelationManager implements RelationManager {
 
     }
 
-    private IndexingStrategy getIndexingStrategyForType(String type ) {
+    private IndexConsistency getIndexConsistencyForType(String type ) {
+        return CpCollectionUtils.getIndexConsistencyForType(collectionSettingsFactory, applicationId, type);
+    }
+
+    private QueueIndexingStrategy getIndexingStrategyForType(String type ) {
         return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 4305aea..b8e8117 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -21,13 +21,13 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 
 import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 
 import java.util.UUID;
 
@@ -55,7 +55,7 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction
      * @param updatedAfter
      * @param
      */
-    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, IndexingStrategy strategy);
+    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, QueueIndexingStrategy strategy);
 
 
     /**
@@ -68,7 +68,7 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction
      * @param entityId
      * @param newEdge
      */
-    void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, IndexingStrategy indexingStrategy);
+    void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, QueueIndexingStrategy queueIndexingStrategy);
 
     /**
      * Queue the deletion of an edge

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 8257640..ec08dfe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -29,7 +29,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
 import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
@@ -58,6 +57,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
 import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -214,7 +214,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         start();
     }
 
-    protected Histogram getMessageCycye() {
+    protected Histogram getMessageCycle() {
         return messageCycle;
     }
 
@@ -278,28 +278,25 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * Offer the EntityIdScope to SQS
      */
     protected void offer(final Serializable operation) {
-        offer(operation, AsyncEventQueueType.REGULAR, IndexingStrategy.DIRECT);
+        offer(operation, AsyncEventQueueType.REGULAR, QueueIndexingStrategy.DIRECT);
     }
 
     /**
      * Offer the EntityIdScope to SQS
      */
-    protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
-        offer(operation, AsyncEventQueueType.REGULAR, indexingStrategy);
+    protected void offer(final Serializable operation, QueueIndexingStrategy queueIndexingStrategy) {
+        offer(operation, AsyncEventQueueType.REGULAR, queueIndexingStrategy);
     }
 
      /**
       * Offer the EntityIdScope to SQS
       */
-    private void offer(final Serializable operation, AsyncEventQueueType queueType, IndexingStrategy indexingStrategy) {
+    private void offer(final Serializable operation, AsyncEventQueueType queueType, QueueIndexingStrategy queueIndexingStrategy) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            Boolean async = null;
-            if (indexingStrategy != IndexingStrategy.DEFAULT) {
-                async = (indexingStrategy == IndexingStrategy.ASYNC);
-            }
+            Boolean async = (queueIndexingStrategy == QueueIndexingStrategy.ASYNC);
             getQueue(queueType).sendMessageToLocalRegion(operation, async);
 
         } catch (IOException e) {
@@ -548,7 +545,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     @Override
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
-                                       final Entity entity, long updatedAfter, IndexingStrategy indexingStrategy) {
+                                       final Entity entity, long updatedAfter, QueueIndexingStrategy queueIndexingStrategy) {
 
 
         if (logger.isTraceEnabled()) {
@@ -561,7 +558,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             new EntityIdScope(applicationScope, entity.getId()),
             updatedAfter);
 
-        offer(event, indexingStrategy);
+        offer(event, queueIndexingStrategy);
 
     }
 
@@ -599,14 +596,14 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void queueNewEdge(final ApplicationScope applicationScope,
                              final Id entityId,
                              final Edge newEdge,
-                             IndexingStrategy indexingStrategy) {
+                             QueueIndexingStrategy queueIndexingStrategy) {
 
         if (logger.isTraceEnabled()) {
             logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
                 newEdge.getType(), entityId.getUuid(), entityId.getType());
         }
 
-        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), indexingStrategy);
+        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), queueIndexingStrategy);
 
     }
 
@@ -710,7 +707,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offerTopic( elasticsearchIndexEvent, queueType );
     }
 
-    protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
+    protected ElasticsearchIndexEvent getESIndexEvent(final IndexOperationMessage indexOperationMessage) {
 
         final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
deleted file mode 100644
index 9123138..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.asyncevents.direct;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.function.Consumer;
-
-/**
- * Bufferes events and dispatched then in batches.
- * Ensures that the callback will be called at a min interval.
- */
-public class BufferedQueueImpl<T> implements BufferedQueue<T> {
-
-    private String fileName = "my_file_name.txt";
-    private Consumer<List<T>> consumer;
-
-    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
-
-    private final LinkedBlockingQueue<PendingDispatch> queue;
-    private final long intervalNanos;
-    private long timeOfLastDispatch = 0L;
-
-    public BufferedQueueImpl(int size, long interval , TimeUnit intervalTimeUnit) {
-
-        Runtime.getRuntime().addShutdownHook(new Thread(new DispatchTask()));
-
-        this.intervalNanos = intervalTimeUnit.toNanos(interval);
-        threadPool.scheduleAtFixedRate(new DispatchTask(), intervalNanos,intervalNanos, TimeUnit.NANOSECONDS);
-        readBatchFile();
-        queue = new LinkedBlockingQueue<>(size);
-    }
-
-    public boolean offer(T t) {
-        PendingDispatch pd = new PendingDispatch(t);
-        if (timeOfLastDispatch + intervalNanos < System.nanoTime()) {
-            dispatchOne(pd);
-            return true;
-        }
-        try {
-            return queue.offer(pd, intervalNanos, TimeUnit.NANOSECONDS);
-        } catch (InterruptedException e) {
-            return false;
-        }
-    }
-
-    public void setConsumer(Consumer<List<T>> consumer) {
-        this.consumer = consumer;
-    }
-
-
-    private void dispatchOne(PendingDispatch pd) {
-        List<PendingDispatch> messages = new ArrayList<>();
-        messages.add(pd);
-        dispatchMessages(messages);
-    }
-
-    protected void dispatchAll() {
-        if (!queue.isEmpty()) {
-            List<PendingDispatch> messages = new ArrayList<>();
-            queue.drainTo(messages);
-            dispatchMessages(messages);
-        }
-    }
-
-    private void dispatchMessages(List<PendingDispatch> messages) {
-        List<T> m = new ArrayList<>();
-        for (PendingDispatch pd : messages) {
-            if (!pd.isCancelled()) {
-                m.add(pd.getWrapped());
-            }
-        }
-        timeOfLastDispatch = System.nanoTime();
-        Boolean sent = Boolean.TRUE;
-        try {
-            consumer.accept(m);
-        } catch (Exception e) {
-            sent = Boolean.FALSE;
-        }
-        for (PendingDispatch pd : messages) {
-            pd.setResult(sent);
-        }
-    }
-
-
-    public int size() {
-        return queue.size();
-    }
-
-    private void readBatchFile() {
-
-    }
-
-
-    //
-    // Internal Helper classes
-    //
-
-
-
-    private class PendingDispatch implements Future<Boolean> {
-        T wrapped;
-        boolean canceled;
-        boolean done;
-        Boolean result = null;
-
-        PendingDispatch(T wrapped) {
-            this.wrapped = wrapped;
-            canceled = false;
-            done = false;
-        }
-
-        T getWrapped() {
-            return wrapped;
-        }
-
-        void setResult(Boolean b) {
-            result = b;
-            done = true;
-            synchronized (this) {
-                notify();
-            }
-        }
-
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            canceled = true;
-            return canceled;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return canceled;
-        }
-
-        @Override
-        public boolean isDone() {
-            return done;
-        }
-
-        @Override
-        public Boolean get() throws InterruptedException, ExecutionException {
-            while (!done) {
-                synchronized (this) {
-                    wait(100);
-                }
-            }
-            return result;
-        }
-
-        @Override
-        public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-            if (!done) {
-                synchronized (this) {
-                    wait(unit.toMillis(timeout));
-                }
-            }
-            return result;
-        }
-    }
-
-
-    private class DispatchTask implements Runnable  {
-        @Override
-        public void run() {
-            try {
-                dispatchAll();
-            } catch (Throwable t) {
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
index f842cea..c4d28b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
@@ -21,6 +21,9 @@ package org.apache.usergrid.corepersistence.asyncevents.direct;
 import java.util.function.Consumer;
 
 /**
+ * This is NOP buffer. An alternate implementation of this interface might buffer the
+ * events to smooth out 'bursts'
+ *
  * Created by peterajohnson on 10/27/17.
  */
 public class BufferedQueueNOP<T> implements BufferedQueue<T> {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
index 4dfce37..ec2b5ec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
@@ -23,7 +23,7 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
+import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
 import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,17 +55,19 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
 
     private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
 
-    private IndexingStrategy configIndexingStrategy = IndexingStrategy.ASYNC;
+    private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
 
     private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
 
     public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
         super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
 
-        //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
         bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
 
-        configIndexingStrategy = IndexingStrategy.get(queueFig.getQueueStrategy());
+        configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
+
+        boolean indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
+        CpCollectionUtils.setDebugMode(indexDebugMode);
 
     }
 
@@ -82,8 +85,10 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
         // failed to dispatch send to SQS
         try {
             List<LegacyQueueMessage> indexedMessages = submitToIndex(result, false);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Sent {} messages to ES ", indexedMessages.size());
+            }
         } catch (Exception e) {
-            e.printStackTrace();
             for (Serializable body : bodies) {
                 super.offer(body);
             }
@@ -123,16 +128,16 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
     }
 
 
-    protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
-        if  (shouldSendToDirectToES(indexingStrategy)) {
+    protected void offer(final Serializable operation, QueueIndexingStrategy queueIndexingStrategy) {
+        queueIndexingStrategy = resolveIndexingStrategy(queueIndexingStrategy);
+        if  (queueIndexingStrategy.shouldSendDirectToES()) {
             List<LegacyQueueMessage> messages = getMessageArray(operation);
             List<IndexEventResult> result = callEventHandlers(messages);
             submitToIndex( result, false );
         }
 
-        // only if single region.
-        if (shouldSendToAWS(indexingStrategy)) {
-            super.offer(operation, indexingStrategy);
+        if (queueIndexingStrategy.shouldSendToAWS()) {
+            super.offer(operation, queueIndexingStrategy);
         }
     }
 
@@ -152,7 +157,7 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
             .map(indexEventResult -> {
 
                 //record the cycle time
-                getMessageCycye().update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                getMessageCycle().update(System.currentTimeMillis() - indexEventResult.getCreationTime());
 
                 // ingest each index op into our combined, single index op for the index producer
                 if(indexEventResult.getIndexOperationMessage().isPresent()){
@@ -166,23 +171,24 @@ public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
 
 
         // dispatch to ES
-        ElasticsearchIndexEvent elasticsearchIndexEvent = getIndexOperationMessage(combined);
+        ElasticsearchIndexEvent elasticsearchIndexEvent = getESIndexEvent(combined);
         handleIndexOperation(elasticsearchIndexEvent);
         return queueMessages;
     }
 
-    private boolean shouldSendToDirectToES(IndexingStrategy indexingStrategy) {
-        if (indexingStrategy == IndexingStrategy.DEFAULT) {
-            indexingStrategy = configIndexingStrategy;
-        }
-        return  (indexingStrategy == IndexingStrategy.DIRECT || indexingStrategy == IndexingStrategy.DIRECTONLY);
-    }
-
-    private boolean shouldSendToAWS(IndexingStrategy indexingStrategy) {
-        if (indexingStrategy == IndexingStrategy.DEFAULT) {
-            indexingStrategy = configIndexingStrategy;
+    // If the collection has not defined an indexing strategy then use the default from the fig.
+    // only allow NOINDEX or DIRECTONLY when in debug mode
+    private QueueIndexingStrategy resolveIndexingStrategy(QueueIndexingStrategy queueIndexingStrategy) {
+        switch (queueIndexingStrategy) {
+            case CONFIG:
+                return configQueueIndexingStrategy;
+            case NOINDEX:
+            case DIRECTONLY:
+                if (!CpCollectionUtils.getDebugMode()) {
+                    return configQueueIndexingStrategy;
+                }
+            default:
+                return queueIndexingStrategy;
         }
-        // and is in same region.
-        return  (indexingStrategy != IndexingStrategy.DIRECTONLY);
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
deleted file mode 100644
index 69c5445..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.index;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class describes the paths an index request can take
- * between tomcat and ES.
- *
- * Created by peterajohnson on 10/30/17.
- */
-public enum IndexingStrategy {
-
-    DIRECTONLY("directonly"),   // Index request is sent directly to ES and not to AWS
-    DIRECT("direct"),   // Index request is sent directly to ES before sync ASW
-    SYNC("sync"),     // Index request is sent via a sync AWS to ES
-    ASYNC("async"),     // Index request is sent via an async AWS to ES
-    DEFAULT("default");    // Follow the default setting
-
-    private String name;
-
-    private static final Map<String,IndexingStrategy> NAME_MAP;
-
-    static {
-        Map<String,IndexingStrategy> map = new HashMap<String,IndexingStrategy>();
-        for (IndexingStrategy instance : IndexingStrategy.values()) {
-            map.put(instance.getName(),instance);
-        }
-        NAME_MAP = Collections.unmodifiableMap(map);
-    }
-
-    IndexingStrategy(String name) {
-        this.name = name;
-    }
-
-    public static IndexingStrategy get(String name) {
-        IndexingStrategy indexingStrategy =  NAME_MAP.get(name);
-        if (indexingStrategy == null) {
-            return DEFAULT;
-        }
-        return indexingStrategy;
-    }
-
-
-    public String getName() {
-        return this.name;
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index 13edb2c..34799bb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -50,6 +50,8 @@ public class Pipeline<InputType> {
 
     private final RequestCursor requestCursor;
     private int limit;
+    private boolean keepStaleEntries;
+    private String query;
 
     //Generics hell, intentionally without a generic, we check at the filter level
     private Observable currentObservable;
@@ -58,7 +60,7 @@ public class Pipeline<InputType> {
     /**
      * Create our filter pipeline
      */
-    public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+    public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit, boolean keepStaleEntries, String query) {
 
 
         ValidationUtils.validateApplicationScope( applicationScope );
@@ -78,6 +80,9 @@ public class Pipeline<InputType> {
         final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
 
         this.currentObservable = Observable.just( filter );
+
+        this.keepStaleEntries = keepStaleEntries;
+        this.query = query;
     }
 
 
@@ -86,7 +91,7 @@ public class Pipeline<InputType> {
 
 
 
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount, keepStaleEntries, query );
 
         filter.setContext( context );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 018abb7..88b5001 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -39,14 +39,20 @@ public class PipelineContext {
     private final ApplicationScope applicationScope;
     private final RequestCursor requestCursor;
     private final int limit;
+    // An entry is stale if the ES version number is less than the Cassandra version number
+    // it can happen if ES was not updated or has yet to be updated.
+    private final boolean keepStaleEntries;
+    private String query;
 
 
-    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
+    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id, boolean keepStaleEntries, String query ) {
 
         this.applicationScope = applicationScope;
         this.requestCursor = requestCursor;
         this.limit = limit;
         this.id = id;
+        this.keepStaleEntries = keepStaleEntries;
+        this.query = query;
     }
 
 
@@ -78,5 +84,18 @@ public class PipelineContext {
         return limit;
     }
 
+    /**
+     * return true if stales entries are not to be filtered out.
+     */
+    public boolean getKeepStaleEntries() {
+        return keepStaleEntries;
+    }
+
+    /**
+     * return the query string if any
+     */
+    public String getQuery() {
+        return query;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
index 9354127..a3b6fd9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.service.CollectionSearch;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -33,12 +34,20 @@ public class CandidateBuilder {
 
     private final Pipeline<FilterResult<Candidate>> pipeline;
     private final FilterFactory filterFactory;
+    private CollectionSearch search;
+
+    public CandidateBuilder(final Pipeline<FilterResult<Candidate>> pipeline,
+                            final FilterFactory filterFactory) {
+        this(pipeline,filterFactory,null);
+    }
 
 
-    public CandidateBuilder( final Pipeline<FilterResult<Candidate>> pipeline,
-                             final FilterFactory filterFactory ) {
+    public CandidateBuilder(final Pipeline<FilterResult<Candidate>> pipeline,
+                            final FilterFactory filterFactory,
+                            CollectionSearch search) {
         this.pipeline = pipeline;
         this.filterFactory = filterFactory;
+        this.search = search;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index a7f9ad9..4f44ac4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe
 import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.service.CollectionSearch;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -125,9 +126,16 @@ public class IdBuilder {
         final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter(
             ql, collectionName, entityType, analyzeOnly ) );
 
-        return new CandidateBuilder( newFilter, filterFactory );
+        return new CandidateBuilder( newFilter, filterFactory , null);
     }
 
+    public CandidateBuilder searchCollection(final String collectionName, final String ql, final CollectionSearch search ) {
+
+        final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter(
+            ql, collectionName, search.getEntityType(), search.getAnalyzeOnly() ) );
+
+        return new CandidateBuilder( newFilter, filterFactory, search );
+    }
 
     /**
      * Search all connections from our input Id and search their connections

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
index f1a44ea..624f9dc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
@@ -44,6 +44,8 @@ public class PipelineBuilder {
     private Optional<String> cursor = Optional.absent();
     private int limit = 10;
     private final FilterFactory filterFactory;
+    private boolean keepStaleEntries = false;
+    private String query = "";
 
 
     /**
@@ -81,6 +83,21 @@ public class PipelineBuilder {
         return this;
     }
 
+    /**
+     */
+    public PipelineBuilder keepStaleEntries(final boolean keepStaleEntries){
+        this.keepStaleEntries = keepStaleEntries;
+        return this;
+    }
+
+    /**
+     */
+    public PipelineBuilder query(final  Optional<String> query){
+        if (query.isPresent()) {
+            this.query = query.get();
+        }
+        return this;
+    }
 
     /**
      * Set our start point in our graph traversal to the specified entity id. A 1.0 compatibility API.  eventually this should be replaced with
@@ -91,7 +108,7 @@ public class PipelineBuilder {
      */
     @Deprecated
     public IdBuilder fromId(final Id entityId){
-        Pipeline<FilterResult<Id>> pipeline =  new Pipeline( applicationScope, this.cursor,limit ).withFilter(  filterFactory.getEntityIdFilter( entityId ) );
+        Pipeline<FilterResult<Id>> pipeline =  new Pipeline( applicationScope, this.cursor,limit,keepStaleEntries,query ).withFilter(  filterFactory.getEntityIdFilter( entityId ) );
 
         return new IdBuilder( pipeline, filterFactory );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 7770436..20bcfe9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -26,10 +26,10 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.model.field.DistanceField;
-import org.apache.usergrid.persistence.model.field.DoubleField;
 import org.apache.usergrid.persistence.model.field.EntityObjectField;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
+import org.apache.usergrid.utils.DateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +74,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
         this.entityIndexFactory = entityIndexFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.indexProducer = indexProducer;
+
     }
 
 
@@ -96,6 +97,9 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
         final EntityIndex applicationIndex = entityIndexFactory
             .createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
 
+        boolean keepStaleEntries = pipelineContext.getKeepStaleEntries();
+        String query = pipelineContext.getQuery();
+
         //buffer them to get a page size we can make 1 network hop
         final Observable<FilterResult<Entity>> searchIdSetObservable =
             candidateResultsObservable.buffer( pipelineContext.getLimit() )
@@ -119,7 +123,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                             entitySet -> new EntityVerifier(
                                 applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
                         )
-                            .doOnNext(entityCollector -> entityCollector.merge())
+                            .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query))
                             .flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
                             .map(entityFilterResult -> {
                                 final Entity entity = entityFilterResult.getValue();
@@ -246,10 +250,10 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
         /**
          * Merge our candidates and our entity set into results
          */
-        public void merge() {
+        public void merge(boolean keepStaleEntries, String query) {
 
             for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
-                validate( candidateResult );
+                validate( candidateResult , keepStaleEntries, query);
             }
 
             indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
@@ -267,7 +271,23 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
         }
 
 
-        private void validate( final FilterResult<Candidate> filterResult ) {
+        // Helper function to convert a UUID time stamp into a unix date
+        private Date UUIDTimeStampToDate(UUID uuid) {
+            long timeStamp = 0L;
+            // The UUID is supposed to be time based so this should always be '1'
+            // but this is just used for logging so we don't want to throw an error i it is misused.
+            if (uuid.version() == 1) {
+                // this is the difference between midnight October 15, 1582 UTC and midnight January 1, 1970 UTC as 100 nanosecond units
+                long epochDiff = 122192928000000000L;
+                // the UUID timestamp is in 100 nanosecond units.
+                // convert that to milliseconds
+                timeStamp =  ((uuid.timestamp()-epochDiff)/10000);
+            }
+            return new Date(timeStamp);
+        }
+
+
+        private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query ) {
 
             final Candidate candidate = filterResult.getValue();
             final CandidateResult candidateResult = candidate.getCandidateResult();
@@ -297,18 +317,40 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
             final UUID entityVersion = entity.getVersion();
             final Id entityId = entity.getId();
 
+            //entity is newer than ES version, could be a missed or slow index event
+            if ( UUIDComparator.staticCompare(entityVersion, candidateVersion) > 0 ) {
+
+               Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
+               Date entityTimeStamp = UUIDTimeStampToDate(entityVersion);
+
+               Map<String,String> fields = new HashMap<>();
+               for  (Field field : entity.getEntity().get().getFields()) {
+                   fields.put(field.getName(),String.valueOf(field.getValue()));
+               }
+
+               logger.warn( "Found stale entity on edge {} for entityId {} Entity version date = {}.  Candidate version date = {}. Will be returned in result set = {} Query = [{}] Entity fields = {}",
+                   searchEdge,
+                   entityId.getUuid(),
+                   DateUtils.instance.formatIso8601Date(entityTimeStamp),
+                   DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+                   keepStaleEntries,
+                   query,
+                   fields
+               );
+
+                if (!keepStaleEntries) {
+                    batch.deindex(searchEdge, entityId, candidateVersion);
+                    return;
+                }
+            }
 
 
-
-
-            //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ||
-                    !entity.getEntity().isPresent()  ||
-                    entity.getStatus() == MvccEntity.Status.DELETED ) {
+            // The entity is marked as deleted
+            if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
 
                 // when updating entities, we don't delete previous versions from ES so this action is expected
                 if(logger.isDebugEnabled()){
-                    logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
                         searchEdge, entityId, entityVersion);
                 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
index 6240028..6b6edfc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -42,6 +42,7 @@ public class CollectionSearch {
     private final Optional<String> cursor;
     private Level level = Level.ALL;
     private boolean analyzeOnly;
+    private boolean keepStaleEntries;
 
 
     public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String
@@ -103,4 +104,12 @@ public class CollectionSearch {
     public void setAnalyzeOnly(final boolean analyzeOnly){
         this.analyzeOnly = analyzeOnly;
     }
+
+    public boolean getKeepStaleEntries() {
+        return keepStaleEntries;
+    }
+
+    public void setKeepStaleEntries(final boolean keepStaleEntries){
+        this.keepStaleEntries = keepStaleEntries;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
index 7684050..e052e2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
 import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
 import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
@@ -58,8 +59,12 @@ public class CollectionServiceImpl implements CollectionService {
         final Optional<String> query = search.getQuery();
 
         final IdBuilder pipelineBuilder =
-            pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
-                                  .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+            pipelineBuilderFactory.create( applicationScope )
+                .withCursor( search.getCursor() )
+                .withLimit( search.getLimit() )
+                .keepStaleEntries(search.getKeepStaleEntries())
+                .query(query)
+                .fromId( search.getCollectionOwnerId() );
 
 
         final EntityBuilder results;
@@ -68,7 +73,7 @@ public class CollectionServiceImpl implements CollectionService {
             results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
         }
         else {
-            results = pipelineBuilder.searchCollection( collectionName, query.get(),search.getEntityType(), search.getAnalyzeOnly()).loadEntities();
+            results = pipelineBuilder.searchCollection( collectionName, query.get(),search).loadEntities();
         }
 
 
@@ -81,7 +86,6 @@ public class CollectionServiceImpl implements CollectionService {
 
         final ApplicationScope applicationScope = search.getApplicationScope();
         final String collectionName = search.getCollectionName();
-        final Optional<String> query = search.getQuery();
 
         final IdBuilder pipelineBuilder =
             pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
index f38cefa..010af2b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
@@ -21,10 +21,11 @@ import org.apache.usergrid.corepersistence.index.CollectionSettings;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
 
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.settings.IndexConsistency;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 
 
 import java.util.*;
@@ -42,38 +43,60 @@ public class CpCollectionUtils {
 
     public static final String SETTING_FIELDS = "fields";
     public static final String SETTING_QUEUE_INDEX = "queueIndex";
+    public static final String SETTING_INDEX_CONSISTENCY = "indexConsistency";
 
     private static Set<String> VALID_SETTING_NAMES = new HashSet<>();
 
     static {
         VALID_SETTING_NAMES.add(SETTING_FIELDS);
         VALID_SETTING_NAMES.add(SETTING_QUEUE_INDEX);
+        VALID_SETTING_NAMES.add(SETTING_INDEX_CONSISTENCY);
     }
 
     public static Set<String> getValidSettings() {
         return VALID_SETTING_NAMES;
     }
 
-    public static IndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+    // When running in debug mode we all some normally invalid index settings
+    // like update C* but not ES.
+    private static boolean debugMode = false;
+    public static boolean getDebugMode() {
+        return debugMode;
+    }
 
-        IndexingStrategy indexingStrategy = IndexingStrategy.DEFAULT;
-        String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
-        if (indexing != null) {
-            indexingStrategy = IndexingStrategy.get(indexing);
+    public static void setDebugMode(boolean set) {
+        debugMode = set;
+    }
+
+    public static Object validateValue(String name, Object value) {
+        if (SETTING_QUEUE_INDEX.equals(name)) {
+            return QueueIndexingStrategy.get(value.toString()).getName();
+        }
+        if (SETTING_INDEX_CONSISTENCY.equals(name)) {
+            return IndexConsistency.get(value.toString()).getName();
         }
-        return indexingStrategy;
+        return value;
     }
 
-    public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+    public static QueueIndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
 
+        QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.CONFIG;
         String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
-        if ("async".equals(indexing)) {
-            return Boolean.TRUE;
+        if (indexing != null) {
+            queueIndexingStrategy = QueueIndexingStrategy.get(indexing);
         }
-        if ("sync".equals(indexing)) {
-            return Boolean.FALSE;
+        return queueIndexingStrategy;
+    }
+
+
+    public static IndexConsistency getIndexConsistencyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+
+        IndexConsistency indexConsistency = IndexConsistency.STRICT;
+        String indexConsistencyString = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_INDEX_CONSISTENCY);
+        if ( indexConsistencyString != null) {
+            indexConsistency = IndexConsistency.get(indexConsistencyString);
         }
-        return null;
+        return indexConsistency;
     }
 
     public static boolean skipIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
index d062ef4..42afa67 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -138,7 +138,7 @@ public class CollectionDeleteTest extends AbstractCoreIT {
         logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME);
 
 
-        app.waitForQueueDrainAndRefreshIndex(5000);
+        app.waitForQueueDrainAndRefreshIndex(15000);
 
         final CollectionDeleteRequestBuilder builder =
             collectionDeleteService.getBuilder()

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index b444199..437f9bf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -153,7 +153,6 @@ public interface EntityIndex extends CPManager {
      */
     String[] getIndexes();
 
-
     /**
      * type of alias
      */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3d2f576..211cf70 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -773,6 +773,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
             return Health.valueOf( chr.getStatus().name() );
         }
         catch ( Exception ex ) {
+            ex.printStackTrace();
             logger.error( "Error connecting to ElasticSearch", ex.getMessage() );
         }
 
@@ -859,7 +860,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     }
 
 
-
     /**
      * Interface for operations.
      */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index ac7d10d..902c5d3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -484,6 +484,7 @@ public class EntityIndexTest extends BaseIT {
     }
 
 
+
     @Test
     public void deleteVerification() throws Throwable {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 4a12d14..4cb6f37 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -110,13 +110,12 @@ public interface LegacyQueueFig extends GuicyFig {
     @Default("900000") // 15 minutes
     int getMapMessageTimeout();
 
-    @Key("usergrid.queue.is.async")
-    @Default("true")
-    boolean isAsyncQueue();
-
-
     @Key("usergrid.queue.strategy")
     @Default("async")
     String getQueueStrategy();
 
+    @Key("usergrid.queue.test")
+    @Default("false")
+    String getQueueDebugMode();
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bc9be57..b18411d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.services.sqs.model.*;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -541,10 +542,9 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
         }
     }
 
-
     @Override
     public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
-        boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+        boolean sendAsync = async == null || async.booleanValue();
         if (sendAsync) {
             sendMessageToAllRegionsAsync(body);
         } else {
@@ -552,7 +552,6 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
         }
     }
 
-
     private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
         if ( sns == null ) {
             logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
@@ -634,8 +633,9 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
 
     @Override
     public void sendMessages( final List bodies ) throws IOException {
+        QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.get(fig.getQueueStrategy());
         for ( Object body : bodies ) {
-            if (fig.isAsyncQueue()) {
+            if (queueIndexingStrategy == QueueIndexingStrategy.ASYNC) {
                 sendMessageToLocalRegionAsync((Serializable) body);
             } else {
                 sendMessageToLocalRegionSync((Serializable) body);
@@ -682,7 +682,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
 
     @Override
     public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
-        boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+        boolean sendAsync = async.booleanValue();
         if (sendAsync) {
             sendMessageToLocalRegionAsync(body);
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
new file mode 100644
index 0000000..531716a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue.settings;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the consistency rules when returning results set between C* and ES
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum IndexConsistency {
+
+    STRICT("strict"),       // Result canidate must be exact match to be returned in result set
+    LATEST("latest");       // Result canidate must be exact match OR most recent version to be returned in result set
+
+    private String name;
+
+    private static final Map<String,IndexConsistency> NAME_MAP;
+
+    static {
+        Map<String,IndexConsistency> map = new HashMap<>();
+        for (IndexConsistency instance : IndexConsistency.values()) {
+            map.put(instance.getName(),instance);
+        }
+        NAME_MAP = Collections.unmodifiableMap(map);
+    }
+
+    IndexConsistency(String name) {
+        this.name = name;
+    }
+
+    public static IndexConsistency get(String name) {
+        IndexConsistency queueIndexingStrategy =  NAME_MAP.get(name);
+        if (queueIndexingStrategy == null) {
+            return LATEST;
+        }
+        return queueIndexingStrategy;
+    }
+
+
+    public String getName() {
+        return this.name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java
new file mode 100644
index 0000000..375de71
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue.settings;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the paths an index request can take
+ * between tomcat and ES.
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum QueueIndexingStrategy {
+
+    NOINDEX("debug_noindex"),        // Do not Index the entity (DEBUG only use for testing)
+    DIRECTONLY("debug_directonly"),  // Index request is sent directly to ES and not to AWS
+
+    DIRECT("direct"),          // Index request is sent directly to ES before sync ASW
+    SYNC("sync"),              // Index request is sent via a sync AWS to ES
+    ASYNC("async"),            // Index request is sent via an async AWS to ES
+    CONFIG("config");          // Follow the default setting of the fig
+
+    private String name;
+
+    private static final Map<String,QueueIndexingStrategy> NAME_MAP;
+
+    static {
+        Map<String,QueueIndexingStrategy> map = new HashMap<String,QueueIndexingStrategy>();
+        for (QueueIndexingStrategy instance : QueueIndexingStrategy.values()) {
+            map.put(instance.getName(),instance);
+        }
+        NAME_MAP = Collections.unmodifiableMap(map);
+    }
+
+    QueueIndexingStrategy(String name) {
+        this.name = name;
+    }
+
+    public static QueueIndexingStrategy get(String name) {
+        QueueIndexingStrategy queueIndexingStrategy =  NAME_MAP.get(name);
+        if (queueIndexingStrategy == null) {
+            return CONFIG;
+        }
+        return queueIndexingStrategy;
+    }
+
+
+    public String getName() {
+        return this.name;
+    }
+
+    public boolean shouldSendDirectToES() {
+        return  (this == QueueIndexingStrategy.DIRECT || this == QueueIndexingStrategy.DIRECTONLY);
+    }
+
+    public boolean shouldSendToAWS() {
+        // and is in same region.
+        return  (this != QueueIndexingStrategy.DIRECTONLY && this != QueueIndexingStrategy.NOINDEX);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java
new file mode 100644
index 0000000..77e06ed
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.rest.interceptors;
+
+import org.glassfish.jersey.server.ContainerRequest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import javax.inject.*;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.ext.*;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * If the request had an ACCEPT_ENCODING header containing 'gzip' then
+ * gzip the response and add CONTENT_ENCODING gzip header
+ *
+ * If the request had an CONTENT_ENCODING header containing 'gzip' then
+ *  unzip the request and remove the CONTENT_ENCODING gzip header
+ *
+ *  Created by peterajohnson on 11/1/17.
+ */
+@Provider
+public class GZIPInterceptor implements ReaderInterceptor, WriterInterceptor {
+
+    final private static String GZIP = "gzip";
+    @Inject
+    private javax.inject.Provider<ContainerRequest> requestProvider;
+
+    @Override
+    public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException {
+        ContainerRequest request = requestProvider.get();
+
+        if (request != null) {
+            List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING);
+            if (aeHeaders != null && aeHeaders.size() > 0) {
+                String acceptEncodingHeader = aeHeaders.get(0);
+                if (acceptEncodingHeader.contains(GZIP)) {
+                    OutputStream outputStream = context.getOutputStream();
+                    context.setOutputStream(new GZIPOutputStream(outputStream));
+                    context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP);
+                }
+            }
+        }
+        context.proceed();
+    }
+
+    @Override
+    public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException {
+        String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING);
+        if (GZIP.equalsIgnoreCase(encoding)) {
+            GZIPInputStream is = new GZIPInputStream(context.getInputStream());
+            context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING);
+            context.setInputStream(is);
+        }
+
+        return context.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/000eaaad/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
deleted file mode 100644
index f562475..0000000
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.rest.interceptors;
-
-import org.glassfish.jersey.server.ContainerRequest;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import javax.inject.*;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.ext.*;
-import javax.ws.rs.ext.Provider;
-
-/**
- * If the request had an ACCEPT_ENCODING header containing 'gzip' then
- * gzip the response and add CONTENT_ENCODING gzip header
- *
- * * If the request had an CONTENT_ENCODING header containing 'gzip' then
- *  unzip the request and remove the CONTENT_ENCODING gzip header
- *  Created by peterajohnson on 11/1/17.
- */
-@Provider
-public class GZIPWriterInterceptor implements ReaderInterceptor, WriterInterceptor {
-
-    final private static String GZIP = "gzip";
-    @Inject
-    private javax.inject.Provider<ContainerRequest> requestProvider;
-
-    @Override
-    public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException {
-        ContainerRequest request = requestProvider.get();
-
-        if (request != null) {
-            List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING);
-            if (aeHeaders != null && aeHeaders.size() > 0) {
-                String acceptEncodingHeader = aeHeaders.get(0);
-                if (acceptEncodingHeader.contains(GZIP)) {
-                    OutputStream outputStream = context.getOutputStream();
-                    context.setOutputStream(new GZIPOutputStream(outputStream));
-                    context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP);
-                }
-            }
-        }
-        context.proceed();
-    }
-
-    @Override
-    public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException {
-        String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING);
-        if (GZIP.equalsIgnoreCase(encoding)) {
-            GZIPInputStream is = new GZIPInputStream(context.getInputStream());
-            context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING);
-            context.setInputStream(is);
-        }
-
-        return context.proceed();
-    }
-}


Mime
View raw message