usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [04/12] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.
Date Mon, 04 May 2015 17:32:54 GMT
Massive refactor.  Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd983d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd983d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd983d66

Branch: refs/heads/USERGRID-609
Commit: cd983d66260222985431a775454183c2ed2305ea
Parents: 6d4847a
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Apr 30 17:40:52 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Apr 30 17:40:52 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   5 +-
 .../corepersistence/pipeline/Pipeline.java      |   9 +-
 .../pipeline/PipelineContext.java               |  16 +-
 .../pipeline/PipelineOperation.java             |  39 ++++
 .../pipeline/PipelineResult.java                |  57 -----
 .../pipeline/cursor/ResponseCursor.java         |  81 ++++---
 .../pipeline/read/AbstractFilter.java           |  45 ++++
 .../pipeline/read/AbstractPathFilter.java       | 109 +++++++++
 .../read/AbstractPipelineOperation.java         |  44 ----
 .../pipeline/read/AbstractSeekingFilter.java    | 102 --------
 .../pipeline/read/CandidateResultsFilter.java   |  31 ---
 .../pipeline/read/Collector.java                |  13 +-
 .../pipeline/read/CollectorFactory.java         |  12 +-
 .../corepersistence/pipeline/read/EdgePath.java |  79 +++++++
 .../corepersistence/pipeline/read/Filter.java   |   9 +-
 .../pipeline/read/FilterFactory.java            |  31 ++-
 .../pipeline/read/FilterResult.java             |  56 +++++
 .../pipeline/read/PipelineOperation.java        |  38 ---
 .../pipeline/read/ReadPipelineBuilder.java      |   5 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  75 +++---
 .../pipeline/read/ResultsPage.java              |  26 ++-
 .../read/collect/AbstractCollector.java         |  46 ++++
 .../read/collect/ResultsPageCollector.java      |  80 +++++++
 .../AbstractElasticSearchFilter.java            |  47 ++--
 .../pipeline/read/elasticsearch/Candidate.java  |  55 +++++
 .../elasticsearch/CandidateEntityFilter.java    | 234 +++++++++++++++++++
 .../read/elasticsearch/CandidateIdFilter.java   | 201 ++++++++++++++++
 .../CandidateResultsEntityResultsCollector.java | 217 -----------------
 .../CandidateResultsIdVerifyFilter.java         | 193 ---------------
 .../impl/CollectionRefsVerifier.java            |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  65 ------
 .../impl/ConnectionRefsVerifier.java            |  59 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  73 ------
 .../impl/ElasticSearchQueryExecutor.java        | 224 ------------------
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ----------
 .../elasticsearch/impl/FilteringLoader.java     | 219 -----------------
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ----
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ----
 .../impl/ResultsLoaderFactory.java              |  41 ----
 .../elasticsearch/impl/ResultsVerifier.java     |  52 -----
 .../elasticsearch/impl/VersionVerifier.java     |  85 -------
 .../pipeline/read/entity/EntityIdFilter.java    |  53 -----
 .../read/entity/EntityLoadCollector.java        |  94 --------
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  12 +-
 .../read/graph/AbstractReadGraphFilter.java     |  15 +-
 .../pipeline/read/graph/EntityIdFilter.java     |  54 +++++
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ++++++++++++
 .../graph/ReadGraphConnectionByTypeFilter.java  |  20 +-
 .../results/ObservableQueryExecutor.java        |  24 +-
 .../pipeline/cursor/CursorTest.java             |  20 +-
 .../persistence/index/CandidateResults.java     |  11 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   7 +-
 52 files changed, 1402 insertions(+), 2096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3119934..2790ee1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,7 +32,6 @@ import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
@@ -648,7 +647,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }
@@ -917,7 +916,7 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-        final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute();
+        final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute();
 
         return new ObservableQueryExecutor( resultsObservable ).next();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index bc93b6c..df6a218 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
@@ -47,7 +46,6 @@ public class Pipeline<R> {
     private final List<PipelineOperation> idPipelineOperationList;
     private final Collector<?, R> collector;
     private final RequestCursor requestCursor;
-    private final ResponseCursor responseCursor;
 
     private final int limit;
 
@@ -69,7 +67,6 @@ public class Pipeline<R> {
         this.limit = limit;
 
         this.requestCursor = new RequestCursor( cursor );
-        this.responseCursor = new ResponseCursor();
     }
 
 
@@ -77,7 +74,7 @@ public class Pipeline<R> {
      * Execute the pipline construction, returning an observable of results
      * @return
      */
-    public Observable<PipelineResult<R>> execute(){
+    public Observable<R> execute(){
 
 
         Observable traverseObservable = Observable.just( applicationScope.getApplication() );
@@ -99,7 +96,7 @@ public class Pipeline<R> {
 
 
         //append the optional cursor into the response for the caller to use
-        return response.map( result -> new PipelineResult<>( result, responseCursor ) );
+        return response;
     }
 
 
@@ -111,7 +108,7 @@ public class Pipeline<R> {
     private void setState( final PipelineOperation pipelineOperation ) {
 
 
-        final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor,
+        final PipelineContext context = new PipelineContext( applicationScope, requestCursor,
             limit, idCount );
 
         pipelineOperation.setContext( context );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 325f876..018abb7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -38,16 +38,13 @@ public class PipelineContext {
     private final int id;
     private final ApplicationScope applicationScope;
     private final RequestCursor requestCursor;
-    private final ResponseCursor responseCursor;
     private final int limit;
 
 
-    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor,
-                            final ResponseCursor responseCursor, final int limit, final int id ) {
+    public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
 
         this.applicationScope = applicationScope;
         this.requestCursor = requestCursor;
-        this.responseCursor = responseCursor;
         this.limit = limit;
         this.id = id;
     }
@@ -64,7 +61,7 @@ public class PipelineContext {
 
 
     /**
-     * Get our cursor value if present
+     * Get our cursor value if present from our pipline
      * @param serializer
      */
     public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) {
@@ -73,15 +70,6 @@ public class PipelineContext {
         return Optional.fromNullable( value );
     }
 
-
-    /**
-     * Set the cursor value into our resposne
-     */
-    public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) {
-        responseCursor.setCursor( id, value, serializer );
-    }
-
-
     /**
      * Get the limit for this execution
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
new file mode 100644
index 0000000..d2fa16c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+
+import rx.Observable;
+
+
+/**
+ * Interface for filtering commands.  All filters must take an observable of Id's as an input.  Output is then determined by subclasses.
+  * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
+  * pipeline
+ * @param <T> The input type of the filter value
+ * @param <R> The output type of the filter value
+ */
+public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> {
+
+    void setContext(final PipelineContext pipelineContext);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
deleted file mode 100644
index fe8604e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Intermediate observable that will return results, as well as an optional cursor
- * @param <R>
- */
-public class PipelineResult<R> {
-
-
-    private final R result;
-
-    private final ResponseCursor responseCursor;
-
-
-    public PipelineResult( final R result, final ResponseCursor responseCursor ) {
-        this.result = result;
-        this.responseCursor = responseCursor;
-    }
-
-
-    /**
-     * If the user requests our cursor, return the cursor
-     * @return
-     */
-    public Optional<String> getCursor(){
-        return this.responseCursor.encodeAsString();
-    }
-
-    public R getResult(){
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index f1c8c24..dbd8b88 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,12 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.io.Serializable;
 import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
 
-import com.fasterxml.jackson.core.Base64Variant;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,71 +39,72 @@ public class ResponseCursor {
 
     private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper();
 
+
     /**
-     * We use a map b/c some indexes might be skipped
+     * The pointer to the first edge path.  Evaluation is lazily performed in the case the caller does not care about
+     * the cursor.
      */
-    private Map<Integer, CursorEntry<?>> cursors = new HashMap<>();
+    private final Optional<EdgePath> edgePath;
 
+    private Optional<String> encodedValue = null;
 
-    /**
-     * Set the possible cursor value into the index. DOES NOT parse the cursor.  This is intentional for performance
-     */
-    public <T extends Serializable> void setCursor( final int id, final T cursor,
-                                                    final CursorSerializer<T> serializer ) {
 
-        final CursorEntry<T> newEntry = new CursorEntry<>( cursor, serializer );
-        cursors.put( id, newEntry );
-    }
+    public ResponseCursor( final Optional<EdgePath> edgePath ) {this.edgePath = edgePath;}
 
 
     /**
-     * now we're done, encode as a string
+     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using streams, we dont' want to take the
+     * time to calculate it
      */
     public Optional<String> encodeAsString() {
-        try {
 
-            if(cursors.isEmpty()){
-                return Optional.absent();
-            }
+        //always return cached if we are called 2x
+        if ( encodedValue != null ) {
+            return encodedValue;
+        }
+
+        if ( !edgePath.isPresent() ) {
+            encodedValue = Optional.absent();
+            return encodedValue;
+        }
+
+
+        try {
 
+            //no edge path, short circuit
 
             final ObjectNode map = MAPPER.createObjectNode();
 
-            for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) {
 
-                final CursorEntry cursorEntry = entry.getValue();
+            Optional<EdgePath> current = edgePath;
 
-                final JsonNode serialized = cursorEntry.serializer.toNode( MAPPER, cursorEntry.cursor );
 
-                map.put( entry.getKey().toString(), serialized );
-            }
+            //traverse each edge and add them to our json
+            do {
+
+                final EdgePath edgePath = current.get();
+                final Object cursorValue = edgePath.getCursorValue();
+                final CursorSerializer serializer = edgePath.getSerializer();
+                final int filterId = edgePath.getFilterId();
+
+                final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+                map.put( String.valueOf( filterId ), serialized );
 
+                current = current.get().getPrevious();
+            }
+            while ( current.isPresent() );
 
-            final byte[] output = MAPPER.writeValueAsBytes(map);
+            final byte[] output = MAPPER.writeValueAsBytes( map );
 
             //generate a base64 url save string
             final String value = Base64.getUrlEncoder().encodeToString( output );
 
-            return Optional.of( value );
-
+            encodedValue =  Optional.of( value );
         }
         catch ( JsonProcessingException e ) {
             throw new CursorParseException( "Unable to serialize cursor", e );
         }
-    }
 
-
-    /**
-     * Interal pointer to the cursor and it's serialzed value
-     */
-    private static final class CursorEntry<T> {
-        private final T cursor;
-        private final CursorSerializer<T> serializer;
-
-
-        private CursorEntry( final T cursor, final CursorSerializer<T> serializer ) {
-            this.cursor = cursor;
-            this.serializer = serializer;
-        }
+        return encodedValue;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
new file mode 100644
index 0000000..e4d5d44
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractFilter<T, R> implements Filter<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
new file mode 100644
index 0000000..c68dc4a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Abstract class for filters to extend that require a cursor
+ * @param <T> The input type
+ * @param <R> The response type
+ * @param <C> The cursor type
+ */
+public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> {
+
+
+
+    //TODO not a big fan of this, but not sure how to build resume otherwise
+    private CursorSeek<C> cursorSeek;
+
+
+    /**
+     * Return the parsed value of the cursor from the last request, if it exists
+     */
+    protected Optional<C> getSeekValue() {
+
+        if(cursorSeek == null) {
+            final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
+            cursorSeek = new CursorSeek<>( cursor );
+        }
+
+        return cursorSeek.getSeekValue();
+
+    }
+
+
+    /**
+     * Sets the cursor into our pipeline context
+     */
+    protected FilterResult<R> createFilterResult( final R emit, final C cursorValue, final Optional<EdgePath> parent ){
+
+
+        //create a current path, and append our parent path to it
+        final EdgePath<C> newEdgePath =
+            new EdgePath<>( pipelineContext.getId(), cursorValue, getCursorSerializer(), parent );
+
+        //emit our value with the parent path
+        return new FilterResult<>( emit, Optional.of( newEdgePath ) );
+
+    }
+
+
+    /**
+     * Return the class to be used when parsing the cursor
+     */
+    protected abstract CursorSerializer<C> getCursorSerializer();
+
+
+    /**
+     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
+     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
+     */
+    protected static class CursorSeek<C> {
+
+        private Optional<C> seek;
+
+        private CursorSeek(final Optional<C> cursorValue){
+            seek = cursorValue;
+        }
+
+
+        /**
+         * Get the seek value to use when searching
+         * @return
+         */
+        public Optional<C> getSeekValue(){
+            final Optional<C> toReturn = seek;
+
+            seek = Optional.absent();
+
+            return toReturn;
+        }
+
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
deleted file mode 100644
index 8d7f106..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-
-/**
- * Basic functionality for our commands to handle cursor IO
- * @param <T> the input type
- * @param <R> The output Type
- */
-public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> {
-
-
-    protected PipelineContext pipelineContext;
-
-
-    @Override
-    public void setContext( final PipelineContext pipelineContext ) {
-        this.pipelineContext = pipelineContext;
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
deleted file mode 100644
index c23a1b7..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import java.io.Serializable;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Abstract class for filters to extend that require a cursor
- * @param <T> The input type
- * @param <R> The response type
- * @param <C> The cursor type
- */
-public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> {
-
-
-
-    //TODO not a big fan of this, but not sure how to build resume otherwise
-    private CursorSeek<C> cursorSeek;
-
-
-    /**
-     * Return the parsed value of the cursor from the last request, if it exists
-     */
-    protected Optional<C> getSeekValue() {
-
-        if(cursorSeek == null) {
-            final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() );
-            cursorSeek = new CursorSeek<>( cursor );
-        }
-
-        return cursorSeek.getSeekValue();
-
-    }
-
-
-    /**
-     * Sets the cursor into our pipeline context
-     * @param newValue
-     */
-    protected void setCursor(final C newValue){
-        pipelineContext.setCursorValue( newValue, getCursorSerializer() );
-    }
-
-
-    /**
-     * Return the class to be used when parsing the cursor
-     */
-    protected abstract CursorSerializer<C> getCursorSerializer();
-
-
-    /**
-     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
-     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
-     */
-    protected static class CursorSeek<C> {
-
-        private Optional<C> seek;
-
-        private CursorSeek(final Optional<C> cursorValue){
-            seek = cursorValue;
-        }
-
-
-        /**
-         * Get the seek value to use when searching
-         * @return
-         */
-        public Optional<C> getSeekValue(){
-            final Optional<C> toReturn = seek;
-
-            seek = Optional.absent();
-
-            return toReturn;
-        }
-
-
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
deleted file mode 100644
index 4e6d06e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of ids, and emits
- * an observable of ids
- */
-public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
index 69d929c..e28ce44 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java
@@ -20,11 +20,18 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+
+
 /**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
+ * A command that is used to reduce our stream of results into a stream of final batch outputs.  When used
+ * no further transformation or encoding should occur.  Otherwise EdgePath data will be lost, and serialization cannot occur
+ * across requests
+ *
+ * @param <T>  The input type
+ * @param <R> The output type
  */
-public interface Collector<T, R> extends PipelineOperation<T, R> {
+public interface Collector<T, R> extends PipelineOperation<T,R> {
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
index 6893b34..dd200b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java
@@ -20,8 +20,7 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 
 
 /**
@@ -29,16 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollec
  */
 public interface CollectorFactory {
 
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    EntityLoadCollector entityLoadCollector();
 
     /**
-     * Get the collector for collection candidate results to entities
+     * Get the results page collector
      * @return
      */
-    CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector();
-
+   ResultsPageCollector getResultsPageCollector();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
new file mode 100644
index 0000000..c560fad
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/EdgePath.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A path from our input element to our emitted element.  A list of EdgePaths comprise a path through the graph.  The chains of edge paths will result
+ * in a cursor when aggregated.  If a graph traversal is the following
+ *
+ * applicationId(1) - "users" -> userId(2) - "devices" -> deviceId(3).  There would be 2 EdgePath
+ *
+ *  EdgePath("users"->userId(2)) <- parent - EdgePath("devices" -> deviceId(3))
+ */
+public class EdgePath<C> {
+
+
+    private final int filterId;
+    private final C cursorValue;
+    private final CursorSerializer<C> serializer;
+    private final Optional<EdgePath> previous;
+
+
+    /**
+     *
+     * @param filterId The id of the filter that generated this path
+     * @param cursorValue The value to resume seeking on the path
+     * @param serializer The serializer to serialize the value
+     * @param parent The parent graph path edge to reach this path
+     */
+    public EdgePath( final int filterId, final C cursorValue, final CursorSerializer<C> serializer,
+                     final Optional<EdgePath> parent ) {
+        this.filterId = filterId;
+        this.cursorValue = cursorValue;
+        this.serializer = serializer;
+        this.previous = parent;
+    }
+
+
+    public C getCursorValue() {
+        return cursorValue;
+    }
+
+
+    public int getFilterId() {
+        return filterId;
+    }
+
+
+    public Optional<EdgePath> getPrevious() {
+        return previous;
+    }
+
+
+    public CursorSerializer<C> getSerializer() {
+        return serializer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index ace62db..054a85a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -20,11 +20,12 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 
 
 /**
- * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of ids, and emits
- * an observable of ids
+ * Traverses edges in the graph.  Either by query or graph traversal.  Take an observable of FilterResult, and emits
+ * an observable of FilterResults.  Filters should never emit groups or objects that represent collections.  Items should
+ * always be emitted 1 at a time.  It is the responsibility of the collector to aggregate results.
  */
-public interface Filter<T, R> extends PipelineOperation<T, R> {}
+public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 078d981..c465516 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -20,10 +20,12 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
@@ -43,6 +45,7 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param collectionName The collection name to use when reading the graph
      */
     ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName );
@@ -57,12 +60,14 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param connectionName The connection name to use when traversing the graph
      */
     ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param connectionName The connection name to use when traversing the graph
      * @param entityType The entity type to use when traversing the graph
      */
@@ -72,13 +77,15 @@ public interface FilterFactory {
 
     /**
      * Read a connection directly between two identifiers
+     *
      * @param connectionName The connection name to use when traversing the graph
-     * @param targetId  The target Id to use when traversing the graph
+     * @param targetId The target Id to use when traversing the graph
      */
     ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId );
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param query The query to use when querying the entities in the collection
      * @param collectionName The collection name to use when querying
      */
@@ -90,6 +97,7 @@ public interface FilterFactory {
 
     /**
      * Generate a new instance of the command with the specified parameters
+     *
      * @param query The query to use when querying the entities in the connection
      * @param connectionName The type of connection to query
      * @param connectedEntityType The type of entity in the connection.  Leave absent to query all entity types
@@ -102,13 +110,24 @@ public interface FilterFactory {
 
 
     /**
-     * Get a candidate ids verifier for collection results.  Should be inserted into pipelines where a query filter is an intermediate step,
-     * not a final filter before collectors
+     * Generate a new instance of the command with the specified parameters
+     */
+    EntityLoadFilter entityLoadFilter();
+
+    /**
+     * Get the collector for collection candidate results to entities
      */
-    CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter();
+    CandidateEntityFilter candidateEntityFilter();
+
+    /**
+     * Get a candidate ids verifier for collection results.  Should be inserted into pipelines where a query filter is
+     * an intermediate step, not a final filter before collectors
+     */
+    CandidateIdFilter candidateResultsIdVerifyFilter();
 
     /**
      * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not doing full traversals
+     *
      * @param entityId The entity id to emit
      */
     EntityIdFilter getEntityIdFilter( final Id entityId );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
new file mode 100644
index 0000000..3c41a2b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A bean that is passed between filters with immutable cursor state
+ * @param <T>
+ */
+public class FilterResult<T> {
+    private final T value;
+    private final Optional<EdgePath> path;
+
+
+    /**
+     * Create a new immutable filtervalue
+     * @param value The value the filter emits
+     * @param path The path to this value, if created
+     */
+    public FilterResult( final T value, final Optional<EdgePath> path ) {
+        this.value = value;
+        this.path = path;
+    }
+
+
+    public T getValue() {
+        return value;
+    }
+
+
+    public Optional<EdgePath> getPath() {
+        return path;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
deleted file mode 100644
index 28bba36..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
-
-import rx.Observable;
-
-
-/**
- * Interface for filtering commands.  All filters must take an observable of Id's as an input.  Output is then determined by subclasses.
-  * This takes an input of Id, performs some operation, and emits values for further processing in the Observable
-  * pipeline
- * @param <T> The input type
- * @param <R>
- */
-public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> {
-
-    void setContext(final PipelineContext pipelineContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index 25ab03e..d0e87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -20,9 +20,6 @@
 package org.apache.usergrid.corepersistence.pipeline.read;
 
 
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -103,5 +100,5 @@ public interface ReadPipelineBuilder {
      * Load our entity results when our previous filter calls graph
      * @return
      */
-    Observable<PipelineResult<ResultsPage>> execute();
+    Observable<ResultsPage> execute();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 4ecfb47..ffb9f7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
-import org.apache.usergrid.corepersistence.pipeline.PipelineResult;
-import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter;
+import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,6 +52,8 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
     private final ApplicationScope applicationScope;
 
+    private final CollectorFactory collectorFactory;
+
 
     /**
      * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are
@@ -70,6 +72,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         this.filterFactory = filterFactory;
 
         this.applicationScope = applicationScope;
+        this.collectorFactory = collectorFactory;
 
         //init our cursor to empty
         this.cursor = Optional.absent();
@@ -78,7 +81,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         this.limit = DEFAULT_LIMIT;
 
 
-        this.collectorState = new CollectorState( collectorFactory );
+        this.collectorState = new CollectorState( );
 
         this.filters = new ArrayList<>();
     }
@@ -120,7 +123,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
 
-        this.collectorState.setEntityLoaderCollector();
+        this.collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -132,7 +135,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphCollectionFilter( collectionName ) );
 
-        this.collectorState.setEntityLoaderCollector();
+        this.collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -147,7 +150,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) );
 
-        this.collectorState.setCandidateResultsEntityResultsCollector();
+        this.collectorState.setCandidateEntityFilter();
 
         return this;
     }
@@ -159,7 +162,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         ValidationUtils.verifyIdentity( entityId );
 
         filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -169,7 +172,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
     public ReadPipelineBuilder getConnection( final String connectionName ) {
         Preconditions.checkNotNull( connectionName, "connectionName must not be null" );
         filters.add( filterFactory.readGraphConnectionFilter( connectionName ) );
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
 
         return this;
     }
@@ -182,7 +185,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
         filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) );
 
-        collectorState.setEntityLoaderCollector();
+        collectorState.setIdEntityLoaderFilter();
         return this;
     }
 
@@ -196,17 +199,25 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
         Preconditions.checkNotNull( query, "query must not be null" );
 
         filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) );
-        collectorState.setCandidateResultsEntityResultsCollector();
+        collectorState.setCandidateEntityFilter();
         return this;
     }
 
 
     @Override
-    public Observable<PipelineResult<ResultsPage>> execute() {
+    public Observable<ResultsPage> execute() {
 
         ValidationUtils.validateApplicationScope( applicationScope );
 
-        final Collector<?, ResultsPage> collector = collectorState.getCollector();
+
+        //add our last filter that will generate entities
+        final Filter<?, Entity> finalFilter = collectorState.getFinalFilter();
+
+        filters.add( finalFilter );
+
+
+        //execute our collector
+        final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector();
 
         Preconditions.checkNotNull( collector,
             "You have not specified an operation that creates a collection filter.  This is required for loading "
@@ -229,46 +240,52 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
      * A mutable state for our collectors.  Rather than create a new instance each time, we create a singleton
      * collector
      */
-    private static final class CollectorState {
-        private final CollectorFactory collectorFactory;
+    private final class CollectorState {
+
 
-        private EntityLoadCollector entityLoadCollector;
+        private EntityLoadFilter entityLoadCollector;
 
-        private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector;
+        private CandidateEntityFilter candidateEntityFilter;
 
+        private Filter entityLoadFilter;
 
-        private Collector<?, ResultsPage> collector = null;
 
 
-        private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;}
+        private CollectorState( ){}
 
 
-        public void setEntityLoaderCollector() {
+        /**
+         * Set our final filter to be a load entity by Id filter
+         */
+        public void setIdEntityLoaderFilter() {
             if ( entityLoadCollector == null ) {
-                entityLoadCollector = collectorFactory.entityLoadCollector();
+                entityLoadCollector = filterFactory.entityLoadFilter();
             }
 
 
-            collector = entityLoadCollector;
+            entityLoadFilter = entityLoadCollector;
         }
 
 
-        public void setCandidateResultsEntityResultsCollector() {
-            if ( candidateResultsEntityResultsCollector == null ) {
-                candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector();
+        /**
+         * Set our final filter to be a load entity by candidate filter
+         */
+        public void setCandidateEntityFilter() {
+            if ( candidateEntityFilter == null ) {
+                candidateEntityFilter = filterFactory.candidateEntityFilter();
             }
 
-            collector = candidateResultsEntityResultsCollector;
+            entityLoadFilter = candidateEntityFilter;
         }
 
 
         public void clear() {
-            collector = null;
+            entityLoadFilter = null;
         }
 
 
-        public Collector<?, ResultsPage> getCollector() {
-            return collector;
+        public Filter<?, Entity> getFinalFilter() {
+            return entityLoadFilter;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 198ac67..1810d65 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -22,18 +22,28 @@ package org.apache.usergrid.corepersistence.pipeline.read;
 
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 
 /**
- * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be considered a "page" of results.
+ * An encapsulation of entities as a group of responses.  Ordered by the requesting filters.  Each set should be
+ * considered a "page" of results.  A hold over from 1.0.  We shouldn't need this when we fully move away from the EM/RM
  */
 public class ResultsPage {
 
     private final List<Entity> entityList;
 
+    private final int limit;
 
-    public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;}
+    private final ResponseCursor responseCursor;
+
+
+    public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) {
+        this.entityList = entityList;
+        this.responseCursor = responseCursor;
+        this.limit = limit;
+    }
 
 
     public List<Entity> getEntityList() {
@@ -43,9 +53,15 @@ public class ResultsPage {
 
     /**
      * Return true if the results page is empty
-     * @return
      */
-    public boolean isEmpty(){
-        return entityList == null || entityList.isEmpty();
+    public boolean hasMoreResults() {
+        return entityList != null && entityList.size() == limit;
+    }
+
+
+
+
+    public ResponseCursor getResponseCursor() {
+        return responseCursor;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
new file mode 100644
index 0000000..1c5175d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ * @param <T> the input type
+ * @param <R> The output Type
+ */
+public abstract class AbstractCollector<T, R> implements Collector<T, R> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
new file mode 100644
index 0000000..84654aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
+import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Takes entities and collects them into results.  This mostly exists for 1.0 compatibility.  Eventually this will
+ * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ */
+public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
+    implements Collector<Entity, ResultsPage> {
+
+
+    @Override
+    public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+
+        final int limit = pipelineContext.getLimit();
+
+        return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
+            () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
+                collector.add( entity );
+            } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+            new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
+    }
+
+
+    /**
+     * A collector that will aggregate our results together
+     */
+    private static class ResultsPageWithCursorCollector {
+
+
+        private final List<Entity> results;
+
+        private Optional<EdgePath> lastPath;
+
+
+        private ResultsPageWithCursorCollector( final int limit ) {
+            this.results = new ArrayList<>( limit );
+        }
+
+
+        public void add( final FilterResult<Entity> result ) {
+            this.results.add( result.getValue() );
+            this.lastPath = result.getPath();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
index eac8a65..004a696 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
@@ -24,11 +24,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
 import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
@@ -44,8 +46,8 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer>
-    implements CandidateResultsFilter {
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
+    implements Filter<Id, Candidate> {
 
     private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
 
@@ -66,7 +68,7 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
 
     @Override
-    public Observable<CandidateResults> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
 
         //get the graph manager
         final ApplicationEntityIndex applicationEntityIndex =
@@ -80,12 +82,12 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return observable.flatMap( idFilterResult -> {
 
-            final SearchEdge searchEdge = getSearchEdge( id );
+            final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
 
 
-            final Observable<CandidateResults> candidates = Observable.create( subscriber -> {
+            final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
 
                 //our offset to our start value.  This will be set the first time we emit
                 //after we receive new ids, we want to reset this to 0
@@ -98,19 +100,14 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
 
                 subscriber.onStart();
 
-                //emit while we have values from ES
-                while ( true ) {
+                //emit while we have values from ES and someone is subscribed
+                while ( !subscriber.isUnsubscribed() ) {
 
 
                     try {
                         final CandidateResults candidateResults =
                             applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
 
-                        currentOffSet += candidateResults.size();
-
-                        //set the cursor for the next value
-                        setCursor( currentOffSet );
-
                         /**
                          * No candidates, we're done
                          */
@@ -119,7 +116,25 @@ public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<
                             return;
                         }
 
-                        subscriber.onNext( candidateResults );
+
+                        for( CandidateResult candidateResult: candidateResults){
+
+                            //our subscriber unsubscribed, break out
+                            if(subscriber.isUnsubscribed()){
+                              return;
+                            }
+
+                            final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+                            final FilterResult<Candidate>
+                                result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+                            subscriber.onNext( result );
+
+                            currentOffSet++;
+                        }
+
+
                     }
                     catch ( Throwable t ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
new file mode 100644
index 0000000..ab9d5d9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+    private final CandidateResult candidateResult;
+    private final SearchEdge searchEdge;
+
+
+    /**
+     * Create a new Candidate for further processing
+     * @param candidateResult  The candidate result
+     * @param searchEdge The search edge this was searched on
+     */
+    public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+        this.candidateResult = candidateResult;
+        this.searchEdge = searchEdge;
+    }
+
+
+    public CandidateResult getCandidateResult() {
+        return candidateResult;
+    }
+
+
+    public SearchEdge getSearchEdge() {
+        return searchEdge;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
new file mode 100644
index 0000000..d30917c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
+ * performs internal buffering for efficiency.  Note that all entities may not be emitted if our load crosses page boundaries.  It is up to the
+ * collector to determine when to stop streaming entities.
+ */
+public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
+    implements Filter<Candidate, Entity> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                  final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+    @Override
+       public Observable<FilterResult<Entity>> call(
+           final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        //buffer them to get a page size we can make 1 network hop
+        final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
+
+            //load them
+            .flatMap( candidateResults -> {
+                    //flatten toa list of ids to load
+                    final Observable<List<Id>> candidateIds =
+                        Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
+
+                    //load the ids
+                    final Observable<EntitySet> entitySetObservable =
+                        candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
+
+                    //now we have a collection, validate our canidate set is correct.
+
+                    return entitySetObservable.map(
+                        entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
+                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
+                                              .flatMap(
+                                                  entityCollector -> Observable.from( entityCollector.getResults() ) );
+                } );
+
+        //if we filter all our results, we want to continue to try the next page
+        return searchIdSetObservable;
+    }
+
+
+
+
+    /**
+     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
+     */
+    private static final class EntityVerifier {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+        private List<FilterResult<Entity>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final EntitySet entitySet;
+
+
+        public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
+                               final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.entitySet = entitySet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( entitySet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Entity>> getResults() {
+            return results;
+        }
+
+
+        public EntityIndexBatch getBatch() {
+            return batch;
+        }
+
+
+        private void validate( final FilterResult<Candidate> filterResult ) {
+
+            final Candidate candidate = filterResult.getValue();
+            final CandidateResult candidateResult = candidate.getCandidateResult();
+            final SearchEdge searchEdge = candidate.getSearchEdge();
+            final Id candidateId = candidateResult.getId();
+            final UUID candidateVersion = candidateResult.getVersion();
+
+
+            final MvccEntity entity = entitySet.getEntity( candidateId );
+
+
+            //doesn't exist warn and drop
+            if ( entity == null ) {
+                logger.warn(
+                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue",
+                    candidateId, candidateVersion );
+
+
+                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+
+            }
+
+
+            final UUID entityVersion = entity.getVersion();
+            final Id entityId = entity.getId();
+
+
+
+
+
+            //entity is newer than ES version, could be an update or the entity is marked as deleted
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+
+                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
+
+                return;
+            }
+
+            //they're the same add it
+
+            final Entity returnEntity = entity.getEntity().get();
+
+            final Optional<EdgePath> parent = filterResult.getPath();
+
+            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
+
+            results.add( toReturn );
+        }
+    }
+}


Mime
View raw message