usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [01/12] incubator-usergrid git commit: Refactor of pipeline to support type mapping for clarity
Date Tue, 26 May 2015 23:06:20 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-608 29a4009eb -> 87963740a


Refactor of pipeline to support type mapping for clarity


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3a1784f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3a1784f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3a1784f0

Branch: refs/heads/USERGRID-608
Commit: 3a1784f0455acae20c7dfbde61e9493d572ad549
Parents: 5b1dfa1
Author: Todd Nine <tnine@apigee.com>
Authored: Tue May 19 17:05:04 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue May 19 18:07:47 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  32 +-
 .../corepersistence/CpEntityManagerFactory.java |  13 +-
 .../corepersistence/CpRelationManager.java      | 158 +++++-----
 .../corepersistence/pipeline/Pipeline.java      | 121 --------
 .../pipeline/PipelineBuilderFactory.java        |  39 ---
 .../pipeline/PipelineModule.java                |   9 -
 .../pipeline/PipelineOperations.java            |  30 ++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |   5 +-
 .../pipeline/read/FilterPipeline.java           | 132 +++++++++
 .../pipeline/read/ReadPipelineBuilder.java      | 104 -------
 .../pipeline/read/ReadPipelineBuilderImpl.java  | 296 -------------------
 .../pipeline/read/collect/EntityFilter.java     |  68 -----
 .../read/collect/EntityResumeFilter.java        |  68 +++++
 .../read/elasticsearch/CandidateIdFilter.java   |  46 ++-
 15 files changed, 380 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 63018cb..7a56631 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -17,14 +17,29 @@ package org.apache.usergrid.corepersistence;
 
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
@@ -165,7 +180,8 @@ public class CpEntityManager implements EntityManager {
 
     private final AsyncEventService indexService;
 
-    private PipelineBuilderFactory pipelineBuilderFactory;
+    private final FilterFactory filterFactory;
+    private final CollectorFactory collectorFactory;
 
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
@@ -207,7 +223,7 @@ public class CpEntityManager implements EntityManager {
      */
     public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
                             final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig,
-                            final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) {
+                            final FilterFactory filterFactory, final CollectorFactory collectorFactory, final UUID applicationId ) {
         this.entityManagerFig = entityManagerFig;
 
 
@@ -216,8 +232,10 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
         Preconditions.checkNotNull( indexService, "indexService must not be null" );
-        Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" );
-        this.pipelineBuilderFactory = pipelineBuilderFactory;
+        Preconditions.checkNotNull( filterFactory, "filterFactory must not be null" );
+        Preconditions.checkNotNull( collectorFactory, "collectorFactory must not be null" );
+        this.filterFactory = filterFactory;
+        this.collectorFactory = collectorFactory;
 
 
         this.managerCache = managerCache;
@@ -732,7 +750,7 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
 
         CpRelationManager relationManager =
-            new CpRelationManager( metricsFactory, managerCache, pipelineBuilderFactory, indexService, this, entityManagerFig, applicationId, entityRef );
+            new CpRelationManager( metricsFactory, managerCache, filterFactory, collectorFactory, indexService, this, entityManagerFig, applicationId, entityRef );
         return relationManager;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 63e2869..5055538 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,7 +35,8 @@ import org.apache.commons.lang.StringUtils;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.exception.ConflictException;
 import org.apache.usergrid.persistence.AbstractEntity;
@@ -125,7 +126,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private final EntityIndex entityIndex;
     private final MetricsFactory metricsFactory;
     private final AsyncEventService indexService;
-    private final PipelineBuilderFactory pipelineBuilderFactory;
+    private final FilterFactory filterFactory;
+    private final CollectorFactory collectorFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
                                    final Injector injector ) {
@@ -139,7 +141,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncEventService.class );
-        this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
+        this.filterFactory = injector.getInstance( FilterFactory.class );
+        this.collectorFactory = injector.getInstance( CollectorFactory.class );
         this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 
@@ -198,7 +201,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
 
     private EntityManager _getEntityManager( UUID applicationId ) {
-        EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, pipelineBuilderFactory,  applicationId );
+        EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig,
+
+            filterFactory,  collectorFactory, applicationId );
         return em;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 4993d88..6201fe8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -17,16 +17,23 @@
 package org.apache.usergrid.corepersistence;
 
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
-import org.apache.usergrid.persistence.graph.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
+import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterPipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
@@ -49,6 +56,10 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
@@ -63,7 +74,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.MapUtils;
 
-import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
@@ -97,7 +107,6 @@ public class CpRelationManager implements RelationManager {
     private final EntityManagerFig entityManagerFig;
 
     private ManagerCache managerCache;
-    private final PipelineBuilderFactory pipelineBuilderFactory;
 
     private EntityManager em;
 
@@ -111,13 +120,16 @@ public class CpRelationManager implements RelationManager {
 
     private final AsyncEventService indexService;
 
-    private MetricsFactory metricsFactory;
-    private Timer updateCollectionTimer;
+
+    private final FilterFactory filterFactory;
+    private final CollectorFactory collectorFactory;
+
 
 
     public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache,
-                              final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService,
-                              final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId, final EntityRef headEntity) {
+                              final FilterFactory filterFactory, final CollectorFactory collectorFactory, final AsyncEventService indexService,
+                              final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId,
+                              final EntityRef headEntity ) {
 
 
         Assert.notNull( em, "Entity manager cannot be null" );
@@ -134,11 +146,9 @@ public class CpRelationManager implements RelationManager {
         this.headEntity = headEntity;
         this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
-        this.pipelineBuilderFactory = pipelineBuilderFactory;
 
-        this.metricsFactory = metricsFactory;
-        this.updateCollectionTimer =
-            metricsFactory.getTimer( CpRelationManager.class, "relation.manager.es.update.collection" );
+        this.filterFactory = filterFactory;
+        this.collectorFactory = collectorFactory;
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading head entity {}:{} from app {}", new Object[] {
@@ -162,7 +172,7 @@ public class CpRelationManager implements RelationManager {
     public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
         GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
+        String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
 
         logger.debug( "getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", new Object[] {
             edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid()
@@ -198,7 +208,7 @@ public class CpRelationManager implements RelationManager {
 
 
     private Map<EntityRef, Set<String>> getContainers() {
-        return getContainers(-1, null, null);
+        return getContainers( -1, null, null );
     }
 
 
@@ -215,14 +225,14 @@ public class CpRelationManager implements RelationManager {
 
         Observable<Edge> edges =
             gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
-              .flatMap(new Func1<String, Observable<Edge>>() {
+              .flatMap( new Func1<String, Observable<Edge>>() {
                   @Override
-                  public Observable<Edge> call(final String edgeType) {
+                  public Observable<Edge> call( final String edgeType ) {
                       return gm.loadEdgesToTarget(
-                          new SimpleSearchByEdgeType(cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
-                              SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                          new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
+                              SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
                   }
-              });
+              } );
 
         //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
         if ( limit > -1 ) {
@@ -250,7 +260,7 @@ public class CpRelationManager implements RelationManager {
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
 
-        String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType(connectionType);
+        String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType );
 
         logger.debug( "isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
             edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
@@ -271,13 +281,13 @@ public class CpRelationManager implements RelationManager {
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
 
-        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
+        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
 
         logger.debug( "isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
             edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
         } );
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
         Observable<Edge> edges = gm.loadEdgeVersions(
             new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId,
                 Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
@@ -343,7 +353,8 @@ public class CpRelationManager implements RelationManager {
             return null;
         }
 
-        return addToCollection( collectionName, itemRef, ( collection != null && collection.getLinkedCollection() != null ) );
+        return addToCollection( collectionName, itemRef,
+            ( collection != null && collection.getLinkedCollection() != null ) );
     }
 
 
@@ -403,7 +414,7 @@ public class CpRelationManager implements RelationManager {
             logger.debug( "Wrote edge {}", edge );
         }
 
-        indexService.queueNewEdge(applicationScope, memberEntity, edge);
+        indexService.queueNewEdge( applicationScope, memberEntity, edge );
 
 
         if ( logger.isDebugEnabled() ) {
@@ -518,7 +529,8 @@ public class CpRelationManager implements RelationManager {
 
 
         //run our delete
-        final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() );
+        final Edge collectionToItemEdge =
+            createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() );
         gm.markEdge( collectionToItemEdge ).toBlocking().last();
 
 
@@ -575,7 +587,7 @@ public class CpRelationManager implements RelationManager {
                 results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
             }
             else {
-                results = em.getTargetEntities(headEntity, srcRelationName, null, Level.REFS);
+                results = em.getTargetEntities( headEntity, srcRelationName, null, Level.REFS );
             }
 
             if ( ( results != null ) && ( results.size() > 0 ) ) {
@@ -617,50 +629,54 @@ public class CpRelationManager implements RelationManager {
         query = adjustQuery( query );
 
 
+        final FilterPipeline<Id> filterPipeline =  new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ).withFilter(  filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) );
 
-        final ReadPipelineBuilder readPipelineBuilder =
-            pipelineBuilderFactory.createReadPipelineBuilder(applicationScope);
 
-        //set our fields applicable to both operations
-        readPipelineBuilder.withCursor(query.getCursor());
-        readPipelineBuilder.withLimit( Optional.of(query.getLimit()));
-
-        //TODO, this should be removed when the CP relation manager is removed
-        readPipelineBuilder.setStartId( cpHeadEntity.getId() );
+        final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline;
 
         if ( query.isGraphSearch() ) {
-            readPipelineBuilder.getCollection( collectionName );
+            entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) )
+                                            .withFilter( filterFactory.entityLoadFilter() );
         }
         else {
             final String entityType = collection.getType();
 
-            readPipelineBuilder.getCollectionWithQuery( collectionName, entityType, query.getQl().get() );
+            entityFilterPipeline = filterPipeline.withFilter(
+                filterFactory.elasticSearchCollectionFilter( query.getQl().get(), collectionName, entityType ) )
+                                            .withFilter( filterFactory.candidateEntityFilter() );
         }
 
 
-        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable =
+            entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() )
+                                .withCollector( collectorFactory.getResultsPageCollector() ).execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }
 
+
     @Override
-    public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults ) throws Exception {
+    public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults )
+        throws Exception {
         Results results;
         long maxLength = entityManagerFig.pollForRecordsTimeout();
         long sleepTime = entityManagerFig.sleep();
         boolean found;
         long current = System.currentTimeMillis(), length = 0;
         do {
-            results = searchCollection(collectionName, query);
+            results = searchCollection( collectionName, query );
             length = System.currentTimeMillis() - current;
             found = expectedResults == results.size();
-            if(found){
+            if ( found ) {
                 break;
             }
-            Thread.sleep(sleepTime);
-        }while (!found && length <= maxLength);
-        if(logger.isInfoEnabled()){
-            logger.info(String.format("Consistent Search finished in %s,  results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
+            Thread.sleep( sleepTime );
+        }
+        while ( !found && length <= maxLength );
+        if ( logger.isInfoEnabled() ) {
+            logger.info( String
+                .format( "Consistent Search finished in %s,  results=%s, expected=%s...dumping stack", length,
+                    results.size(), expectedResults ) );
             Thread.dumpStack();
         }
         return results;
@@ -836,7 +852,7 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    public Results getTargetEntities(String connectionType, String connectedEntityType, Level level)
+    public Results getTargetEntities( String connectionType, String connectedEntityType, Level level )
         throws Exception {
 
         //until this is refactored properly, we will delegate to a search by query
@@ -849,20 +865,19 @@ public class CpRelationManager implements RelationManager {
         query.setEntityType( connectedEntityType );
         query.setResultsLevel( level );
 
-        return searchTargetEntities(query);
+        return searchTargetEntities( query );
     }
 
 
     @Override
-    public Results getSourceEntities(String connType, String fromEntityType, Level resultsLevel)
-        throws Exception {
+    public Results getSourceEntities( String connType, String fromEntityType, Level resultsLevel ) throws Exception {
 
-        return getSourceEntities(connType, fromEntityType, resultsLevel, -1);
+        return getSourceEntities( connType, fromEntityType, resultsLevel, -1 );
     }
 
 
     @Override
-    public Results getSourceEntities(String connType, String fromEntityType, Level level, int count)
+    public Results getSourceEntities( String connType, String fromEntityType, Level level, int count )
         throws Exception {
 
         // looking for edges to the head entity
@@ -895,7 +910,7 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    public Results searchTargetEntities(Query query) throws Exception {
+    public Results searchTargetEntities( Query query ) throws Exception {
 
         Preconditions.checkNotNull( query, "query cannot be null" );
 
@@ -909,37 +924,40 @@ public class CpRelationManager implements RelationManager {
         query = adjustQuery( query );
 
         final String entityType = query.getEntityType();
-        //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector -> 1.0 results
+        //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
+        // -> 1.0 results
 
         //  startid -- graph edge load -- entity load (verify) from ids -> results page collector
         // startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector
 
         //startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector
-        //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref collector
+        //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
+        // collector
+
 
-        final ReadPipelineBuilder readPipelineBuilder =
-            pipelineBuilderFactory.createReadPipelineBuilder(applicationScope);
-        //readPipelineBuilder.startId().load().collect()
+        final FilterPipeline<Id> filterPipeline =
+            new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() )
+                .withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) );
 
-        //set our fields applicable to both operations
-        readPipelineBuilder
-            .withCursor(query.getCursor())
-            .withLimit(Optional.of(query.getLimit()))
-                //TODO, this should be removed when the CP relation manager is removed
-            .setStartId( cpHeadEntity.getId() );
+
+        final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline;
 
         if ( query.isGraphSearch() ) {
-           // if(query.getResultsLevel() == Level.ALL_PROPERTIES)
-           readPipelineBuilder.getConnection( connection );
-            //else
+            entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphConnectionFilter( connection ) )
+                                                 .withFilter( filterFactory.entityLoadFilter() );
         }
+
         else {
-            readPipelineBuilder.getConnectionWithQuery( connection, Optional.fromNullable( entityType ),
-                query.getQl().get() );
+
+            entityFilterPipeline = filterPipeline.withFilter( filterFactory
+                .elasticSearchConnectionFilter( query.getQl().get(), connection, Optional.fromNullable( entityType ) ) )
+                                                 .withFilter( filterFactory.candidateEntityFilter() );
         }
 
 
-        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable =
+            entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() )
+                                .withCollector( collectorFactory.getResultsPageCollector() ).execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
deleted file mode 100644
index 26cf346..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * A pipeline that will allow us to build a traversal command for execution
- *
- * See http://martinfowler.com/articles/collection-pipeline/ for some examples
- *
- * TODO: Re work the cursor and limit phases.  They need to be lazily evaluated, not added on build time
- */
-public class Pipeline<R> {
-
-
-    private final ApplicationScope applicationScope;
-    private final List<PipelineOperation> idPipelineOperationList;
-    private final Collector<?, R> collector;
-    private final RequestCursor requestCursor;
-
-    private final int limit;
-
-
-    private int idCount = 0;
-
-
-    /**
-     * Our first pass, where we implement our start point as an Id until we can use this to perform our entire
-     * traversal.  Eventually as we untangle the existing Query service nightmare, the sourceId will be remove and
-     * should only be traversed from the root application
-     */
-    public Pipeline( final ApplicationScope applicationScope, final List<PipelineOperation> pipelineOperations,
-                     final Collector<?, R> collector, final Optional<String> cursor, final int limit ) {
-
-        this.applicationScope = applicationScope;
-        this.idPipelineOperationList = pipelineOperations;
-        this.collector = collector;
-        this.limit = limit;
-
-        this.requestCursor = new RequestCursor( cursor );
-    }
-
-
-    /**
-     * Execute the pipline construction, returning an observable of results
-     * @return
-     */
-    public Observable<R> execute(){
-
-
-        Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() ));
-
-        //build our traversal commands
-        for ( PipelineOperation pipelineOperation : idPipelineOperationList ) {
-            setState( pipelineOperation );
-
-            //TODO, see if we can wrap this observable in our ObservableTimer so we can see how long each filter takes
-
-
-            traverseObservable = traverseObservable.compose( pipelineOperation );
-        }
-
-
-        setState( collector );
-
-        final Observable<R> response =  traverseObservable.compose( collector );
-
-
-        //append the optional cursor into the response for the caller to use
-        return response;
-    }
-
-
-
-
-    /**
-     * Set the id of the state
-     */
-    private void setState( final PipelineOperation pipelineOperation ) {
-
-
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor,
-            limit, idCount );
-
-        pipelineOperation.setContext( context );
-
-        //done for clarity
-        idCount++;
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java
deleted file mode 100644
index 9916bc1..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-
-/**
- * Factory for creating pipeline builders
- */
-public interface PipelineBuilderFactory {
-
-
-    /**
-     * Create a read pipeline builder
-     * @param applicationScope
-     * @return
-     */
-    ReadPipelineBuilder createReadPipelineBuilder( final ApplicationScope applicationScope );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index 3018718..ef696bd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -22,9 +22,6 @@ package org.apache.usergrid.corepersistence.pipeline;
 
 import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactoryImpl;
-import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
-import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilderImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -42,12 +39,6 @@ public class PipelineModule extends AbstractModule {
 //       bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class );
 
 
-          //Use Guice to create the builder since we don't really need to do anything
-        //other than DI when creating the filters
-       install( new FactoryModuleBuilder().implement( ReadPipelineBuilder.class, ReadPipelineBuilderImpl.class )
-                                          .build( PipelineBuilderFactory.class ) );
-
-
 //        install( new Factory)
 
             //Use Guice to create the builder since we don't really need to do anything

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java
new file mode 100644
index 0000000..3929a97
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+public interface PipelineOperations {
+
+    /**
+     * Add the pipeline operation to the set of operations
+     * @param po
+     */
+    void add( PipelineOperation po );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index 054a85a..ee01602 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -28,4 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
  * an observable of FilterResults.  Filters should never emit groups or objects that represent collections.  Items should
  * always be emitted 1 at a time.  It is the responsibility of the collector to aggregate results.
  */
-public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {}
+public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {
+
+    /**
+     * Get the builder for the next phase
+     * @return
+     */
+//    B getNextBuilder();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index a2f1605..d297c2a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,8 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
@@ -139,5 +138,5 @@ public interface FilterFactory {
      * Create a new instance of our entity filter
      * @return
      */
-    EntityFilter entityFilter();
+    EntityResumeFilter entityResumeFilter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
new file mode 100644
index 0000000..f8bbdd8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import rx.Observable;
+
+
+/**
+ * Pipeline for applying our UG domain specific filters.
+ *
+ * Modeled after an observable, with typing to allow input of specific filters
+ *
+ * @param InputType the input type in the current pipeline state
+ */
+public class FilterPipeline<InputType> {
+
+
+    private int idCount = 0;
+
+    private final ApplicationScope applicationScope;
+
+
+    private final RequestCursor requestCursor;
+    private int limit;
+
+    //Generics hell, intentionally without a generic, we check at the filter level
+    private Observable currentObservable;
+
+
+    /**
+     * Create our filter pipeline
+     */
+    public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+
+
+        ValidationUtils.validateApplicationScope( applicationScope );
+        Preconditions.checkNotNull( cursor, "cursor optional is required" );
+        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+
+        this.applicationScope = applicationScope;
+
+        //init our cursor to empty
+        this.requestCursor = new RequestCursor( cursor );
+
+        //set the default limit
+        this.limit = limit;
+
+        //set our observable to start at the application
+        final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
+        this.currentObservable = Observable.just( filter );
+    }
+
+
+    public <OutputType> FilterPipeline<OutputType> withFilter(
+        final Filter<? super InputType, ? extends OutputType> filter ) {
+
+
+        setUp( filter );
+
+        return ( FilterPipeline<OutputType> ) this;
+    }
+
+
+    public <OutputType> FilterPipeline<OutputType> withCollector(
+        final Collector<? super InputType, ? extends OutputType> collector ) {
+
+
+        setUp( collector );
+
+        return ( FilterPipeline<OutputType> ) this;
+    }
+
+
+    private <OutputType> void setUp(
+        final PipelineOperation<? super InputType, ? extends OutputType> pipelineOperation ) {
+        setState( pipelineOperation );
+
+        currentObservable = currentObservable.compose( pipelineOperation );
+    }
+
+
+    /**
+     * Return the observable of the filter pipeline
+     */
+    public Observable<InputType> execute() {
+        return currentObservable;
+    }
+
+
+    /**
+     * Set the id of the state
+     */
+    private void setState( final PipelineOperation pipelineOperation ) {
+
+
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+
+        pipelineOperation.setContext( context );
+
+        //done for clarity
+        idCount++;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
deleted file mode 100644
index d0e87b3..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * An instance of a pipeline builder for building commands on our read pipline
- *
- * Each invocation of the method will assemble the underlying pipe and updating it's state
- *
- * Results are added by invoking execute.
- */
-public interface ReadPipelineBuilder {
-
-
-    /**
-     * Set the cursor
-     * @param cursor
-     */
-    ReadPipelineBuilder withCursor(final Optional<String> cursor);
-
-    /**
-     * Set the limit of our page sizes
-     * @param limit
-     * @return
-     */
-    ReadPipelineBuilder withLimit(final Optional<Integer> limit);
-
-    /**
-     * An operation to bridge 2.0-> 1.0.  Should be removed when everyone uses the pipeline
-     * @param id
-     * @return
-     */
-    ReadPipelineBuilder setStartId(final Id id);
-
-
-    /**
-     * Add a get entity to the pipeline
-     */
-    ReadPipelineBuilder getEntityViaCollection( final String collectionName, final Id entityId );
-
-
-    /**
-     * Add get Collection from our previous source
-     */
-    ReadPipelineBuilder getCollection( final String collectionName );
-
-    /**
-     * Get all entities with a query
-     */
-    ReadPipelineBuilder getCollectionWithQuery( final String collectionName,final String entityType,  final String query);
-
-    /**
-     * Get an entity via the connection name and entity Id
-     */
-    ReadPipelineBuilder getEntityViaConnection( final String connectionName, final Id entityId );
-
-    /**
-     * Get all entities in a connection by the connection name
-     */
-    ReadPipelineBuilder getConnection( final String connectionName );
-
-    /**
-     * Get all entities in a connection of the specified connection type
-     */
-    ReadPipelineBuilder getConnection( final String connectionName, final String entityType );
-
-    /**
-     * Get all entities in a connection with a query and a target entity type
-     */
-    ReadPipelineBuilder getConnectionWithQuery( final String connectionName, final Optional<String> entityType,
-                                                final String query );
-
-
-    /**
-     * Load our entity results when our previous filter calls graph
-     * @return
-     */
-    Observable<ResultsPage> execute();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
deleted file mode 100644
index 28446ad..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.Pipeline;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * An implementation of our builder for piplines
- */
-public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
-
-    private static final int DEFAULT_LIMIT = 10;
-
-    private final FilterFactory filterFactory;
-
-    private final CollectorState collectorState;
-
-    private final ApplicationScope applicationScope;
-
-    private final CollectorFactory collectorFactory;
-
-
-    /**
-     * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are
-     * rendered
-     */
-    private List<Filter> filters;
-
-
-    private Optional<String> cursor;
-    private int limit;
-
-
-    @Inject
-    public ReadPipelineBuilderImpl( final FilterFactory filterFactory, final CollectorFactory collectorFactory,
-                                    @Assisted final ApplicationScope applicationScope ) {
-        this.filterFactory = filterFactory;
-
-        this.applicationScope = applicationScope;
-        this.collectorFactory = collectorFactory;
-
-        //init our cursor to empty
-        this.cursor = Optional.absent();
-
-        //set the default limit
-        this.limit = DEFAULT_LIMIT;
-
-
-        this.collectorState = new CollectorState( );
-
-        this.filters = new ArrayList<>();
-    }
-
-
-    @Override
-    public ReadPipelineBuilder withCursor( final Optional<String> cursor ) {
-        Preconditions.checkNotNull( cursor, "cursor must not be null" );
-        this.cursor = cursor;
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder withLimit( final Optional<Integer> limit ) {
-        Preconditions.checkNotNull( limit, "limit must not be null" );
-        this.limit = limit.or( DEFAULT_LIMIT );
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder setStartId( final Id id ) {
-        ValidationUtils.verifyIdentity( id );
-
-        filters.add( filterFactory.getEntityIdFilter( id ) );
-
-        this.collectorState.clear();
-
-
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getEntityViaCollection( final String collectionName, final Id entityId ) {
-        Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
-        ValidationUtils.verifyIdentity( entityId );
-
-        filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
-
-        this.collectorState.setIdEntityLoaderFilter();
-
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getCollection( final String collectionName ) {
-        Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
-
-        filters.add( filterFactory.readGraphCollectionFilter( collectionName ) );
-
-        this.collectorState.setIdEntityLoaderFilter();
-
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String entityType,  final String query ) {
-        Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
-        Preconditions.checkNotNull( query, "query must not be null" );
-
-        //TODO, this should really be 2 a TraverseFilter with an entityLoad collector
-
-        filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) );
-
-        this.collectorState.setCandidateEntityFilter();
-
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getEntityViaConnection( final String connectionName, final Id entityId ) {
-        Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
-        ValidationUtils.verifyIdentity( entityId );
-
-        filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
-        collectorState.setIdEntityLoaderFilter();
-
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getConnection( final String connectionName ) {
-        Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
-        filters.add( filterFactory.readGraphConnectionFilter( connectionName ) );
-        collectorState.setIdEntityLoaderFilter();
-
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getConnection( final String connectionName, final String entityType ) {
-        Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
-        Preconditions.checkNotNull( connectionName, "entityType must not be null" );
-
-        filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) );
-
-        collectorState.setIdEntityLoaderFilter();
-        return this;
-    }
-
-
-    @Override
-    public ReadPipelineBuilder getConnectionWithQuery( final String connectionName, final Optional<String> entityType,
-                                                       final String query ) {
-
-        Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
-        Preconditions.checkNotNull( connectionName, "entityType must not be null" );
-        Preconditions.checkNotNull( query, "query must not be null" );
-
-        filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) );
-        collectorState.setCandidateEntityFilter();
-        return this;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> execute() {
-
-        ValidationUtils.validateApplicationScope( applicationScope );
-
-
-        //add our last filter that will generate entities
-        final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter();
-
-        filters.add( entityLoadFilter );
-
-        //add the filter that skips the first result on resume
-        final Filter<Entity, Entity>  cursorEntityFilter = filterFactory.entityFilter();
-
-        filters.add( cursorEntityFilter );
-
-
-        //execute our collector
-        final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector();
-
-        Preconditions.checkNotNull( collector,
-            "You have not specified an operation that creates a collection filter.  This is required for loading "
-                + "results" );
-
-
-        Preconditions.checkNotNull( cursor, "A cursor should be initialized even if absent" );
-
-        Preconditions.checkArgument( limit > 0, "limit must be > than 0" );
-
-
-        Pipeline pipeline = new Pipeline( applicationScope, filters, collector, cursor, limit );
-
-
-        return pipeline.execute();
-    }
-
-
-    /**
-     * A mutable state for our collectors.  Rather than create a new instance each time, we create a singleton
-     * collector
-     */
-    private final class CollectorState {
-
-
-        private EntityLoadFilter entityLoadCollector;
-
-        private CandidateEntityFilter candidateEntityFilter;
-
-        private Filter entityLoadFilter;
-
-
-
-        private CollectorState( ){}
-
-
-        /**
-         * Set our final filter to be a load entity by Id filter
-         */
-        public void setIdEntityLoaderFilter() {
-            if ( entityLoadCollector == null ) {
-                entityLoadCollector = filterFactory.entityLoadFilter();
-            }
-
-
-            entityLoadFilter = entityLoadCollector;
-        }
-
-
-        /**
-         * Set our final filter to be a load entity by candidate filter
-         */
-        public void setCandidateEntityFilter() {
-            if ( candidateEntityFilter == null ) {
-                candidateEntityFilter = filterFactory.candidateEntityFilter();
-            }
-
-            entityLoadFilter = candidateEntityFilter;
-        }
-
-
-        public void clear() {
-            entityLoadFilter = null;
-        }
-
-
-        public Filter<?, Entity> getFinalFilter() {
-            return entityLoadFilter;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
deleted file mode 100644
index daf2e7f..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.collect;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * A filter that is used when we can potentially serialize pages via cursor.  This will filter the first result, only if
- * it matches the Id that was set
- */
-public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
-
-
-    @Override
-    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
-
-        //filter only the first id, then map into our path for our next pass
-
-
-        return filterResultObservable.skipWhile( filterResult -> {
-
-            final Optional<Id> startFromCursor = getSeekValue();
-
-            return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() );
-        } ).map( filterResult -> {
-
-
-            final Entity entity = filterResult.getValue();
-            final Id entityId = entity.getId();
-
-            return createFilterResult( entity, entityId, filterResult.getPath() );
-        } );
-    }
-
-
-    @Override
-    protected CursorSerializer<Id> getCursorSerializer() {
-        return IdCursorSerializer.INSTANCE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
new file mode 100644
index 0000000..2917b61
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor.  This will filter the first result, only if
+ * it matches the Id that was set
+ */
+public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
+
+
+    @Override
+    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+        //filter only the first id, then map into our path for our next pass
+
+
+        return filterResultObservable.skipWhile( filterResult -> {
+
+            final Optional<Id> startFromCursor = getSeekValue();
+
+            return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() );
+        } ).map( filterResult -> {
+
+
+            final Entity entity = filterResult.getValue();
+            final Id entityId = entity.getId();
+
+            return createFilterResult( entity, entityId, filterResult.getPath() );
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Id> getCursorSerializer() {
+        return IdCursorSerializer.INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
index 56e1c1c..0e87141 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -50,11 +49,10 @@ import rx.Observable;
 
 
 /**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions
- * Input is a batch of candidate results, output is a stream of validated Ids
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of
+ * candidate results, output is a stream of validated Ids
  */
-public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
-    implements Filter<Candidate, Id> {
+public class CandidateIdFilter extends AbstractFilter<Candidate, Id> implements Filter<Candidate, Id> {
 
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EntityIndexFactory entityIndexFactory;
@@ -68,9 +66,8 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
     }
 
 
-
     @Override
-      public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
 
 
         /**
@@ -87,32 +84,28 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
         final ApplicationEntityIndex applicationIndex =
             entityIndexFactory.createApplicationEntityIndex( applicationScope );
 
-        final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap(
-            candidateResults -> {
-                //flatten toa list of ids to load
-                final Observable<List<Id>> candidateIds =
-                    Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() )
-                              .toList();
+        final Observable<FilterResult<Id>> searchIdSetObservable =
+            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
+                    //flatten toa list of ids to load
+                    final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
+                        candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
 
-                //load the ids
-                final Observable<VersionSet> versionSetObservable =
-                    candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+                    //load the ids
+                    final Observable<VersionSet> versionSetObservable =
+                        candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
 
-                //now we have a collection, validate our canidate set is correct.
+                    //now we have a collection, validate our canidate set is correct.
 
-                return versionSetObservable.map(
-                    entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                                           .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                    return versionSetObservable.map(
+                        entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
+                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
                         entityCollector -> Observable.from( entityCollector.collectResults() ) );
-        } );
+                } );
 
         return searchIdSetObservable;
     }
 
 
-
-
-
     /**
      * Map a new cp entity to an old entity.  May be null if not present
      */
@@ -155,7 +148,6 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
 
         /**
          * Validate each candidate results vs the data loaded from cass
-         * @param filterCandidate
          */
         private void validate( final FilterResult<Candidate> filterCandidate ) {
 
@@ -191,11 +183,9 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
 
             //they're the same add it
 
-            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath()  );
+            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
 
             results.add( result );
         }
     }
-
-
 }


Mime
View raw message