usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [01/50] [abbrv] incubator-usergrid git commit: Refactored generation of collection scope to be re-used with CpNamingUtils for consistency
Date Wed, 03 Dec 2014 19:59:11 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/no-source-in-es [created] c3fc1515b
  refs/heads/two-dot-o c2bc921db -> 6210092f0
  refs/heads/two-dot-o-events 32276938d -> 532a02cb3


Refactored generation of collection scope to be re-used with CpNamingUtils for consistency


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ed79642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ed79642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ed79642

Branch: refs/heads/two-dot-o-events
Commit: 9ed796423f15a8ef33137b113f6e04c578994b21
Parents: 68f4e0f
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Nov 20 14:15:24 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Nov 20 14:23:38 2014 -0700

----------------------------------------------------------------------
 .../migration/EntityDataMigration.java          | 139 +++++++++++++++++
 .../migration/EntityTypeMappingMigration.java   |  28 ++--
 .../migration/GraphShardVersionMigration.java   | 104 +++++++------
 .../rx/AllEntitiesInSystemObservable.java       |  27 ++--
 .../corepersistence/StaleIndexCleanupTest.java  |   5 +-
 .../migration/EntityTypeMappingMigrationIT.java |  74 +++++-----
 .../migration/GraphShardVersionMigrationIT.java | 148 +++++++++++--------
 .../rx/AllEntitiesInSystemObservableIT.java     |  23 ++-
 8 files changed, 364 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
new file mode 100644
index 0000000..79d31a8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.functions.Action1;
+
+
+/**
+ * Migration for migrating graph edges to the new Shards
+ */
+public class EntityDataMigration implements DataMigration {
+
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
+
+
+    private final MvccEntitySerializationStrategy v1Serialization;
+    private final MvccEntitySerializationStrategy v2Serialization;
+
+    private final ManagerCache managerCache;
+    private final Keyspace keyspace;
+
+
+    @Inject
+    public EntityDataMigration( @PreviousImpl final MvccEntitySerializationStrategy v1Serialization,
+                                @CurrentImpl final MvccEntitySerializationStrategy v2Serialization,
+                                final ManagerCache managerCache, final Keyspace keyspace ) {
+        this.v1Serialization = v1Serialization;
+        this.v2Serialization = v2Serialization;
+        this.managerCache = managerCache;
+        this.keyspace = keyspace;
+    }
+
+
+    @Override
+    public void migrate( final ProgressObserver observer ) throws Throwable {
+
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
+                new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+
+
+                    @Override
+                    public void call(
+                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+
+
+                        final UUID now = UUIDGenerator.newTimeUUID();
+
+                        final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
+
+
+                        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+                        for ( Id entityId : applicationEntityGroup.entityIds ) {
+
+                            CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
+                                    appScopeId, entityId.getType() );
+
+
+                            Iterator<MvccEntity> allVersions =
+                                    v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+
+                            while ( allVersions.hasNext() ) {
+                                final MvccEntity version = allVersions.next();
+
+                                final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+
+                                totalBatch.mergeShallow( versionBatch );
+
+                                if ( totalBatch.getRowCount() >= 50 ) {
+                                    try {
+                                        totalBatch.execute();
+                                    }
+                                    catch ( ConnectionException e ) {
+                                        throw new DataMigrationException( "Unable to migrate batches ", e );
+                                    }
+                                }
+                            }
+                        }
+
+                        try {
+                            totalBatch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new DataMigrationException( "Unable to migrate batches ", e );
+                        }
+                    }
+                } ).toBlocking().last();
+    }
+
+
+    @Override
+    public int getVersion() {
+        return Versions.VERSION_3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 1adfe73..8089dfd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -29,13 +29,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 
@@ -47,9 +44,6 @@ import rx.functions.Action1;
  */
 public class EntityTypeMappingMigration implements DataMigration {
 
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityTypeMappingMigration.class );
-
     private final ManagerCache managerCache;
 
 
@@ -65,25 +59,27 @@ public class EntityTypeMappingMigration implements DataMigration {
 
         final AtomicLong atomicLong = new AtomicLong();
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
 
 
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entityData ) {
+                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
 
-                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( entityData.applicationScope.getApplication() );
+                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() );
 
 
                                              final MapManager mapManager = managerCache.getMapManager( ms );
 
-                                             final UUID entityUuid = entityData.entityId.getUuid();
-                                             final String entityType = entityData.entityId.getType();
+                                             for(Id entityId: applicationEntityGroup.entityIds) {
+                                                 final UUID entityUuid = entityId.getUuid();
+                                                 final String entityType = entityId.getType();
 
-                                             mapManager.putString( entityUuid.toString(), entityType );
+                                                 mapManager.putString( entityUuid.toString(), entityType );
 
-                                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
-                                                 updateStatus( atomicLong, observer );
+                                                 if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+                                                     updateStatus( atomicLong, observer );
+                                                 }
                                              }
                                          }
                                      } ).toBlocking().lastOrDefault( null );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index ac4cd58..3b92570 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
@@ -64,8 +65,7 @@ public class GraphShardVersionMigration implements DataMigration {
 
     @Inject
     public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization,
-                                       final ManagerCache managerCache, final
-    Keyspace keyspace ) {
+                                       final ManagerCache managerCache, final Keyspace keyspace ) {
         this.v2Serialization = v2Serialization;
         this.managerCache = managerCache;
         this.keyspace = keyspace;
@@ -77,51 +77,71 @@ public class GraphShardVersionMigration implements DataMigration {
 
         final AtomicLong counter = new AtomicLong();
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache).flatMap(
-                new Func1<AllEntitiesInSystemObservable.EntityData, Observable<List<Edge>>>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).flatMap(
+                new Func1<AllEntitiesInSystemObservable.ApplicationEntityGroup, Observable<List<Edge>>>() {
 
 
                     @Override
-                    public Observable<List<Edge>> call( final AllEntitiesInSystemObservable.EntityData entityData ) {
-                        logger.info( "Migrating edges from node {} in scope {}", entityData.entityId,
-                                entityData.applicationScope );
-
-                        final GraphManager gm = managerCache.getGraphManager( entityData.applicationScope );
-
-                        //get each edge from this node as a source
-                        return EdgesFromSourceObservable.edgesFromSource( gm, entityData.entityId )
-
-                                //for each edge, re-index it in v2  every 1000 edges or less
-                                .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
-                                    @Override
-                                    public void call( final List<Edge> edges ) {
-
-                                        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                        for ( final Edge edge : edges ) {
-                                            logger.info( "Migrating meta for edge {}", edge );
-                                            final MutationBatch edgeBatch =
-                                                    v2Serialization.writeEdge( entityData.applicationScope, edge );
-                                            batch.mergeShallow( edgeBatch );
-                                        }
-
-                                        try {
-                                            batch.execute();
-                                        }
-                                        catch ( ConnectionException e ) {
-                                            throw new RuntimeException( "Unable to perform migration", e );
-                                        }
-
-                                        //update the observer so the admin can see it
-                                        final long newCount = counter.addAndGet( edges.size() );
-
-                                        observer.update( getVersion(), String.format("Currently running.  Rewritten %d edge types", newCount) );
-
-
-                                    }
-                                } );
+                    public Observable<List<Edge>> call(
+                            final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+
+                        //emit a stream of all ids from this group
+                        return Observable.from( applicationEntityGroup.entityIds )
+                                         .flatMap( new Func1<Id, Observable<List<Edge>>>() {
+
+
+                                             //for each id in the group, get it's edges
+                                             @Override
+                                             public Observable<List<Edge>> call( final Id id ) {
+                                                 logger.info( "Migrating edges from node {} in scope {}", id,
+                                                         applicationEntityGroup.applicationScope );
+
+                                                 final GraphManager gm = managerCache
+                                                         .getGraphManager( applicationEntityGroup.applicationScope );
+
+                                                 //get each edge from this node as a source
+                                                 return EdgesFromSourceObservable.edgesFromSource( gm, id )
+
+                                                         //for each edge, re-index it in v2  every 1000 edges or less
+                                                         .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
+                                                             @Override
+                                                             public void call( final List<Edge> edges ) {
+
+                                                                 final MutationBatch batch =
+                                                                         keyspace.prepareMutationBatch();
+
+                                                                 for ( final Edge edge : edges ) {
+                                                                     logger.info( "Migrating meta for edge {}", edge );
+                                                                     final MutationBatch edgeBatch = v2Serialization
+                                                                             .writeEdge(
+                                                                                     applicationEntityGroup
+                                                                                             .applicationScope,
+                                                                                     edge );
+                                                                     batch.mergeShallow( edgeBatch );
+                                                                 }
+
+                                                                 try {
+                                                                     batch.execute();
+                                                                 }
+                                                                 catch ( ConnectionException e ) {
+                                                                     throw new RuntimeException(
+                                                                             "Unable to perform migration", e );
+                                                                 }
+
+                                                                 //update the observer so the admin can see it
+                                                                 final long newCount =
+                                                                         counter.addAndGet( edges.size() );
+
+                                                                 observer.update( getVersion(), String.format(
+                                                                         "Currently running.  Rewritten %d edge types",
+                                                                         newCount ) );
+                                                             }
+                                                         } );
+                                             }
+                                         } );
                     }
                 } ).toBlocking().lastOrDefault( null );
+        ;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
index 291bbe9..771b81f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.rx;
 
 
+import java.util.List;
+
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -40,14 +42,17 @@ public class AllEntitiesInSystemObservable {
 
     /**
      * Return an observable that emits all entities in the system.
+     * @param managerCache the managerCache to use
+     * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup.  Note that if we exceed the buffer size
+     * you may be more than 1 ApplicationEntityGroup with the same application and different ids
      */
-    public static Observable<EntityData> getAllEntitiesInSystem( final ManagerCache managerCache) {
+    public static Observable<ApplicationEntityGroup> getAllEntitiesInSystem( final ManagerCache managerCache, final int bufferSize) {
         //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
         return ApplicationObservable.getAllApplicationIds( managerCache )
 
-                                    .flatMap( new Func1<Id, Observable<EntityData>>() {
+                                    .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
                                         @Override
-                                        public Observable<EntityData> call( final Id applicationId ) {
+                                        public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
 
                                             //set up our application scope and graph manager
                                             final ApplicationScope applicationScope = new ApplicationScopeImpl(
@@ -68,11 +73,11 @@ public class AllEntitiesInSystemObservable {
 
                                             //merge both the specified application node and the entity node
                                             // so they all get used
-                                            return Observable.merge( applicationNode, entityNodes )
-                                                             .map( new Func1<Id, EntityData>() {
+                                            return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
+                                                             .map( new Func1<List<Id>, ApplicationEntityGroup>() {
                                                                  @Override
-                                                                 public EntityData call( final Id id ) {
-                                                                     return new EntityData( applicationScope, id );
+                                                                 public ApplicationEntityGroup call( final List<Id> id ) {
+                                                                     return new ApplicationEntityGroup( applicationScope, id );
                                                                  }
                                                              } );
                                         }
@@ -83,14 +88,14 @@ public class AllEntitiesInSystemObservable {
     /**
      * Get the entity data.  Immutable bean for fast access
      */
-    public static final class EntityData {
+    public static final class ApplicationEntityGroup {
         public final ApplicationScope applicationScope;
-        public final Id entityId;
+        public final List<Id> entityIds;
 
 
-        public EntityData( final ApplicationScope applicationScope, final Id entityId ) {
+        public ApplicationEntityGroup( final ApplicationScope applicationScope, final List<Id> entityIds ) {
             this.applicationScope = applicationScope;
-            this.entityId = entityId;
+            this.entityIds = entityIds;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index fa9f9df..9d0c9e6 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.fasterxml.uuid.UUIDComparator;
 
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -220,9 +221,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         EntityManager em = app.getEntityManager();
 
-        CollectionScope cs = new CollectionScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
-                CpNamingUtils.getCollectionScopeNameFromEntityType( eref.getType() ) );
+        CollectionScope cs = getCollectionScopeNameFromEntityType(  new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
 
         EntityCollectionManagerFactory ecmf = CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index dafdb00..1f0665a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -46,7 +45,6 @@ import rx.functions.Action1;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 
@@ -75,7 +73,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
     @Test
     public void testIdMapping() throws Throwable {
 
-        assertEquals("version 1 expected", 1, entityTypeMappingMigration.getVersion());
+        assertEquals( "version 1 expected", 1, entityTypeMappingMigration.getVersion() );
 
         final EntityManager newAppEm = app.getEntityManager();
 
@@ -87,7 +85,6 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
 
 
-
         final Set<Id> allEntities = new HashSet<>();
         allEntities.addAll( type1Identities );
         allEntities.addAll( type2Identities );
@@ -106,38 +103,45 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         entityTypeMappingMigration.migrate( progressObserver );
 
 
-
-
-
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
                                              //ensure that each one has a type
-                                             try {
-
-                                                 final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
-                                                 final Entity returned = em.get( entity.entityId.getUuid() );
-
-                                                 //we seem to occasionally get phantom edges.  If this is the case we'll store the type _> uuid mapping, but we won't have anything to load
-                                                if(returned != null) {
-                                                    assertEquals( entity.entityId.getUuid(), returned.getUuid() );
-                                                    assertEquals( entity.entityId.getType(), returned.getType() );
-                                                }
-                                                else {
-                                                    final String type = managerCache.getMapManager( CpNamingUtils.getEntityTypeMapScope(
-                                                            entity.applicationScope.getApplication() ) )
-                                                            .getString( entity.entityId.getUuid().toString() );
-
-                                                    assertEquals(entity.entityId.getType(), type);
-                                                }
-                                             }
-                                             catch ( Exception e ) {
-                                                 throw new RuntimeException( "Unable to get entity " + entity.entityId
-                                                         + " by UUID, migration failed", e );
-                                             }
 
-                                             allEntities.remove( entity.entityId );
+                                             final EntityManager em = emf.getEntityManager(
+                                                     entity.applicationScope.getApplication().getUuid() );
+
+                                             for ( final Id id : entity.entityIds ) {
+                                                 try {
+                                                     final Entity returned = em.get( id.getUuid() );
+
+                                                     //we seem to occasionally get phantom edges.  If this is the
+                                                     // case we'll store the type _> uuid mapping, but we won't have
+                                                     // anything to load
+
+                                                     if ( returned != null ) {
+                                                         assertEquals( id.getUuid(), returned.getUuid() );
+                                                         assertEquals( id.getType(), returned.getType() );
+                                                     }
+                                                     else {
+                                                         final String type = managerCache.getMapManager( CpNamingUtils
+                                                                 .getEntityTypeMapScope(
+                                                                         entity.applicationScope.getApplication() ) )
+                                                                                         .getString( id.getUuid()
+                                                                                                       .toString() );
+
+                                                         assertEquals( id.getType(), type );
+                                                     }
+                                                 }
+                                                 catch ( Exception e ) {
+                                                     throw new RuntimeException( "Unable to get entity " + id
+                                                             + " by UUID, migration failed", e );
+                                                 }
+
+                                                 allEntities.remove( id );
+                                             }
                                          }
                                      } ).toBlocking().lastOrDefault( null );
 
@@ -145,9 +149,5 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
         assertEquals( "Every element should have been encountered", 0, allEntities.size() );
         assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
         assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index aab47a0..88c02cd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -32,12 +32,9 @@ import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -114,40 +111,41 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
         //read everything in previous version format and put it into our types.
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                         public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
 
-                                             /**
-                                              * Get our edge types from the source
-                                              */
-                                             gm.getEdgeTypesFromSource(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       sourceTypes.put( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
-
-
-                                             /**
-                                              * Get the edge types to the target
-                                              */
-                                             gm.getEdgeTypesToTarget(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       targetTypes.put( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
-
-                                             allEntities.remove( entity.entityId );
+                                             for(final Id id: entity.entityIds) {
+                                                 /**
+                                                  * Get our edge types from the source
+                                                  */
+                                                 gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( id, null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           sourceTypes.put( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+
+
+                                                 /**
+                                                  * Get the edge types to the target
+                                                  */
+                                                 gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
+                                                         null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           targetTypes.put( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+
+                                                 allEntities.remove( id );
+                                             }
                                          }
                                      } ).toBlocking().lastOrDefault( null );
 
@@ -169,42 +167,66 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
 
 
         //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
-                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
                                          @Override
-                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                         public void call(
+                                                 final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                                              final GraphManager gm =
                                                      managerCache.getGraphManager( entity.applicationScope );
 
-                                             /**
-                                              * Get our edge types from the source
-                                              */
-                                             gm.getEdgeTypesFromSource(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       sourceTypes.remove( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
-
-
-                                             /**
-                                              * Get the edge types to the target
-                                              */
-                                             gm.getEdgeTypesToTarget(
-                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
-                                               .doOnNext( new Action1<String>() {
-                                                   @Override
-                                                   public void call( final String s ) {
-                                                       targetTypes.remove( entity.entityId, s );
-                                                   }
-                                               } ).toBlocking().lastOrDefault( null );
+                                             for ( final Id id : entity.entityIds ) {
+                                                 /**
+                                                  * Get our edge types from the source
+                                                  */
+                                                 gm.getEdgeTypesFromSource(
+                                                         new SimpleSearchEdgeType( id, null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           sourceTypes.remove( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+
+
+                                                 /**
+                                                  * Get the edge types to the target
+                                                  */
+                                                 gm.getEdgeTypesToTarget(
+                                                         new SimpleSearchEdgeType( id, null, null ) )
+                                                   .doOnNext( new Action1<String>() {
+                                                       @Override
+                                                       public void call( final String s ) {
+                                                           targetTypes.remove( id, s );
+                                                       }
+                                                   } ).toBlocking().lastOrDefault( null );
+                                             }
+                                             }
                                          }
-                                     } ).toBlocking().lastOrDefault( null );
 
-        assertEquals( "All source types migrated", 0, sourceTypes.size() );
-        assertEquals( "All target types migrated", 0, targetTypes.size() );
+
+                                         ).
+
+
+                                         toBlocking()
+
+
+                                         .
+
+
+                                         lastOrDefault( null );
+
+
+                                         assertEquals( "All source types migrated",0,sourceTypes.size( )
+
+
+                                         );
+
+
+                                         assertEquals( "All target types migrated",0,targetTypes.size( )
+
+
+                                         );
+                                     }
     }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index 423dc1f..4d1c6c9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.corepersistence.rx;
 
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -33,13 +32,11 @@ import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import rx.functions.Action1;
 
@@ -95,26 +92,28 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
             @Override
-            public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+            public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
 
                 assertNotNull(entity);
                 assertNotNull(entity.applicationScope);
-                assertNotNull(entity.entityId);
+                assertNotNull(entity.entityIds);
 
                 //not from our test, don't check it
                 if(!applicationId.equals( entity.applicationScope.getApplication() )){
                     return;
                 }
 
+                for(Id id: entity.entityIds) {
 
-                //we should only emit each node once
-                if ( entity.entityId.getType().equals( type1 ) ) {
-                    assertTrue( "Element should be present on removal", type1Identities.remove( entity.entityId ) );
-                }
-                else if ( entity.entityId.getType().equals( type2 ) ) {
-                    assertTrue( "Element should be present on removal", type2Identities.remove( entity.entityId ) );
+                    //we should only emit each node once
+                    if ( id.getType().equals( type1 ) ) {
+                        assertTrue( "Element should be present on removal", type1Identities.remove( id ) );
+                    }
+                    else if ( id.getType().equals( type2 ) ) {
+                        assertTrue( "Element should be present on removal", type2Identities.remove( id ) );
+                    }
                 }
             }
         } ).toBlocking().lastOrDefault( null );


Mime
View raw message