usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [03/16] incubator-usergrid git commit: Initial pass at moving queues to core
Date Tue, 21 Apr 2015 15:55:12 GMT
Initial pass at moving queues to core


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/84d779fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/84d779fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/84d779fc

Branch: refs/heads/USERGRID-578
Commit: 84d779fc323b0f6c130c03a8982a39f5d5025b17
Parents: 8bb8a4f
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Apr 15 14:21:29 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Fri Apr 17 11:58:28 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |   8 +
 .../usergrid/corepersistence/CoreModule.java    |  44 ++-
 .../corepersistence/CpEntityManager.java        | 167 +++++-----
 .../corepersistence/CpEntityManagerFactory.java |  18 +-
 .../corepersistence/CpManagerCache.java         |   3 +
 .../corepersistence/CpRelationManager.java      | 102 +++----
 .../CpRelationManagerFactory.java               |  46 ---
 .../usergrid/corepersistence/GuiceFactory.java  |  19 +-
 .../events/EntityDeletedHandler.java            |   5 +-
 .../events/EntityVersionCreatedHandler.java     |   8 +-
 .../events/EntityVersionDeletedHandler.java     |  19 +-
 .../corepersistence/index/BufferQueue.java      |  68 +++++
 .../index/BufferQueueInMemoryImpl.java          | 116 +++++++
 .../index/BufferQueueSQSImpl.java               | 306 +++++++++++++++++++
 .../index/IndexQueueService.java                |  45 +++
 .../corepersistence/index/IndexService.java     |  50 +++
 .../corepersistence/index/IndexServiceImpl.java | 148 +++++++++
 .../corepersistence/index/QueryFig.java         |  98 ++++++
 .../corepersistence/index/QueueProvider.java    | 112 +++++++
 .../index/SQSIndexQueueServiceImpl.java         |  35 +++
 .../corepersistence/util/CpNamingUtils.java     |   4 +-
 .../usergrid/persistence/EntityManager.java     |   2 -
 .../persistence/GuiceAdapterBeanFactory.java    | 114 +++++++
 .../usergrid/persistence/PersistenceModule.java | 162 ++++++++++
 .../corepersistence/TestIndexModule.java        |  35 +++
 .../index/BufferQueueSQSImplTest.java           | 179 +++++++++++
 .../persistence/core/aws/NoAWSCredsRule.java    |  98 ++++++
 .../graph/serialization/EdgesObservable.java    |  23 ++
 .../serialization/impl/EdgesObservableImpl.java |  68 +++--
 .../persistence/index/EntityIndexBatch.java     |   3 +-
 .../usergrid/persistence/index/IndexFig.java    | 104 ++-----
 .../persistence/index/guice/IndexModule.java    |   5 +-
 .../persistence/index/guice/QueueProvider.java  | 116 -------
 .../persistence/index/impl/BufferQueue.java     |  66 ----
 .../index/impl/BufferQueueInMemoryImpl.java     | 115 -------
 .../index/impl/BufferQueueSQSImpl.java          | 306 -------------------
 .../impl/EsApplicationEntityIndexImpl.java      |  12 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |  16 +-
 .../index/impl/EsEntityIndexFactoryImpl.java    |   8 +-
 .../index/impl/EsEntityIndexImpl.java           |  50 +--
 .../index/impl/EsIndexBufferConsumerImpl.java   | 237 ++++++--------
 .../index/impl/EsIndexBufferProducerImpl.java   |  59 ----
 .../index/impl/FailureMonitorImpl.java          |  26 --
 .../index/impl/FlushBufferQueue.java            |  23 ++
 .../index/impl/IndexBufferConsumer.java         |  15 +-
 .../index/impl/IndexBufferProducer.java         |  32 --
 .../persistence/index/impl/IndexIdentifier.java |  46 +++
 .../index/impl/IndexIdentifierImpl.java         | 118 +------
 .../index/impl/IndexOperationMessage.java       | 139 +++++++++
 .../index/impl/IndexRefreshCommandImpl.java     |  12 +-
 .../index/migration/LegacyIndexIdentifier.java  |   4 +-
 .../index/guice/TestIndexModule.java            |  22 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 175 -----------
 .../persistence/queue/NoAWSCredsRule.java       |  98 ------
 .../persistence/queue/QueueManagerTest.java     |   3 +-
 stack/services/pom.xml                          |  50 +--
 .../notifications/NotifiersServiceIT.java       |  19 +-
 57 files changed, 2232 insertions(+), 1749 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 6f3c381..3504be4 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -163,6 +163,14 @@
 
   <dependencies>
 
+
+      <!-- dependent on wiring guice and spring -->
+      <dependency>
+          <groupId>com.google.inject.extensions</groupId>
+          <artifactId>guice-spring</artifactId>
+          <version>4.0-beta5</version>
+      </dependency>
+
     <!-- Apache Dependencies -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 7859ffc..7e8af87 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,7 +16,13 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.corepersistence.index.BufferQueue;
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
+import org.apache.usergrid.corepersistence.index.QueryFig;
+import org.apache.usergrid.corepersistence.index.QueueProvider;
 import org.apache.usergrid.corepersistence.migration.*;
+import org.apache.usergrid.persistence.PersistenceModule;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.safehaus.guicyfig.GuicyFigModule;
 import org.springframework.context.ApplicationContext;
@@ -56,25 +62,18 @@ import com.google.inject.multibindings.Multibinder;
  */
 public class CoreModule  extends AbstractModule {
 
-    /**
-     * TODO this is a circular dependency, and should be refactored
-     */
-    private LazyEntityManagerFactoryProvider lazyEntityManagerFactoryProvider;
 
-    public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
 
+    public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
 
 
-    public CoreModule( final ApplicationContext context ) {
-        this.lazyEntityManagerFactoryProvider = new LazyEntityManagerFactoryProvider( context );
-    }
 
     @Override
     protected void configure() {
 
 
-        //See TODO, this is fugly
-        bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
+//        //See TODO, this is fugly
+//        bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
 
         install( new CommonModule());
         install( new CollectionModule() {
@@ -141,31 +140,24 @@ public class CoreModule  extends AbstractModule {
         plugins.addBinding().to( AppInfoMigrationPlugin.class );
         plugins.addBinding().to( MigrationModuleVersionPlugin.class );
 
-        bind(AllApplicationsObservable.class).to(AllApplicationsObservableImpl.class);
+        bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class );
 
-        install(new GuicyFigModule(ApplicationIdCacheFig.class));
-
-    }
 
+        /*****
+         * Indexing service
+         *****/
 
-    /**
-     * TODO, this is a hack workaround due to the guice/spring EMF circular dependency
-     * Once the entity managers have been refactored and moved into guice, remove this dependency.
-     *
-     */
-    public static class LazyEntityManagerFactoryProvider implements Provider<EntityManagerFactory>{
 
-        private final ApplicationContext context;
+        bind(IndexService.class).to(IndexServiceImpl.class);
+        //bind the queue provider
 
+        bind( BufferQueue.class).toProvider( QueueProvider.class );
 
-        public LazyEntityManagerFactoryProvider( final ApplicationContext context ) {this.context = context;}
+        install( new GuicyFigModule( QueryFig.class ) );
 
 
+        install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
 
-        @Override
-        public EntityManagerFactory get() {
-            return this.context.getBean( EntityManagerFactory.class );
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2e7b7d8..a615a43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -80,9 +80,6 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -125,7 +122,6 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -172,7 +168,6 @@ public class CpEntityManager implements EntityManager {
     private UUID applicationId;
     private Application application;
 
-    private CpEntityManagerFactory emf;
 
     private ManagerCache managerCache;
 
@@ -212,29 +207,34 @@ public class CpEntityManager implements EntityManager {
 //    private LoadingCache<EntityScope, org.apache.usergrid.persistence.model.entity.Entity> entityCache;
 
 
-    public CpEntityManager() {
-
-    }
-
-    @Override
-    public void init( EntityManagerFactory emf, UUID applicationId ) {
+    /**
+     * Fugly, make this part of DI
+     * @param cass
+     * @param counterUtils
+     * @param managerCache
+     * @param metricsFactory
+     * @param applicationId
+     */
+    public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) {
 
-        Preconditions.checkNotNull( emf, "emf must not be null" );
+        Preconditions.checkNotNull( cass, "cass must not be null" );
+        Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" );
+        Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
 
-        this.emf = ( CpEntityManagerFactory ) emf;
-        this.managerCache = this.emf.getManagerCache();
+
+        this.managerCache = managerCache;
         this.applicationId = applicationId;
 
         applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
         ecm =  managerCache.getEntityCollectionManager( applicationScope );
 
-        this.cass = this.emf.getCassandraService();
-        this.counterUtils = this.emf.getCounterUtils();
+        this.cass = cass;
+        this.counterUtils = counterUtils;
 
         //Timer Setup
-        this.metricsFactory = this.emf.getMetricsFactory();
+        this.metricsFactory = metricsFactory;
         this.aggCounterTimer =this.metricsFactory.getTimer( CpEntityManager.class,
             "cp.entity.get.aggregate.counters.timer" );
         this.entCreateTimer =this.metricsFactory.getTimer( CpEntityManager.class, "cp.entity.create.timer" );
@@ -724,18 +724,18 @@ public class CpEntityManager implements EntityManager {
 
     @Override
     public void updateApplication( Map<String, Object> properties ) throws Exception {
-        this.updateProperties(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), properties);
+        this.updateProperties( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), properties );
         this.application = get( applicationId, Application.class );
     }
 
 
     @Override
     public RelationManager getRelationManager( EntityRef entityRef ) {
-        Preconditions.checkNotNull(entityRef, "entityRef cannot be null");
-        CpRelationManager rmi = CpRelationManagerFactory.get(
-            this, emf, applicationId, entityRef, null, metricsFactory
-        );
-        return rmi;
+        Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
+
+        CpRelationManager relationManager =
+            new CpRelationManager( metricsFactory, managerCache, this, applicationId, entityRef );
+        return relationManager;
     }
 
 
@@ -819,7 +819,7 @@ public class CpEntityManager implements EntityManager {
     @Override
     public EntityRef getAlias( String aliasType, String alias ) throws Exception {
 
-        return getAlias(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), aliasType, alias);
+        return getAlias( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), aliasType, alias );
     }
 
 
@@ -848,7 +848,7 @@ public class CpEntityManager implements EntityManager {
                     new Object[] { ownerRef, collectionType, aliasValue } );
         }
 
-        return results.get(aliasValue);
+        return results.get( aliasValue );
     }
 
 
@@ -1226,7 +1226,7 @@ public class CpEntityManager implements EntityManager {
 
         ApplicationCF dictionaryCf = null;
 
-        boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary(entity.getType(), dictionaryName);
+        boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
 
         if ( entityHasDictionary ) {
             dictionaryCf = ENTITY_DICTIONARIES;
@@ -1279,7 +1279,7 @@ public class CpEntityManager implements EntityManager {
 
         Class<?> dictionaryCoType =
                 Schema.getDefaultSchema().getDictionaryValueType(entity.getType(), dictionaryName);
-        boolean coTypeIsBasic = ClassUtils.isBasicType(dictionaryCoType);
+        boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
 
         ByteBuffer[] columnNames = new ByteBuffer[elementNames.length];
         for ( int i = 0; i < elementNames.length; i++ ) {
@@ -1349,14 +1349,14 @@ public class CpEntityManager implements EntityManager {
     @Override
     public boolean isCollectionMember( EntityRef owner, String collectionName, EntityRef entity ) throws Exception {
 
-        return getRelationManager( owner ).isCollectionMember(collectionName, entity);
+        return getRelationManager( owner ).isCollectionMember( collectionName, entity );
     }
 
 
     @Override
     public boolean isConnectionMember( EntityRef owner, String connectionName, EntityRef entity ) throws Exception {
 
-        return getRelationManager( owner ).isConnectionMember(connectionName, entity);
+        return getRelationManager( owner ).isConnectionMember( connectionName, entity );
     }
 
 
@@ -1455,7 +1455,7 @@ public class CpEntityManager implements EntityManager {
     public ConnectionRef createConnection( EntityRef connectingEntity, String connectionType,
                                            EntityRef connectedEntityRef ) throws Exception {
 
-        return getRelationManager( connectingEntity ).createConnection(connectionType, connectedEntityRef);
+        return getRelationManager( connectingEntity ).createConnection( connectionType, connectedEntityRef );
     }
 
 
@@ -1465,7 +1465,7 @@ public class CpEntityManager implements EntityManager {
             throws Exception {
 
         return getRelationManager( connectingEntity )
-                .createConnection(pairedConnectionType, pairedEntity, connectionType, connectedEntityRef);
+                .createConnection( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
     }
 
 
@@ -1523,7 +1523,7 @@ public class CpEntityManager implements EntityManager {
             String connectedEntityType, Level resultsLevel ) throws Exception {
 
         return getRelationManager( entityRef )
-                .getConnectedEntities(connectionType, connectedEntityType, resultsLevel);
+                .getConnectedEntities( connectionType, connectedEntityType, resultsLevel );
     }
 
 
@@ -1532,7 +1532,7 @@ public class CpEntityManager implements EntityManager {
             String connectedEntityType, Level resultsLevel ) throws Exception {
 
         return getRelationManager( entityRef )
-                .getConnectingEntities(connectionType, connectedEntityType, resultsLevel);
+                .getConnectingEntities( connectionType, connectedEntityType, resultsLevel );
     }
 
 
@@ -1540,21 +1540,21 @@ public class CpEntityManager implements EntityManager {
     public Results getConnectingEntities( EntityRef entityRef, String connectionType,
             String entityType, Level level, int count ) throws Exception {
 
-        return getRelationManager( entityRef ).getConnectingEntities(connectionType, entityType, level, count);
+        return getRelationManager( entityRef ).getConnectingEntities( connectionType, entityType, level, count );
     }
 
 
     @Override
     public Results searchConnectedEntities( EntityRef connectingEntity, Query query ) throws Exception {
 
-        return getRelationManager( connectingEntity ).searchConnectedEntities(query);
+        return getRelationManager( connectingEntity ).searchConnectedEntities( query );
     }
 
 
     @Override
     public Set<String> getConnectionIndexes( EntityRef entity, String connectionType ) throws Exception {
 
-        return getRelationManager( entity ).getConnectionIndexes(connectionType);
+        return getRelationManager( entity ).getConnectionIndexes( connectionType );
     }
 
 
@@ -1799,8 +1799,8 @@ public class CpEntityManager implements EntityManager {
         permission = permission.toLowerCase();
         long timestamp = cass.createTimestamp();
         Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
-        CassandraPersistenceUtils.addInsertToMutator(batch, ApplicationCF.ENTITY_DICTIONARIES,
-            getRolePermissionsKey(groupId, roleName), permission, ByteBuffer.allocate(0), timestamp);
+        CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
+            getRolePermissionsKey( groupId, roleName ), permission, ByteBuffer.allocate( 0 ), timestamp );
 
         //Adding graphite metrics
         Timer.Context timeGroupRolePermission = entGrantGroupPermissionTimer.time();
@@ -1816,7 +1816,7 @@ public class CpEntityManager implements EntityManager {
         long timestamp = cass.createTimestamp();
         Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
         CassandraPersistenceUtils.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
-                getRolePermissionsKey( groupId, roleName ), permission, timestamp );
+            getRolePermissionsKey( groupId, roleName ), permission, timestamp );
         //Adding graphite metrics
         Timer.Context timeRevokeGroupRolePermission = entRevokeGroupPermissionTimer.time();
         CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -2281,7 +2281,7 @@ public class CpEntityManager implements EntityManager {
     private MapManager getMapManagerForTypes() {
         Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION );
 
-        final MapScope ms = CpNamingUtils.getEntityTypeMapScope(mapOwner);
+        final MapScope ms = CpNamingUtils.getEntityTypeMapScope( mapOwner );
 
         MapManager mm = managerCache.getMapManager(ms);
 
@@ -2836,26 +2836,25 @@ public class CpEntityManager implements EntityManager {
 
         CpWalker walker = new CpWalker( );
 
-        walker.walkCollections(
-            this, getApplication(), collectionName, reverse, new CpVisitor() {
+        walker.walkCollections( this, getApplication(), collectionName, reverse, new CpVisitor() {
 
-            @Override
-            public void visitCollectionEntry( EntityManager em, String collName, Entity entity ) {
+                @Override
+                public void visitCollectionEntry( EntityManager em, String collName, Entity entity ) {
 
-                try {
-                    em.update( entity );
-                    po.onProgress( entity );
-                }
-                catch ( WriteOptimisticVerifyException wo ) {
-                    // swallow this, it just means this was already updated, which accomplishes our task
-                    logger.warn( "Someone beat us to updating entity {} in collection {}.  Ignoring.",
-                        entity.getName(), collName );
-                }
-                catch ( Exception ex ) {
-                    logger.error( "Error repersisting entity", ex );
+                    try {
+                        em.update( entity );
+                        po.onProgress( entity );
+                    }
+                    catch ( WriteOptimisticVerifyException wo ) {
+                        // swallow this, it just means this was already updated, which accomplishes our task
+                        logger.warn( "Someone beat us to updating entity {} in collection {}.  Ignoring.",
+                            entity.getName(), collName );
+                    }
+                    catch ( Exception ex ) {
+                        logger.error( "Error repersisting entity", ex );
+                    }
                 }
-            }
-        } );
+            } );
     }
 
 
@@ -2890,32 +2889,34 @@ public class CpEntityManager implements EntityManager {
 
     void indexEntityIntoCollection( final Edge edge,  final  org.apache.usergrid.persistence.model.entity.Entity collectionMember) {
 
-        final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope() );
-        final EntityIndexBatch batch = aie.createBatch();
-
-        // index member into entity collection | type scope
-        final IndexEdge collectionIndexScope = generateScopeFromSource( edge );
-        batch.index( collectionIndexScope, collectionMember );
-
-        //TODO REMOVE INDEX CODE
-        //        // index member into entity | all-types scope
-        //        IndexScope entityAllTypesScope = new IndexScopeImpl(
-        //                collectionEntity.getId(),
-        //                CpNamingUtils.ALL_TYPES, entityType );
-        //
-        //        batch.index(entityAllTypesScope, memberEntity);
-        //
-        //        // index member into application | all-types scope
-        //        IndexScope appAllTypesScope = new IndexScopeImpl(
-        //                getApplicationScope().getApplication(),
-        //                CpNamingUtils.ALL_TYPES, entityType );
-        //
-        //        batch.index(appAllTypesScope, memberEntity);
-
-        //Adding graphite metrics
-        Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time();
-        batch.execute();
-        timeIndexEntityCollection.stop();
+        throw new UnsupportedOperationException( "Use the new interface!" );
+//
+//        final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope() );
+//        final EntityIndexBatch batch = aie.createBatch();
+//
+//        // index member into entity collection | type scope
+//        final IndexEdge collectionIndexScope = generateScopeFromSource( edge );
+//        batch.index( collectionIndexScope, collectionMember );
+//
+//        //TODO REMOVE INDEX CODE
+//        //        // index member into entity | all-types scope
+//        //        IndexScope entityAllTypesScope = new IndexScopeImpl(
+//        //                collectionEntity.getId(),
+//        //                CpNamingUtils.ALL_TYPES, entityType );
+//        //
+//        //        batch.index(entityAllTypesScope, memberEntity);
+//        //
+//        //        // index member into application | all-types scope
+//        //        IndexScope appAllTypesScope = new IndexScopeImpl(
+//        //                getApplicationScope().getApplication(),
+//        //                CpNamingUtils.ALL_TYPES, entityType );
+//        //
+//        //        batch.index(appAllTypesScope, memberEntity);
+//
+//        //Adding graphite metrics
+//        Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time();
+//        batch.execute();
+//        timeIndexEntityCollection.stop();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 9907f91..e03ed47 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -156,14 +156,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     }
 
 
-    public ManagerCache getManagerCache() {
-
-        if ( managerCache == null ) {
-            managerCache = injector.getInstance( ManagerCache.class );
-        }
-        return managerCache;
-    }
-
     private Observable<EntityIdScope> getAllEntitiesObservable(){
       return injector.getInstance( Key.get(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){})).getData();
     }
@@ -184,16 +176,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private EntityManager _getEntityManager( UUID applicationId ) {
 
-        EntityManager em = new CpEntityManager();
-        em.init( this ,applicationId );
-
+        EntityManager em = new CpEntityManager(cassandraService, counterUtils, managerCache, metricsFactory, applicationId );
         return em;
     }
 
-    public MetricsFactory getMetricsFactory(){
-        return metricsFactory;
-    }
-
     @Override
     public Entity createApplicationV2(String organizationName, String name) throws Exception {
         return createApplicationV2( organizationName, name, null );
@@ -746,7 +732,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public Health getEntityStoreHealth() {
 
         // could use any collection scope here, does not matter
-        EntityCollectionManager ecm = getManagerCache().getEntityCollectionManager(
+        EntityCollectionManager ecm = managerCache.getEntityCollectionManager(
             new ApplicationScopeImpl( new SimpleId( CpNamingUtils.MANAGEMENT_APPLICATION_ID, "application" ) ) );
 
         return ecm.getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 4cae31e..f4fee0c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -30,6 +30,9 @@ import org.apache.usergrid.persistence.map.MapScope;
 import com.google.inject.Inject;
 
 
+/**
+ * Cache for managing our other managers.  Now just a delegate.  Needs refactored away
+ */
 public class CpManagerCache implements ManagerCache {
 
     private final EntityCollectionManagerFactory ecmf;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 1ae1db1..3c72b60 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -107,9 +107,6 @@ public class CpRelationManager implements RelationManager {
 
     private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
 
-
-    private CpEntityManagerFactory emf;
-
     private ManagerCache managerCache;
 
     private EntityManager em;
@@ -126,25 +123,18 @@ public class CpRelationManager implements RelationManager {
     private Timer updateCollectionTimer;
 
 
-    public CpRelationManager() {}
-
-
-    public CpRelationManager init( EntityManager em, CpEntityManagerFactory emf, UUID applicationId,
-                                   EntityRef headEntity, IndexBucketLocator indexBucketLocator,
-                                   MetricsFactory metricsFactory ) {
+    public CpRelationManager(final MetricsFactory metricsFactory, final ManagerCache managerCache, final EntityManager em, final UUID applicationId, final EntityRef headEntity ) {
 
         Assert.notNull( em, "Entity manager cannot be null" );
-        Assert.notNull( emf, "Entity manager factory cannot be null" );
         Assert.notNull( applicationId, "Application Id cannot be null" );
         Assert.notNull( headEntity, "Head entity cannot be null" );
         Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
         // TODO: this assert should not be failing
         //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
         this.em = em;
-        this.emf = emf;
         this.applicationId = applicationId;
         this.headEntity = headEntity;
-        this.managerCache = emf.getManagerCache();
+        this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
         this.metricsFactory = metricsFactory;
@@ -165,7 +155,7 @@ public class CpRelationManager implements RelationManager {
         Assert.notNull( cpHeadEntity, String
             .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) );
 
-        return this;
+
     }
 
 
@@ -260,40 +250,42 @@ public class CpRelationManager implements RelationManager {
         final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
 
 
-        final GraphManager gm = managerCache.getGraphManager( applicationScope );
-
-        // loop through all types of edge to target
-
-
-        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
-
-        final EntityIndexBatch entityIndexBatch = ei.createBatch();
-
-        final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) )
-
-            // for each edge type, emit all the edges of that type
-            .flatMap( etype -> gm.loadEdgesToTarget(
-                new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, null ) ) )
-
-                //for each edge we receive index and add to the batch
-            .doOnNext( edge -> {
-                // reindex the entity in the source entity's collection or connection index
-
-                IndexEdge indexScope = generateScopeFromSource( edge );
-
-                entityIndexBatch.index( indexScope, cpEntity );
-
-            } ).doOnCompleted( () -> {
-                    Timer.Context timeElasticIndexBatch = updateCollectionTimer.time();
-                    entityIndexBatch.execute();
-                    timeElasticIndexBatch.stop();
-              } ).count().toBlocking().lastOrDefault( 0 );
-
-        //Adding graphite metrics
-
-
-        logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
+        throw new UnsupportedOperationException( "Use the new interface" );
+
+//        final GraphManager gm = managerCache.getGraphManager( applicationScope );
+//
+//        // loop through all types of edge to target
+//
+//
+//        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
+//
+//        final EntityIndexBatch entityIndexBatch = ei.createBatch();
+//
+//        final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) )
+//
+//            // for each edge type, emit all the edges of that type
+//            .flatMap( etype -> gm.loadEdgesToTarget(
+//                new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
+//                    SearchByEdgeType.Order.DESCENDING, null ) ) )
+//
+//                //for each edge we receive index and add to the batch
+//            .doOnNext( edge -> {
+//                // reindex the entity in the source entity's collection or connection index
+//
+//                IndexEdge indexScope = generateScopeFromSource( edge );
+//
+//                entityIndexBatch.index( indexScope, cpEntity );
+//
+//            } ).doOnCompleted( () -> {
+//                    Timer.Context timeElasticIndexBatch = updateCollectionTimer.time();
+//                    entityIndexBatch.execute();
+//                    timeElasticIndexBatch.stop();
+//              } ).count().toBlocking().lastOrDefault( 0 );
+//
+//        //Adding graphite metrics
+//
+//
+//        logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
     }
 
 
@@ -494,9 +486,10 @@ public class CpRelationManager implements RelationManager {
         //            headEntityScope.getName()});
 
         if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
-            getRelationManager( itemEntity )
-                .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
-            getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false );
+            throw new UnsupportedOperationException( "Implement me directly in graph " );
+//            getRelationManager( itemEntity )
+//                .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
+//            getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false );
         }
 
         return itemEntity;
@@ -555,7 +548,8 @@ public class CpRelationManager implements RelationManager {
             addToCollection( collName, itemEntity );
 
             if ( collection != null && collection.getLinkedCollection() != null ) {
-                getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity );
+                throw new UnsupportedOperationException( "Implement me directly in graph " );
+//                getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity );
             }
         }
 
@@ -1045,12 +1039,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    private CpRelationManager getRelationManager( EntityRef headEntity ) {
-        CpRelationManager rmi = new CpRelationManager();
-        rmi.init( em, emf, applicationId, headEntity, null, metricsFactory );
-        return rmi;
-    }
-
 
     /** side effect: converts headEntity into an Entity if it is an EntityRef! */
     private Entity getHeadEntity() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java
deleted file mode 100644
index 4223f37..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.corepersistence;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-
-import java.util.UUID;
-
-/**
- * Factory to return and init relation manager instances
- */
-public class CpRelationManagerFactory {
-
-
-    public static CpRelationManager get( EntityManager em,
-                                         CpEntityManagerFactory emf,
-                                         UUID applicationId,
-                                         EntityRef headEntity,
-                                         IndexBucketLocator indexBucketLocator,
-                                         MetricsFactory metricsFactory){
-        CpRelationManager relationManager = new CpRelationManager();
-        relationManager.init(em,emf,applicationId,headEntity,indexBucketLocator,metricsFactory);
-        return relationManager;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
index 566430f..3a08034 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
@@ -31,8 +31,7 @@ import org.springframework.context.ApplicationContextAware;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.usergrid.persistence.cassandra.CassandraService;
-import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.PersistenceModule;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -47,7 +46,7 @@ import me.prettyprint.cassandra.service.CassandraHostConfigurator;
  * Factory for configuring Guice then returning it
  */
 @Singleton
-public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAware {
+public class GuiceFactory implements FactoryBean<Injector> {
 
     private static final Logger logger = LoggerFactory.getLogger( GuiceFactory.class );
 
@@ -55,16 +54,16 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw
 
     private final Properties systemProperties;
 
-
     private ApplicationContext applicationContext;
 
     private Injector injector;
 
 
 
-    public GuiceFactory( final CassandraHostConfigurator chc, final Properties systemProperties  ) {
+    public GuiceFactory( final ApplicationContext applicationContext, final CassandraHostConfigurator chc, final Properties systemProperties  ) {
         this.chc = chc;
         this.systemProperties = systemProperties;
+        this.applicationContext = applicationContext;
     }
 
 
@@ -128,8 +127,10 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw
             throw new RuntimeException( "Fatal error loading configuration.", e );
         }
 
-        //this is seriously fugly, and needs removed
-        injector = Guice.createInjector( new CoreModule( applicationContext ) );
+
+
+        //this is seriously fugly, and needs removed we shouldn't be mixing spring and guice
+        injector = Guice.createInjector( new CoreModule(  ), new PersistenceModule( applicationContext ) );
 
         return injector;
     }
@@ -147,8 +148,4 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw
     }
 
 
-    @Override
-    public void setApplicationContext( final ApplicationContext applicationContext ) throws BeansException {
-        this.applicationContext = applicationContext;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 6a37144..2de69ed 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -42,11 +42,8 @@ public class EntityDeletedHandler implements EntityDeleted {
     private static final Logger logger = LoggerFactory.getLogger( EntityDeletedHandler.class );
 
 
-    private final EntityManagerFactory emf;
-
-
     @Inject
-    public EntityDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+    public EntityDeletedHandler( ) {}
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index 0163fc2..20bdd55 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -41,15 +41,11 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
     private static final Logger logger = LoggerFactory.getLogger(EntityVersionCreatedHandler.class );
 
 
-    private final EntityManagerFactory emf;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
 
 
     @Inject
-    public EntityVersionCreatedHandler( final EntityManagerFactory emf,
-                                        final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-        this.emf = emf;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    public EntityVersionCreatedHandler(  ) {
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 700851a..22f599e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -30,9 +30,11 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -56,17 +58,20 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
     private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class );
 
 
-    private final EntityManagerFactory emf;
     private final EdgesObservable edgesObservable;
     private final SerializationFig serializationFig;
+    private final EntityIndexFactory entityIndexFactory;
+    private final GraphManagerFactory graphManagerFactory;
 
 
     @Inject
-    public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable,
-                                        final SerializationFig serializationFig ) {
-        this.emf = emf;
+    public EntityVersionDeletedHandler( final EdgesObservable edgesObservable, final SerializationFig serializationFig,
+                                        final EntityIndexFactory entityIndexFactory,
+                                        final GraphManagerFactory graphManagerFactory ) {
         this.edgesObservable = edgesObservable;
         this.serializationFig = serializationFig;
+        this.entityIndexFactory = entityIndexFactory;
+        this.graphManagerFactory = graphManagerFactory;
     }
 
 
@@ -87,10 +92,8 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
             } );
         }
 
-        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
-
-        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
-        final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope );
+        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( scope );
+        final GraphManager gm = graphManagerFactory.createEdgeManager(  scope );
 
 
         //create an observable of all scopes to deIndex

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java
new file mode 100644
index 0000000..fc1bdb7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
+
+/**
+ * A temporary interface of our buffer Q to decouple of producer and consumer;
+ */
+public interface BufferQueue {
+
+    /**
+     * Offer the indexoperation message.  Some queues may support not returning the future until ack or fail.
+     * Other queues may return the future after ack on the offer.  See the implementation documentation for details.
+     * @param operation
+     */
+    public void offer(final IndexOperationMessage operation);
+
+
+    /**
+     * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed.
+     * May return less than the take size, but will never return null
+     *
+     * @param takeSize
+     * @param timeout
+     * @param timeUnit
+     * @return A null safe lid
+     */
+    public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+
+
+    /**
+     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented.
+     * This will set the future as done in in memory operations
+     *
+     * @param messages
+     */
+    public void ack(final List<IndexOperationMessage> messages);
+
+    /**
+     * Mark these message as failed.  Set the exception in the future on local operation
+     *
+     * @param messages
+     */
+    public void fail(final List<IndexOperationMessage> messages, final Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java
new file mode 100644
index 0000000..0e43da3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemoryImpl implements BufferQueue {
+
+
+    private final QueryFig fig;
+    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
+
+    @Inject
+    public BufferQueueInMemoryImpl( final QueryFig fig ) {
+        this.fig = fig;
+        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        try {
+            messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS );
+        }
+        catch ( InterruptedException e ) {
+            throw new RuntimeException("Unable to offer message to queue", e);
+        }
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+        try {
+
+
+            messages.drainTo( response, takeSize );
+
+            //we got something, go process it
+            if ( response.size() > 0 ) {
+                return response;
+            }
+
+
+            final IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+            if ( polled != null ) {
+                response.add( polled );
+
+                //try to add more
+                messages.drainTo( response, takeSize - 1 );
+            }
+        }
+        catch ( InterruptedException e ) {
+            //swallow
+        }
+
+
+        return response;
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+        //if we have a future ack it
+        for ( final IndexOperationMessage op : messages ) {
+            op.done();
+        }
+    }
+
+
+    @Override
+    public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+
+
+        for ( final IndexOperationMessage op : messages ) {
+            final BetterFuture<IndexOperationMessage> future = op.getFuture();
+
+            if ( future != null ) {
+                future.setError( t );
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java
new file mode 100644
index 0000000..d955014
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * This is experimental at best.  Our SQS size limit is a problem.  We shouldn't use this for index operation. Only for
+ * performing
+ */
+@Singleton
+public class BufferQueueSQSImpl implements BufferQueue {
+
+    private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
+
+    /** Hacky, copied from CPEntityManager b/c we can't access it here */
+    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+    /**
+     * The name to put in the map
+     */
+    public static final String MAP_NAME = "esqueuedata";
+
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+
+    static {
+        SMILE_FACTORY.delegateToTextual( true );
+    }
+
+
+    private final QueueManager queue;
+    private final MapManager mapManager;
+    private final QueryFig queryFig;
+    private final ObjectMapper mapper;
+    private final Meter readMeter;
+    private final Timer readTimer;
+    private final Meter writeMeter;
+    private final Timer writeTimer;
+
+
+    @Inject
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig,
+                               final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        final QueueScope queueScope =
+            new QueueScopeImpl( QUEUE_NAME );
+
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
+        this.queryFig = queryFig;
+
+        final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
+
+        this.mapManager = mapManagerFactory.createMapManager( scope );
+
+
+        this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
+        this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
+
+        this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
+        this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
+
+        this.mapper = new ObjectMapper( SMILE_FACTORY );
+        //pretty print, disabling for speed
+        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+
+        //no op
+        if(operation.isEmpty()){
+            operation.getFuture().done();
+            return;
+        }
+
+        final Timer.Context timer = this.writeTimer.time();
+        this.writeMeter.mark();
+
+        final UUID identifier = UUIDGenerator.newTimeUUID();
+
+        try {
+
+            final String payLoad = toString( operation );
+
+            //write to cassandra
+            this.mapManager.putString( identifier.toString(), payLoad, TTL );
+
+            //signal to SQS
+            this.queue.sendMessage( identifier );
+            operation.done();
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        //SQS doesn't support more than 10
+
+        final int actualTake = Math.min( 10, takeSize );
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+
+            List<QueueMessage> messages = queue
+                .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                    String.class );
+
+
+
+            final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
+
+            final List<String> mapEntries = new ArrayList<>( messages.size() );
+
+
+            if(messages.size() == 0){
+                return response;
+            }
+
+            //add all our keys  for a single round trip
+            for ( final QueueMessage message : messages ) {
+                mapEntries.add( message.getBody().toString() );
+            }
+
+            //look up the values
+            final Map<String, String> storedCommands = mapManager.getStrings( mapEntries );
+
+
+            //load them into our response
+            for ( final QueueMessage message : messages ) {
+
+                final String key = getMessageKey( message );
+
+                //now see if the key was there
+                final String payload = storedCommands.get( key );
+
+                //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
+                // a DLQ
+
+                if ( payload == null ) {
+                    continue;
+                }
+
+                final IndexOperationMessage messageBody;
+
+                try {
+                    messageBody = fromString( payload );
+                }
+                catch ( IOException e ) {
+                    logger.error( "Unable to deserialize message from string.  This is a bug", e );
+                    throw new RuntimeException( "Unable to deserialize message from string.  This is a bug", e );
+                }
+
+                SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
+
+                response.add( operation );
+            }
+
+            readMeter.mark( response.size() );
+            return response;
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+
+        //nothing to do
+        if ( messages.size() == 0 ) {
+            return;
+        }
+
+        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+        for ( IndexOperationMessage ioe : messages ) {
+
+
+            final SqsIndexOperationMessage sqsIndexOperationMessage =   ( SqsIndexOperationMessage ) ioe;
+
+            final String key = getMessageKey( sqsIndexOperationMessage.getMessage() );
+
+            //remove it from the map
+            mapManager.delete( key  );
+
+            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
+        }
+
+        queue.commitMessages( toAck );
+    }
+
+
+    @Override
+    public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+        //no op, just let it retry after the queue timeout
+    }
+
+
+    /** Read the object from Base64 string. */
+    private IndexOperationMessage fromString( String s ) throws IOException {
+        IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
+        return o;
+    }
+
+
+    /** Write the object to a Base64 string. */
+    private String toString( IndexOperationMessage o ) throws IOException {
+        return mapper.writeValueAsString( o );
+    }
+
+    private String getMessageKey(final QueueMessage message){
+        return message.getBody().toString();
+    }
+
+    /**
+     * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
+     */
+    public class SqsIndexOperationMessage extends IndexOperationMessage {
+
+        private final QueueMessage message;
+
+
+        public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) {
+            this.message = message;
+            this.addAllDeIndexRequest( source.getDeIndexRequests() );
+            this.addAllIndexRequest( source.getIndexRequests() );
+        }
+
+
+        /**
+         * Get the message from our queue
+         */
+        public QueueMessage getMessage() {
+            return message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java
new file mode 100644
index 0000000..8d7f222
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Low level queue service for indexing entities
+ */
+public interface IndexQueueService {
+
+
+    /**
+     * Queue an entity to be index asynchronously
+     * @param applicationScope
+     * @param entityId
+     * @param version
+     */
+    void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
new file mode 100644
index 0000000..2bf073c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Our low level indexing service operations
+ */
+public interface IndexService {
+
+
+    /**
+     *  Perform an index update of the entity's state from Cassandra
+     *
+     * @param applicationScope The scope of the entity
+     * @param entity The entity
+     *
+     * @return An observable with the count of every
+     */
+    Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity );
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
new file mode 100644
index 0000000..2a7533a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.apache.usergrid.utils.InflectionUtils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+
+
+/**
+ * Implementation of the indexing service
+ */
+@Singleton
+public class IndexServiceImpl implements IndexService {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+    private final EdgesObservable edgesObservable;
+    private final IndexFig indexFig;
+
+
+    @Inject
+    public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory,
+                             final EdgesObservable edgesObservable, IndexFig indexFig ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+        this.edgesObservable = edgesObservable;
+        this.indexFig = indexFig;
+    }
+
+
+    @Override
+    public Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity ) {
+
+
+        //bootstrap the lower modules from their caches
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+
+        final Id entityId = entity.getId();
+
+
+        //we always index in the target scope
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+
+        //we may have to index
+        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeToTarget( edge ) );
+
+
+        //we might or might not need to index from target-> source
+
+
+        final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, entityId );
+
+
+        final Observable<IndexOperationMessage> observable =
+            //try to send a whole batch if we can
+            Observable.merge( sourceEdgesToIndex, targetSizes ).buffer( indexFig.getIndexBatchSize() )
+
+                //map into batches based on our buffer size
+                .flatMap( buffer -> Observable.from( buffer ).collect( () -> ei.createBatch(),
+                    ( batch, indexEdge ) -> batch.index( indexEdge, entity ) )
+                    //return the future from the batch execution
+                    .flatMap( batch -> Observable.from( batch.execute() ) ) );
+
+        observable.toBlocking().last();
+
+
+        return Observable.just( 0 );
+    }
+
+
+    /**
+     * Get index edgs to the target
+     *
+     * @param graphManager The graph manager
+     * @param entityId The entitie's id
+     */
+    private Observable<IndexEdge> getIndexEdgesToTarget( final GraphManager graphManager, final Id entityId ) {
+
+        final String collectionName = InflectionUtils.pluralize( entityId.getType() );
+
+
+        final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE, collectionName );
+
+        //nothing to do
+        if ( collection == null ) {
+            return Observable.empty();
+        }
+
+
+        final String linkedCollection = collection.getLinkedCollection();
+
+        /**
+         * Nothing to link
+         */
+        if ( linkedCollection == null ) {
+            return Observable.empty();
+        }
+
+
+        /**
+         * An observable of sizes as we execute batches
+         */
+        return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection )
+                              .map( edge -> generateScopeFromSource( edge ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
new file mode 100644
index 0000000..a7d2450
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Application id cache fig
+ */
+public interface QueryFig extends GuicyFig {
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
+     * backpressure
+     */
+    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+
+    /**
+     * The number of worker threads to consume from the queue
+     */
+    public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
+
+
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout";
+
+    /**
+     * Amount of time to wait when reading from the queue
+     */
+    public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
+    /**
+     * Amount of time to wait when reading from the queue in milliseconds
+     */
+    public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
+
+
+    String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";
+
+
+    @Default( "1000" )
+    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+    long getFailureRetryTime();
+
+    //give us 60 seconds to process the message
+    @Default( "60" )
+    @Key( INDEX_QUEUE_READ_TIMEOUT )
+    int getIndexQueueTimeout();
+
+    @Default( "2" )
+    @Key( ELASTICSEARCH_WORKER_COUNT )
+    int getWorkerCount();
+
+    @Default( "LOCAL" )
+    @Key( ELASTICSEARCH_QUEUE_IMPL )
+    String getQueueImplementation();
+
+    @Default( "1000" )
+    @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT )
+    long getQueueOfferTimeout();
+
+    /**
+     * size of the buffer to build up before you send results
+     */
+    @Default( "1000" )
+    @Key( INDEX_QUEUE_SIZE )
+    int getIndexQueueSize();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java
new file mode 100644
index 0000000..d3920db
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+
+/**
+ * A provider to allow users to configure their queue impl via properties
+ */
+@Singleton
+public class QueueProvider implements Provider<BufferQueue> {
+
+    private final QueryFig queryFig;
+
+    private final QueueManagerFactory queueManagerFactory;
+    private final MapManagerFactory mapManagerFactory;
+    private final MetricsFactory metricsFactory;
+
+    private BufferQueue bufferQueue;
+
+
+    @Inject
+    public QueueProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory,
+                          final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        this.queryFig = queryFig;
+
+
+        this.queueManagerFactory = queueManagerFactory;
+        this.mapManagerFactory = mapManagerFactory;
+        this.metricsFactory = metricsFactory;
+    }
+
+
+    @Override
+    @Singleton
+    public BufferQueue get() {
+        if ( bufferQueue == null ) {
+            bufferQueue = getQueue();
+        }
+
+
+        return bufferQueue;
+    }
+
+
+    private BufferQueue getQueue() {
+        final String value = queryFig.getQueueImplementation();
+
+        final Implementations impl = Implementations.valueOf( value );
+
+        switch ( impl ) {
+            case LOCAL:
+                return new BufferQueueInMemoryImpl( queryFig );
+            case SQS:
+                return new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory );
+            default:
+                throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" );
+        }
+    }
+
+
+    private String getErrorValues() {
+        String values = "";
+
+        for ( final Implementations impl : Implementations.values() ) {
+            values += impl + ", ";
+        }
+
+        values = values.substring( 0, values.length() - 2 );
+
+        return values;
+    }
+
+
+    /**
+     * Different implementations
+     */
+    public static enum Implementations {
+        LOCAL,
+        SQS;
+
+
+        public String asString() {
+            return toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java
new file mode 100644
index 0000000..42f36b1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class SQSIndexQueueServiceImpl implements IndexQueueService {
+
+    @Override
+    public void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 0376780..c60b86e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -157,7 +157,7 @@ public class CpNamingUtils {
      */
     public static SearchEdge createCollectionSearchEdge( final Id sourceId, final String connectionType ) {
         return new SearchEdgeImpl( sourceId, getEdgeTypeFromCollectionName( connectionType ),
-            SearchEdge.NodeType.SOURCE );
+            SearchEdge.NodeType.TARGET );
     }
 
 
@@ -181,7 +181,7 @@ public class CpNamingUtils {
      */
     public static SearchEdge createConnectionSearchEdge( final Id sourceId, final String connectionType ) {
         return new SearchEdgeImpl( sourceId, getEdgeTypeFromConnectionType( connectionType ),
-            SearchEdge.NodeType.SOURCE );
+            SearchEdge.NodeType.TARGET );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index 7929b12..fc8b3d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -689,8 +689,6 @@ public interface EntityManager {
     /** @return the cass */
     CassandraService getCass();
 
-    public void init( EntityManagerFactory emf,  UUID applicationId);
-
     /** For testing purposes */
     public void flushManagerCaches();
 


Mime
View raw message