usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [3/3] usergrid git commit: Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.
Date Sun, 02 Apr 2017 23:23:20 GMT
Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.

 - includes making all the tests work without in-memory queue fronting the database queue which really means adding some more delay in tests
 - the tests now are actually faster now because the original refreshIndex() created and queried random entities which took longer in most cases
 - uncommented the checkReceipts function in Notification tests as these are now passing, with an added fix for duplicate receipt creation
 - some logging updates in the distributed database queueing impl (Qakka)
 - increased the default take to 500 from the queue when DISTRIBUTED database queueing is configured ( which is the default now )
 - Notifications Queue Listener thread names have a random identifier in included
 - reduced the DISTRIBUTED database queueing default long poll to 1 second from 5 seconds


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d3e988bc
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d3e988bc
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d3e988bc

Branch: refs/heads/master
Commit: d3e988bcbb7eb417c84cfda7396ec3506521aa37
Parents: 8b63aae
Author: Michael Russo <russomichael@google.com>
Authored: Sun Apr 2 16:14:05 2017 -0700
Committer: Michael Russo <russomichael@google.com>
Committed: Sun Apr 2 16:14:05 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 44 +---------
 .../asyncevents/AsyncIndexProvider.java         |  5 ++
 .../java/org/apache/usergrid/Application.java   |  4 +-
 .../org/apache/usergrid/CoreApplication.java    | 22 ++---
 .../org/apache/usergrid/CoreITSetupImpl.java    | 20 ++++-
 .../org/apache/usergrid/TestEntityIndex.java    |  1 +
 .../corepersistence/AggregationServiceTest.java | 15 ++--
 .../corepersistence/StaleIndexCleanupTest.java  | 17 ++--
 .../persistence/ApplicationServiceIT.java       |  6 +-
 .../usergrid/persistence/CollectionIT.java      | 74 ++++++++--------
 .../usergrid/persistence/CountingMutatorIT.java |  4 +-
 .../persistence/EntityConnectionsIT.java        | 16 ++--
 .../usergrid/persistence/EntityManagerIT.java   | 20 ++---
 .../org/apache/usergrid/persistence/GeoIT.java  | 32 +++----
 .../persistence/GeoQueryBooleanTest.java        |  4 +-
 .../apache/usergrid/persistence/IndexIT.java    | 18 ++--
 .../usergrid/persistence/PathQueryIT.java       |  7 +-
 .../usergrid/persistence/PermissionsIT.java     |  6 +-
 .../usergrid/persistence/RebuildIndexTest.java  | 26 +++---
 .../cassandra/EntityManagerFactoryImplIT.java   | 13 +--
 .../persistence/query/ConnectionHelper.java     |  2 +-
 .../query/IntersectionTransitivePagingIT.java   |  2 +-
 .../query/IntersectionUnionPagingIT.java        |  2 +-
 .../persistence/query/IteratingQueryIT.java     | 32 +++----
 .../persistence/query/NotSubPropertyIT.java     |  2 +-
 .../persistence/query/ParenthesisProblemIT.java |  2 +-
 .../resources/usergrid-custom-test.properties   |  3 +
 .../usergrid/persistence/qakka/QakkaFig.java    |  4 +-
 .../qakka/core/impl/InMemoryQueue.java          |  4 +-
 .../core/impl/QueueMessageManagerImpl.java      |  9 +-
 .../distributed/actors/QueueActorHelper.java    | 15 +++-
 .../distributed/actors/QueueActorRouter.java    |  2 +-
 .../distributed/actors/ShardAllocator.java      |  4 +-
 .../impl/DistributedQueueServiceImpl.java       | 26 ++++--
 .../distributed/actors/ShardAllocatorTest.java  |  3 +
 .../queue/src/test/resources/qakka.properties   |  5 +-
 .../usergrid/rest/CollectionMetadataIT.java     |  4 +-
 .../apache/usergrid/rest/NotificationsIT.java   | 14 ++-
 .../apache/usergrid/rest/PartialUpdateTest.java |  6 +-
 .../apache/usergrid/rest/SystemResourceIT.java  |  3 +-
 .../rest/applications/ApplicationCreateIT.java  |  2 +-
 .../rest/applications/ApplicationDeleteIT.java  |  6 +-
 .../applications/ApplicationResourceIT.java     | 12 +--
 .../applications/assets/AssetResourceIT.java    | 26 +++---
 .../applications/assets/AwsAssetResourceIT.java | 22 ++---
 .../collection/BrowserCompatibilityTest.java    |  2 +-
 .../collection/CollectionsResourceIT.java       | 70 +++++++--------
 .../collection/DuplicateNameIT.java             |  2 +-
 .../activities/ActivityResourceIT.java          |  8 +-
 .../collection/activities/PutTest.java          |  6 +-
 .../collection/devices/DevicesResourceIT.java   | 12 +--
 .../collection/groups/GroupResourceIT.java      | 39 +++++----
 .../collection/paging/PagingResourceIT.java     | 10 +--
 .../users/ConnectionResourceTest.java           | 22 ++---
 .../collection/users/OwnershipResourceIT.java   | 24 +++---
 .../collection/users/PermissionsResourceIT.java | 58 ++++++-------
 .../collection/users/RetrieveUsersTest.java     |  8 +-
 .../collection/users/UserResourceIT.java        | 89 ++++++++++----------
 .../applications/events/EventsResourceIT.java   |  6 +-
 .../applications/queries/BasicGeoTests.java     |  4 +-
 .../applications/queries/GeoPagingTest.java     | 13 +--
 .../applications/queries/MatrixQueryTests.java  |  5 +-
 .../rest/applications/queries/OrderByTest.java  |  6 +-
 .../applications/queries/QueryTestBase.java     |  2 +-
 .../queries/SelectMappingsQueryTest.java        | 14 +--
 .../usergrid/rest/management/AccessTokenIT.java |  8 +-
 .../usergrid/rest/management/AdminUsersIT.java  | 33 ++++----
 .../rest/management/ExportResourceIT.java       |  2 +-
 .../rest/management/ImportResourceIT.java       | 14 +--
 .../rest/management/ManagementResourceIT.java   | 18 ++--
 .../rest/management/OrganizationsIT.java        | 16 ++--
 .../rest/management/RegistrationIT.java         |  4 +-
 .../rest/test/resource/AbstractRestIT.java      | 12 ++-
 .../resources/usergrid-custom-test.properties   |  7 +-
 .../services/notifications/QueueListener.java   | 15 ++--
 .../services/notifications/gcm/GCMAdapter.java  |  3 +-
 .../org/apache/usergrid/ServiceApplication.java |  2 +-
 .../apache/usergrid/management/EmailFlowIT.java |  2 +-
 .../org/apache/usergrid/management/RoleIT.java  |  5 +-
 .../usergrid/services/CollectionServiceIT.java  |  2 +-
 .../usergrid/services/GroupServiceIT.java       |  5 +-
 .../usergrid/services/ServiceInvocationIT.java  |  6 +-
 .../AbstractServiceNotificationIT.java          | 11 +--
 .../apns/NotificationsServiceIT.java            | 40 +++++----
 .../gcm/NotificationsServiceIT.java             | 15 ++--
 .../resources/usergrid-custom-test.properties   | 15 ++--
 86 files changed, 605 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1071842..cdb4fc7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -3089,52 +3089,16 @@ public class CpEntityManager implements EntityManager {
         managerCache.getEntityIndex(applicationScope).addIndex(newIndexName, shards, replicas, writeConsistency);
     }
 
+
     @Override
     public void initializeIndex(){
         managerCache.getEntityIndex(applicationScope).initialize();
     }
-    /**
-     * TODO, these 3 methods are super janky.  During refactoring we should clean this model up
-     */
-    public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
-        try {
-            long start = System.currentTimeMillis();
-            // refresh special indexes without calling EntityManager refresh because stack overflow
-            Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
-            map.put("some prop", "test");
-            boolean hasFinished = false;
-            Entity refreshEntity = create("refresh", map);
-            EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo
-                = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
 
-            try {
-                for (int i = 0; i < 20; i++) {
-                    if (searchCollection(
-                        new SimpleEntityRef(
-                            org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
-                        InflectionUtils.pluralize("refresh"),
-                        Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
-                    ).size() > 0
-                        ) {
-                        hasFinished = true;
-                        break;
-                    }
-                    int sleepTime = 500;
-                    logger.info("Sleeping {} ms during refreshIndex", sleepTime);
-                    Thread.sleep(sleepTime);
 
-                    indexRefreshCommandInfo
-                        = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
-                }
-                if(!hasFinished){
-                    throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid());
-                }
-            }finally {
-                delete(refreshEntity);
-            }
-            Thread.sleep(100);
-
-            return indexRefreshCommandInfo;
+    public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
+        try {
+            return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
         } catch (Exception e) {
             throw new RuntimeException("refresh failed",e);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 81960f5..2ba6c0b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -36,6 +36,7 @@ import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
+import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.DISTRIBUTED;
 import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL;
 
 
@@ -121,6 +122,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             asyncEventService.MAX_TAKE = 1000;
         }
 
+        if ( impl.equals( DISTRIBUTED )) {
+            asyncEventService.MAX_TAKE = 500;
+        }
+
         return asyncEventService;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/Application.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/Application.java b/stack/core/src/test/java/org/apache/usergrid/Application.java
index 378a4f7..102ee9c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/Application.java
+++ b/stack/core/src/test/java/org/apache/usergrid/Application.java
@@ -152,7 +152,9 @@ public interface Application extends TestRule {
 
     public void remove( EntityRef entityRef ) throws Exception;
 
-    public void refreshIndex();
+    public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis);
+
+    public void waitForQueueDrainAndRefreshIndex();
 
     /**
      * Get the entity manager

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 9046f02..f505ead 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -181,8 +181,8 @@ public class CoreApplication implements Application, TestRule {
 
         logger.info( "Created new application {} in organization {}", appName, orgName );
 
-//        //wait for the index before proceeding
-//        em.refreshIndex();
+        //wait for the index before proceeding
+        waitForQueueDrainAndRefreshIndex(500);
 
     }
 
@@ -223,19 +223,21 @@ public class CoreApplication implements Application, TestRule {
         return em.get( new SimpleEntityRef( type, id ) );
     }
 
-
     @Override
-    public synchronized void refreshIndex() {
-        //Insert test entity and find it
-        setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid());
-
-        if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
-            setup.getEmf().refreshIndex(em.getApplicationId());
+    public synchronized void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) {
+        try{
+            Thread.sleep(waitTimeMillis);
+        } catch (InterruptedException e ){
+            //noop
         }
-
         em.refreshIndex();
     }
 
+    @Override
+    public synchronized void waitForQueueDrainAndRefreshIndex() {
+        waitForQueueDrainAndRefreshIndex(750);
+    }
+
 
     @Override
     public EntityManager getEntityManager() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
index 64b001c..bd6ae3e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java
@@ -156,16 +156,28 @@ public class CoreITSetupImpl implements CoreITSetup, TestEntityIndex {
     @Override
     public void refresh(UUID appId){
         try {
-            Thread.sleep(50);
-        } catch (InterruptedException ie){
 
+            Thread.sleep(125);
+
+        } catch (InterruptedException ie){
+            //noop
         }
+
         emf.refreshIndex(appId);
 
+    }
+
+    @Override
+    public void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis){
         try {
-            Thread.sleep(50);
-        } catch (InterruptedException ie){
 
+            Thread.sleep(waitTimeMillis);
+
+        } catch (InterruptedException ie){
+            //noop
         }
+
+        emf.refreshIndex(appId);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
index 7da187a..e5e979e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
+++ b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java
@@ -26,4 +26,5 @@ import java.util.UUID;
  */
 public interface TestEntityIndex {
     void refresh(UUID appId);
+    void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
index 9f1c9a4..55ce26e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java
@@ -48,8 +48,8 @@ public class AggregationServiceTest extends AbstractCoreIT {
         props.put("name", "myname");
         Entity entity1 = this.app.getEntityManager().create("test", props);
         Entity entity2 = this.app.getEntityManager().create("test2", props);
-        this.app.refreshIndex();
-        Thread.sleep(500);
+
+        this.app.waitForQueueDrainAndRefreshIndex(500);
 
         long sum = aggregationService.getApplicationSize(applicationScope);
 
@@ -57,23 +57,24 @@ public class AggregationServiceTest extends AbstractCoreIT {
         Assert.assertTrue(sum > (entity1.getSize() + entity2.getSize()));
 
         long sum1 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
-        Assert.assertEquals(sum1, entity1.getSize());
+        Assert.assertEquals(entity1.getSize(), sum1);
 
         long sum2 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "test2s"));
-        Assert.assertEquals(sum2, entity2.getSize());
+        Assert.assertEquals(entity2.getSize(), sum2);
 
         props = new HashMap<>();
         props.put("test", 1234);
         props.put("name", "myname2");
         Entity entity3 = this.app.getEntityManager().create("test", props);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex(500);
+
         long sum3 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests"));
-        Assert.assertEquals(sum3, entity1.getSize() + entity3.getSize());
+        Assert.assertEquals(entity1.getSize() + entity3.getSize(), sum3);
 
         Map<String,Long> sumEach = aggregationService.getEachCollectionSize(applicationScope);
         Assert.assertTrue(sumEach.containsKey("tests") && sumEach.containsKey("test2s"));
-        Assert.assertEquals(sum3, (long) sumEach.get("tests"));
+        Assert.assertEquals((long) sumEach.get("tests"), sum3);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index abe2615..0abd7a2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -105,7 +104,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         Entity thing = em.create("thing", new HashMap<String, Object>() {{
             put("name", "thing1");
         }});
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
         assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
@@ -116,7 +115,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         em.updateProperties(thing, new HashMap<String, Object>() {{
             put("stuff", "widget");
         }});
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
@@ -161,7 +160,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             }}));
             Thread.sleep( writeDelayMs );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "thing", "select *");
         Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
@@ -210,7 +209,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         Thread.sleep(250); // delete happens asynchronously, wait for some time
 
         //refresh the app index
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(250); // refresh happens asynchronously, wait for some time
 
@@ -231,7 +230,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                }
             });
             //refresh the app index
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
 
             crs = queryCollectionCp("things", "thing", "select *");
 
@@ -265,7 +264,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                 put("name", dogName);
             }}));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *");
         Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size());
@@ -288,7 +287,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             }
 
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // wait for indexes to be cleared for the deleted entities
         count = 0;
@@ -296,7 +295,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         do {
             //trigger the repair
             queryCollectionEm("dogs", "select * order by created");
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
             crs = queryCollectionCp("dogs", "dog", "select *");
         } while ( crs.size() != numEntities && count++ < 15 );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index 9ad90eb..547691f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -23,7 +23,6 @@ import com.google.common.base.Optional;
 import com.google.inject.Injector;
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.service.AggregationService;
 import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -32,7 +31,6 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.junit.Assert;
@@ -65,7 +63,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             map.put("somekey", UUID.randomUUID());
            Entity entity = entityManager.create("tests", map);
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(500);
         ApplicationScope appScope  = CpNamingUtils.getApplicationScope(entityManager.getApplicationId());
         Observable<Id> ids =
@@ -76,7 +74,7 @@ public class ApplicationServiceIT extends AbstractCoreIT {
             this.app.getApplicationService().deleteAllEntities(appScope, 5);
         count = ids.count().toBlocking().last();
         Assert.assertEquals(count, 5);
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         Injector injector = SpringResource.getInstance().getBean(Injector.class);
         GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class);
         GraphManager graphManager = factory.createEdgeManager(appScope);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 3305e0e..f484f4f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -132,7 +132,7 @@ public class CollectionIT extends AbstractCoreIT {
         activity3 = app.get( activity3.getUuid(), activity3.getType() );
         app.addToCollection( user, "activities", activity3 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // empty query
         Query query = new Query();
@@ -259,7 +259,7 @@ public class CollectionIT extends AbstractCoreIT {
         activity3 = app.get(activity3.getUuid(), activity3.getType());
         app.addToCollection(user, "activities", activity3);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // empty query
         Query query = new Query();
@@ -295,7 +295,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -315,7 +315,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.update( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // search with the old username, should be no results
         query = Query.fromQL( "firstname = '" + firstName + "'" );
@@ -354,7 +354,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "middlename = '" + middleName + "'" );
@@ -386,7 +386,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "lastname = '" + lastName + "'" );
@@ -434,7 +434,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put("nickname", "ed");
         em.updateProperties(user1, properties);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         final Query query = Query.fromQL( "nickname = 'ed'" );
@@ -469,7 +469,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity group = em.create( "group", properties );
         assertNotNull( group );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         final Query query = Query.fromQL( "name = '" + groupName + "'" );
@@ -501,7 +501,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity group = em.create( "group", properties );
         assertNotNull( group );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
 
@@ -559,7 +559,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.addToCollection( user, "activities", em.create( "activity", properties ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Query query = Query.fromQL( "verb = 'post'" );
 
@@ -593,7 +593,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = new Query();
@@ -636,7 +636,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = new Query();
@@ -677,7 +677,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "orquerygame", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // EntityRef
         Query query = Query
@@ -756,7 +756,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = Query.fromQL(
@@ -817,7 +817,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // simple not
         Query query = Query.fromQL( "select * where NOT keywords contains 'game'" );
@@ -893,7 +893,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity entity2 = em.create( "game", properties );
         assertNotNull( entity2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
 
         // search for games without sub-field Foo should returned zero entities
@@ -949,7 +949,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put("keywords", "Action, New");
         em.create( "game", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -980,7 +980,7 @@ public class CollectionIT extends AbstractCoreIT {
         properties.put( "keywords", "Action, New" );
         Entity thirdGame = em.create( "game", properties );
 
-        app.refreshIndex();//need to track all batches then resolve promises
+        app.waitForQueueDrainAndRefreshIndex();//need to track all batches then resolve promises
 
         Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
         Results r = em.searchCollection( em.getApplicationRef(), "games", query );
@@ -1011,7 +1011,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = new Query();
         query.setLimit( 50 );
@@ -1039,7 +1039,7 @@ public class CollectionIT extends AbstractCoreIT {
             numDeleted++;
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // wait for indexes to be cleared
         Thread.sleep(1000); //TODO find why we have to wait.  This is a bug
@@ -1096,7 +1096,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         int pageSize = 10;
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         final Query query = Query.fromQL( "index < " + size * 2 + " order by index asc" );
 
         Results r = null;
@@ -1147,7 +1147,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         int pageSize = 10;
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * where index >= " + size / 2 + " sort by index asc" );
         query.setLimit( pageSize );
@@ -1201,7 +1201,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int pageSize = 10;
 
@@ -1254,7 +1254,7 @@ public class CollectionIT extends AbstractCoreIT {
             entityIds.add( created.getUuid() );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int pageSize = 5;
 
@@ -1310,7 +1310,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", root );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "rootprop1 = 'simpleprop'" );
         Entity entity;
@@ -1357,7 +1357,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", jsonData );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "intprop = 10" );
 
@@ -1416,7 +1416,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         Entity saved = em.create( "test", props );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "myString = 'My simple string'" );
 
@@ -1441,7 +1441,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select username, email where username = 'edanuff'";
         Query query = Query.fromQL( s );
@@ -1471,7 +1471,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select {name: username, email: email} where username = 'edanuff'";
         Query query = Query.fromQL( s );
@@ -1503,7 +1503,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         final Entity entity = em.create( "user", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         String s = "select * where username = 'ed@anuff.com'";
         Query query = Query.fromQL( s );
@@ -1525,7 +1525,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.createConnection( foo, "testconnection", entity );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now query via the testConnection, this should work
 
@@ -1569,7 +1569,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "loveobject", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         location = new LinkedHashMap<String, Object>();
         location.put( "Place", "Via Pietro Maroncelli, 48, 62012 Santa Maria Apparente Province of Macerata, Italy" );
@@ -1587,7 +1587,7 @@ public class CollectionIT extends AbstractCoreIT {
 
         em.create( "loveobject", properties );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // String s = "select * where Flag = 'requested'";
         // String s = "select * where Flag = 'requested' and NOT Recipient.Username =
@@ -1632,7 +1632,7 @@ public class CollectionIT extends AbstractCoreIT {
             createdEntities.add( created );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getCollection( em.getApplicationRef(), "users", null, 50, Level.ALL_PROPERTIES, false );
 
@@ -1729,7 +1729,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();
@@ -1763,7 +1763,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity game2 = em.create( "game", properties );
         assertNotNull( game2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();
@@ -1797,7 +1797,7 @@ public class CollectionIT extends AbstractCoreIT {
         Entity createUser2 = em.create( user2 );
         assertNotNull( createUser2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // overlap
         Query query = new Query();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
index 63c7cb8..596ec7c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
@@ -74,7 +74,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
         properties.put( "username", "testuser" );
         properties.put( "email", "test@foo.bar" );
         Entity created = em.create( "user", properties );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Entity returned = em.get( created.getUuid() );
 
@@ -89,7 +89,7 @@ public class CountingMutatorIT extends AbstractCoreIT {
 
 
             Entity connectedEntity = em.create( "user", connectedProps );
-            app.refreshIndex();
+            app.waitForQueueDrainAndRefreshIndex();
 
             // Connect from our new entity to our root one so it's updated when paging
             em.createConnection( connectedEntity, "following", returned );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
index 296bf53..e1e24c4 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
@@ -64,7 +64,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         assertEquals( 1, connectionTypes.size());
         assertEquals("likes", connectionTypes.iterator().next());
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.IDS);
 
@@ -128,7 +128,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catB.getUuid() + "\n" );
         em.createConnection( awardA, "awarded", catB );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // List forward connections for cat A
 
@@ -149,7 +149,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catA.getUuid() + "\n" );
         em.createConnection( awardA, "awarded", catA );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // List forward connections for cat A
 // Not valid with current usages
@@ -256,7 +256,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 
         em.createConnection( secondUserEntity, "likes", arrogantbutcher );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", "restaurant", Level.IDS);
 
@@ -310,7 +310,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 
         em.createConnection( fredEntity, "likes", wilmaEntity );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
 //        // search for "likes" edges from fred
 //        assertEquals( 1,
@@ -363,7 +363,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         em.createConnection( fredEntity, "likes", JohnEntity );
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now query via the testConnection, this should work
 
@@ -410,7 +410,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         }
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.ALL_PROPERTIES) ;
 
@@ -453,7 +453,7 @@ public class EntityConnectionsIT extends AbstractCoreIT {
 //
 //        em.createConnection( fredEntity, "likes", wilmaEntity );
 //
-//        app.refreshIndex();
+//        app.waitForQueueDrainAndRefreshIndex();
 //
 ////        // search for "likes" edges from fred
 ////        assertEquals( 1,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
index cb3a728..e1e4a05 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
@@ -75,7 +75,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) );
         assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty( "email" ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId ), "users", "edanuff" );
 
@@ -274,13 +274,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity thing = em.create( "thing", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         logger.info( "Starting entity delete" );
         em.delete( thing );
         logger.info( "Entity deleted" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now search by username, no results should be returned
 
@@ -310,13 +310,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity user = em.create( "user", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         logger.info( "Starting entity delete" );
         em.delete( user );
         logger.info( "Entity deleted" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // now search by username, no results should be returned
 
@@ -335,7 +335,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         user = em.create( "user", properties );
         logger.info( "Entity created" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Query userNameQuery = Query.fromQL( "username = '" + name + "'" );
 
@@ -456,7 +456,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Results r = em.getCollection( appRef, "things", null, 10, Level.ALL_PROPERTIES, false );
 
@@ -548,7 +548,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         Entity createdDevice = em.createItemInCollection( createdUser, "devices", "device", device );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid() ) );
 
@@ -580,7 +580,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity user = em.create( "robot", properties );
         assertNotNull( user );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         assertNotNull( em.get( user.getUuid() ) );
     }
@@ -608,7 +608,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         em.addToCollection(appRef, "fluffies", entityRef);
         em.addToCollection(appRef, "fluffies", entityRef);
 
-        //app.refreshIndex();
+        //app.waitForQueueDrainAndRefreshIndex();
 
         Results results = em.getCollection(appRef,
             "fluffies", null, 10, Level.ALL_PROPERTIES, true);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index b7d708e..df77084 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -96,7 +96,7 @@ public class GeoIT extends AbstractCoreIT {
         assertNotNull(hotel);
 
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //2. Query with a globally large distance to verify location
 
@@ -142,7 +142,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //2. Query with a globally large distance to verify location
         Query query = Query.fromQL("select * where location within " + Integer.MAX_VALUE + " of 0, 0");
@@ -154,7 +154,7 @@ public class GeoIT extends AbstractCoreIT {
         user.getDynamicProperties().remove("location");
         em.updateProperties(user, properties);
         em.update(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //4. Repeat the query, expecting no results
         listResults = em.searchCollection(em.getApplicationRef(), "users", query);
@@ -188,7 +188,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final double lat = 37.776753;
         final double lon = -122.407846;
@@ -234,7 +234,7 @@ public class GeoIT extends AbstractCoreIT {
         }};
         Entity user = em.create("user", properties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final double lat = 37.776753;
         final double lon = -122.407846;
@@ -284,12 +284,12 @@ public class GeoIT extends AbstractCoreIT {
 
         Entity user = em.create("user", userProperties);
         assertNotNull(user);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //3. Create a connection between the user and the entity
         em.createConnection(user, "likes", restaurant);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //4. Test that the user is within 2000m of the entity
         Results emSearchResults = em.searchTargetEntities(user,
             Query.fromQL("location within 5000 of "
@@ -326,7 +326,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
             logger.debug("Entity {} created", entity.getProperty("name"));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //2. validate the size of the result
         Query query = new Query();
         Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -367,7 +367,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
             logger.debug("Entity {} created", entity.getProperty("name"));
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //2. validate the size of the result
         Query query = new Query();
         Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query);
@@ -540,7 +540,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
          // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
         // just to be save
@@ -596,7 +596,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(2000);
 
         // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
@@ -669,7 +669,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
          // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers
         // just to be save
@@ -729,7 +729,7 @@ public class GeoIT extends AbstractCoreIT {
             created.add(e);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int startDelta = size - min;
 
@@ -794,7 +794,7 @@ public class GeoIT extends AbstractCoreIT {
             em.create("store", data);
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //do a direct geo iterator test.  We need to make sure that we short circuit on the correct tile.
 
@@ -838,7 +838,7 @@ public class GeoIT extends AbstractCoreIT {
             assertNotNull(entity);
         }
         //3. refresh the index
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         //4. return the entity manager
         return em;
     }
@@ -857,7 +857,7 @@ public class GeoIT extends AbstractCoreIT {
         latlong.put("longitude", longitude);
 
         em.setProperty(entity, "location", latlong);
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
index 9a3f5a6..609f977 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java
@@ -79,7 +79,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
         Entity user2 = em.create( "user", properties );
         assertNotNull( user2 );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // define center point about 300m from that location
         final double lat = 37.774277;
@@ -158,7 +158,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT {
         Entity userFred = em.create( "user", properties );
         assertNotNull( userFred );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // define center point about 300m from that location
         final double lat = 37.774277;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
index d62f88e..5933b57 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
@@ -60,7 +60,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         int i = 0;
 
@@ -133,7 +133,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "name < 'delta' order by name asc" );
         Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -261,7 +261,7 @@ public class IndexIT extends AbstractCoreIT {
             em.create( "item", properties );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "group = 1 order by name desc" );
         Results r = em.searchCollection( em.getApplicationRef(), "items", query );
@@ -290,7 +290,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.create("names", entity1);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL("select status where status = 'pickled'");
@@ -338,7 +338,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.createConnection( entity2Ref, "connecting", entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -357,7 +357,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.update( entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //query and check the status has been updated, shouldn't return results
         query = Query.fromQL( "select * where status = 'pickled'" );
@@ -413,7 +413,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.createConnection( entity2Ref, "connecting", entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //should return valid values
         Query query = Query.fromQL( "select * where status = 'pickled'" );
@@ -432,7 +432,7 @@ public class IndexIT extends AbstractCoreIT {
 
         em.update( entity1Ref );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         //query and check the status has been updated, shouldn't return results
         query = Query.fromQL( "select * where status = 'pickled'" );
@@ -500,7 +500,7 @@ public class IndexIT extends AbstractCoreIT {
         }};
         em.create("names", entity2);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // simple single-field select mapping
         {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
index e6ecf97..329a5be 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
@@ -28,7 +28,6 @@ import java.util.UUID;
 import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Query.Level;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -63,7 +62,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
 
@@ -135,7 +134,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // pick an arbitrary group, ensure it has 7 users
         Results ru = em.getCollection( groups.get( 2 ), "users", null, 20, Level.IDS, false );
@@ -152,7 +151,7 @@ public class PathQueryIT extends AbstractCoreIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         // pick an arbitrary user, ensure it has 7 devices
         Results rd = em.getCollection( users.get( 6 ), "devices", null, 20, Level.IDS, false );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
index 11f0692..1072d29 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java
@@ -28,8 +28,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang3.RandomStringUtils;
-
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.entities.Role;
 import org.apache.usergrid.persistence.Query.Level;
@@ -147,7 +145,7 @@ public class PermissionsIT extends AbstractCoreIT {
         dump( "group roles", roles );
 
         em.deleteGroupRole( group.getUuid(), "author" );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
 
         roles = em.getGroupRoles( group.getUuid() );
@@ -156,7 +154,7 @@ public class PermissionsIT extends AbstractCoreIT {
 
         em.addUserToGroupRole( user.getUuid(), group.getUuid(), "admin" );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Results r = em.getUsersInGroupRole( group.getUuid(), "admin", Level.ALL_PROPERTIES );
         dump( "entities", r.getEntities() );
         assertEquals( "proper number of users in group role not set", 1, r.size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 383d620..a7759de 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -125,7 +125,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -163,6 +163,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         waitForRebuild( status, reIndexService );
 
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read the catherder collection and not the catshepard
 
@@ -233,7 +234,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
         }
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(15000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -247,6 +248,8 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         deleteIndex( em.getApplicationId() );
 
+        app.waitForQueueDrainAndRefreshIndex();
+
         // ----------------- test that we can read them, should fail
 
         // deleting sytem app index will interfere with other concurrently running tests
@@ -283,7 +286,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -292,7 +294,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(15000);
         readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
     }
 
@@ -343,7 +345,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
 
         logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -392,7 +394,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -401,7 +402,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(5000);
         results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 );
         assertEquals(results.size(),3);
         q = Query.fromQL("select * where location within 100 of "+lat+", "+lon);
@@ -435,7 +436,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         final Entity secondEntity = em.create( "thing",  entityData);
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex(5000);
 
         // ----------------- test that we can read them, should work fine
 
@@ -493,7 +494,6 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
             logger.info( "Rebuilt index" );
 
-            app.refreshIndex();
         }
         catch ( Exception ex ) {
             logger.error( "Error rebuilding index", ex );
@@ -502,7 +502,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
-        Thread.sleep( 2000 );
+        app.waitForQueueDrainAndRefreshIndex(5000);
         countEntities( em, collectionName, 1 );
     }
 
@@ -547,14 +547,14 @@ public class RebuildIndexTest extends AbstractCoreIT {
         );
 
         ei.deleteApplication().toBlocking().lastOrDefault( null );
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
     }
 
 
     private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
         throws Exception {
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
         Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
@@ -593,7 +593,7 @@ public class RebuildIndexTest extends AbstractCoreIT {
     private int countEntities( EntityManager em, String collectionName, int expectedEntities)
            throws Exception {
 
-           app.refreshIndex();
+           app.waitForQueueDrainAndRefreshIndex();
 
            Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
            Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index d287d7e..3652b6f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -26,11 +26,8 @@ import java.util.UUID;
 import org.apache.usergrid.Application;
 import org.apache.usergrid.CoreApplication;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.utils.UUIDUtils;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,8 +40,6 @@ import org.apache.usergrid.persistence.cassandra.util.TraceTagManager;
 import org.apache.usergrid.persistence.cassandra.util.TraceTagReporter;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.setup.ConcurrentProcessSingleton;
-import rx.functions.Func0;
-import rx.functions.Func1;
 import rx.functions.Func2;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -140,7 +135,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
             Thread.sleep( 500 );
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
 
         // wait for it to appear in delete apps list
@@ -164,7 +159,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
         // delete the application
         setup.getEmf().deleteApplication(deletedAppId);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         found = findApps.call( deletedAppId, emf.getDeletedApplications() );
 
@@ -196,14 +191,14 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
             }
         }while (status.getStatus()!= ReIndexService.Status.COMPLETE);
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         // test to see that app now works and is happy
 
         // it should not be found in the deleted apps collection
         found = findApps.call( deletedAppId, emf.getDeletedApplications());
         assertFalse("Restored app found in deleted apps collection", found);
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         apps = setup.getEmf().getApplications();
         found = findApps.call(deletedAppId, apps);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
index e5c84f8..1f53c0a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java
@@ -76,7 +76,7 @@ public class ConnectionHelper extends CollectionIoHelper {
     @Override
     public Results getResults( Query query ) throws Exception {
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         query.setConnectionType( CONNECTION );
         query.setEntityType( "test" );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
index cea3b35..dcc8eae 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java
@@ -131,7 +131,7 @@ public class IntersectionTransitivePagingIT{
 
 
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(1000);
         return expected;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
index 4d60164..3403dc8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java
@@ -135,7 +135,7 @@ public class IntersectionUnionPagingIT {
 
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
index b2003fe..dac3f68 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
@@ -298,7 +298,7 @@ public class IteratingQueryIT {
             //we have to sleep, or we kill embedded cassandra
 
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         Thread.sleep(1000);
         long stop = System.currentTimeMillis();
 
@@ -367,7 +367,7 @@ public class IteratingQueryIT {
                 expected.add( name );
             }
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -438,7 +438,7 @@ public class IteratingQueryIT {
             }
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -551,7 +551,7 @@ public class IteratingQueryIT {
                 expectedResults.add( name );
             }
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -623,7 +623,7 @@ public class IteratingQueryIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -689,7 +689,7 @@ public class IteratingQueryIT {
             }
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -742,7 +742,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -803,7 +803,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -865,7 +865,7 @@ public class IteratingQueryIT {
             expected.add( name );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -924,7 +924,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Thread.sleep(500);
         long stop = System.currentTimeMillis();
@@ -987,7 +987,7 @@ public class IteratingQueryIT {
             expected.add( name );
         }
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
         long stop = System.currentTimeMillis();
 
         logger.info( "Writes took {} ms", stop - start );
@@ -1050,7 +1050,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
             expected.add( name );
         }
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -1110,7 +1110,7 @@ public class IteratingQueryIT {
             io.writeEntity( entity );
         }
 
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 
@@ -1216,7 +1216,7 @@ public class IteratingQueryIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query = Query.fromQL( "select * order by boolean desc, index asc" );
         query.setLimit( queryLimit );
@@ -1322,7 +1322,7 @@ public class IteratingQueryIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         Query query =
             Query.fromQL( "select * where intersect = true OR intersect2 = true order by created, intersect desc" );
@@ -1384,7 +1384,7 @@ public class IteratingQueryIT {
 
             io.writeEntity( entity );
         }
-        this.app.refreshIndex();
+        this.app.waitForQueueDrainAndRefreshIndex();
 
         long stop = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
index f7308da..3f5573f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java
@@ -132,7 +132,7 @@ public class NotSubPropertyIT {
 
         logger.info( "Writes took {} ms", stop - start );
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         return expected;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
index 60c1622..89641a8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java
@@ -72,7 +72,7 @@ public class ParenthesisProblemIT extends AbstractCoreIT {
             put("age",1);
         }});
 
-        app.refreshIndex();
+        app.waitForQueueDrainAndRefreshIndex();
 
         final Results entities = em.searchCollection( em.getApplicationRef(), "cats", Query.fromQL(query));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index c544967..8f9058d 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -49,6 +49,9 @@ collection.uniquevalues.authoritative.region=us-east
 # Queueing Test Settings
 # Reduce the long polling time for the tests
 queue.long.polling.time.millis=50
+elasticsearch.worker_count=8
+elasticsearch.worker_count_utility=8
+queue.get.timeout.seconds=8
 
 # --- End: Usergrid cluster/actor system settings
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 061807b..778274e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -165,7 +165,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
     long getMaxShardSize();
 
     @Key(QUEUE_LONG_POLL_TIME_MILLIS)
-    @Default("5000")
+    @Default("1000")
     long getLongPollTimeMillis();
 
     /** Max time-to-live for queue message and payload data */
@@ -174,7 +174,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
     int getMaxTtlSeconds();
 
     @Key(QUEUE_IN_MEMORY)
-    @Default("true")
+    @Default("false") // in memory not ready yet; leave this to false else msgs could be processed more than once
     boolean getInMemoryCache();
 
     @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC)

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 09bb8de..fa5ee0b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -59,7 +59,7 @@ public class InMemoryQueue {
         }
     }
 
-    public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
+    synchronized public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) {
 
         UUID newest = newestByQueueName.get( queueName );
         if ( newest == null ) {
@@ -76,7 +76,7 @@ public class InMemoryQueue {
         getQueue( queueName ).add( databaseQueueMessage );
     }
 
-    public UUID getNewest( String queueName ) {
+    synchronized public UUID getNewest( String queueName ) {
         if ( getQueue( queueName ).isEmpty() ) {
             newestByQueueName.remove( queueName );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index ac2857f..fd4257b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 
 @Singleton
@@ -188,7 +189,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
                         queueMessage.setData( json );
 
                     } catch (UnsupportedEncodingException e) {
-                        logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e);
+                        logger.error("Error decoding data for messageId=" + queueMessage.getMessageId(), e);
                     }
                 } else {
                     try {
@@ -201,6 +202,12 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
                 }
 
                 queueMessages.add( queueMessage );
+            } else if ( (System.currentTimeMillis() - dbMessage.getQueuedAt()) > TimeUnit.HOURS.toMillis(2) ) {
+                logger.warn("Queue Message does not have corresponding data after 2 hours, removing from queue - " +
+                    "queueName: {}, region: {}, queueMessageId: {}", dbMessage.getQueueName(), dbMessage.getRegion(),
+                    dbMessage.getQueueMessageId());
+                queueMessageSerialization.deleteMessage(dbMessage.getQueueName(), dbMessage.getRegion(),
+                    dbMessage.getShardId(), dbMessage.getType(), dbMessage.getQueueMessageId());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 89c79ec..eb26b69 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -23,6 +23,7 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -177,7 +178,7 @@ public class QueueActorHelper {
             }
         }
 
-        newestFetchedUuid.put( queueName, since );
+        updateUUIDPointer(queueName, since);
 
 //        Shard currentShard = multiShardIterator.getCurrentShard();
 //        if ( currentShard != null ) {
@@ -279,7 +280,7 @@ public class QueueActorHelper {
     }
 
 
-    void queueRefresh( String queueName ) {
+    synchronized void queueRefresh( String queueName ) {
 
         Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
 
@@ -327,7 +328,7 @@ public class QueueActorHelper {
 
                 startingShards.put( shardKey, shardId );
 
-                lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+                updateLastRefreshedTime(queueName);
 
                 if ( count > 0 ) {
                     Object shard = shardIdOptional.isPresent() ? shardIdOptional.get() : "null";
@@ -346,4 +347,12 @@ public class QueueActorHelper {
         return queueName + "_" + type + region;
     }
 
+    private synchronized void updateUUIDPointer(String queueName, UUID newUUIDPointer){
+        newestFetchedUuid.put( queueName, newUUIDPointer );
+    }
+
+    private synchronized void updateLastRefreshedTime(String queueName){
+        lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 1ff8502..cbc7245 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -133,7 +133,7 @@ public class QueueActorRouter extends UntypedActor {
                     getContext().dispatcher(),
                     getSelf() );
                 shardAllocationSchedulersByQueueName.put( queueName, scheduler );
-                logger.debug( "Created shard allocater for queue {}", queueName );
+                logger.debug( "Created shard allocator for queue {}", queueName );
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 19059e6..75c1c22 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -139,8 +139,8 @@ public class ShardAllocator extends UntypedActor {
                 shardSerialization.createShard( newShard );
                 shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 );
 
-                logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
-                        this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+                logger.info("Allocated new shard for queue, newShardID: {}, queueName: {}, shardMessageCount: {}, usedPercent: {}%",
+                    newShard.getShardId(), queueName, counterValue, (long)((double)counterValue/(double)qakkaFig.getMaxShardSize()*100) );
 
             } else {
 //                logger.debug("No new shard for queue {} counterValue {} of max {}",


Mime
View raw message