usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [07/12] incubator-usergrid git commit: Fixes graph cursor resume state.
Date Mon, 04 May 2015 17:32:57 GMT
Fixes graph cursor resume state.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/118711ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/118711ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/118711ae

Branch: refs/heads/USERGRID-609
Commit: 118711aee83875f72a0b058b784218b211748d4e
Parents: 413f023
Author: Todd Nine <tnine@apigee.com>
Authored: Mon May 4 09:59:00 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon May 4 09:59:00 2015 -0600

----------------------------------------------------------------------
 .../read/graph/AbstractReadGraphFilter.java     | 52 ++++++++++++++++++--
 1 file changed, 49 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/118711ae/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 503fcf9..303bc5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -62,6 +63,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
 
 
         final String edgeName = getEdgeTypeName();
+        final EdgeState edgeCursorState = new EdgeState();
 
 
         //return all ids that are emitted from this edge
@@ -80,15 +82,30 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
             return graphManager.loadEdgesFromSource( search )
-                //set our cursor every edge we traverse
+                //set the edge state for cursors
+                .doOnNext( edge -> edgeCursorState.update( edge ) )
 
-                    //map our id from the target edge
-                .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath()
) );
+                    //map our id from the target edge  and set our cursor every edge we traverse
+                .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
+                    previousFilterValue.getPath() ) );
         } );
     }
 
 
     @Override
+    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
+                                                   final Optional<EdgePath> parent
) {
+
+        //if it's our first pass, there's no cursor to generate
+        if(cursorValue == null){
+            return new FilterResult<>( emit, parent );
+        }
+
+        return super.createFilterResult( emit, cursorValue, parent );
+    }
+
+
+    @Override
     protected CursorSerializer<Edge> getCursorSerializer() {
         return EdgeCursorSerializer.INSTANCE;
     }
@@ -98,4 +115,33 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
      * Get the edge type name we should use when traversing
      */
     protected abstract String getEdgeTypeName();
+
+
+    /**
+     * Wrapper class. Because edges seek > the last returned, we need to keep our n-1
value. This will be our cursor We
+     * always try to seek to the same position as we ended.  Since we don't deal with a persistent
read result, if we
+     * seek to a value = to our last, we may skip data.
+     */
+    private final class EdgeState {
+
+        private Edge cursorEdge = null;
+        private Edge currentEdge = null;
+
+
+        /**
+         * Update the pointers
+         */
+        private void update( final Edge newEdge ) {
+            cursorEdge = currentEdge;
+            currentEdge = newEdge;
+        }
+
+
+        /**
+         * Get the edge to use in cursors for resume
+         */
+        private Edge getCursorEdge() {
+            return cursorEdge;
+        }
+    }
 }


Mime
View raw message