usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [1/4] incubator-usergrid git commit: WIP overwrite
Date Wed, 15 Apr 2015 23:22:50 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-541 [created] 85b47ee30


WIP overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0ece8473
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0ece8473
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0ece8473

Branch: refs/heads/USERGRID-541
Commit: 0ece847325186957a4d139b7bcb3e50da903da90
Parents: 8bb8a4f
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Apr 15 14:21:29 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Apr 15 14:21:29 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 125 +++++++--------
 .../corepersistence/CpRelationManager.java      |  70 +++++----
 .../corepersistence/IndexQueueService.java      |  45 ++++++
 .../usergrid/corepersistence/IndexService.java  |  50 ++++++
 .../corepersistence/IndexServiceImpl.java       | 156 +++++++++++++++++++
 .../corepersistence/util/CpNamingUtils.java     |   4 +-
 .../graph/serialization/EdgesObservable.java    |  23 +++
 .../serialization/impl/EdgesObservableImpl.java |  68 +++++---
 .../persistence/index/EntityIndexBatch.java     |   3 +-
 9 files changed, 424 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2e7b7d8..d374e2f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -724,14 +724,14 @@ public class CpEntityManager implements EntityManager {
 
     @Override
     public void updateApplication( Map<String, Object> properties ) throws Exception
{
-        this.updateProperties(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
properties);
+        this.updateProperties( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId
), properties );
         this.application = get( applicationId, Application.class );
     }
 
 
     @Override
     public RelationManager getRelationManager( EntityRef entityRef ) {
-        Preconditions.checkNotNull(entityRef, "entityRef cannot be null");
+        Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
         CpRelationManager rmi = CpRelationManagerFactory.get(
             this, emf, applicationId, entityRef, null, metricsFactory
         );
@@ -819,7 +819,7 @@ public class CpEntityManager implements EntityManager {
     @Override
     public EntityRef getAlias( String aliasType, String alias ) throws Exception {
 
-        return getAlias(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), aliasType,
alias);
+        return getAlias( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), aliasType,
alias );
     }
 
 
@@ -848,7 +848,7 @@ public class CpEntityManager implements EntityManager {
                     new Object[] { ownerRef, collectionType, aliasValue } );
         }
 
-        return results.get(aliasValue);
+        return results.get( aliasValue );
     }
 
 
@@ -1226,7 +1226,7 @@ public class CpEntityManager implements EntityManager {
 
         ApplicationCF dictionaryCf = null;
 
-        boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary(entity.getType(),
dictionaryName);
+        boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary( entity.getType(),
dictionaryName );
 
         if ( entityHasDictionary ) {
             dictionaryCf = ENTITY_DICTIONARIES;
@@ -1279,7 +1279,7 @@ public class CpEntityManager implements EntityManager {
 
         Class<?> dictionaryCoType =
                 Schema.getDefaultSchema().getDictionaryValueType(entity.getType(), dictionaryName);
-        boolean coTypeIsBasic = ClassUtils.isBasicType(dictionaryCoType);
+        boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
 
         ByteBuffer[] columnNames = new ByteBuffer[elementNames.length];
         for ( int i = 0; i < elementNames.length; i++ ) {
@@ -1349,14 +1349,14 @@ public class CpEntityManager implements EntityManager {
     @Override
     public boolean isCollectionMember( EntityRef owner, String collectionName, EntityRef
entity ) throws Exception {
 
-        return getRelationManager( owner ).isCollectionMember(collectionName, entity);
+        return getRelationManager( owner ).isCollectionMember( collectionName, entity );
     }
 
 
     @Override
     public boolean isConnectionMember( EntityRef owner, String connectionName, EntityRef
entity ) throws Exception {
 
-        return getRelationManager( owner ).isConnectionMember(connectionName, entity);
+        return getRelationManager( owner ).isConnectionMember( connectionName, entity );
     }
 
 
@@ -1455,7 +1455,7 @@ public class CpEntityManager implements EntityManager {
     public ConnectionRef createConnection( EntityRef connectingEntity, String connectionType,
                                            EntityRef connectedEntityRef ) throws Exception
{
 
-        return getRelationManager( connectingEntity ).createConnection(connectionType, connectedEntityRef);
+        return getRelationManager( connectingEntity ).createConnection( connectionType, connectedEntityRef
);
     }
 
 
@@ -1465,7 +1465,7 @@ public class CpEntityManager implements EntityManager {
             throws Exception {
 
         return getRelationManager( connectingEntity )
-                .createConnection(pairedConnectionType, pairedEntity, connectionType, connectedEntityRef);
+                .createConnection( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef
);
     }
 
 
@@ -1523,7 +1523,7 @@ public class CpEntityManager implements EntityManager {
             String connectedEntityType, Level resultsLevel ) throws Exception {
 
         return getRelationManager( entityRef )
-                .getConnectedEntities(connectionType, connectedEntityType, resultsLevel);
+                .getConnectedEntities( connectionType, connectedEntityType, resultsLevel
);
     }
 
 
@@ -1532,7 +1532,7 @@ public class CpEntityManager implements EntityManager {
             String connectedEntityType, Level resultsLevel ) throws Exception {
 
         return getRelationManager( entityRef )
-                .getConnectingEntities(connectionType, connectedEntityType, resultsLevel);
+                .getConnectingEntities( connectionType, connectedEntityType, resultsLevel
);
     }
 
 
@@ -1540,21 +1540,21 @@ public class CpEntityManager implements EntityManager {
     public Results getConnectingEntities( EntityRef entityRef, String connectionType,
             String entityType, Level level, int count ) throws Exception {
 
-        return getRelationManager( entityRef ).getConnectingEntities(connectionType, entityType,
level, count);
+        return getRelationManager( entityRef ).getConnectingEntities( connectionType, entityType,
level, count );
     }
 
 
     @Override
     public Results searchConnectedEntities( EntityRef connectingEntity, Query query ) throws
Exception {
 
-        return getRelationManager( connectingEntity ).searchConnectedEntities(query);
+        return getRelationManager( connectingEntity ).searchConnectedEntities( query );
     }
 
 
     @Override
     public Set<String> getConnectionIndexes( EntityRef entity, String connectionType
) throws Exception {
 
-        return getRelationManager( entity ).getConnectionIndexes(connectionType);
+        return getRelationManager( entity ).getConnectionIndexes( connectionType );
     }
 
 
@@ -1799,8 +1799,8 @@ public class CpEntityManager implements EntityManager {
         permission = permission.toLowerCase();
         long timestamp = cass.createTimestamp();
         Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId
), be );
-        CassandraPersistenceUtils.addInsertToMutator(batch, ApplicationCF.ENTITY_DICTIONARIES,
-            getRolePermissionsKey(groupId, roleName), permission, ByteBuffer.allocate(0),
timestamp);
+        CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
+            getRolePermissionsKey( groupId, roleName ), permission, ByteBuffer.allocate(
0 ), timestamp );
 
         //Adding graphite metrics
         Timer.Context timeGroupRolePermission = entGrantGroupPermissionTimer.time();
@@ -1816,7 +1816,7 @@ public class CpEntityManager implements EntityManager {
         long timestamp = cass.createTimestamp();
         Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId
), be );
         CassandraPersistenceUtils.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
-                getRolePermissionsKey( groupId, roleName ), permission, timestamp );
+            getRolePermissionsKey( groupId, roleName ), permission, timestamp );
         //Adding graphite metrics
         Timer.Context timeRevokeGroupRolePermission = entRevokeGroupPermissionTimer.time();
         CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -2281,7 +2281,7 @@ public class CpEntityManager implements EntityManager {
     private MapManager getMapManagerForTypes() {
         Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION );
 
-        final MapScope ms = CpNamingUtils.getEntityTypeMapScope(mapOwner);
+        final MapScope ms = CpNamingUtils.getEntityTypeMapScope( mapOwner );
 
         MapManager mm = managerCache.getMapManager(ms);
 
@@ -2836,26 +2836,25 @@ public class CpEntityManager implements EntityManager {
 
         CpWalker walker = new CpWalker( );
 
-        walker.walkCollections(
-            this, getApplication(), collectionName, reverse, new CpVisitor() {
+        walker.walkCollections( this, getApplication(), collectionName, reverse, new CpVisitor()
{
 
-            @Override
-            public void visitCollectionEntry( EntityManager em, String collName, Entity entity
) {
+                @Override
+                public void visitCollectionEntry( EntityManager em, String collName, Entity
entity ) {
 
-                try {
-                    em.update( entity );
-                    po.onProgress( entity );
-                }
-                catch ( WriteOptimisticVerifyException wo ) {
-                    // swallow this, it just means this was already updated, which accomplishes
our task
-                    logger.warn( "Someone beat us to updating entity {} in collection {}.
 Ignoring.",
-                        entity.getName(), collName );
-                }
-                catch ( Exception ex ) {
-                    logger.error( "Error repersisting entity", ex );
+                    try {
+                        em.update( entity );
+                        po.onProgress( entity );
+                    }
+                    catch ( WriteOptimisticVerifyException wo ) {
+                        // swallow this, it just means this was already updated, which accomplishes
our task
+                        logger.warn( "Someone beat us to updating entity {} in collection
{}.  Ignoring.",
+                            entity.getName(), collName );
+                    }
+                    catch ( Exception ex ) {
+                        logger.error( "Error repersisting entity", ex );
+                    }
                 }
-            }
-        } );
+            } );
     }
 
 
@@ -2890,32 +2889,34 @@ public class CpEntityManager implements EntityManager {
 
     void indexEntityIntoCollection( final Edge edge,  final  org.apache.usergrid.persistence.model.entity.Entity
collectionMember) {
 
-        final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope()
);
-        final EntityIndexBatch batch = aie.createBatch();
-
-        // index member into entity collection | type scope
-        final IndexEdge collectionIndexScope = generateScopeFromSource( edge );
-        batch.index( collectionIndexScope, collectionMember );
-
-        //TODO REMOVE INDEX CODE
-        //        // index member into entity | all-types scope
-        //        IndexScope entityAllTypesScope = new IndexScopeImpl(
-        //                collectionEntity.getId(),
-        //                CpNamingUtils.ALL_TYPES, entityType );
-        //
-        //        batch.index(entityAllTypesScope, memberEntity);
-        //
-        //        // index member into application | all-types scope
-        //        IndexScope appAllTypesScope = new IndexScopeImpl(
-        //                getApplicationScope().getApplication(),
-        //                CpNamingUtils.ALL_TYPES, entityType );
-        //
-        //        batch.index(appAllTypesScope, memberEntity);
-
-        //Adding graphite metrics
-        Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time();
-        batch.execute();
-        timeIndexEntityCollection.stop();
+        throw new UnsupportedOperationException( "Use the new interface!" );
+//
+//        final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope()
);
+//        final EntityIndexBatch batch = aie.createBatch();
+//
+//        // index member into entity collection | type scope
+//        final IndexEdge collectionIndexScope = generateScopeFromSource( edge );
+//        batch.index( collectionIndexScope, collectionMember );
+//
+//        //TODO REMOVE INDEX CODE
+//        //        // index member into entity | all-types scope
+//        //        IndexScope entityAllTypesScope = new IndexScopeImpl(
+//        //                collectionEntity.getId(),
+//        //                CpNamingUtils.ALL_TYPES, entityType );
+//        //
+//        //        batch.index(entityAllTypesScope, memberEntity);
+//        //
+//        //        // index member into application | all-types scope
+//        //        IndexScope appAllTypesScope = new IndexScopeImpl(
+//        //                getApplicationScope().getApplication(),
+//        //                CpNamingUtils.ALL_TYPES, entityType );
+//        //
+//        //        batch.index(appAllTypesScope, memberEntity);
+//
+//        //Adding graphite metrics
+//        Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time();
+//        batch.execute();
+//        timeIndexEntityCollection.stop();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 1ae1db1..3ddfb07 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -260,40 +260,42 @@ public class CpRelationManager implements RelationManager {
         final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
 
 
-        final GraphManager gm = managerCache.getGraphManager( applicationScope );
-
-        // loop through all types of edge to target
-
-
-        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
-
-        final EntityIndexBatch entityIndexBatch = ei.createBatch();
-
-        final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(),
null, null ) )
-
-            // for each edge type, emit all the edges of that type
-            .flatMap( etype -> gm.loadEdgesToTarget(
-                new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, null ) ) )
-
-                //for each edge we receive index and add to the batch
-            .doOnNext( edge -> {
-                // reindex the entity in the source entity's collection or connection index
-
-                IndexEdge indexScope = generateScopeFromSource( edge );
-
-                entityIndexBatch.index( indexScope, cpEntity );
-
-            } ).doOnCompleted( () -> {
-                    Timer.Context timeElasticIndexBatch = updateCollectionTimer.time();
-                    entityIndexBatch.execute();
-                    timeElasticIndexBatch.stop();
-              } ).count().toBlocking().lastOrDefault( 0 );
-
-        //Adding graphite metrics
-
-
-        logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count
);
+        throw new UnsupportedOperationException( "Use the new interface" );
+
+//        final GraphManager gm = managerCache.getGraphManager( applicationScope );
+//
+//        // loop through all types of edge to target
+//
+//
+//        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope
);
+//
+//        final EntityIndexBatch entityIndexBatch = ei.createBatch();
+//
+//        final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(),
null, null ) )
+//
+//            // for each edge type, emit all the edges of that type
+//            .flatMap( etype -> gm.loadEdgesToTarget(
+//                new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
+//                    SearchByEdgeType.Order.DESCENDING, null ) ) )
+//
+//                //for each edge we receive index and add to the batch
+//            .doOnNext( edge -> {
+//                // reindex the entity in the source entity's collection or connection index
+//
+//                IndexEdge indexScope = generateScopeFromSource( edge );
+//
+//                entityIndexBatch.index( indexScope, cpEntity );
+//
+//            } ).doOnCompleted( () -> {
+//                    Timer.Context timeElasticIndexBatch = updateCollectionTimer.time();
+//                    entityIndexBatch.execute();
+//                    timeElasticIndexBatch.stop();
+//              } ).count().toBlocking().lastOrDefault( 0 );
+//
+//        //Adding graphite metrics
+//
+//
+//        logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes",
count );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
new file mode 100644
index 0000000..30bb20d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Low level queue service for indexing entities
+ */
+public interface IndexQueueService {
+
+
+    /**
+     * Queue an entity to be index asynchronously
+     * @param applicationScope
+     * @param entityId
+     * @param version
+     */
+    void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final
UUID version );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
new file mode 100644
index 0000000..84b59e0
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Our low level indexing service operations
+ */
+public interface IndexService {
+
+
+    /**
+     *  Perform an index update of the entity's state from Cassandra
+     *
+     * @param applicationScope The scope of the entity
+     * @param entity The entity
+     *
+     * @return An observable with the count of every
+     */
+    Observable<Integer> indexEntity( final ApplicationScope applicationScope, final
Entity entity );
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
new file mode 100644
index 0000000..005deb2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.impl.IndexIdentifierImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.apache.usergrid.utils.InflectionUtils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.functions.Func1;
+import rx.observables.MathObservable;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+
+
+/**
+ * Implementation of the indexing service
+ */
+@Singleton
+public class IndexServiceImpl implements IndexService {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+    private final EdgesObservable edgesObservable;
+    private final SerializationFig serializationFig;
+
+
+    @Inject
+    public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory
entityIndexFactory,
+                             final EdgesObservable edgesObservable, final SerializationFig
serializationFig ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+        this.edgesObservable = edgesObservable;
+        this.serializationFig = serializationFig;
+    }
+
+
+    @Override
+    public Observable<Integer> indexEntity( final ApplicationScope applicationScope,
final Entity entity ) {
+
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+
+        // loop through all types of edge to target
+
+
+        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex(
applicationScope );
+
+
+        //get all the source edges for an entity
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entity.getId()
);
+
+
+        final Id entityId = entity.getId();
+
+        final Observable<IndexEdge> targetIndexEdges = edgesToTarget.map( edge ->
generateScopeToTarget( edge ) );
+
+
+
+        //we might or might not need to index from target-> source
+
+
+        final Observable<IndexEdge>
+            targetSizes = getIndexEdgesToTarget( gm, ei, entity );
+
+
+        final Observable<IndexIdentifierImpl.IndexOperationMessage> observable = Observable.merge(
targetIndexEdges,
+            targetSizes ).buffer( serializationFig.getBufferSize() ).flatMap( buffer ->
+            Observable.from(buffer).collect( () -> ei.createBatch(), ( batch, indexEdge
) -> batch.index( indexEdge, entity ) ).flatMap( batch -> Observable.from( batch.execute()
) );
+
+
+
+
+
+        final Observable<IndexIdentifierImpl.IndexOperationMessage> futures = Observable.merge()
+        return MathObservable.sumInteger( sourceSizes );
+    }
+
+
+    /**
+     * Get index edgs to the target
+     * @param graphManager
+     * @param ei The application entity index
+     * @param entity The entity
+     * @return
+     */
+    private Observable<IndexEdge> getIndexEdgesToTarget(
+        final GraphManager graphManager, final ApplicationEntityIndex ei, final Entity entity
 ) {
+
+        final Id entityId = entity.getId();
+        final String collectionName = InflectionUtils.pluralize( entityId.getType() );
+
+
+        final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE,
collectionName );
+
+        //nothing to do
+        if ( collection == null ) {
+            return Observable.empty();
+        }
+
+
+        final String linkedCollection = collection.getLinkedCollection();
+
+        /**
+         * Nothing to link
+         */
+        if ( linkedCollection == null ) {
+            return Observable.empty();
+        }
+
+
+        /**
+         * An observable of sizes as we execute batches
+         */
+       return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection
).map( edge -> generateScopeFromSource( edge ) );
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 0376780..c60b86e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -157,7 +157,7 @@ public class CpNamingUtils {
      */
     public static SearchEdge createCollectionSearchEdge( final Id sourceId, final String
connectionType ) {
         return new SearchEdgeImpl( sourceId, getEdgeTypeFromCollectionName( connectionType
),
-            SearchEdge.NodeType.SOURCE );
+            SearchEdge.NodeType.TARGET );
     }
 
 
@@ -181,7 +181,7 @@ public class CpNamingUtils {
      */
     public static SearchEdge createConnectionSearchEdge( final Id sourceId, final String
connectionType ) {
         return new SearchEdgeImpl( sourceId, getEdgeTypeFromConnectionType( connectionType
),
-            SearchEdge.NodeType.SOURCE );
+            SearchEdge.NodeType.TARGET );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 2206953..fa73991 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -28,6 +28,29 @@ import rx.Observable;
  * Get all edges from source
  */
 public interface EdgesObservable {
+
+    /**
+     * Return an observable of all edges from a source
+     * @param gm
+     * @param sourceNode
+     * @return
+     */
     Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode);
+
+    /**
+     * Get all edges from the source node with the target type
+     * @param gm
+     * @param sourceNode
+     * @param targetType
+     * @return
+     */
+    Observable<Edge> getEdgesFromSource(final GraphManager gm, final Id sourceNode,
final String targetType );
+
+    /**
+     * Return an observable of all edges to a target
+     * @param gm
+     * @param targetNode
+     * @return
+     */
     Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 371cf1d..2264cbd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -19,62 +19,88 @@
  */
 package org.apache.usergrid.persistence.graph.serialization.impl;
 
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import rx.Observable;
 import rx.functions.Func1;
 
+
 /**
  * Emits the edges that are edges from the specified source node
  */
 public class EdgesObservableImpl implements EdgesObservable {
 
-    private static final Logger logger = LoggerFactory.getLogger(EdgesObservableImpl.class);
-    public EdgesObservableImpl(){
+    private static final Logger logger = LoggerFactory.getLogger( EdgesObservableImpl.class
);
+
+
+    public EdgesObservableImpl() {
 
     }
 
+
     /**
      * Get all edges from the source
      */
     @Override
-    public  Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType(
sourceNode, null, null ) );
+    public Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode
) {
+        final Observable<String> edgeTypes =
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null )
);
 
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
+        return edgeTypes.flatMap(  edgeType -> {
 
                 logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode
);
 
-                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType,
Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, null ) );
-            }
+                return gm.loadEdgesFromSource(
+                    new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        null ) );
         } );
     }
+
+
+    @Override
+    public Observable<Edge> getEdgesFromSource( final GraphManager gm, final Id sourceNode,
final String targetType ) {
+
+        final Observable<String> edgeTypes =
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null )
);
+
+
+        return edgeTypes.flatMap( edgeType -> {
+
+            logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
+
+            return gm.loadEdgesFromSourceByType(
+                new SimpleSearchByIdType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    targetType, null ) );
+        } );
+    }
+
+
     /**
      * Get all edges from the source
      */
     @Override
-    public  Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode)
{
-        Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
targetNode, null, null ) );
+    public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode
) {
+        final Observable<String> edgeTypes =
+            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
 
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
+        return edgeTypes.flatMap( edgeType -> {
 
-                logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
+            logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode );
 
-                return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType,
Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, null ) );
-            }
+            return gm.loadEdgesToTarget(
+                new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    null ) );
         } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0ece8473/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index da05d39..de28680 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index;/*
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.IndexIdentifierImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -65,7 +66,7 @@ public interface EntityIndexBatch {
      * Execute the batch
      * @return future to guarantee execution
      */
-    BetterFuture execute();
+    BetterFuture<IndexIdentifierImpl.IndexOperationMessage> execute();
 
     /**
      * Get the number of operations in the batch


Mime
View raw message