Fixed tests. Now need to implement delete and test with concurrent writes/removes Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e578bb63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e578bb63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e578bb63 Branch: refs/heads/optimistic-tx-semantics Commit: e578bb639f71dff71b87e56fb082c8f85972967a Parents: feea5ba Author: Todd Nine Authored: Tue Feb 11 12:13:02 2014 -0700 Committer: Todd Nine Committed: Tue Feb 11 12:13:02 2014 -0700 ---------------------------------------------------------------------- .../persistence/graph/impl/EdgeManagerImpl.java | 52 ++++-- .../persistence/graph/EdgeManagerIT.java | 164 +++++++++++-------- 2 files changed, 133 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e578bb63/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java index 67cd8e8..e3afd2a 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java @@ -67,8 +67,7 @@ public class EdgeManagerImpl implements EdgeManager { @Inject public EdgeManagerImpl( final EdgeWriteStage edgeWriteStage, final Scheduler scheduler, final EdgeMetadataSerialization edgeMetadataSerialization, - final EdgeSerialization edgeSerialization, - @Assisted final OrganizationScope scope ) { + final EdgeSerialization edgeSerialization, @Assisted final OrganizationScope scope ) { this.edgeWriteStage = edgeWriteStage; this.scheduler = scheduler; this.edgeMetadataSerialization = edgeMetadataSerialization; @@ -90,53 +89,71 @@ public class EdgeManagerImpl implements EdgeManager { @Override public void deleteEdge( final Edge edge ) { - throw new UnsupportedOperationException("Not yet implemented"); + throw new UnsupportedOperationException( "Not yet implemented" ); } @Override public Observable loadEdgesFromSource( final SearchByEdgeType search ) { - Observable iterator = Observable.create( new ObservableIterator() { + Observable iterator = Observable.create( new ObservableIterator() { @Override protected Iterator getIterator() { return edgeSerialization.getEdgesFromSource( scope, search ); } } ); - return filter(iterator, search.getMaxVersion()); + return filter( iterator, search.getMaxVersion() ); } @Override public Observable loadEdgesToTarget( final SearchByEdgeType search ) { + Observable iterator = Observable.create( new ObservableIterator() { + @Override + protected Iterator getIterator() { + return edgeSerialization.getEdgesToTarget( scope, search ); + } + } ); - - return null; //To change body of implemented methods use File | Settings | File Templates. + return filter( iterator, search.getMaxVersion() ); } @Override public Observable loadEdgesFromSourceByType( final SearchByIdType search ) { - return null; //To change body of implemented methods use File | Settings | File Templates. + Observable iterator = Observable.create( new ObservableIterator() { + @Override + protected Iterator getIterator() { + return edgeSerialization.getEdgesFromSourceByTargetType( scope, search ); + } + } ); + + return filter( iterator, search.getMaxVersion() ); } @Override public Observable loadEdgesToTargetByType( final SearchByIdType search ) { - return null; //To change body of implemented methods use File | Settings | File Templates. + Observable iterator = Observable.create( new ObservableIterator() { + @Override + protected Iterator getIterator() { + return edgeSerialization.getEdgesToTargetBySourceType( scope, search ); + } + } ); + + return filter( iterator, search.getMaxVersion() ); } @Override public Observable getEdgeTypesFromSource( final SearchEdgeType search ) { - return Observable.create( new ObservableIterator() { + return Observable.create( new ObservableIterator() { @Override protected Iterator getIterator() { return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search ); } } ); - } @@ -173,12 +190,15 @@ public class EdgeManagerImpl implements EdgeManager { } ); } - private Observable filter(final Observable observable, final UUID maxVersion){ - if(maxVersion == null){ - return observable; - } - return observable.filter( new Func1() { + /** + * If not max version is specified, just return the original observable. If one is + * @param observable + * @param maxVersion + * @return + */ + private Observable filter( final Observable observable, final UUID maxVersion ) { + return observable.filter( new Func1() { @Override public Boolean call( final Edge edge ) { //our edge version needs to be <= max Version http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e578bb63/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java index 2308019..0bb38d2 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerIT.java @@ -47,6 +47,7 @@ import com.google.inject.Inject; import rx.Observable; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge; +import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdge; import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createSearchByEdgeAndId; import static org.junit.Assert.assertEquals; @@ -234,18 +235,21 @@ public class EdgeManagerIT { EdgeManager em = emf.createEdgeManager( scope ); + final Id sourceId = createId( "source" ); - Edge edge1 = createEdge( "source", "test", "target" ); - em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null );; - Edge edge2 = createEdge( "source", "test", "target" ); + Edge edge1 = createEdge( sourceId, "test", createId("target") ); - em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null );; + em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null ); - Edge edge3 = createEdge( "source", "test", "target" ); + Edge edge2 = createEdge( sourceId, "test", createId("target") ); - em.writeEdge( edge3 ).toBlockingObservable().singleOrDefault( null );; + em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null ); + + Edge edge3 = createEdge( sourceId, "test", createId("target") ); + + em.writeEdge( edge3 ).toBlockingObservable().singleOrDefault( null ); //now test retrieving it @@ -256,20 +260,20 @@ public class EdgeManagerIT { Observable edges = em.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator returned = edges.toBlockingObservable().next().iterator(); + Iterator returned = edges.toBlockingObservable().getIterator(); - //now start from the 2nd edge + //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first assertEquals( "Correct edge returned", edge1, returned.next() ); - assertEquals( "Correct edge returned", edge2, returned.next() ); + assertFalse("No more edges", returned.hasNext()); - search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getVersion(), edge2 ); + search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getVersion(), edge2 ); edges = em.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" - returned = edges.toBlockingObservable().next().iterator(); + returned = edges.toBlockingObservable().getIterator(); assertEquals( "Paged correctly", edge3, returned.next() ); @@ -284,17 +288,19 @@ public class EdgeManagerIT { EdgeManager em = emf.createEdgeManager( scope ); - Edge edge1 = createEdge( "source", "test", "target" ); + final Id targetId = createId( "target" ); - em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null );; + Edge edge1 = createEdge( createId("source"), "test", targetId ); - Edge edge2 = createEdge( "source", "test", "target" ); + em.writeEdge( edge1 ).toBlockingObservable().singleOrDefault( null ); - em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null );; + Edge edge2 = createEdge( createId("source"), "test", targetId ); - Edge edge3 = createEdge( "source", "test", "target" ); + em.writeEdge( edge2 ).toBlockingObservable().singleOrDefault( null ); - em.writeEdge( edge3 ).toBlockingObservable().singleOrDefault( null );; + Edge edge3 = createEdge( createId("source"), "test", targetId ); + + em.writeEdge( edge3 ).toBlockingObservable().singleOrDefault( null ); //now test retrieving it @@ -305,20 +311,21 @@ public class EdgeManagerIT { Observable edges = em.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator returned = edges.toBlockingObservable().next().iterator(); + Iterator returned = edges.toBlockingObservable().getIterator(); - //now start from the 2nd edge + //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first assertEquals( "Correct edge returned", edge1, returned.next() ); - assertEquals( "Correct edge returned", edge2, returned.next() ); - search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge1.getVersion(), edge2 ); + assertFalse("No more edges", returned.hasNext()); + + search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getVersion(), edge2 ); edges = em.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" - returned = edges.toBlockingObservable().next().iterator(); + returned = edges.toBlockingObservable().getIterator(); assertEquals( "Paged correctly", edge3, returned.next() ); @@ -469,7 +476,7 @@ public class EdgeManagerIT { SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getVersion(), null ); - Observable edges = em.loadEdgesFromSource( search ); + Observable edges = em.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlockingObservable().single(); @@ -479,7 +486,7 @@ public class EdgeManagerIT { SearchByIdType searchById = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getVersion(), edge.getSourceNode().getType(), null ); - edges = em.loadEdgesFromSourceByType( searchById ); + edges = em.loadEdgesToTargetByType( searchById ); //implicitly blows up if more than 1 is returned from "single" returned = edges.toBlockingObservable().single(); @@ -511,7 +518,7 @@ public class EdgeManagerIT { @Test - public void testWriteReadEdgeTypesTargetTypes() { + public void testWriteReadEdgeTypesSourceTypes() { final EdgeManager em = emf.createEdgeManager( scope ); @@ -525,7 +532,7 @@ public class EdgeManagerIT { Edge testTarget2Edge = new SimpleEdge( sourceId, "test", targetId2, UUIDGenerator.newTimeUUID() ); - em.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );; + em.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null ); Edge test2TargetEdge = new SimpleEdge( sourceId, "test2", targetId1, UUIDGenerator.newTimeUUID() ); @@ -538,34 +545,37 @@ public class EdgeManagerIT { em.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null ) ); - List results = edges.toList().toBlockingObservable().single(); + Iterator results = edges.toBlockingObservable().getIterator(); + + + assertEquals( "Edges correct", "test", results.next() ); - assertEquals( "Size correct", 2, results.size() ); + assertEquals( "Edges correct", "test2", results.next() ); - assertTrue( "Edges correct", results.contains( "test" ) ); + assertFalse("No more edges", results.hasNext()); - assertTrue( "Edges correct", results.contains( "test2" ) ); //now test sub edges edges = em.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test", null ) ); - results = edges.toList().toBlockingObservable().single(); + results = edges.toBlockingObservable().getIterator(); + - assertEquals( "Size correct", 2, results.size() ); + assertEquals( "Types correct", targetId1.getType(), results.next()); - assertTrue( "Types correct", results.contains( targetId1.getType() ) ); + assertEquals( "Types correct", targetId2.getType() , results.next() ); - assertTrue( "Types correct", results.contains( targetId2.getType() ) ); + assertFalse( "No results", results.hasNext()); //now get types for test2 edges = em.getIdTypesFromSource( new SimpleSearchIdType( testTargetEdge.getSourceNode(), "test2", null ) ); - results = edges.toList().toBlockingObservable().single(); + results = edges.toBlockingObservable().getIterator(); - assertEquals( "Size correct", 1, results.size() ); + assertEquals( "Types correct", targetId1.getType(), results.next( ) ); - assertTrue( "Types correct", results.contains( targetId1.getType() ) ); + assertFalse( "No results", results.hasNext()); //now delete our edges, we shouldn't get anything back em.deleteEdge( testTargetEdge ); @@ -575,14 +585,14 @@ public class EdgeManagerIT { edges = em.getEdgeTypesToTarget( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null ) ); - results = edges.toList().toBlockingObservable().single(); + results = edges.toBlockingObservable().getIterator(); - assertEquals( "No results", 0, results.size() ); + assertFalse( "No results", results.hasNext()); } @Test - public void testWriteReadEdgeTypesSourceTypes() { + public void testWriteReadEdgeTypesTargetTypes() { final EdgeManager em = emf.createEdgeManager( scope ); @@ -593,16 +603,16 @@ public class EdgeManagerIT { Edge testTargetEdge = new SimpleEdge( sourceId1, "test", targetId1, UUIDGenerator.newTimeUUID() ); - em.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );; + em.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null ); Edge testTarget2Edge = new SimpleEdge( sourceId2, "test", targetId1, UUIDGenerator.newTimeUUID() ); - em.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );; + em.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null ); Edge test2TargetEdge = new SimpleEdge( sourceId1, "test2", targetId1, UUIDGenerator.newTimeUUID() ); - em.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );; + em.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null ); //get our 2 edge types @@ -611,36 +621,40 @@ public class EdgeManagerIT { Observable edges = em.getEdgeTypesToTarget( edgeTypes ); - List results = edges.toList().toBlockingObservable().single(); + Iterator results = edges.toBlockingObservable().getIterator(); - assertEquals( "Size correct", 2, results.size() ); - assertTrue( "Edges correct", results.contains( "test" ) ); + assertEquals( "Edges correct", "test", results.next() ); - assertTrue( "Edges correct", results.contains( "test2" ) ); + assertEquals( "Edges correct","test2", results.next()); + + assertFalse("No more edges", results.hasNext()); //now test sub edges - edges = em.getEdgeTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null ) ); + edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null ) ); + + results = edges.toBlockingObservable().getIterator(); - results = edges.toList().toBlockingObservable().single(); + assertEquals( "Types correct", sourceId1.getType(), results.next()); - assertEquals( "Size correct", 2, results.size() ); + assertEquals( "Types correct", sourceId2.getType(), results.next()); - assertTrue( "Types correct", results.contains( sourceId1.getType() ) ); + assertFalse("No more edges", results.hasNext()); - assertTrue( "Types correct", results.contains( sourceId2.getType() ) ); //now get types for test2 edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test2", null ) ); - results = edges.toList().toBlockingObservable().single(); + results = edges.toBlockingObservable().getIterator(); + + + assertEquals( "Types correct", sourceId1.getType(), results.next() ); - assertEquals( "Size correct", 1, results.size() ); + assertFalse("No more edges", results.hasNext()); - assertTrue( "Types correct", results.contains( sourceId1.getType() ) ); em.deleteEdge( testTargetEdge ); @@ -650,14 +664,14 @@ public class EdgeManagerIT { edges = em.getEdgeTypesFromSource( new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null ) ); - results = edges.toList().toBlockingObservable().single(); + results = edges.toBlockingObservable().getIterator(); - assertEquals( "No results", 0, results.size() ); + assertEquals( "No results",results.hasNext() ); } @Test - public void testWriteReadEdgeTypesTargetTypesPaging() { + public void testWriteReadEdgeTypesSourceTypesPaging() { final EdgeManager em = emf.createEdgeManager( scope ); @@ -684,25 +698,28 @@ public class EdgeManagerIT { //get our 2 edge types SearchEdgeType edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null ); - Observable edges = em.getEdgeTypesToTarget( edgeTypes ); + Observable edges = em.getEdgeTypesFromSource( edgeTypes ); Iterator results = edges.toBlockingObservable().getIterator(); assertEquals( "Edges correct", "test", results.next() ); + assertEquals( "Edges correct", "test2", results.next() ); + assertFalse("No more edges", results.hasNext()); //now load the next page - edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test" ); + edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), "test" ); - edges = em.getEdgeTypesToTarget( edgeTypes ); + edges = em.getEdgeTypesFromSource( edgeTypes ); results = edges.toBlockingObservable().getIterator(); assertEquals( "Edges correct", "test2", results.next() ); + assertFalse("No more edges", results.hasNext()); //now test sub edges @@ -713,6 +730,9 @@ public class EdgeManagerIT { assertEquals( "Types correct", targetId1.getType(), results.next() ); + assertEquals( "Types correct", targetId2.getType(), results.next() ); + assertFalse("No more edges", results.hasNext()); + //now get the next page @@ -729,7 +749,7 @@ public class EdgeManagerIT { @Test - public void testWriteReadEdgeTypesSourceTypesPaging() { + public void testWriteReadEdgeTypesTargetTypesPaging() { final EdgeManager em = emf.createEdgeManager( scope ); @@ -755,19 +775,23 @@ public class EdgeManagerIT { //get our 2 edge types SearchEdgeType edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), null ); - Observable edges = em.getEdgeTypesFromSource( edgeTypes ); + Observable edges = em.getEdgeTypesToTarget( edgeTypes ); Iterator results = edges.toBlockingObservable().getIterator(); assertEquals( "Edges correct", "test", results.next() ); + assertEquals( "Edges correct", "test2", results.next() ); + + assertFalse("No more edges", results.hasNext()); + //now load the next page - edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getTargetNode(), "test" ); + edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), "test" ); - edges = em.getEdgeTypesFromSource( edgeTypes ); + edges = em.getEdgeTypesToTarget( edgeTypes ); results = edges.toBlockingObservable().getIterator(); @@ -776,18 +800,24 @@ public class EdgeManagerIT { assertEquals( "Edges correct", "test2", results.next() ); + assertFalse("No more edges", results.hasNext()); + //now test sub edges - edges = em.getEdgeTypesFromSource( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null ) ); + edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", null ) ); results = edges.toBlockingObservable().getIterator(); assertEquals( "Types correct", sourceId1.getType(), results.next() ); + assertEquals( "Types correct", sourceId2.getType(), results.next() ); + + assertFalse("No more edges", results.hasNext()); + //now get the next page - edges = em.getEdgeTypesFromSource( + edges = em.getIdTypesToTarget( new SimpleSearchIdType( testTargetEdge.getTargetNode(), "test", sourceId1.getType() ) ); results = edges.toBlockingObservable().getIterator();