usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [37/44] usergrid git commit: Changes implementation to update a connection to have the timestamp it was last POSTed.
Date Sat, 26 Sep 2015 01:42:42 GMT
Changes implementation to update a connection to have the timestamp it was last POSTed.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/079ba976
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/079ba976
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/079ba976

Branch: refs/heads/USERGRID-933
Commit: 079ba9760f3e8d72dbf08610360d089b9ff62d47
Parents: 0c12abb
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Sep 22 16:59:25 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Sep 22 16:59:25 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 34 ++++++++++++--------
 .../service/ConnectionServiceImpl.java          |  5 +--
 .../service/ConnectionServiceImplTest.java      |  4 +--
 3 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/079ba976/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 1da6f75..21a2ee7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
 import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
@@ -62,7 +61,6 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
@@ -697,25 +695,35 @@ public class CpRelationManager implements RelationManager {
         final GraphManager gm = managerCache.getGraphManager( applicationScope );
 
 
-        //check if the edge exists
+        //write new edge
 
+        gm.writeEdge( edge ).subscribe();
 
-        final SearchByEdge searchByEdge = new SimpleSearchByEdge(edge.getSourceNode(), edge.getType(),
edge.getTargetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()
 );
+        indexService.queueNewEdge( applicationScope, targetEntity, edge );
 
 
-        //only take 1 and count it.  If we don't have anything, create the edge
-        final int count = gm.loadEdgeVersions( searchByEdge ).take( 1 ).count().toBlocking().last();
+        //now read all older versions of an edge, and remove them.  Finally calling delete
+        final SearchByEdge searchByEdge =
+            new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
Long.MAX_VALUE,
+                SearchByEdgeType.Order.DESCENDING, Optional.absent() );
 
-        if(count == 0) {
-            if(logger.isDebugEnabled()) {
-                logger.debug( "No edge exists between {} and {} of type {}.  Creating",
-                    new Object[] { edge.getSourceNode(), edge.getTargetNode(), edge.getType()
} );
+
+        //load our versions, only retain the most recent one
+        gm.loadEdgeVersions( searchByEdge ).skip( 1 ).flatMap( edgeToDelete -> {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Marking edge {} for deletion", edgeToDelete );
+            }
+            return gm.markEdge( edge );
+        } ).lastOrDefault( null ).doOnNext( lastEdge -> {
+            //no op if we hit our default
+            if(lastEdge == null){
+                return;
             }
 
-            gm.writeEdge( edge ).toBlocking().last();
+            //queue up async processing
+            indexService.queueDeleteEdge( applicationScope, lastEdge );
+        }).subscribe();
 
-            indexService.queueNewEdge( applicationScope, targetEntity, edge );
-        }
 
         return connection;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/079ba976/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 1b36321..927d292 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -181,9 +181,10 @@ public class ConnectionServiceImpl implements ConnectionService {
 
                     logger.debug( "Found edge {}, searching for multiple versions of edge",
edge );
 
+                    //keep only the most recent
                     final SearchByEdge searchByEdge =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
0,
-                            SearchByEdgeType.Order.ASCENDING, Optional.absent() );
+                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
Long.MAX_VALUE,
+                            SearchByEdgeType.Order.DESCENDING, Optional.absent() );
                     return gm.loadEdgeVersions( searchByEdge )
                         //skip the first version since it's the one we want to retain
                         .skip( 1 )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/079ba976/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index a2c18b1..326e128 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -196,7 +196,7 @@ public class ConnectionServiceImplTest {
         //check our oldest was deleted first
         assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() );
 
-        assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() );
+        assertEdgeData( written1, deletedConnections.get( 1 ).getEdge() );
 
         assertEquals( "2 edges deleted", 2, deletedConnections.size() );
 
@@ -213,7 +213,7 @@ public class ConnectionServiceImplTest {
 
         assertEquals( 1, edges.size() );
 
-        assertEquals( written1, edges.get( 0 ) );
+        assertEquals( written3, edges.get( 0 ) );
     }
 
 


Mime
View raw message