usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [07/16] incubator-usergrid git commit: Added factory for RX I/O Scheduler.
Date Tue, 21 Apr 2015 15:55:16 GMT
Added factory for RX I/O Scheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9a1b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9a1b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9a1b02

Branch: refs/heads/USERGRID-578
Commit: 9b9a1b02f877ee6f35d7c3a7ba74001a17c237f1
Parents: 48d8060
Author: Todd Nine <tnine@apigee.com>
Authored: Mon Apr 20 08:01:12 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon Apr 20 10:36:06 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   5 +-
 .../corepersistence/CpEntityManager.java        |   9 +-
 .../corepersistence/CpRelationManager.java      |  57 +-------
 .../index/AsyncIndexProvider.java               |  15 +--
 .../index/AsyncIndexService.java                |   5 +-
 .../index/InMemoryAsyncIndexService.java        |  34 ++---
 .../corepersistence/index/IndexServiceImpl.java |   9 +-
 .../index/SQSAsyncIndexService.java             |   4 +-
 .../EntityCollectionManagerFactoryImpl.java     |  15 ++-
 .../impl/EntityCollectionManagerImpl.java       |  38 +++---
 .../mvcc/stage/write/RollbackAction.java        |   2 -
 .../persistence/core/guice/CommonModule.java    |  12 ++
 .../persistence/core/rx/RxSchedulerFig.java     |  60 +++++++++
 .../persistence/core/rx/RxTaskScheduler.java    |  40 ++++++
 .../core/rx/RxTaskSchedulerImpl.java            | 129 +++++++++++++++++++
 .../index/impl/IndexRefreshCommandImpl.java     |  14 +-
 .../persistence/index/impl/IndexingUtils.java   |   6 +-
 .../persistence/index/usergrid-mappings.json    |   4 +-
 18 files changed, 314 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 4758456..6ebff53 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,9 @@ import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugi
 import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
@@ -144,7 +147,7 @@ public class CoreModule  extends AbstractModule {
          *****/
 
 
-        bind(IndexService.class).to(IndexServiceImpl.class);
+        bind(IndexService.class).to( IndexServiceImpl.class );
         //bind the queue provider
 
         bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 844892a..6c3989d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -620,9 +620,8 @@ public class CpEntityManager implements EntityManager {
         }
 
         // update in all containing collections and connection indexes
-        CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity );
-        rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
-        timer.stop();
+
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
     }
 
 
@@ -1067,9 +1066,7 @@ public class CpEntityManager implements EntityManager {
 
         //Adding graphite metrics
 
-        // update in all containing collections and connection indexes
-        CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef );
-        rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index db9816e..be8605e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -255,47 +255,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    public void updateContainingCollectionAndCollectionIndexes(
-        final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
-
-
-        throw new UnsupportedOperationException( "Use the new interface" );
-
-//        final GraphManager gm = managerCache.getGraphManager( applicationScope );
-//
-//        // loop through all types of edge to target
-//
-//
-//        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope
);
-//
-//        final EntityIndexBatch entityIndexBatch = ei.createBatch();
-//
-//        final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(),
null, null ) )
-//
-//            // for each edge type, emit all the edges of that type
-//            .flatMap( etype -> gm.loadEdgesToTarget(
-//                new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
-//                    SearchByEdgeType.Order.DESCENDING, null ) ) )
-//
-//                //for each edge we receive index and add to the batch
-//            .doOnNext( edge -> {
-//                // reindex the entity in the source entity's collection or connection index
-//
-//                IndexEdge indexScope = generateScopeFromSource( edge );
-//
-//                entityIndexBatch.index( indexScope, cpEntity );
-//
-//            } ).doOnCompleted( () -> {
-//                    Timer.Context timeElasticIndexBatch = updateCollectionTimer.time();
-//                    entityIndexBatch.execute();
-//                    timeElasticIndexBatch.stop();
-//              } ).count().toBlocking().lastOrDefault( 0 );
-//
-//        //Adding graphite metrics
-//
-//
-//        logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes",
count );
-    }
 
 
     @Override
@@ -473,8 +432,6 @@ public class CpRelationManager implements RelationManager {
         gm.writeEdge( edge ).toBlocking().last();
 
 
-        //This is broken and needs fixed updateContainingCollectionAndCollectionIndexes See
USERGRID-541
-
 
         //perform indexing
 
@@ -482,7 +439,7 @@ public class CpRelationManager implements RelationManager {
             logger.debug( "Wrote edge {}", edge );
         }
 
-        indexService.queueEntityIndexUpdate( applicationScope, memberEntity.getId(), memberEntity.getVersion()
);
+        indexService.queueEntityIndexUpdate( applicationScope, memberEntity);
 
 
         if ( logger.isDebugEnabled() ) {
@@ -490,17 +447,7 @@ public class CpRelationManager implements RelationManager {
                 itemRef.getUuid().toString(), itemRef.getType(), collName
             } );
         }
-        //        logger.debug("With head entity scope is {}:{}:{}", new Object[] {
-        //            headEntityScope.getApplication().toString(),
-        //            headEntityScope.getOwner().toString(),
-        //            headEntityScope.getName()});
-
-        if ( connectBack && collection != null && collection.getLinkedCollection()
!= null ) {
-            throw new UnsupportedOperationException( "Implement me directly in graph " );
-//            getRelationManager( itemEntity )
-//                .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity,
false );
-//            getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(),
headEntity, false );
-        }
+
 
         return itemEntity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
index 8257c94..77b4990 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
@@ -37,24 +37,23 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService>
{
 
     private final QueryFig queryFig;
 
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final QueueManagerFactory queueManagerFactory;
     private final MetricsFactory metricsFactory;
     private final IndexService indexService;
+    private final RxTaskScheduler rxTaskScheduler;
 
     private AsyncIndexService asyncIndexService;
 
 
     @Inject
-    public AsyncIndexProvider( final QueryFig queryFig,
-                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                               final QueueManagerFactory queueManagerFactory, final MetricsFactory
metricsFactory,
-                               final IndexService indexService ) {
+    public AsyncIndexProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory,
final
+    MetricsFactory metricsFactory,
+                               final IndexService indexService, final RxTaskScheduler rxTaskScheduler
) {
         this.queryFig = queryFig;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
         this.indexService = indexService;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -77,7 +76,7 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService>
{
 
         switch ( impl ) {
             case LOCAL:
-                return new InMemoryAsyncIndexService( indexService, entityCollectionManagerFactory
);
+                return new InMemoryAsyncIndexService( indexService, rxTaskScheduler );
             case SQS:
                 return new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory
);
             default:

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
index d1f2fb6..06310ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java
@@ -40,8 +40,7 @@ public interface AsyncIndexService {
      * We will return a distributed future.  For SQS impls, this will return immediately,
and the result will not be available.
      * After SQS is removed, the tests should be enhanced to ensure that we're processing
our queues correctly.
      * @param applicationScope
-     * @param entityId
-     * @param version
+     * @param entity The entity to index
      */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId,
final UUID version );
+    void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
index fdc9c65..3e2a271 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java
@@ -20,56 +20,40 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 
 
 @Singleton
 public class InMemoryAsyncIndexService implements AsyncIndexService {
 
     private final IndexService indexService;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     @Inject
-    public InMemoryAsyncIndexService( final IndexService indexService,
-                                      final EntityCollectionManagerFactory entityCollectionManagerFactory
) {this.indexService = indexService;
+    public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler
rxTaskScheduler ) {
+        this.indexService = indexService;
 
 
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
     @Override
-    public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id
entityId,
-                                        final UUID version ) {
-
-        final IndexEntityEvent event = new IndexEntityEvent( applicationScope, entityId,
version );
+    public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity
toIndex ) {
 
         //process the entity immediately
         //only process the same version, otherwise ignore
 
-        getEntity( applicationScope, entityId).filter( entity-> version.equals(entity.hasVersion()
)).doOnNext( entity -> {
-           indexService.indexEntity( applicationScope, entity );
-        } ).subscribeOn( Schedulers.io() ).subscribe();
-
-
-    }
-
-    private Observable<Entity> getEntity( final ApplicationScope applicationScope,
final Id entityId){
-
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(
applicationScope );
-        return ecm.load( entityId );
+        Observable.just( toIndex ).doOnNext( entity -> {
+            indexService.indexEntity( applicationScope, entity );
+        } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 2a7533a..873e2b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -83,8 +83,8 @@ public class IndexServiceImpl implements IndexService {
         //we always index in the target scope
         final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId
);
 
-        //we may have to index
-        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge ->
generateScopeToTarget( edge ) );
+        //we may have to index  we're indexing from source->target here
+        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge ->
generateScopeFromSource( edge ) );
 
 
         //we might or might not need to index from target-> source
@@ -141,8 +141,9 @@ public class IndexServiceImpl implements IndexService {
 
         /**
          * An observable of sizes as we execute batches
+         *
+         * we're indexing from target->source here
          */
-        return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection
)
-                              .map( edge -> generateScopeFromSource( edge ) );
+        return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection
).map( edge -> generateScopeToTarget( edge ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
index cad20bd..dfcb97a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -256,8 +257,7 @@ public class SQSAsyncIndexService implements AsyncIndexService {
 
 
     @Override
-    public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id
entityId,
-                                        final UUID version ) {
+    public void queueEntityIndexUpdate( final ApplicationScope applicationScope,  final Entity
entity) {
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 9191c06..761c4b5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerial
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
 
@@ -74,6 +75,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final TaskExecutor taskExecutor;
     private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
+    private final RxTaskScheduler rxTaskScheduler;
 
     private LoadingCache<ApplicationScope, EntityCollectionManager> ecmCache =
         CacheBuilder.newBuilder().maximumSize( 1000 )
@@ -84,7 +86,8 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                 writeStart, writeVerifyUnique,
                                 writeOptimisticVerify, writeCommit, rollback, markStart,
markCommit,
                                 entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory,
taskExecutor, scope, metricsFactory );
+                                mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory,
taskExecutor, scope, metricsFactory,
+                                rxTaskScheduler );
 
 
                             final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig,
target  );
@@ -95,8 +98,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
 
 
     @Inject
-    public EntityCollectionManagerFactoryImpl( final WriteStart writeStart,
-                                               final WriteUniqueVerify writeVerifyUnique,
+    public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final WriteUniqueVerify
writeVerifyUnique,
                                                final WriteOptimisticVerify writeOptimisticVerify,
                                                final WriteCommit writeCommit, final RollbackAction
rollback,
                                                final MarkStart markStart, final MarkCommit
markCommit,
@@ -105,9 +107,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace,
                                                final EntityVersionTaskFactory entityVersionTaskFactory,
-                                               @CollectionTaskExecutor final TaskExecutor
taskExecutor,
-                                              final EntityCacheFig entityCacheFig,
-                                               MetricsFactory metricsFactory) {
+                                               @CollectionTaskExecutor final TaskExecutor
taskExecutor, final
+                                                   EntityCacheFig entityCacheFig,
+                                               MetricsFactory metricsFactory, final RxTaskScheduler
rxTaskScheduler ) {
 
         this.writeStart = writeStart;
         this.writeVerifyUnique = writeVerifyUnique;
@@ -124,6 +126,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.taskExecutor = taskExecutor;
         this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
     @Override
     public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope)
{

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index f0b070c..6f10e86 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -49,6 +49,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -109,6 +110,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
     private final EntityVersionTaskFactory entityVersionTaskFactory;
     private final TaskExecutor taskExecutor;
 
+    private final RxTaskScheduler rxTaskScheduler;
+
     private final Keyspace keyspace;
     private final Timer writeTimer;
     private final Meter writeMeter;
@@ -125,26 +128,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
 
     @Inject
-    public EntityCollectionManagerImpl(
-        final WriteStart                    writeStart,
-        final WriteUniqueVerify                    writeVerifyUnique,
-        final WriteOptimisticVerify                writeOptimisticVerify,
-        final WriteCommit                          writeCommit,
-        final RollbackAction                       rollback,
-        final MarkStart                            markStart,
-        final MarkCommit                           markCommit,
-        final MvccEntitySerializationStrategy entitySerializationStrategy,
-        final UniqueValueSerializationStrategy     uniqueValueSerializationStrategy,
-        final MvccLogEntrySerializationStrategy    mvccLogEntrySerializationStrategy,
-        final Keyspace                             keyspace,
-        final EntityVersionTaskFactory entityVersionTaskFactory,
-        @CollectionTaskExecutor final TaskExecutor taskExecutor,
-        @Assisted final ApplicationScope applicationScope,
-        final MetricsFactory metricsFactory
-
-    ) {
+    public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify
writeVerifyUnique,
+                                        final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit
+                                                writeCommit,
+                                        final RollbackAction rollback, final MarkStart markStart,
+                                        final MarkCommit markCommit, final MvccEntitySerializationStrategy
entitySerializationStrategy,
+                                        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+                                        final Keyspace keyspace, final EntityVersionTaskFactory
entityVersionTaskFactory,
+                                        @CollectionTaskExecutor final TaskExecutor taskExecutor,
@Assisted final ApplicationScope applicationScope,
+                                        final MetricsFactory metricsFactory,
+
+                                        final RxTaskScheduler rxTaskScheduler ) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        this.rxTaskScheduler = rxTaskScheduler;
 
         ValidationUtils.validateApplicationScope( applicationScope );
 
@@ -453,13 +451,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
                     public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent
) {
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn(
Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn(
rxTaskScheduler.getAsyncIOScheduler() )
                                           .doOnNext( writeVerifyUnique );
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic
=
-                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn(
Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn(
rxTaskScheduler.getAsyncIOScheduler() )
                                           .doOnNext( writeOptimisticVerify );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
index cd15c26..8342e55 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java
@@ -49,7 +49,6 @@ public class RollbackAction implements Action1<Throwable> {
 
     private static final Logger log = LoggerFactory.getLogger( RollbackAction.class );
 
-    private final Scheduler scheduler;
     private final UniqueValueSerializationStrategy uniqueValueStrat;
     private final MvccLogEntrySerializationStrategy logEntryStrat;
 
@@ -58,7 +57,6 @@ public class RollbackAction implements Action1<Throwable> {
     public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat,
                            UniqueValueSerializationStrategy uniqueValueStrat ) {
 
-        scheduler = Schedulers.io();
         this.uniqueValueStrat = uniqueValueStrat;
         this.logEntryStrat = logEntryStrat;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index bc84b6b..f266643 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -36,6 +36,9 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl;
+import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
@@ -86,6 +89,15 @@ public class CommonModule extends AbstractModule {
         //do multibindings for migrations
         //create the empty multibinder so other plugins can use it
          Multibinder.newSetBinder( binder(), MigrationPlugin.class);
+
+
+        /**
+         * RX java scheduler configuration
+         */
+
+        install ( new GuicyFigModule( RxSchedulerFig.class ));
+
+        bind( RxTaskScheduler.class).to( RxTaskSchedulerImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
new file mode 100644
index 0000000..7986132
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.rx;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+public interface RxSchedulerFig extends GuicyFig {
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request before retrying.
 Provides simple
+     * backpressure
+     */
+    public static final String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request before retrying.
 Provides simple
+     * backpressure
+     */
+    public static final String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+
+
+
+
+    @Default( "100" )
+    @Key( IO_SCHEDULER_THREADS )
+    int getMaxIoThreads();
+
+    @Default( "Usergrid-RxIOPool" )
+    @Key(IO_SCHEDULER_NAME)
+    String getIoSchedulerName();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
new file mode 100644
index 0000000..d6cc5e8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.rx;
+
+
+import org.apache.usergrid.persistence.core.task.Task;
+
+import rx.Scheduler;
+
+
+/**
+ * An interface for returning task schedulers
+ */
+public interface RxTaskScheduler {
+
+    /**
+     * Get the scheduler for tasks that perform blocking I/O
+     * @return
+     */
+    Scheduler getAsyncIOScheduler();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
new file mode 100644
index 0000000..219cde6
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.rx;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Scheduler;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * An implementation of the task scheduler that allows us to control the number of I/O threads
+ */
+@Singleton
+public class RxTaskSchedulerImpl implements RxTaskScheduler {
+
+    private static final Logger log = LoggerFactory.getLogger( RxTaskSchedulerImpl.class
);
+
+    private final Scheduler scheduler;
+    private final String poolName;
+
+    @Inject
+    public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){
+
+        this.poolName = schedulerFig.getIoSchedulerName();
+
+        final int poolSize = schedulerFig.getMaxIoThreads();
+
+
+        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
+
+
+        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
+
+
+        this.scheduler = Schedulers.from(threadPool);
+
+
+    }
+
+
+    @Override
+    public Scheduler getAsyncIOScheduler() {
+        return scheduler;
+    }
+
+
+    /**
+     * Create a thread pool that will reject work if our audit tasks become overwhelmed
+     */
+    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize
) {
+
+            super( 1, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory(
),
+                    new RejectedHandler() );
+        }
+    }
+
+
+    /**
+     * Thread factory that will name and count threads for easier debugging
+     */
+    private final class CountingThreadFactory implements ThreadFactory {
+
+        private final AtomicLong threadCounter = new AtomicLong();
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+            final long newValue = threadCounter.incrementAndGet();
+
+            Thread t = new Thread( r, poolName + "-" + newValue );
+
+            t.setDaemon( true );
+
+            return t;
+        }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private final class RejectedHandler implements RejectedExecutionHandler {
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor
) {
+            log.warn( "{} task queue full, rejecting task {}", poolName, r );
+
+            //TODO T.N. do we want to run this on the caller thread?
+
+            throw new RejectedExecutionException( "Unable to run task, queue full" );
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 07150bb..c997e5a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.index.AliasedEntityIndex;
@@ -48,7 +49,6 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 import rx.util.async.Async;
 
 
@@ -64,12 +64,13 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
     private final IndexBufferConsumer producer;
     private final IndexFig indexFig;
     private final Timer timer;
-
+    private final RxTaskScheduler rxTaskScheduler;
 
     @Inject
     public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider,
                                     IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory
metricsFactory,
-                                    final IndexCache indexCache ) {
+                                    final IndexCache indexCache, final RxTaskScheduler rxTaskScheduler
) {
+
 
 
         this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, "index.refresh.timer"
);
@@ -78,6 +79,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
         this.producer = producer;
         this.indexFig = indexFig;
         this.indexCache = indexCache;
+        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
@@ -141,14 +143,14 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand
{
                 logger.error( "Failed during refresh search for " + uuid, ee );
                 throw new RuntimeException( "Failed during refresh search for " + uuid, ee
);
             }
-        }, Schedulers.io() ).call();
+        }, rxTaskScheduler.getAsyncIOScheduler() ).call();
 
 
         return future.doOnNext( found -> {
             if ( !found.hasFinished() ) {
-                logger.error(String.format("Couldn't find record during refresh uuid: {}
took ms:{} ", uuid, found.getExecutionTime()));
+                logger.error("Couldn't find record during refresh uuid: {} took ms:{} ",
uuid, found.getExecutionTime());
             }else{
-                logger.info(String.format("found record during refresh uuid: {} took ms:{}
", uuid, found.getExecutionTime()));
+                logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
             }
         } ).doOnCompleted(() -> {
             //clean up our data

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 7c33084..de5e29d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -54,11 +54,11 @@ public class IndexingUtils {
 
     public static final String ENTITY_TYPE_FIELDNAME = "entityType";
 
-    public static final String EDGE_NODE_ID_FIELDNAME = "edgeNodeId";
+    public static final String EDGE_NODE_ID_FIELDNAME = "nodeId";
 
     public static final String EDGE_NAME_FIELDNAME = "edgeName";
 
-    public static final String EDGE_NODE_TYPE_FIELDNAME = "edgeType";
+    public static final String EDGE_NODE_TYPE_FIELDNAME = "entityNodeType";
 
     public static final String EDGE_TIMESTAMP_FIELDNAME = "edgeTimestamp";
 
@@ -107,8 +107,6 @@ public class IndexingUtils {
         idString( sb, scope.getNodeId() );
         sb.append( FIELD_SEPERATOR );
         sb.append( scope.getEdgeName() );
-        sb.append( FIELD_SEPERATOR );
-        sb.append( scope.getNodeType() );
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
index 4da1902..d4c34cd 100644
--- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
+++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json
@@ -21,7 +21,7 @@
         "index": "not_analyzed",
         "doc_values": true
       },
-      "edgeNodeId": {
+      "nodeId": {
         "type": "string",
         "index": "not_analyzed",
         "doc_values": true
@@ -31,7 +31,7 @@
         "index": "not_analyzed",
         "doc_values": true
       },
-      "edgeType": {
+      "entityNodeType": {
         "type": "string",
         "index": "not_analyzed",
         "doc_values": true


Mime
View raw message