usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [2/2] git commit: Change query result building logic to discard stale CandidateResults in all cases, and to do repair by reindexing each stale candidate found.
Date Thu, 09 Oct 2014 17:04:39 GMT
Change query result building logic to discard stale CandidateResults in all cases, and to do
repair by reindexing each stale candidate found.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/81d4e0ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/81d4e0ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/81d4e0ea

Branch: refs/heads/two-dot-o
Commit: 81d4e0ea24d7f18e60718fefc33086ced5f5900c
Parents: 159e5fd
Author: Dave Johnson <dmjohnson@apigee.com>
Authored: Thu Oct 9 11:33:53 2014 -0400
Committer: Dave Johnson <dmjohnson@apigee.com>
Committed: Thu Oct 9 11:33:53 2014 -0400

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 179 +++++++++++--------
 .../corepersistence/StaleIndexCleanupTest.java  |  43 ++++-
 2 files changed, 136 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 5f595f4..bcfe215 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -1542,118 +1542,143 @@ public class CpRelationManager implements RelationManager {
     }
     
 
+    /**
+     * Build results from a set of candidates, and discard those that represent stale indexes.
+     * 
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     * @param collName Name of collection or null if querying all types
+     */
     private Results buildResults(Query query, CandidateResults crs, String collName ) {
 
         logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
 
         Results results = null;
 
-        if ( query.getLevel().equals( Level.IDS )) {
+        EntityIndex index = managerCache.getEntityIndex(applicationScope);
+        EntityIndexBatch indexBatch = index.createBatch();
 
-            // TODO: add stale entity logic here
-            
-            // TODO: replace this with List<Id> someday
-            List<UUID> ids = new ArrayList<UUID>();
-            Iterator<CandidateResult> iter = crs.iterator();
-            while ( iter.hasNext() ) {
-                ids.add( iter.next().getId().getUuid() );
+
+        // map of the latest versions, we will discard stale indexes
+        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
+
+        Iterator<CandidateResult> iter = crs.iterator();
+        while ( iter.hasNext() ) {
+
+            CandidateResult cr = iter.next();
+
+            CollectionScope collScope = new CollectionScopeImpl( 
+                applicationScope.getApplication(), 
+                applicationScope.getApplication(), 
+                CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType()
));
+
+            EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
+
+            UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Getting version for entity {} from scope\n   app {}\n   owner
{}\n   name {}", 
+                new Object[] { 
+                    cr.getId(),
+                    collScope.getApplication(), 
+                    collScope.getOwner(), 
+                    collScope.getName() 
+                });
+            }
+
+            if ( latestVersion == null ) {
+                logger.error("Version for Entity {}:{} not found", 
+                        cr.getId().getType(), cr.getId().getUuid());
+                continue;
             }
-            results = Results.fromIdList( ids );
 
-        } else if ( query.getLevel().equals( Level.REFS )) {
+            if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
+                logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest
v:{}", 
+                    new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
+                        cr.getVersion(), latestVersion});
 
-            // TODO: add stale entity logic here
-            
-            if ( crs.size() == 1 ) {
-                CandidateResult cr = crs.iterator().next();
-                results = Results.fromRef( 
-                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+                IndexScope indexScope = new IndexScopeImpl(
+                    cpHeadEntity.getId(),
+                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType()
));
+                indexBatch.deindex( indexScope, cr);
+
+                continue;
+            }
+
+            CandidateResult alreadySeen = latestVersions.get( cr.getId() ); 
+
+            if ( alreadySeen == null ) { // never seen it, so add to map
+                latestVersions.put( cr.getId(), cr );
 
             } else {
+                // we seen this id before, only add entity if we now have newer version
+                if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
 
-                List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-                Iterator<CandidateResult> iter = crs.iterator();
-                while ( iter.hasNext() ) {
-                    Id id = iter.next().getId();
-                    entityRefs.add( new SimpleEntityRef( id.getType(), id.getUuid() ));
-                } 
-                results = Results.fromRefList(entityRefs);
+                    latestVersions.put( cr.getId(), cr);
+
+                    IndexScope indexScope = new IndexScopeImpl(
+                        cpHeadEntity.getId(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType()
));
+                    indexBatch.deindex( indexScope, alreadySeen);
+                }
             }
+        }
 
-        } else {
+        indexBatch.execute();
 
-            // first, build map of latest versions of entities
-            Map<Id, org.apache.usergrid.persistence.model.entity.Entity> latestVersions
= 
-                new LinkedHashMap<Id, org.apache.usergrid.persistence.model.entity.Entity>();
+        if (query.getLevel().equals(Level.IDS)) {
 
-            Iterator<CandidateResult> iter = crs.iterator();
-            while ( iter.hasNext() ) {
+            List<UUID> ids = new ArrayList<UUID>();
+            for ( Id id : latestVersions.keySet() ) {
+                CandidateResult cr = latestVersions.get(id);
+                ids.add( cr.getId().getUuid() );
+            }
+            results = Results.fromIdList(ids);
+
+        } else if (query.getLevel().equals(Level.REFS)) {
+
+            List<EntityRef> refs = new ArrayList<EntityRef>();
+            for ( Id id : latestVersions.keySet() ) {
+                CandidateResult cr = latestVersions.get(id);
+                refs.add( new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+            }
+            results = Results.fromRefList( refs );
 
-                CandidateResult cr = iter.next();
+        } else {
+
+            List<Entity> entities = new ArrayList<Entity>();
+            for (Id id : latestVersions.keySet()) {
+
+                CandidateResult cr = latestVersions.get(id);
 
                 CollectionScope collScope = new CollectionScopeImpl( 
                     applicationScope.getApplication(), 
                     applicationScope.getApplication(), 
                     CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType()
));
-                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Loading entity {} from scope\n   app {}\n   owner {}\n
  name {}", 
-                    new Object[] { 
-                        cr.getId(),
-                        collScope.getApplication(), 
-                        collScope.getOwner(), 
-                        collScope.getName() 
-                    });
-                }
+                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-                org.apache.usergrid.persistence.model.entity.Entity e =
-                    ecm.load( cr.getId() ).toBlockingObservable().last();
+                org.apache.usergrid.persistence.model.entity.Entity e = 
+                        ecm.load( cr.getId() ).toBlocking().lastOrDefault(null);
 
                 if ( e == null ) {
-                    logger.error("Entity {}:{} not found", cr.getId().getType(), cr.getId().getUuid());
+                    logger.error("Entity {}:{} not found", 
+                            cr.getId().getType(), cr.getId().getUuid());
                     continue;
                 }
 
-                if ( cr.getVersion().compareTo( e.getVersion()) < 0 )  {
-                    logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest
v:{}", 
-                        new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
-                            cr.getVersion(), e.getVersion()});
-                    continue;
-                }
-
-                org.apache.usergrid.persistence.model.entity.Entity alreadySeen = 
-                    latestVersions.get( e.getId() ); 
-                if ( alreadySeen == null ) { // never seen it, so add to map
-                    latestVersions.put( e.getId(), e);
-
-                } else {
-                    // we seen this id before, only add entity if newer version
-                    if ( e.getVersion().compareTo( alreadySeen.getVersion() ) > 0 ) {
-                        latestVersions.put( e.getId(), e);
-                    }
-                }
-            }
-
-            // now build collection of old-school entities
-            List<Entity> entities = new ArrayList<Entity>();
-            for ( Id id : latestVersions.keySet() ) {
-
-                org.apache.usergrid.persistence.model.entity.Entity e =
-                    latestVersions.get( id );
-
                 Entity entity = EntityFactory.newEntity(
-                    e.getId().getUuid(), e.getField("type").getValue().toString() );
+                        e.getId().getUuid(), e.getField("type").getValue().toString());
 
-                Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
-                entity.addProperties( entityMap ); 
-                entities.add( entity );
+                Map<String, Object> entityMap = CpEntityMapUtils.toMap(e);
+                entity.addProperties(entityMap);
+                entities.add(entity);
             }
 
-            if ( entities.size() == 1 ) {
-                results = Results.fromEntity( entities.get(0));
+            if (entities.size() == 1) {
+                results = Results.fromEntity(entities.get(0));
             } else {
-                results = Results.fromEntities( entities );
+                results = Results.fromEntities(entities);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5fc9af3..c5d5782 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -53,6 +54,9 @@ import org.slf4j.LoggerFactory;
 public class StaleIndexCleanupTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class
);
 
+    private static final long writeDelayMs = 80;
+    //private static final long readDelayMs = 7;
+
 
     @Test
     public void testUpdateVersioning() throws Exception {
@@ -92,45 +96,66 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         logger.info("Started testStaleIndexCleanup()");
 
-        final EntityManager em = app.getEntityManager();
+        // TODO: turn off post processing stuff that cleans up stale entities 
 
-        final List<Entity> things = new ArrayList<Entity>();
+        final EntityManager em = app.getEntityManager();
 
-        int numEntities = 1;
-        int numUpdates = 3;
+        int numEntities = 100;
+        int numUpdates = 10;
 
-        // create 100 entities
+        // create lots of entities
+        final List<Entity> things = new ArrayList<Entity>();
         for ( int i=0; i<numEntities; i++) {
             final String thingName = "thing" + i;
             things.add( em.create("thing", new HashMap<String, Object>() {{
                 put("name", thingName);
             }}));
+            Thread.sleep( writeDelayMs );
         }
         em.refreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "select *");
         Assert.assertEquals( numEntities, crs.size() );
 
-        // update each one 10 times
+        // update each one a bunch of times
+        int count = 0;
         for ( Entity thing : things ) {
 
             for ( int j=0; j<numUpdates; j++) {
+
                 Entity toUpdate = em.get( thing.getUuid() );
                 thing.setProperty( "property"  + j, RandomStringUtils.randomAlphanumeric(10));
                 em.update(toUpdate);
+
+                Thread.sleep( writeDelayMs );
                 em.refreshIndex();
+                count++;
+
+                if ( count % 100 == 0 ) {
+                    logger.info("Updated {} of {} times", count, numEntities * numUpdates);
+                }
             }
         }
 
-        // new query for total number of result candidates = 1000
+        // query Core Persistence directly for total number of result candidates
+        // should be entities X updates because of stale indexes 
         crs = queryCollectionCp("things", "select *");
         Assert.assertEquals( numEntities * numUpdates, crs.size() );
 
-        // query for results, should be 100 (and it triggers background clean up of stale
indexes)
+        // query EntityManager for results
+        // should return 100 becuase it filters out the stale entities
+        Query q = Query.fromQL("select *");
+        q.setLimit( 10000 );
+        Results results = em.searchCollection( em.getApplicationRef(), "things", q);
+        assertEquals( numEntities, results.size() );
 
+        // EntityManager should have kicked off a batch cleanup of those stale indexes
         // wait a second for batch cleanup to complete
+        Thread.sleep(600);
 
-        // query for total number of result candidates = 1000
+        // query for total number of result candidates = 100
+        crs = queryCollectionCp("things", "select *");
+        Assert.assertEquals( numEntities, crs.size() );
     }
 
 


Mime
View raw message