usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [12/25] incubator-usergrid git commit: Refactored index scope generation to be more consistent and clean
Date Wed, 01 Apr 2015 20:44:47 GMT
Refactored index scope generation to be more consistent and clean

Moved some newly private methods to test utils

Added onStart event to the observable iterator

Removes group by collection type from filtering loader


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7c356d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7c356d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7c356d8b

Branch: refs/heads/two-dot-o-dev
Commit: 7c356d8b2ab256bb20c498c11924820441f5495f
Parents: cf3f7ab
Author: Todd Nine <tnine@apigee.com>
Authored: Mon Mar 30 09:45:46 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon Mar 30 12:11:44 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  29 +---
 .../corepersistence/CpRelationManager.java      |  61 +++------
 .../usergrid/corepersistence/CpWalker.java      |   4 +-
 .../events/EntityDeletedHandler.java            |  31 ++---
 .../events/EntityVersionCreatedHandler.java     |   2 +-
 .../events/EntityVersionDeletedHandler.java     |  87 +++++++++---
 .../results/FilteringLoader.java                | 108 ++++++---------
 .../corepersistence/util/CpNamingUtils.java     | 134 ++++++++++++++++---
 .../corepersistence/StaleIndexCleanupTest.java  |  10 +-
 .../rx/EdgesFromSourceObservableIT.java         |   9 +-
 .../rx/EdgesToTargetObservableIT.java           |  53 ++++----
 .../apache/usergrid/utils/EdgeTestUtils.java    |  50 +++++++
 .../persistence/core/rx/ObservableIterator.java |   2 +
 13 files changed, 347 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index c45c390..a7dda13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -69,7 +69,6 @@ import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.entities.Event;
 import org.apache.usergrid.persistence.entities.Group;
@@ -80,10 +79,8 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
 
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -129,7 +125,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
-
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-
-            throw hre;
-        }
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity );
@@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager {
     @Override
     public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
 
-        IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
-                getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-
-
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
         //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
 
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-        }
 
         // Index CP entity into default collection scope
         //        IndexScope defaultIndexScope = new IndexScopeImpl(
@@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager {
         final EntityIndexBatch batch = aie.createBatch();
 
         // index member into entity collection | type scope
-        IndexScope collectionIndexScope = new IndexScopeImpl( collectionEntity.getId(),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
-
+        IndexScope collectionIndexScope = generateScopeFromCollection( collectionEntity.getId(), collName );
         batch.index( collectionIndexScope, memberEntity );
 
         //TODO REMOVE INDEX CODE

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3427684..b76f38f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -79,7 +79,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -118,6 +117,11 @@ import rx.functions.Func1;
 import static java.util.Arrays.asList;
 
 import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager {
                 final EntityRef eref =
                     new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                String name;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
-                    name = CpNamingUtils.getConnectionType( edge.getType() );
-                }
-                else {
-                    name = CpNamingUtils.getCollectionName( edge.getType() );
-                }
+                String name = getNameFromEdgeType(edge.getType());
                 addMapSet( entityRefSetMap, eref, name );
             }
          ).toBlocking().last();
@@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager {
                     @Override
                     public void call( final Edge edge ) {
 
-                        EntityRef sourceEntity =
-                                new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
                         // reindex the entity in the source entity's collection or connection index
 
-                        IndexScope indexScope;
-                        if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
-
-                            String collName = CpNamingUtils.getCollectionName( edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()),
-                                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
-                        }
-                        else {
+                        IndexScope indexScope = generateScopeFromSource(edge);
 
-                            String connName = CpNamingUtils.getConnectionType( edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
-                                CpNamingUtils.getConnectionScopeName( connName ) );
-                        }
 
                         entityIndexBatch.index( indexScope, cpEntity );
 
@@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager {
         Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             String edgeType = iter.next();
-            indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
+            indexes.add( getNameFromEdgeType( edgeType ) );
         }
 
         return indexes;
@@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // remove item from collection index
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
 
         batch.deindex( indexScope, memberEntity );
 
         // remove collection from item index
-        IndexScope itemScope = new IndexScopeImpl(
-            memberEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName(
-                    Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) );
+        IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(),
+            Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) );
 
 
         batch.deindex( itemScope, cpHeadEntity );
 
-        BetterFuture future = batch.execute();
+        batch.execute();
 
         // remove edge from collection to item
         GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager {
                     + "' of " + headEntity.getType() + ":" + headEntity .getUuid() );
         }
 
-        final IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
 
         final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
@@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager {
         EntityIndexBatch batch = ei.createBatch();
 
         // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType );
 
         batch.index( indexScope, targetEntity );
 
@@ -1208,10 +1184,8 @@ public class CpRelationManager implements RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // Deindex the connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-            new SimpleId( connectingEntityRef.getUuid(),
-                connectingEntityRef.getType() ),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        final Id cpId =  createId( connectingEntityRef );
+        IndexScope indexScope = generateScopeFromConnection( cpId, connectionType );
         batch.deindex( indexScope, targetEntity );
 
         // Deindex the connection in app|source|type context
@@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager {
 
         headEntity = em.validate( headEntity );
 
-        final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connection ) );
+        final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection );
 
         final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index c14447d..3f2c9d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -36,6 +36,8 @@ import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+
 
 /**
  * Takes a visitor to all collections and entities.
@@ -128,7 +130,7 @@ public class CpWalker {
                 if ( entity == null ) {
                     return;
                 }
-                String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+                String collName = getNameFromEdgeType( edgeValue.getType() );
                 visitor.visitCollectionEntry( em, collName, entity );
             } ).subscribeOn( Schedulers.io() );
         }, 100 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 78c1ca7..57d69bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted {
             return;
         }
 
-
-
-        if(logger.isDebugEnabled()) {
-            logger.debug(
-                "Handling deleted event for entity {}:{} v {} " + " app: {}",
-                new Object[] {
-                    entityId.getType(), entityId.getUuid(), version,
-                    scope.getApplication()
-                } );
-        }
-
-        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
-        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
-
-        throw new NotImplementedException( "Fix this" );
+//        This is a NO-OP now, it's handled by the EntityVersionDeletedHandler
+
+//
+//        if(logger.isDebugEnabled()) {
+//            logger.debug(
+//                "Handling deleted event for entity {}:{} v {} " + " app: {}",
+//                new Object[] {
+//                    entityId.getType(), entityId.getUuid(), version,
+//                    scope.getApplication()
+//                } );
+//        }
+//
+//        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+//        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+
+//        throw new NotImplementedException( "Fix this" );
 
         //read all edges to this node and de-index them
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index c000500..0163fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
 
     @Override
     public void versionCreated( final ApplicationScope scope, final Entity entity ) {
-        //not op, we're not migrating properly to this.  Make this an event
+        //not op, we're not migrating properly to this.  Make this an event At the moment this is happening on write
 
 //        // This check is for testing purposes and for a test that to be able to dynamically turn
 //        // off and on delete previous versions so that it can test clean-up on read.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 4fa5ce1..a2e9b30 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,18 +24,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import org.apache.usergrid.exception.NotImplementedException;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import rx.Observable;
+
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
 
 
 /**
@@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
 
 
     private final EntityManagerFactory emf;
+    private final EdgesObservable edgesObservable;
+    private final SerializationFig serializationFig;
 
 
     @Inject
-    public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+    public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable,
+                                        final SerializationFig serializationFig ) {
+        this.emf = emf;
+        this.edgesObservable = edgesObservable;
+        this.serializationFig = serializationFig;
+    }
 
 
     @Override
@@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
         }
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
-                    + "  app: {}", new Object[] {
-                    entityVersions.size(), entityId.getType(), entityId.getUuid(),
-                    scope.getApplication()
-                } );
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "  app: {}", new Object[] {
+                entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getApplication()
+            } );
         }
 
         CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
 
         final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+        final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope );
+
+
+        //create an observable of all scopes to deIndex
+        //remove all indexes pointing to this
+        final Observable<IndexScope> targetScopes =  edgesObservable.edgesToTarget( gm, entityId ).map(
+            edge -> generateScopeFromSource( edge) );
+
+
+        //Remove all double indexes
+        final Observable<IndexScope> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map(
+                    edge -> generateScopeToTarget( edge ) );
+
+
+        //create a stream of scopes
+        final Observable<IndexScopeVersion> versions = Observable.merge( targetScopes, sourceScopes ).flatMap(
+            indexScope -> Observable.from( entityVersions )
+                                    .map( version -> new IndexScopeVersion( indexScope, version ) ) );
+
+        //create a set of batches
+        final Observable<EntityIndexBatch> batches = versions.buffer( serializationFig.getBufferSize() ).flatMap(
+            bufferedVersions -> Observable.from( bufferedVersions ).collect( () -> ei.createBatch(),
+                ( EntityIndexBatch batch, IndexScopeVersion version ) -> {
+                    //deindex in this batch
+                    batch.deindex( version.scope, version.version.getEntityId(), version.version.getVersion() );
+                } ) );
 
 
-        throw new NotImplementedException( "Fix this" );
 
+        //execute the batches
+        batches.doOnNext( batch -> batch.execute() ).toBlocking().last();
 
-//        final IndexScope indexScope =
-//            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
-//                scope.getName() );
-//
-//        //create our batch, and then collect all of them into a single batch
-//        Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
-//            entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-//        } )
-//            //after our batch is collected, execute it
-//            .doOnNext( entityIndexBatch -> {
-//                entityIndexBatch.execute();
-//            } ).toBlocking().last();
+    }
+
+
+
+
+
+    private static final class IndexScopeVersion{
+        private final IndexScope scope;
+        private final MvccLogEntry version;
+
+
+        private IndexScopeVersion( final IndexScope scope, final MvccLogEntry version ) {
+            this.scope = scope;
+            this.version = version;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 2eb6675..c12bb2c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -48,7 +46,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.fasterxml.uuid.UUIDComparator;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
 
 
 public class FilteringLoader implements ResultsLoader {
@@ -64,16 +61,14 @@ public class FilteringLoader implements ResultsLoader {
 
     /**
      * Create an instance of a filter loader
+     *
      * @param managerCache The manager cache to load
-     * @param resultsVerifier  The verifier to verify the candidate results
+     * @param resultsVerifier The verifier to verify the candidate results
      * @param applicationScope The application scope to perform the load
      * @param indexScope The index scope used in the search
      */
-    protected FilteringLoader(
-            final ManagerCache managerCache,
-            final ResultsVerifier resultsVerifier,
-            final ApplicationScope applicationScope,
-            final IndexScope indexScope ) {
+    protected FilteringLoader( final ManagerCache managerCache, final ResultsVerifier resultsVerifier,
+                               final ApplicationScope applicationScope, final IndexScope indexScope ) {
 
         this.managerCache = managerCache;
         this.resultsVerifier = resultsVerifier;
@@ -90,7 +85,7 @@ public class FilteringLoader implements ResultsLoader {
     public Results loadResults( final CandidateResults crs ) {
 
 
-        if(crs.size() == 0){
+        if ( crs.size() == 0 ) {
             return new Results();
         }
 
@@ -101,11 +96,6 @@ public class FilteringLoader implements ResultsLoader {
         // Maps the entity ids to our candidates
         final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
 
-        // Groups all candidate results by types.  When search connections there will be multiple
-        // types, so we want to batch fetch them more efficiently
-
-        final HashMultimap<String, CandidateResult> groupedByScopes =
-                HashMultimap.create( crs.size(), crs.size() );
 
         final Iterator<CandidateResult> iter = crs.iterator();
 
@@ -119,9 +109,6 @@ public class FilteringLoader implements ResultsLoader {
 
             final CandidateResult currentCandidate = iter.next();
 
-            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
-                    currentCandidate.getId().getType() );
-
             final Id entityId = currentCandidate.getId();
 
             //check if we've seen this candidate by id
@@ -131,7 +118,6 @@ public class FilteringLoader implements ResultsLoader {
             if ( previousMax == null ) {
                 maxCandidateMapping.put( entityId, currentCandidate );
                 orderIndex.put( entityId, i );
-                groupedByScopes.put( collectionType, currentCandidate );
                 continue;
             }
 
@@ -146,12 +132,12 @@ public class FilteringLoader implements ResultsLoader {
             final CandidateResult toKeep;
 
             //current is newer than previous.  Remove previous and keep current
-            if(UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ){
+            if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
                 toRemove = previousMax;
                 toKeep = currentCandidate;
             }
             //previously seen value is newer than current.  Remove the current and keep the previously seen value
-            else{
+            else {
                 toRemove = currentCandidate;
                 toKeep = previousMax;
             }
@@ -160,17 +146,13 @@ public class FilteringLoader implements ResultsLoader {
 
 
             //de-index it
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
-                new Object[] {
-                    entityId.getUuid(),
-                    entityId.getType(),
-                    toRemove.getVersion(),
-                    toKeep.getVersion() } );
+            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion()
+                } );
 
             //deindex this document, and remove the previous maxVersion
             //we have to deindex this from our ownerId, since this is what gave us the reference
             indexBatch.deindex( indexScope, toRemove );
-            groupedByScopes.remove( collectionType, toRemove );
 
 
             //TODO, fire the entity repair cleanup task here instead of de-indexing
@@ -178,7 +160,6 @@ public class FilteringLoader implements ResultsLoader {
             //replace the value with a more current version
             maxCandidateMapping.put( entityId, toKeep );
             orderIndex.put( entityId, i );
-            groupedByScopes.put( collectionType, toKeep );
         }
 
 
@@ -187,57 +168,52 @@ public class FilteringLoader implements ResultsLoader {
 
         final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
 
-        for ( final String scopeName : groupedByScopes.keySet() ) {
-
 
-            final Set<CandidateResult> candidateResults = groupedByScopes.get( scopeName );
-
-            final Collection<Id> idsToLoad =
-                    Collections2.transform( candidateResults, new Function<CandidateResult, Id>() {
-                        @Nullable
-                        @Override
-                        public Id apply( @Nullable final CandidateResult input ) {
-                            //NOTE this is never null, we won't need to check
-                           return input.getId();
-                        }
-                    } );
+        final Collection<Id> idsToLoad =
+            Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() {
+                @Nullable
+                @Override
+                public Id apply( @Nullable final CandidateResult input ) {
+                    //NOTE this is never null, we won't need to check
+                    return input.getId();
+                }
+            } );
 
 
-            //now using the scope, load the collection
+        //now using the scope, load the collection
 
-            // Get the collection scope and batch load all the versions.  We put all entities in
-            // app/app for easy retrieval/ unless persistence changes, we never want to read from
-            // any scope other than the app, app, scope name scope
-//            final CollectionScope collScope = new CollectionScopeImpl(
-//                applicationScope.getApplication(), applicationScope.getApplication(), scopeName);
+        // Get the collection scope and batch load all the versions.  We put all entities in
+        // app/app for easy retrieval/ unless persistence changes, we never want to read from
+        // any scope other than the app, app, scope name scope
+        //            final CollectionScope collScope = new CollectionScopeImpl(
+        //                applicationScope.getApplication(), applicationScope.getApplication(), scopeName);
 
-            final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope);
+        final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope );
 
 
-            //load the results into the loader for this scope for validation
-            resultsVerifier.loadResults( idsToLoad, ecm );
+        //load the results into the loader for this scope for validation
+        resultsVerifier.loadResults( idsToLoad, ecm );
 
-            //now let the loader validate each candidate.  For instance, the "max" in this candidate
-            //could still be a stale result, so it needs validated
-            for ( final Id requestedId : idsToLoad ) {
+        //now let the loader validate each candidate.  For instance, the "max" in this candidate
+        //could still be a stale result, so it needs validated
+        for ( final Id requestedId : idsToLoad ) {
 
-                final CandidateResult cr = maxCandidateMapping.get( requestedId );
+            final CandidateResult cr = maxCandidateMapping.get( requestedId );
 
-                //ask the loader if this is valid, if not discard it and de-index it
-                if ( !resultsVerifier.isValid( cr ) ) {
-                    indexBatch.deindex( indexScope, cr );
-                    continue;
-                }
+            //ask the loader if this is valid, if not discard it and de-index it
+            if ( !resultsVerifier.isValid( cr ) ) {
+                indexBatch.deindex( indexScope, cr );
+                continue;
+            }
 
-                //if we get here we're good, we need to add this to our results
-                final int candidateIndex = orderIndex.get( requestedId );
+            //if we get here we're good, we need to add this to our results
+            final int candidateIndex = orderIndex.get( requestedId );
 
-                sortedResults.put( candidateIndex, requestedId );
-            }
+            sortedResults.put( candidateIndex, requestedId );
         }
 
 
-         // NOTE DO NOT execute the batch here.
+        // NOTE DO NOT execute the batch here.
         // It changes the results and we need consistent paging until we aggregate all results
         return resultsVerifier.getResults( sortedResults.values() );
     }
@@ -247,6 +223,4 @@ public class FilteringLoader implements ResultsLoader {
     public void postProcess() {
         this.indexBatch.execute();
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 2e9fb55..652742b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util;
 
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.clearspring.analytics.util.Preconditions;
+
 
 /**
  * Utilises for constructing standard naming conventions for collections and connections
@@ -80,66 +86,150 @@ public class CpNamingUtils {
      * @param type
      * @return
      */
-    public static String getCollectionScopeNameFromEntityType( String type ) {
+    private static String getCollectionScopeNameFromEntityType( String type ) {
         String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
         return csn.toLowerCase();
     }
 
 
-    public static String getCollectionScopeNameFromCollectionName( String name ) {
+    private static String getCollectionScopeNameFromCollectionName( String name ) {
         String csn = EDGE_COLL_SUFFIX + name;
         return csn.toLowerCase();
     }
 
 
-    public static String getConnectionScopeName( String connectionType ) {
+    private static String getConnectionScopeName( String connectionType ) {
         String csn = EDGE_CONN_SUFFIX + connectionType ;
         return csn.toLowerCase();
     }
 
 
-    public static boolean isCollectionEdgeType( String type ) {
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeFromSource(final Edge edge ){
+
+
+        final Id nodeId = edge.getSourceNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+
+
+
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeToTarget(final Edge edge ){
+
+
+
+        final Id nodeId = edge.getTargetNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+    /**
+     * Generate either the collection name or connection name from the edgeName
+     * @param edgeName
+     * @return
+     */
+    public static String getNameFromEdgeType(final String edgeName){
+
+
+        if(isCollectionEdgeType( edgeName )){
+           return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) );
+        }
+
+        return getConnectionScopeName(getConnectionType( edgeName )  );
+
+    }
+
+
+    /**
+     * Get the index scope from the colleciton name
+     * @param nodeId The source or target node id
+     * @param collectionName The name of the collection.  Ex "users"
+     * @return
+     */
+    public static IndexScope generateScopeFromCollection( final Id nodeId, final String collectionName ){
+        return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) );
+    }
+
+
+    /**
+     * Get the scope from the connection
+     * @param nodeId
+     * @param connectionName
+     * @return
+     */
+    public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){
+        return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) );
+    }
+
+
+    /**
+     * Create an Id object from the entity ref
+     * @param entityRef
+     * @return
+     */
+    public static Id createId(final EntityRef entityRef){
+      return new SimpleId( entityRef.getUuid(), entityRef.getType() );
+    }
+
+    private static boolean isCollectionEdgeType( String type ) {
         return type.startsWith( EDGE_COLL_SUFFIX );
     }
 
 
-    public static boolean isConnectionEdgeType( String type ) {
+    private static boolean isConnectionEdgeType( String type ) {
         return type.startsWith( EDGE_CONN_SUFFIX );
     }
 
 
-    static public String  getConnectionType( String edgeType ) {
+
+    private static  String  getConnectionType( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
-    static public String getCollectionName( String edgeType ) {
+    private static String getCollectionName( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
+    /**
+     * Generate a standard edge name for our graph using the connection name
+     * @param connectionType The type of connection made
+     * @return
+     */
     public static String getEdgeTypeFromConnectionType( String connectionType ) {
-
-        if ( connectionType != null ) {
-            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
-            return csn;
-        }
-
-        return null;
+        return  (EDGE_CONN_SUFFIX  + "|" + connectionType).toLowerCase();
     }
 
 
+    /**
+     * Generate a standard edges from for a collection
+     * @param collectionName
+     * @return
+     */
     public static String getEdgeTypeFromCollectionName( String collectionName ) {
-
-        if ( collectionName != null ) {
-            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
-            return csn;
-        }
-
-
-        return null;
+        return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5c166c5..f743f0b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.fasterxml.uuid.UUIDComparator;
@@ -60,7 +61,9 @@ import com.google.inject.Injector;
 import net.jcip.annotations.NotThreadSafe;
 
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         EntityManager em = app.getEntityManager();
 
-        EntityIndexFactory eif =  SpringResource.getInstance().getBean( Injector.class ).getInstance( EntityIndexFactory.class );
+        EntityIndexFactory eif =  SpringResource.getInstance().getBean( Injector.class ).getInstance(
+            EntityIndexFactory.class );
 
         ApplicationScope as = new ApplicationScopeImpl(
             new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) );
         ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as);
 
-        IndexScope is = new IndexScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+        final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION);
+        IndexScope is = generateScopeFromCollection(rootId, collName );
         Query rcq = Query.fromQL( query );
 
         // TODO: why does this have no effect; max we ever get is 1000 entities

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 50c2cd9..3bfe460 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
                 final Id source = edge.getSourceNode();
 
                 //test if we're a collection, if so
-                if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    final String collectionName = CpNamingUtils.getCollectionName( edgeType );
+                if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                    final String collectionName = EdgeTestUtils.getNameForEdge( edgeType );
 
                     assertEquals("application source returned", createdApplication.getUuid(), source.getUuid());
 
@@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
 
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
+                if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
                     fail( "Only connection edges should be encountered" );
                 }
 
-                final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+                final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
                 assertEquals( "Same connection type expected", "likes", connectionType );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9f1bb17..6d228b2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                //test if we're a collection, if so remove ourselves fro the types
-                if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    fail( "Connections should be the only type encountered" );
-                }
+            //test if we're a collection, if so remove ourselves fro the types
+            if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                fail( "Connections should be the only type encountered" );
+            }
 
 
-                final String collectionType = CpNamingUtils.getCollectionName( edgeType );
+            final String collectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
-                if ( collectionType.equals( type1 ) ) {
-                    assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
-                }
-                else if ( collectionType.equals( type2 ) ) {
-                    assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
-                }
+            if ( collectionType.equals( type1 ) ) {
+                assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
+            }
+            else if ( collectionType.equals( type2 ) ) {
+                assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
+            }
 
 
-            }
         } ).toBlocking().lastOrDefault( null );
 
 
@@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
-                    fail( "Only connection edges should be encountered" );
-                }
+            if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
+                fail( "Only connection edges should be encountered" );
+            }
 
-                final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+            final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
 
-                assertEquals( "Same connection type expected", "likes", connectionType );
+            assertEquals( "Same connection type expected", "likes", connectionType );
 
 
-                assertTrue( "Element should be present on removal", connections.remove( target ) );
-            }
+            assertTrue( "Element should be present on removal", connections.remove( target ) );
         } ).toBlocking().lastOrDefault( null );
 
         assertEquals( "Every connection should have been encountered", 0, connections.size() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
new file mode 100644
index 0000000..f217338
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.utils;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EdgeTestUtils {
+
+    /**
+     * Get the name for an edge
+     */
+    public static String getNameForEdge( final String edgeName ) {
+        final String[] parts = edgeName.split( "\\|" );
+
+        assertEquals( "there should be 2 parts", parts.length, 2 );
+
+        return parts[1];
+    }
+
+
+    public static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX );
+    }
+
+
+    public static boolean isConnectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 2bd1edb..84a7fc3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
 
 
         try {
+            subscriber.onStart();
+
             //get our iterator and push data to the observer
             final Iterator<T> itr = getIterator();
 


Mime
View raw message