usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [14/50] [abbrv] incubator-usergrid git commit: [USERGRID-608] Fixed signature to make method self documenting Added fix for UniqueCleanup.java where it was comparing things to the wrong standard. Added fixes for IndexServiceImpl so that it can properly f
Date Thu, 28 May 2015 12:53:19 GMT
[USERGRID-608] Fixed signature to make method self documenting
Added fix for UniqueCleanup.java where it was comparing things to the wrong standard.
Added fixes for IndexServiceImpl so that it can properly follow the delete async process it
is called from.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/00aa293d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/00aa293d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/00aa293d

Branch: refs/heads/USERGRID-669
Commit: 00aa293d0c161a056c8f8e10d3f5880eeac2df5b
Parents: 630cb4a
Author: GERey <greyes@apigee.com>
Authored: Fri May 22 12:49:01 2015 -0700
Committer: GERey <greyes@apigee.com>
Committed: Fri May 22 12:49:01 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java |  82 +++++--------
 .../usergrid/persistence/EntityManagerIT.java   |  67 +++++------
 .../mvcc/stage/delete/UniqueCleanup.java        | 120 ++++++++-----------
 .../index/ApplicationEntityIndex.java           |   2 +-
 .../impl/EsApplicationEntityIndexImpl.java      |   2 +-
 .../persistence/index/impl/EntityIndexTest.java |   8 +-
 6 files changed, 121 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 7d7d0d9..5e2a5ea 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -33,6 +34,7 @@ import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.CandidateResult;
@@ -41,6 +43,7 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -52,9 +55,8 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.functions.Func1;
-import rx.observables.ConnectableObservable;
 
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromTarget;
 import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
@@ -188,14 +190,15 @@ public class IndexServiceImpl implements IndexService {
 
                 CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource,
targetId, 1000, 0 );
 
-                //Should loop thorugh and query for all documents and if there are no documents
then the loop should exit.
-                do{
+                //Should loop thorugh and query for all documents and if there are no documents
then the loop should
+                // exit.
+                do {
                     batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed,
batch );
-                    if(!targetEdgesToBeDeindexed.getOffset().isPresent())
-                        break;
-                    targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId,
1000, targetEdgesToBeDeindexed.getOffset().get() );
-                }while(!targetEdgesToBeDeindexed.isEmpty());
-
+                    if ( !targetEdgesToBeDeindexed.getOffset().isPresent() ) break;
+                    targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId,
1000,
+                        targetEdgesToBeDeindexed.getOffset().get() );
+                }
+                while ( !targetEdgesToBeDeindexed.isEmpty() );
 
 
                 final IndexEdge fromTarget = generateScopeFromTarget( edge );
@@ -203,12 +206,13 @@ public class IndexServiceImpl implements IndexService {
 
                 CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget,
sourceId, 1000, 0 );
 
-                do{
+                do {
                     batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed,
batch );
-                    if(!sourceEdgesToBeDeindexed.getOffset().isPresent())
-                        break;
-                    sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId,
1000, sourceEdgesToBeDeindexed.getOffset().get()  );
-                }while(!sourceEdgesToBeDeindexed.isEmpty());
+                    if ( !sourceEdgesToBeDeindexed.getOffset().isPresent() ) break;
+                    sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId,
1000,
+                        sourceEdgesToBeDeindexed.getOffset().get() );
+                }
+                while ( !sourceEdgesToBeDeindexed.isEmpty() );
 
                 return batch.execute();
             } );
@@ -216,52 +220,33 @@ public class IndexServiceImpl implements IndexService {
         return ObservableTimer.time( batches, addTimer );
     }
 
+    //This should look up the entityId and delete any documents with a timestamp that comes
before
+    //The edges that are connected will be compacted away from the graph.
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope
applicationScope,
                                                                   final Id entityId, final
UUID markedVersion ) {
 
         //bootstrap the lower modules from their caches
-        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
         final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex(
applicationScope );
 
-        //we always index in the target scope
-        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId
);
-
-        //we may have to index  we're indexing from source->target here
-        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge ->
generateScopeFromSource( edge ) );
-
-
-        //we might or might not need to index from target-> source
-        final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, entityId
);
-
-        //merge the edges together
-        final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex,
targetSizes);
-        //do our observable for batching
-        //try to send a whole batch if we can
-
+        CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion
);
 
+        //not actually sure about the timestamp but ah well. works.
+        SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
+            CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType()
) ), entityId,
+            entityId.getUuid().timestamp() ) );
 
-        //loop through candidateResults and deindex every single result that comeback.
 
-        //do our observable for batching
-        //try to send a whole batch if we can
-        final Observable<IndexOperationMessage>  batches =  observable.buffer( indexFig.getIndexBatchSize()
)
-
-            //map into batches based on our buffer size
-            .flatMap( buffer -> Observable.from( buffer )
+        final Observable<IndexOperationMessage>  batches = Observable.from( crs )
                 //collect results into a single batch
-                .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
-                    //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity
);
-                    //TODO: refactor into stages of observable, also need a loop to get entities
until we recieve nothing back.
-                    CandidateResults crs = ei.getAllEntityVersionBeforeMark( entityId, markedVersion,
1000, 0 );
-                    for(CandidateResult cr: crs){
-                        batch.deindex( indexEdge, cr);
-                    }
+                .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> {
+                    logger.debug( "Deindexing on edge {} for entity {} added to batch",searchEdge
, entityId );
+                    batch.deindex( searchEdge, candidateResult );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> batch.execute() ) );
+                .flatMap( batch -> batch.execute() );
 
-        return ObservableTimer.time( batches, indexTimer );
+        return ObservableTimer.time(batches, indexTimer);
     }
 
 
@@ -274,7 +259,7 @@ public class IndexServiceImpl implements IndexService {
      */
     private Observable<IndexEdge> getIndexEdgesAsTarget( final GraphManager graphManager,
final Id entityId ) {
 
-        final String collectionName = InflectionUtils.pluralize( entityId.getType() );
+            final String collectionName = InflectionUtils.pluralize( entityId.getType() );
 
 
         final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE,
collectionName );
@@ -311,8 +296,7 @@ public class IndexServiceImpl implements IndexService {
     public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge edge,CandidateResults
edgesToBeDeindexed, EntityIndexBatch batch){
         Iterator itr = edgesToBeDeindexed.iterator();
         while( itr.hasNext() ) {
-            CandidateResult cr = ( CandidateResult ) itr.next();
-            batch.deindex( edge, cr.getId(), cr.getVersion() );
+            batch.deindex( edge, ( CandidateResult ) itr.next());
         }
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
index faf22e5..c70a141 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
@@ -46,7 +46,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 
-
 public class EntityManagerIT extends AbstractCoreIT {
     private static final Logger LOG = LoggerFactory.getLogger( EntityManagerIT.class );
 
@@ -74,13 +73,12 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         user = em.get( user );
         assertNotNull( user );
-        assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username"));
+        assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username"
) );
         assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty(
"email" ) );
 
         app.refreshIndex();
 
-        EntityRef userRef = em.getAlias(
-            new SimpleEntityRef("application", applicationId), "users", "edanuff" );
+        EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId
), "users", "edanuff" );
 
         assertNotNull( userRef );
         assertEquals( "userRef.id not expected value", user.getUuid(), userRef.getUuid()
);
@@ -89,7 +87,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         LOG.info( "user.username: " + user.getProperty( "username" ) );
         LOG.info( "user.email: " + user.getProperty( "email" ) );
 
-        final Query query = Query.fromQL("username = 'edanuff'");
+        final Query query = Query.fromQL( "username = 'edanuff'" );
 
         Results results = em.searchCollection( em.getApplicationRef(), "users", query );
         assertNotNull( results );
@@ -109,8 +107,8 @@ public class EntityManagerIT extends AbstractCoreIT {
         assertEquals( 1, results.size() );
         user = results.getEntity();
         assertNotNull( user );
-        assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username"));
-        assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty(
"email"));
+        assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username"
) );
+        assertEquals( "user.email not expected value", "ed@anuff.com", user.getProperty(
"email" ) );
 
         LOG.info( "user.username: " + user.getProperty( "username" ) );
         LOG.info( "user.email: " + user.getProperty( "email" ) );
@@ -141,7 +139,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         i = 0;
         for ( Entity entity : things ) {
 
-            Entity thing = em.get( new SimpleEntityRef( "thing", entity.getUuid()));
+            Entity thing = em.get( new SimpleEntityRef( "thing", entity.getUuid() ) );
             assertNotNull( "thing should not be null", thing );
             assertFalse( "thing id not valid", thing.getUuid().equals( new UUID( 0, 0 ) )
);
             assertEquals( "name not expected value", "thing" + i, thing.getProperty( "name"
) );
@@ -153,7 +151,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         for ( Entity entity : things ) {
             ids.add( entity.getUuid() );
 
-            Entity en = em.get( new SimpleEntityRef( "thing", entity.getUuid()));
+            Entity en = em.get( new SimpleEntityRef( "thing", entity.getUuid() ) );
             String type = en.getType();
             assertEquals( "type not expected value", "thing", type );
 
@@ -236,7 +234,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         properties.put( "name", "testprop" );
         Entity thing = em.create( "thing", properties );
 
-        Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid()));
+        Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid() ) );
         assertNotNull( "entity should not be null", entity );
         em.setProperty( entity, "alpha", 1L );
         em.setProperty( entity, "beta", 2L );
@@ -246,9 +244,9 @@ public class EntityManagerIT extends AbstractCoreIT {
         assertNotNull( "properties should not be null", props );
         assertEquals( "wrong number of properties", 8, props.size() );
 
-        assertEquals( "wrong value for property alpha", (long) 1, props.get( "alpha" ) );
-        assertEquals( "wrong value for property beta", (long) 2, props.get( "beta" ) );
-        assertEquals( "wrong value for property gamma", (long) 3, props.get( "gamma" ) );
+        assertEquals( "wrong value for property alpha", ( long ) 1, props.get( "alpha" )
);
+        assertEquals( "wrong value for property beta", ( long ) 2, props.get( "beta" ) );
+        assertEquals( "wrong value for property gamma", ( long ) 3, props.get( "gamma" )
);
 
         for ( Entry<String, Object> entry : props.entrySet() ) {
             LOG.info( entry.getKey() + " : " + entry.getValue() );
@@ -277,6 +275,8 @@ public class EntityManagerIT extends AbstractCoreIT {
         Entity thing = em.create( "thing", properties );
         LOG.info( "Entity created" );
 
+        app.refreshIndex();
+
         LOG.info( "Starting entity delete" );
         em.delete( thing );
         LOG.info( "Entity deleted" );
@@ -286,11 +286,10 @@ public class EntityManagerIT extends AbstractCoreIT {
         // now search by username, no results should be returned
 
 
-        final Query emailQuery = Query.fromQL( "name = '" + name +"'" );
+        final Query emailQuery = Query.fromQL( "name = '" + name + "'" );
 
 
-        Results r = em.searchCollection( em.getApplicationRef(), "thing",
-               emailQuery );
+        Results r = em.searchCollection( em.getApplicationRef(), "thing", emailQuery );
 
         assertEquals( 0, r.size() );
     }
@@ -323,8 +322,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         // now search by username, no results should be returned
 
         final Query query = Query.fromQL( "username = '" + name + "'" );
-        Results r = em.searchCollection( em.getApplicationRef(), "users",
-            query );
+        Results r = em.searchCollection( em.getApplicationRef(), "users", query );
 
         assertEquals( 0, r.size() );
 
@@ -342,7 +340,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         final Query userNameQuery = Query.fromQL( "username = '" + name + "'" );
 
-        r = em.searchCollection( em.getApplicationRef(), "users", userNameQuery);
+        r = em.searchCollection( em.getApplicationRef(), "users", userNameQuery );
 
         assertEquals( 1, r.size() );
 
@@ -350,7 +348,7 @@ public class EntityManagerIT extends AbstractCoreIT {
     }
 
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings( "unchecked" )
     @Test
     public void testJson() throws Exception {
         LOG.info( "EntityDaoTest.testProperties" );
@@ -361,7 +359,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         properties.put( "name", "testprop" );
         Entity thing = em.create( "thing", properties );
 
-        Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid()));
+        Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid() ) );
         assertNotNull( "entity should not be null", entity );
 
         Map<String, Object> json = new LinkedHashMap<String, Object>();
@@ -384,7 +382,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
 
     @Test
-    @Ignore("There is a concurrency issue due to counters not being thread safe: see USERGRID-1753")
+    @Ignore( "There is a concurrency issue due to counters not being thread safe: see USERGRID-1753"
)
     public void testEntityCounters() throws Exception {
         LOG.info( "EntityManagerIT#testEntityCounters" );
         EntityManager em = app.getEntityManager();
@@ -394,17 +392,16 @@ public class EntityManagerIT extends AbstractCoreIT {
         organizationEntity.setProperty( "name", "testCounterOrg" );
         organizationEntity = em.create( organizationEntity );
 
-        Entity appInfo = setup.getEmf().createApplicationV2(
-                "testCounterOrg", "testEntityCounters" + UUIDGenerator.newTimeUUID()  );
-        UUID applicationId = UUIDUtils.tryExtractUUID(
-            appInfo.getProperty(PROPERTY_APPLICATION_ID).toString());
+        Entity appInfo =
+            setup.getEmf().createApplicationV2( "testCounterOrg", "testEntityCounters" +
UUIDGenerator.newTimeUUID() );
+        UUID applicationId = UUIDUtils.tryExtractUUID( appInfo.getProperty( PROPERTY_APPLICATION_ID
).toString() );
 
         Map<String, Object> properties = new LinkedHashMap<String, Object>();
         properties.put( "name", "testEntityCounters" );
         Entity applicationEntity = em.create( applicationId, CpNamingUtils.APPLICATION_INFO,
properties );
 
         em.createConnection( new SimpleEntityRef( "group", organizationEntity.getUuid() ),
"owns",
-                new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) );
+            new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) );
 
         em = setup.getEmf().getEntityManager( applicationId );
         properties = new LinkedHashMap<String, Object>();
@@ -457,7 +454,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         // now search by username, no results should be returned
 
-        EntityRef appRef = em.get( new SimpleEntityRef("application", app.getId() ) );
+        EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) );
 
         app.refreshIndex();
 
@@ -481,7 +478,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         properties.put( "email", "test@foo.bar" );
         Entity created = em.create( "user", properties );
 
-        Entity returned = em.get( new SimpleEntityRef( "user", created.getUuid()));
+        Entity returned = em.get( new SimpleEntityRef( "user", created.getUuid() ) );
 
         assertNotNull( created );
         assertNotNull( returned );
@@ -503,13 +500,13 @@ public class EntityManagerIT extends AbstractCoreIT {
         properties.put( "name", "one" );
         Entity saved = em.create( "thing", properties );
 
-        Entity thingOne = em.get( new SimpleEntityRef("thing", saved.getUuid()));
+        Entity thingOne = em.get( new SimpleEntityRef( "thing", saved.getUuid() ) );
         assertNotNull( "entity should not be null", thingOne );
         assertEquals( "one", thingOne.getProperty( "name" ).toString() );
 
         em.setProperty( thingOne, "name", "two", true );
 
-        Entity thingTwo = em.get( new SimpleEntityRef("thing",saved.getUuid()));
+        Entity thingTwo = em.get( new SimpleEntityRef( "thing", saved.getUuid() ) );
 
         assertEquals( "two", thingTwo.getProperty( "name" ) );
     }
@@ -527,7 +524,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         userProps.put( "email", "test@foo.bar" );
         Entity createdUser = em.create( "user", userProps );
 
-        Entity returnedUser = em.get( new SimpleEntityRef("user",createdUser.getUuid()));
+        Entity returnedUser = em.get( new SimpleEntityRef( "user", createdUser.getUuid()
) );
 
         assertNotNull( createdUser );
         assertNotNull( returnedUser );
@@ -539,7 +536,7 @@ public class EntityManagerIT extends AbstractCoreIT {
         userProps2.put( "email", "test2@foo.bar" );
         Entity createdUser2 = em.create( "user", userProps2 );
 
-        Entity returnedUser2 = em.get( new SimpleEntityRef("user",createdUser2.getUuid()));
+        Entity returnedUser2 = em.get( new SimpleEntityRef( "user", createdUser2.getUuid()
) );
 
         assertNotNull( createdUser2 );
         assertNotNull( returnedUser2 );
@@ -553,7 +550,7 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         app.refreshIndex();
 
-        Entity returnedDevice = em.get( new SimpleEntityRef("device", createdDevice.getUuid()));
+        Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid()
) );
 
         assertNotNull( createdDevice );
         assertNotNull( returnedDevice );
@@ -585,6 +582,6 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         app.refreshIndex();
 
-        assertNotNull( em.get( user.getUuid() ));
+        assertNotNull( em.get( user.getUuid() ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 3e3e531..affd82c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -20,8 +20,10 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
 
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
 import org.slf4j.Logger;
@@ -82,84 +84,62 @@ public class UniqueCleanup
         final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable
) {
 
         final Observable<CollectionIoEvent<MvccEntity>> outputObservable =
-            collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> {
+            collectionIoEventObservable.flatMap( mvccEntityCollectionIoEvent -> {
 
                 final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
                 final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
                 final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
 
-                Iterator<UniqueValue> uniqueFields = uniqueValueSerializationStrategy.getAllUniqueFields(
-                    applicationScope, entityId );
-
-                final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                while(uniqueFields.hasNext()){
-                    UniqueValue uniqueValue = uniqueFields.next();
-                    final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
-                    //TODO: should this be equals? That way we clean up the one marked as
well
-                    if(UUIDComparator.staticCompare( entityVersion, uniqueValueVersion )
>= 0){
-                        logger
-                            .debug( "Deleting value:{} from application scope: {} ", uniqueValue,
applicationScope );
-                        uniqueCleanupBatch.mergeShallow(
-                            uniqueValueSerializationStrategy.delete( applicationScope,uniqueValue
));
-
-                    }
-                }
-
-                try {
-                    uniqueCleanupBatch.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to execute batch mutation", e );
-                }
-
-
 
                 //TODO Refactor this logic into a a class that can be invoked from anywhere
-//                //iterate all unique values
-//                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup
=
-//                    Observable.create( new ObservableIterator<UniqueValue>( "Unique
value load" ) {
-//                        @Override
-//                        protected Iterator<UniqueValue> getIterator() {
-//                            return uniqueValueSerializationStrategy.getAllUniqueFields(
applicationScope, entityId );
-//                        }
-//                    } )
-//
-//                        //skip  versions > the specified version
-//                        //TODO: does this emit for every version before the staticComparator?
-//                        .skipWhile( uniqueValue -> {
-//
-//                            logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion
);
-//                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
-//                            //TODO: should this be equals? That way we clean up the one
marked as well
-//                            return UUIDComparator.staticCompare( entityVersion,uniqueValueVersion
) > 0;
-//                        } )
-//
-//                            //buffer our buffer size, then roll them all up in a single
batch mutation
-//                        .buffer( serializationFig.getBufferSize() )
-//
-//                            //roll them up
-//                        .doOnNext( uniqueValues -> {
-//                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-//
-//
-//                            for ( UniqueValue value : uniqueValues ) {
-//                                logger
-//                                    .debug( "Deleting value:{} from application scope:
{} ", value, applicationScope );
-//                                uniqueCleanupBatch
-//                                    .mergeShallow( uniqueValueSerializationStrategy.delete(
applicationScope, value ) );
-//                            }
-//
-//                            try {
-//                                uniqueCleanupBatch.execute();
-//                            }
-//                            catch ( ConnectionException e ) {
-//                                throw new RuntimeException( "Unable to execute batch mutation",
e );
-//                            }
-//                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent
);
+                //iterate all unique values
+                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup
=
+                    Observable.create( new ObservableIterator<UniqueValue>( "Unique
value load" ) {
+                        @Override
+                        protected Iterator<UniqueValue> getIterator() {
+                            return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope,
entityId );
+                        }
+                    } )
+
+                        //skip  versions > the specified version
+                        //TODO: does this emit for every version before the staticComparator?
+                        .skipWhile( uniqueValue -> {
+
+                            logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion
);
+                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+                            //TODO: should this be equals? That way we clean up the one marked
as well
+                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion
) > 0;
+                        } )
+
+                            //buffer our buffer size, then roll them all up in a single batch
mutation
+                        .buffer( serializationFig.getBufferSize() )
+
+                            //roll them up
+
+                        .doOnNext( uniqueValues -> {
+                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                            for ( UniqueValue value : uniqueValues ) {
+                                logger
+                                    .debug( "Deleting value:{} from application scope: {}
", value, applicationScope );
+                                uniqueCleanupBatch
+                                    .mergeShallow( uniqueValueSerializationStrategy.delete(
applicationScope, value ) );
+                            }
+
+                            try {
+                                uniqueCleanupBatch.execute();
+                            }
+                            catch ( ConnectionException e ) {
+                                throw new RuntimeException( "Unable to execute batch mutation",
e );
+                            }
+                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent
);
+
+                return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer );
             } );
 
-        return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
+
+
+        return outputObservable;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 0e0e033..9ce65e9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -68,7 +68,7 @@ public interface ApplicationEntityIndex {
      * @param offset The offset to page the query on.
      * @return
      */
-    CandidateResults getAllEntityVersionBeforeMark(final Id entityId, final UUID markedVersion
,final int limit, final int offset);
+    CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID
markedVersion );
 
     /**
      * delete all application records

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index ad47348..99e5525 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -228,7 +228,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex
{
 
 
     @Override
-    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID
markedVersion) {
+    public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final
UUID markedVersion ) {
 
         Preconditions.checkNotNull( entityId, "entityId cannot be null" );
         //TODO: check to see if there is some version verifcation. I know there is but i
forget where.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 79fc14a..00ddc17 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.time.StopWatch;
 
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
@@ -385,13 +384,13 @@ public class EntityIndexTest extends BaseIT {
         Entity entity = EntityIndexMapUtils.fromMap( entityMap );
         EntityUtils.setId( entity, new SimpleId( "fastcar" ) );
         EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
-        entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID()));
+        entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID()
) );
 
         entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last();
         ei.refreshAsync().toBlocking().first();
 
         CandidateResults candidateResults = entityIndex
-            .search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name
contains 'Ferrari*'", 10, 0 );
+            .search( searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name
contains 'Ferrari*'", 10, 0 );
         assertEquals( 1, candidateResults.size() );
 
         EntityIndexBatch batch = entityIndex.createBatch();
@@ -444,7 +443,8 @@ public class EntityIndexTest extends BaseIT {
         ei.refreshAsync().toBlocking().first();
 
         CandidateResults candidateResults = entityIndex
-            .getAllEntityVersionBeforeMark( entity[versionToSearchFor].getId(), entity[versionToSearchFor].getVersion());
+            .getAllEntityVersionsBeforeMarkedVersion( entity[versionToSearchFor].getId(),
+                entity[versionToSearchFor].getVersion() );
         assertEquals( 501, candidateResults.size() );
     }
 


Mime
View raw message