usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [07/50] [abbrv] incubator-usergrid git commit: Refactors operations into easier build pattern. Pipeline still need some work.
Date Thu, 28 May 2015 12:53:12 GMT
Refactors  operations into easier build pattern.  Pipeline still need some work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d54dffc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d54dffc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d54dffc

Branch: refs/heads/USERGRID-669
Commit: 6d54dffc4e9178b85349ec591275c9005ad121ed
Parents: 3a1784f
Author: Todd Nine <tnine@apigee.com>
Authored: Wed May 20 15:00:38 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed May 20 19:24:50 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  16 +-
 .../corepersistence/CpEntityManagerFactory.java |  12 +-
 .../corepersistence/CpRelationManager.java      |  93 +++++---
 .../pipeline/FilterPipeline.java                | 107 +++++++++
 .../pipeline/PipelineModule.java                |   2 -
 .../pipeline/PipelineOperation.java             |   2 +-
 .../pipeline/builder/CandidateBuilder.java      |  67 ++++++
 .../pipeline/builder/ConnectionBuilder.java     |  37 +++
 .../pipeline/builder/ConnectionRefBuilder.java  |  53 +++++
 .../pipeline/builder/EntityBuilder.java         |  51 ++++
 .../pipeline/builder/IdBuilder.java             | 147 ++++++++++++
 .../pipeline/builder/PipelineBuilder.java       | 100 ++++++++
 .../builder/PipelineBuilderFactory.java         |  35 +++
 .../pipeline/read/AbstractFilter.java           |   2 +-
 .../pipeline/read/AbstractPathFilter.java       |   2 +-
 .../pipeline/read/Collector.java                |  38 ---
 .../pipeline/read/CollectorFactory.java         |  38 ---
 .../corepersistence/pipeline/read/Filter.java   |  38 ---
 .../pipeline/read/FilterFactory.java            |  69 ++++--
 .../pipeline/read/FilterPipeline.java           | 132 -----------
 .../pipeline/read/ReadFilterFactoryImpl.java    | 136 -----------
 .../pipeline/read/ResultsPage.java              |  10 +-
 .../read/collect/AbstractCollector.java         |  46 ----
 .../read/collect/ConnectionRefFilter.java       |  68 ++++++
 .../read/collect/ConnectionRefResumeFilter.java |  86 +++++++
 .../read/collect/EntityResumeFilter.java        |   3 +-
 .../read/collect/ResultsPageCollector.java      |  35 ++-
 .../AbstractElasticSearchFilter.java            | 171 --------------
 .../pipeline/read/elasticsearch/Candidate.java  |  55 -----
 .../elasticsearch/CandidateEntityFilter.java    | 234 -------------------
 .../read/elasticsearch/CandidateIdFilter.java   | 191 ---------------
 .../ElasticSearchCollectionFilter.java          |  77 ------
 .../ElasticSearchConnectionFilter.java          |  73 ------
 .../ElasticsearchCursorSerializer.java          |  42 ----
 .../read/elasticsearch/Elasticsearchdiagram.jpg | Bin 316655 -> 0 bytes
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  82 -------
 .../read/graph/AbstractReadGraphFilter.java     | 147 ------------
 .../read/graph/EdgeCursorSerializer.java        |  42 ----
 .../pipeline/read/graph/EntityIdFilter.java     |  54 -----
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ------------
 .../pipeline/read/graph/GraphDiagram.jpg        | Bin 347711 -> 0 bytes
 .../graph/ReadGraphCollectionByIdFilter.java    |  49 ----
 .../read/graph/ReadGraphCollectionFilter.java   |  53 -----
 .../graph/ReadGraphConnectionByIdFilter.java    |  50 ----
 .../graph/ReadGraphConnectionByTypeFilter.java  | 100 --------
 .../read/graph/ReadGraphConnectionFilter.java   |  53 -----
 .../search/AbstractElasticSearchFilter.java     | 169 ++++++++++++++
 .../pipeline/read/search/Candidate.java         |  55 +++++
 .../read/search/CandidateEntityFilter.java      | 232 ++++++++++++++++++
 .../pipeline/read/search/CandidateIdFilter.java | 190 +++++++++++++++
 .../search/ElasticsearchCursorSerializer.java   |  40 ++++
 .../read/search/Elasticsearchdiagram.jpg        | Bin 0 -> 316655 bytes
 .../read/search/SearchCollectionFilter.java     |  77 ++++++
 .../read/search/SearchConnectionFilter.java     |  72 ++++++
 .../AbstractReadGraphEdgeByIdFilter.java        |  82 +++++++
 .../read/traverse/AbstractReadGraphFilter.java  | 146 ++++++++++++
 .../read/traverse/EdgeCursorSerializer.java     |  42 ++++
 .../pipeline/read/traverse/EntityIdFilter.java  |  53 +++++
 .../read/traverse/EntityLoadVerifyFilter.java   | 154 ++++++++++++
 .../pipeline/read/traverse/GraphDiagram.jpg     | Bin 0 -> 347711 bytes
 .../traverse/ReadGraphCollectionByIdFilter.java |  49 ++++
 .../traverse/ReadGraphCollectionFilter.java     |  53 +++++
 .../traverse/ReadGraphConnectionByIdFilter.java |  50 ++++
 .../ReadGraphConnectionByTypeFilter.java        |  99 ++++++++
 .../traverse/ReadGraphConnectionFilter.java     |  53 +++++
 .../results/ObservableQueryExecutor.java        |   2 +-
 .../pipeline/cursor/CursorTest.java             |   4 +-
 67 files changed, 2515 insertions(+), 2160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7a56631..be52547 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -38,8 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
@@ -180,8 +179,7 @@ public class CpEntityManager implements EntityManager {
 
     private final AsyncEventService indexService;
 
-    private final FilterFactory filterFactory;
-    private final CollectorFactory collectorFactory;
+    private final PipelineBuilderFactory filterFactory;
 
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
@@ -223,7 +221,7 @@ public class CpEntityManager implements EntityManager {
      */
     public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
                             final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig,
-                            final FilterFactory filterFactory, final CollectorFactory collectorFactory, final UUID applicationId ) {
+                            final PipelineBuilderFactory pipelineBuilderFactory,  final UUID applicationId ) {
         this.entityManagerFig = entityManagerFig;
 
 
@@ -232,10 +230,8 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
         Preconditions.checkNotNull( indexService, "indexService must not be null" );
-        Preconditions.checkNotNull( filterFactory, "filterFactory must not be null" );
-        Preconditions.checkNotNull( collectorFactory, "collectorFactory must not be null" );
-        this.filterFactory = filterFactory;
-        this.collectorFactory = collectorFactory;
+        Preconditions.checkNotNull( pipelineBuilderFactory, "filterFactory must not be null" );
+        this.filterFactory = pipelineBuilderFactory;
 
 
         this.managerCache = managerCache;
@@ -750,7 +746,7 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
 
         CpRelationManager relationManager =
-            new CpRelationManager( metricsFactory, managerCache, filterFactory, collectorFactory, indexService, this, entityManagerFig, applicationId, entityRef );
+            new CpRelationManager( managerCache, filterFactory, indexService, this, entityManagerFig, applicationId, entityRef );
         return relationManager;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5055538..baa1148 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,8 +35,7 @@ import org.apache.commons.lang.StringUtils;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.exception.ConflictException;
 import org.apache.usergrid.persistence.AbstractEntity;
@@ -126,8 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private final EntityIndex entityIndex;
     private final MetricsFactory metricsFactory;
     private final AsyncEventService indexService;
-    private final FilterFactory filterFactory;
-    private final CollectorFactory collectorFactory;
+    private final PipelineBuilderFactory pipelineBuilderFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
                                    final Injector injector ) {
@@ -141,8 +139,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncEventService.class );
-        this.filterFactory = injector.getInstance( FilterFactory.class );
-        this.collectorFactory = injector.getInstance( CollectorFactory.class );
+        this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
         this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 
@@ -203,7 +200,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private EntityManager _getEntityManager( UUID applicationId ) {
         EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig,
 
-            filterFactory,  collectorFactory, applicationId );
+
+            pipelineBuilderFactory, applicationId );
         return em;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6201fe8..1c34929 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,9 +31,11 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
@@ -52,7 +54,6 @@ import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.SimpleRoleRef;
 import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
@@ -121,13 +122,12 @@ public class CpRelationManager implements RelationManager {
     private final AsyncEventService indexService;
 
 
-    private final FilterFactory filterFactory;
-    private final CollectorFactory collectorFactory;
+    private final PipelineBuilderFactory pipelineBuilderFactory;
 
 
 
-    public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache,
-                              final FilterFactory filterFactory, final CollectorFactory collectorFactory, final AsyncEventService indexService,
+    public CpRelationManager(  final ManagerCache managerCache,
+                              final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService,
                               final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId,
                               final EntityRef headEntity ) {
 
@@ -147,8 +147,7 @@ public class CpRelationManager implements RelationManager {
         this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
-        this.filterFactory = filterFactory;
-        this.collectorFactory = collectorFactory;
+        this.pipelineBuilderFactory = pipelineBuilderFactory;
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading head entity {}:{} from app {}", new Object[] {
@@ -629,29 +628,23 @@ public class CpRelationManager implements RelationManager {
         query = adjustQuery( query );
 
 
-        final FilterPipeline<Id> filterPipeline =  new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ).withFilter(  filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) );
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
+                                  .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
 
 
-        final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline;
+        final EntityBuilder results;
 
         if ( query.isGraphSearch() ) {
-            entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) )
-                                            .withFilter( filterFactory.entityLoadFilter() );
+            results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
         }
         else {
             final String entityType = collection.getType();
-
-            entityFilterPipeline = filterPipeline.withFilter(
-                filterFactory.elasticSearchCollectionFilter( query.getQl().get(), collectionName, entityType ) )
-                                            .withFilter( filterFactory.candidateEntityFilter() );
+            results = pipelineBuilder.searchCollection( collectionName, entityType, query.getQl().get() ).loadEntities();
         }
 
 
-        final Observable<ResultsPage> resultsObservable =
-            entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() )
-                                .withCollector( collectorFactory.getResultsPageCollector() ).execute();
-
-        return new ObservableQueryExecutor( resultsObservable ).next();
+        return new ObservableQueryExecutor( results.build() ).next();
     }
 
 
@@ -923,7 +916,7 @@ public class CpRelationManager implements RelationManager {
 
         query = adjustQuery( query );
 
-        final String entityType = query.getEntityType();
+        final Optional<String> entityType = Optional.fromNullable( query.getEntityType() ) ;
         //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
         // -> 1.0 results
 
@@ -935,31 +928,57 @@ public class CpRelationManager implements RelationManager {
         // collector
 
 
-        final FilterPipeline<Id> filterPipeline =
-            new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() )
-                .withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) );
+        final IdBuilder
+            pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ).withLimit( query.getLimit() ).fromId(
+            cpHeadEntity.getId() );
+
+
+
+
+        if(query.getResultsLevel() == Level.REFS){
+            final Observable<ResultsPage<ConnectionRef>> results;
+
+            if(query.isGraphSearch()){
+
+               results = pipelineBuilder.traverseConnection( connection, entityType   ).loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+
+
+            }
+            else
+            {
+                results = pipelineBuilder.searchConnection( connection, query.getQl().get(),entityType) .loadIds().loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
+
+            }
+
+            throw new UnsupportedOperationException( "Implement me" );
+
+        }
+
+
+
+        if(query.getResultsLevel() == Level.IDS){
+
+            throw new UnsupportedOperationException( "Not yet implemented" );
+        }
+
+
+        //we want to load all entities
 
+        final Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> results;
 
-        final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline;
 
         if ( query.isGraphSearch() ) {
-            entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphConnectionFilter( connection ) )
-                                                 .withFilter( filterFactory.entityLoadFilter() );
+            results = pipelineBuilder.traverseConnection( connection, entityType ).loadEntities().build();
         }
 
         else {
 
-            entityFilterPipeline = filterPipeline.withFilter( filterFactory
-                .elasticSearchConnectionFilter( query.getQl().get(), connection, Optional.fromNullable( entityType ) ) )
-                                                 .withFilter( filterFactory.candidateEntityFilter() );
+            results = pipelineBuilder.searchConnection( connection,  query.getQl().get() , entityType).loadEntities().build();
         }
 
 
-        final Observable<ResultsPage> resultsObservable =
-            entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() )
-                                .withCollector( collectorFactory.getResultsPageCollector() ).execute();
 
-        return new ObservableQueryExecutor( resultsObservable ).next();
+        return new ObservableQueryExecutor( results ).next();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
new file mode 100644
index 0000000..089f47d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import rx.Observable;
+
+
+/**
+ * Pipeline for applying our UG domain specific filters.
+ *
+ * Modeled after an observable, with typing to allow input of specific filters
+ *
+ * @param InputType the input type in the current pipeline state
+ */
+public class FilterPipeline<InputType> {
+
+
+    private int idCount = 0;
+
+    private final ApplicationScope applicationScope;
+
+
+    private final RequestCursor requestCursor;
+    private int limit;
+
+    //Generics hell, intentionally without a generic, we check at the filter level
+    private Observable currentObservable;
+
+
+    /**
+     * Create our filter pipeline
+     */
+    public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+
+
+        ValidationUtils.validateApplicationScope( applicationScope );
+        Preconditions.checkNotNull( cursor, "cursor optional is required" );
+        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+
+        this.applicationScope = applicationScope;
+
+        //init our cursor to empty
+        this.requestCursor = new RequestCursor( cursor );
+
+        //set the default limit
+        this.limit = limit;
+
+        //set our observable to start at the application
+        final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
+        this.currentObservable = Observable.just( filter );
+    }
+
+
+    public <OutputType> FilterPipeline<OutputType> withFilter(
+        final PipelineOperation<? super InputType, ? extends OutputType> filter ) {
+
+
+
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+
+        filter.setContext( context );
+
+        //done for clarity
+        idCount++;
+
+        return ( FilterPipeline<OutputType> ) this;
+    }
+
+
+
+    /**
+     * Return the observable of the filter pipeline
+     */
+    public Observable<InputType> execute() {
+        return currentObservable;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index ef696bd..8ec8704 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.corepersistence.pipeline;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 
 import com.google.inject.AbstractModule;
@@ -44,6 +43,5 @@ public class PipelineModule extends AbstractModule {
             //Use Guice to create the builder since we don't really need to do anything
         //other than DI when creating the filters
        install( new FactoryModuleBuilder().build( FilterFactory.class ) );
-        install( new FactoryModuleBuilder().build( CollectorFactory.class ));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
index d2fa16c..3dda22e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -33,7 +33,7 @@ import rx.Observable;
  * @param <T> The input type of the filter value
  * @param <R> The output type of the filter value
  */
-public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+public interface PipelineOperation<T, R> extends Observable.Transformer<T, R> {
 
     void setContext(final PipelineContext pipelineContext);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
new file mode 100644
index 0000000..5cb2eab
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class CandidateBuilder {
+
+
+    private final FilterPipeline<FilterResult<Candidate>> filterPipeline;
+    private final FilterFactory filterFactory;
+
+
+    public CandidateBuilder( final FilterPipeline<FilterResult<Candidate>> filterPipeline,
+                             final FilterFactory filterFactory ) {
+        this.filterPipeline = filterPipeline;
+        this.filterFactory = filterFactory;
+    }
+
+
+    /**
+     * Validates all candidates for the versions by id and sets them
+     * @return
+     */
+    public IdBuilder loadIds(){
+
+        final FilterPipeline<FilterResult<Id>> newFilter = filterPipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() );
+
+        return new IdBuilder( newFilter, filterFactory );
+    }
+
+
+    /**
+     * Load all the candidates as entities and return them
+     * @return
+     */
+    public EntityBuilder loadEntities(){
+
+        final FilterPipeline<FilterResult<Entity>> newFilter = filterPipeline.withFilter( filterFactory.candidateEntityFilter() );
+
+        return new EntityBuilder(newFilter  );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
new file mode 100644
index 0000000..b4ea94e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.ConnectionRef;
+
+import rx.Observable;
+
+
+public class ConnectionBuilder {
+
+
+
+    public Observable<ResultsPage<ConnectionRef>> build(){
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
new file mode 100644
index 0000000..6c0ebc8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
+import org.apache.usergrid.persistence.ConnectionRef;
+
+import rx.Observable;
+
+
+/**
+ * A 1.0 compatibility state.  Should be removed as services are refactored
+ */
+@Deprecated
+public class ConnectionRefBuilder {
+
+
+    private final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter;
+
+    public ConnectionRefBuilder( final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter ) {
+       this.connectionRefFilter = connectionRefFilter;
+    }
+
+
+    /**
+     * Build our connection refs observable
+     * @return
+     */
+    public Observable<ResultsPage<ConnectionRef>> build(){
+        return connectionRefFilter.withFilter( new ResultsPageCollector<>() ).execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
new file mode 100644
index 0000000..07b4586
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import rx.Observable;
+
+
+/**
+ * Builder to build our entity state
+ */
+public class EntityBuilder {
+
+    private final FilterPipeline<FilterResult<Entity>> filterPipeline;
+
+
+    public EntityBuilder( final FilterPipeline<FilterResult<Entity>> filterPipeline ) {
+        this.filterPipeline = filterPipeline;
+    }
+
+
+    /**
+     * Build our results of entities
+     * @return
+     */
+    public Observable<ResultsPage<Entity>> build(){
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
new file mode 100644
index 0000000..12a89ba
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder to transition from emitting Ids in the pipeline into other operations
+ */
+public class IdBuilder {
+
+
+    private final FilterFactory filterFactory;
+    private final FilterPipeline<FilterResult<Id>> filterPipeline;
+
+
+    public IdBuilder( final FilterPipeline<FilterResult<Id>> filterPipeline, final FilterFactory filterFactory ) {
+        this.filterPipeline = filterPipeline;
+        this.filterFactory = filterFactory;
+    }
+
+
+    /**
+     * Load all the ids we encounter when traversing the graph as entities
+     * @return
+     */
+    public EntityBuilder loadEntities() {
+        final FilterPipeline<FilterResult<Entity>> pipeline =
+            filterPipeline.withFilter( filterFactory.entityLoadFilter() );
+
+        return new EntityBuilder( pipeline );
+    }
+
+
+    /**
+     * Traverse all the collection edges from our input Id
+     * @param collectionName
+     * @return
+     */
+    public IdBuilder traverseCollection( final String collectionName ) {
+        final FilterPipeline<FilterResult<Id>> newFilter =
+            filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) );
+
+        return new IdBuilder( newFilter, filterFactory );
+    }
+
+
+    /**
+     * Traverse all connection edges from our input Id
+     * @param connectionName The name of the connection
+     * @param entityType The optional type of the entity
+     * @return
+     */
+    public IdBuilder traverseConnection( final String connectionName, final Optional<String> entityType ) {
+
+        final PipelineOperation<FilterResult<Id>, FilterResult<Id>> filter;
+
+        if(entityType.isPresent()){
+            filter = filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType.get() );
+        }else{
+            filter = filterFactory.readGraphConnectionFilter( connectionName );
+        }
+
+
+        return new IdBuilder( filterPipeline.withFilter(filter ), filterFactory );
+    }
+
+
+    /**
+     * Search all collections from our inputId with the specified criteria
+     * @param collectionName  The name of the collection
+     * @param ql The user's query to execute
+     * @param entityType The type of the entity
+     * @return  Candidate results
+     */
+    public CandidateBuilder searchCollection( final String collectionName, final String ql, final String entityType  ) {
+
+        final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchCollectionFilter(
+            ql, collectionName, entityType ) );
+
+        return new CandidateBuilder( newFilter, filterFactory );
+    }
+
+
+    /**
+     * Search all connections from our input Id and search their connections
+     * @param connectionName The connection name to search
+     * @param ql The query to execute
+     * @param entityType The optional type of entity.  If this is absent, all entity types in the connection will be searched
+     * @return  Candidate results
+     */
+    public CandidateBuilder searchConnection( final String connectionName, final String ql ,  final Optional<String> entityType) {
+
+
+        final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchConnectionFilter(
+            ql, connectionName, entityType ) );
+
+        return new CandidateBuilder( newFilter, filterFactory );
+    }
+
+
+    /**
+     * Create connection refs from our ids.  This is a legacy operation
+     * @param sourceId
+     * @param connectionType
+     * @return
+     */
+    @Deprecated
+    public ConnectionRefBuilder loadConnectionRefs(final Id sourceId, final String connectionType){
+
+        final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter = filterPipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType  ) ).withFilter(
+            new ConnectionRefResumeFilter() );
+        return new ConnectionRefBuilder(connectionRefFilter);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
new file mode 100644
index 0000000..488e9c1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.FilterPipeline;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+
+/**
+ * This is our root builder to build filter pipelines.  All operations should start with an instance of this class, and compose
+ * graph operations by traversing various builders to create our filter pipeline
+ */
+public class PipelineBuilder {
+
+
+
+    private final ApplicationScope applicationScope;
+    private Optional<String> cursor = Optional.absent();
+    private int limit = 10;
+    private final FilterFactory filterFactory;
+
+
+    /**
+     * Create an instance of our I/O operations
+     * @param filterFactory
+     */
+    @Inject
+    public PipelineBuilder( final FilterFactory filterFactory, @Assisted final ApplicationScope applicationScope ) {
+        this.filterFactory = filterFactory;
+        this.applicationScope = applicationScope;
+    }
+
+
+
+
+    /**
+     * Set the cursor to use in our filter pipline
+     * @param cursor
+     * @return
+     */
+    public PipelineBuilder withCursor(final Optional<String> cursor){
+        Preconditions.checkNotNull(cursor, "cursor must not be null");
+        this.cursor = cursor;
+        return this;
+    }
+
+
+    /**
+     * Set our limit
+     * @param limit
+     * @return
+     */
+    public PipelineBuilder withLimit(final int limit){
+        this.limit = limit;
+        return this;
+    }
+
+
+    /**
+     * Set our start point in our graph traversal to the specified entity id. A 1.0 compatibility API.  eventually this should be replaced with
+     * a call that will allow us to start traversing at the application node to any other node in the graph
+     *
+     * @param entityId
+     * @return
+     */
+    @Deprecated
+    public IdBuilder fromId(final Id entityId){
+        FilterPipeline<FilterResult<Id>>  filterPipeline =  new FilterPipeline( applicationScope, this.cursor,limit ).withFilter(  filterFactory.getEntityIdFilter( entityId ) );
+
+        return new IdBuilder( filterPipeline, filterFactory );
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
new file mode 100644
index 0000000..6cb515b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.builder;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+public interface PipelineBuilderFactory {
+
+
+    /**
+     * Create our pipeline builder to allow us to build our pipeline
+     * @param applicationScope
+     * @return
+     */
+    PipelineBuilder create( final ApplicationScope applicationScope );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
index e4d5d44..64cf67f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -29,7 +29,7 @@ import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
  * @param <T> the input type
  * @param <R> The output Type
  */
-public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+public abstract class AbstractFilter<T, R> implements PipelineOperation<T, R> {
 
 
     protected PipelineContext pipelineContext;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
index c68dc4a..6dc4561 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -33,7 +33,7 @@ import com.google.common.base.Optional;
  * @param <R> The response type
  * @param <C> The cursor type
  */
-public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<FilterResult<T>, FilterResult<R>>  {
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
deleted file mode 100644
index e28ce44..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
-
-
-/**
- * A command that is used to reduce our stream of results into a stream of final batch outputs.  When used
- * no further transformation or encoding should occur.  Otherwise EdgePath data will be lost, and serialization cannot occur
- * across requests
- *
- * @param <T>  The input type
- * @param <R> The output type
- */
-public interface Collector<T, R> extends PipelineOperation<T,R> {
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
deleted file mode 100644
index dd200b5..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
-
-
-/**
- * A factory for generating collectors
- */
-public interface CollectorFactory {
-
-
-    /**
-     * Get the results page collector
-     * @return
-     */
-   ResultsPageCollector getResultsPageCollector();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
deleted file mode 100644
index ee01602..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
-
-
-/**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of FilterResult, and emits
- * an observable of FilterResults.  Filters should never emit groups or objects that represent collections.  Items should
- * always be emitted 1 at a time.  It is the responsibility of the collector to aggregate results.
- */
-public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {
-
-    /**
-     * Get the builder for the next phase
-     * @return
-     */
-//    B getNextBuilder();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index d297c2a..ca5695c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,18 +20,20 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -90,10 +92,9 @@ public interface FilterFactory {
      * @param query The query to use when querying the entities in the collection
      * @param collectionName The collection name to use when querying
      */
-    ElasticSearchCollectionFilter elasticSearchCollectionFilter( @Assisted( "query" ) final String query,
-                                                                 @Assisted( "collectionName" )
-                                                                 final String collectionName,
-                                                                 @Assisted( "entityType" ) final String entityType );
+    SearchCollectionFilter searchCollectionFilter( @Assisted( "query" ) final String query,
+                                                   @Assisted( "collectionName" ) final String collectionName,
+                                                   @Assisted( "entityType" ) final String entityType );
 
 
     /**
@@ -103,17 +104,16 @@ public interface FilterFactory {
      * @param connectionName The type of connection to query
      * @param connectedEntityType The type of entity in the connection.  Leave absent to query all entity types
      */
-    ElasticSearchConnectionFilter elasticSearchConnectionFilter( @Assisted( "query" ) final String query,
-                                                                 @Assisted( "connectionName" )
-                                                                 final String connectionName,
-                                                                 @Assisted( "connectedEntityType" )
-                                                                 final Optional<String> connectedEntityType );
+    SearchConnectionFilter searchConnectionFilter( @Assisted( "query" ) final String query,
+                                                   @Assisted( "connectionName" ) final String connectionName,
+                                                   @Assisted( "connectedEntityType" )
+                                                   final Optional<String> connectedEntityType );
 
 
     /**
      * Generate a new instance of the command with the specified parameters
      */
-    EntityLoadFilter entityLoadFilter();
+    EntityLoadVerifyFilter entityLoadFilter();
 
     /**
      * Get the collector for collection candidate results to entities
@@ -127,16 +127,37 @@ public interface FilterFactory {
     CandidateIdFilter candidateResultsIdVerifyFilter();
 
     /**
-     * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not doing full traversals
-     *
      * @param entityId The entity id to emit
+     *
+     * @deprecated A 1.0 api
+     *
+     * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not doing full traversals
      */
+    @Deprecated
     EntityIdFilter getEntityIdFilter( final Id entityId );
 
 
     /**
      * Create a new instance of our entity filter
-     * @return
      */
     EntityResumeFilter entityResumeFilter();
+
+    /**
+     * @deprecated A 1.0 api Create a filter for resuming connection references
+     */
+    @Deprecated
+    ConnectionRefResumeFilter connectionRefResumeFilter();
+
+    /**
+     *
+     * Creates connection refs for 1.0 compatibilty
+     *
+     * @param sourceId The source id
+     * @param connectionType The connection type
+     *
+     * @deprecated A 1.0 api Create a filter for transforming incoming ids into connection refs
+     */
+    @Deprecated
+    ConnectionRefFilter connectionRefFilter( @Assisted( "sourceId" ) final Id sourceId,
+                                             @Assisted( "connectionType" ) final String connectionType );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
deleted file mode 100644
index f8bbdd8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import rx.Observable;
-
-
-/**
- * Pipeline for applying our UG domain specific filters.
- *
- * Modeled after an observable, with typing to allow input of specific filters
- *
- * @param InputType the input type in the current pipeline state
- */
-public class FilterPipeline<InputType> {
-
-
-    private int idCount = 0;
-
-    private final ApplicationScope applicationScope;
-
-
-    private final RequestCursor requestCursor;
-    private int limit;
-
-    //Generics hell, intentionally without a generic, we check at the filter level
-    private Observable currentObservable;
-
-
-    /**
-     * Create our filter pipeline
-     */
-    public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
-
-
-        ValidationUtils.validateApplicationScope( applicationScope );
-        Preconditions.checkNotNull( cursor, "cursor optional is required" );
-        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
-
-
-        this.applicationScope = applicationScope;
-
-        //init our cursor to empty
-        this.requestCursor = new RequestCursor( cursor );
-
-        //set the default limit
-        this.limit = limit;
-
-        //set our observable to start at the application
-        final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
-        this.currentObservable = Observable.just( filter );
-    }
-
-
-    public <OutputType> FilterPipeline<OutputType> withFilter(
-        final Filter<? super InputType, ? extends OutputType> filter ) {
-
-
-        setUp( filter );
-
-        return ( FilterPipeline<OutputType> ) this;
-    }
-
-
-    public <OutputType> FilterPipeline<OutputType> withCollector(
-        final Collector<? super InputType, ? extends OutputType> collector ) {
-
-
-        setUp( collector );
-
-        return ( FilterPipeline<OutputType> ) this;
-    }
-
-
-    private <OutputType> void setUp(
-        final PipelineOperation<? super InputType, ? extends OutputType> pipelineOperation ) {
-        setState( pipelineOperation );
-
-        currentObservable = currentObservable.compose( pipelineOperation );
-    }
-
-
-    /**
-     * Return the observable of the filter pipeline
-     */
-    public Observable<InputType> execute() {
-        return currentObservable;
-    }
-
-
-    /**
-     * Set the id of the state
-     */
-    private void setState( final PipelineOperation pipelineOperation ) {
-
-
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
-
-        pipelineOperation.setContext( context );
-
-        //done for clarity
-        idCount++;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
deleted file mode 100644
index 0f73fb9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class ReadFilterFactoryImpl { //implements ReadFilterFactory {
-
-//
-//    private final GraphManagerFactory graphManagerFactory;
-//    private final EntityIndexFactory entityIndexFactory;
-//    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-//
-//
-//    @Inject
-//    public ReadFilterFactoryImpl( final GraphManagerFactory graphManagerFactory,
-//                                  final EntityIndexFactory entityIndexFactory,
-//                                  final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-//
-//
-//        this.graphManagerFactory = graphManagerFactory;
-//        this.entityIndexFactory = entityIndexFactory;
-//        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-//    }
-//
-//
-//    @Override
-//    public ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ) {
-//        return new ReadGraphCollectionFilter( graphManagerFactory, collectionName );
-//    }
-//
-//
-//    @Override
-//    public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName,
-//                                                                        final Id targetId ) {
-//        return new ReadGraphCollectionByIdFilter( graphManagerFactory, collectionName, targetId );
-//    }
-//
-//
-//    @Override
-//    public ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ) {
-//        return new ReadGraphConnectionFilter( graphManagerFactory, connectionName );
-//    }
-//
-//
-//    @Override
-//    public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName,
-//                                                                       final String entityType ) {
-//        return new ReadGraphConnectionByTypeFilter( graphManagerFactory, connectionName, entityType );
-//    }
-//
-//
-//    @Override
-//    public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName,
-//                                                                        final Id targetId ) {
-//        return new ReadGraphConnectionByIdFilter( graphManagerFactory, connectionName, targetId );
-//    }
-//
-//
-//    @Override
-//    public EntityLoadCollector entityLoadCollector() {
-//        return new EntityLoadCollector( entityCollectionManagerFactory );
-//    }
-//
-//
-//    /**
-//     * TODO refactor these impls to use RX internally, as well as remove the query object
-//     */
-//    @Override
-//    public QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector(
-//        final String collectionName, final String query ) {
-//
-//        final Query queryObject = Query.fromQL( query );
-//
-//        final QueryCollectionElasticSearchCollectorFilter filter =
-//            new QueryCollectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory,
-//                collectionName, queryObject );
-//
-//        return filter;
-//    }
-//
-//
-//    @Override
-//    public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector(
-//        final String connectionName, final String query ) {
-//
-//        final Query queryObject = Query.fromQL( query );
-//
-//        final QueryConnectionElasticSearchCollectorFilter filter =
-//            new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory,
-//                connectionName, queryObject );
-//
-//        return filter;
-//    }
-//
-//
-//    @Override
-//    public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector(
-//        final String connectionName, final String connectionEntityType, final String query ) {
-//
-//        final Query queryObject = Query.fromQL( query );
-//        queryObject.setConnectionType( connectionEntityType );
-//
-//        final QueryConnectionElasticSearchCollectorFilter filter =
-//            new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory,
-//                connectionName, queryObject );
-//
-//        return filter;
-//    }
-//
-//
-//    @Override
-//    public EntityIdFilter getEntityIdFilter( final Id entityId ) {
-//        return new EntityIdFilter( entityId );
-//    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 1810d65..6b3a086 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -29,24 +29,26 @@ import org.apache.usergrid.persistence.model.entity.Entity;
 /**
  * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be
  * considered a "page" of results.  A hold over from 1.0.  We shouldn't need this when we fully move away from the EM/RM
+ *
+ * @param T the type of results page
  */
-public class ResultsPage {
+public class ResultsPage<T> {
 
-    private final List<Entity> entityList;
+    private final List<T> entityList;
 
     private final int limit;
 
     private final ResponseCursor responseCursor;
 
 
-    public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+    public ResultsPage( final List<T> entityList, final ResponseCursor responseCursor, final int limit ) {
         this.entityList = entityList;
         this.responseCursor = responseCursor;
         this.limit = limit;
     }
 
 
-    public List<Entity> getEntityList() {
+    public List<T> getEntityList() {
         return entityList;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
deleted file mode 100644
index 1c5175d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.collect;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractCollector<T, R> implements Collector<T, R> {
-
-
-    protected PipelineContext pipelineContext;
-
-
-    @Override
-    public void setContext( final PipelineContext pipelineContext ) {
-        this.pipelineContext = pipelineContext;
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java
new file mode 100644
index 0000000..392e33a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This class only exists for 1.0 compatibility, remove once services no longer need connection refs
+ */
+public class ConnectionRefFilter extends AbstractFilter<FilterResult<Id>, FilterResult<ConnectionRef>> {
+
+
+    private final Id sourceId;
+    private final String connectionType;
+
+
+    @Inject
+    public ConnectionRefFilter( @Assisted( "sourceId" ) final Id sourceId,
+                                @Assisted( "connectionType" ) final String connectionType ) {
+        this.sourceId = sourceId;
+        this.connectionType = connectionType;
+    }
+
+
+    @Override
+    public Observable<FilterResult<ConnectionRef>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
+
+        return filterResultObservable.map( targetResult -> {
+
+            final Id targetId = targetResult.getValue();
+            final ConnectionRef ref =
+                new ConnectionRefImpl( sourceId.getType(), sourceId.getUuid(), connectionType, targetId.getType(),
+                    targetId.getUuid() );
+
+            return new FilterResult<>( ref, Optional.<EdgePath>absent() );
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java
new file mode 100644
index 0000000..5c3a93e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor.  This will filter the first result, only if
+ * it matches the Id that was set.   This is a 1.0 compatibility implementation, and should be removed when services no
+ * longer depends on connection refs
+ */
+public class ConnectionRefResumeFilter extends AbstractPathFilter<ConnectionRef, ConnectionRef, Id> {
+
+
+    @Override
+    public Observable<FilterResult<ConnectionRef>> call(
+        final Observable<FilterResult<ConnectionRef>> filterResultObservable ) {
+
+        //filter only the first id, then map into our path for our next pass
+
+
+        return filterResultObservable.skipWhile( filterResult -> {
+
+            final Optional<Id> startFromCursor = getSeekValue();
+
+
+            if ( !startFromCursor.isPresent() ) {
+                return false;
+            }
+
+            final ConnectedEntityRef ref = filterResult.getValue().getTargetRefs();
+
+            final Id entityId = startFromCursor.get();
+
+            return entityId.getUuid().equals( ref.getUuid() ) && entityId.getType().equals( ref.getType() );
+        } ).map( filterResult -> {
+
+
+            final ConnectionRef entity = filterResult.getValue();
+
+            final String type = entity.getTargetRefs().getType();
+            final UUID uuid = entity.getTargetRefs().getUuid();
+
+            final Id entityId = new SimpleId( uuid, type );
+
+            return createFilterResult( entity, entityId, filterResult.getPath() );
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Id> getCursorSerializer() {
+        return IdCursorSerializer.INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
index 2917b61..f545631 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java
@@ -22,7 +22,6 @@ package org.apache.usergrid.corepersistence.pipeline.read.collect;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -36,7 +35,7 @@ import rx.Observable;
  * A filter that is used when we can potentially serialize pages via cursor.  This will filter the first result, only if
  * it matches the Id that was set
  */
-public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> {
+public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id>  {
 
 
     @Override


Mime
View raw message