usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [2/4] incubator-usergrid git commit: WIP overwrite
Date Fri, 10 Apr 2015 00:25:49 GMT
WIP overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3d036e1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3d036e1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3d036e1d

Branch: refs/heads/USERGRID-536
Commit: 3d036e1dcb94cfa03bd702250ebfcca962fe3b54
Parents: fe76d71
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Apr 9 12:14:43 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Apr 9 12:14:43 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 1061 ++----------------
 .../corepersistence/util/CpNamingUtils.java     |  230 ++--
 2 files changed, 216 insertions(+), 1075 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3d036e1d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index bb9928a..a3ef1f5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -18,9 +18,7 @@ package org.apache.usergrid.corepersistence;
 
 
 import java.nio.ByteBuffer;
-import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -45,7 +43,8 @@ import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.PagingResultsIterator;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Query.Level;
 import org.apache.usergrid.persistence.RelationManager;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.RoleRef;
@@ -54,22 +53,13 @@ import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.SimpleRoleRef;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.cassandra.IndexUpdate;
-import org.apache.usergrid.persistence.cassandra.QueryProcessorImpl;
-import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
-import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
-import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
-import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
-import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
-import org.apache.usergrid.persistence.geo.EntityLocationRef;
-import org.apache.usergrid.persistence.geo.model.Point;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
@@ -77,57 +67,32 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.query.Identifier;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Query.Level;
+import org.apache.usergrid.persistence.index.query.SortPredicate;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.query.ir.AllNode;
-import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
-import org.apache.usergrid.persistence.query.ir.QueryNode;
-import org.apache.usergrid.persistence.query.ir.QuerySlice;
-import org.apache.usergrid.persistence.query.ir.SearchVisitor;
-import org.apache.usergrid.persistence.query.ir.WithinNode;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory;
-import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator;
-import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
-import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
-import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
-import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
-import org.apache.usergrid.utils.IndexUtils;
 import org.apache.usergrid.utils.MapUtils;
-import org.apache.usergrid.utils.UUIDUtils;
 
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.mutation.Mutator;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
 
-import static java.util.Arrays.asList;
-
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
-import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
@@ -136,28 +101,13 @@ import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
 import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
 import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
 import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
 import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
 import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
 import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.Serializers.be;
 import static org.apache.usergrid.utils.ClassUtils.cast;
-import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
 import static org.apache.usergrid.utils.InflectionUtils.singularize;
 import static org.apache.usergrid.utils.MapUtils.addMapSet;
-import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
 
 
 /**
@@ -327,7 +277,6 @@ public class CpRelationManager implements RelationManager {
                       return gm.loadEdgesToTarget(
                           new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
                               SearchByEdgeType.Order.DESCENDING, null ) );
-
                   }
               } );
 
@@ -399,7 +348,7 @@ public class CpRelationManager implements RelationManager {
 
                         // reindex the entity in the source entity's collection or connection index
 
-                        IndexScope indexScope = generateScopeFromSource(edge);
+                        IndexEdge indexScope = generateScopeFromSource(edge);
 
 
                         entityIndexBatch.index( indexScope, cpEntity );
@@ -483,12 +432,9 @@ public class CpRelationManager implements RelationManager {
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        Observable<Edge> edgesToTarget = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-            targetId,
-            CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
-            System.currentTimeMillis(),
-            SearchByEdgeType.Order.DESCENDING,
-            null ) ); // last
+        Observable<Edge> edgesToTarget = gm.loadEdgesToTarget(
+            new SimpleSearchByEdgeType( targetId, CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
+                System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) ); // last
 
         Iterator<Edge> iterator = edgesToTarget.toBlocking().getIterator();
         int count = 0;
@@ -502,24 +448,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    private boolean moreThanOneOutboundConnection( EntityRef source, String connectionType ) {
-
-        Id sourceId = new SimpleId( source.getUuid(), source.getType() );
-
-        GraphManager gm = managerCache.getGraphManager( applicationScope );
-
-        Observable<Edge> edgesFromSource = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
-            sourceId,
-            CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
-            System.currentTimeMillis(),
-            SearchByEdgeType.Order.DESCENDING,
-            null ) ); // last
-
-        int count = edgesFromSource.take( 2 ).count().toBlocking().last();
-
-        return count > 1;
-    }
-
 
     @Override
     public Set<String> getCollections() throws Exception {
@@ -529,7 +457,7 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager( applicationScope );
 
         Observable<String> str = gm.getEdgeTypesFromSource(
-                new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
+            new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
 
         Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
@@ -542,19 +470,22 @@ public class CpRelationManager implements RelationManager {
 
 
     @Override
-    public Results getCollection( String collectionName,
-            UUID startResult,
-            int count,
-            Level resultsLevel,
-            boolean reversed ) throws Exception {
+    public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
+                                  boolean reversed ) throws Exception {
 
-        Query query = Query.fromQL( "select *" );
-        query.setLimit( count );
-        query.setReversed( reversed );
+
+        final String ql;
 
         if ( startResult != null ) {
-            query.addGreaterThanEqualFilter( "created", startResult.timestamp() );
+            ql = "select * where created > " + startResult.timestamp();
         }
+        else {
+            ql = "select *";
+        }
+
+        Query query = Query.fromQL( ql );
+        query.setLimit( count );
+        query.setReversed( reversed );
 
         return searchCollection( collectionName, query );
     }
@@ -632,26 +563,15 @@ public class CpRelationManager implements RelationManager {
                 } );
         }
 
-        String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName );
-
-        UUID timeStampUuid = memberEntity.getId().getUuid() != null
-                && UUIDUtils.isTimeBased( memberEntity.getId().getUuid() )
-                ?  memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
-
-        long uuidHash = UUIDUtils.getUUIDLong( timeStampUuid );
 
         // create graph edge connection from head entity to member entity
-        Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
+        final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() );
         GraphManager gm = managerCache.getGraphManager( applicationScope );
         gm.writeEdge( edge ).toBlocking().last();
 
 
         if(logger.isDebugEnabled()) {
-            logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] {
-                edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(),
-                memberEntity.getId().getUuid(), applicationScope.getApplication().getType(),
-                applicationScope.getApplication().getUuid()
-            } );
+            logger.debug( "Wrote edge {}", edge );
         }
 
         ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
@@ -775,40 +695,33 @@ public class CpRelationManager implements RelationManager {
         org.apache.usergrid.persistence.model.entity.Entity memberEntity =
             ((CpEntityManager)em).load( entityId );
 
+
+
+        // remove edge from collection to item
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
+
+
+        //run our delete
+        final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() );
+        gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
+
+
+        /**
+         * Remove from the index
+         *
+         */
+
         final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
         final EntityIndexBatch batch = ei.createBatch();
 
         // remove item from collection index
-        IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
+        SearchEdge indexScope = createCollectionSearchEdge( cpHeadEntity.getId(), collName );
 
         batch.deindex( indexScope, memberEntity );
 
-        // remove collection from item index
-        IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(),
-            Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) );
-
-
-        batch.deindex( itemScope, cpHeadEntity );
 
         batch.execute();
 
-        // remove edge from collection to item
-        GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Edge collectionToItemEdge = new SimpleEdge(
-                cpHeadEntity.getId(),
-                CpNamingUtils.getEdgeTypeFromCollectionName( collName ),
-                memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) );
-        gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
-
-        // remove edge from item to collection
-        Edge itemToCollectionEdge = new SimpleEdge(
-                memberEntity.getId(),
-                CpNamingUtils.getEdgeTypeFromCollectionName(
-                        Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ),
-                cpHeadEntity.getId(),
-                UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) );
-
-        gm.deleteEdge( itemToCollectionEdge ).toBlocking().last();
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -884,15 +797,14 @@ public class CpRelationManager implements RelationManager {
                     + "' of " + headEntity.getType() + ":" + headEntity .getUuid() );
         }
 
-        final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
+
+        final SearchEdge searchEdge = createCollectionSearchEdge( cpHeadEntity.getId(), collName );
 
         final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
         final SearchTypes types = SearchTypes.fromTypes( collection.getType() );
 
-        logger.debug( "Searching scope {}:{}",
-
-            indexScope.getOwner().toString(), indexScope.getName() );
+        logger.debug( "Searching scope {}", searchEdge);
 
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
@@ -902,7 +814,7 @@ public class CpRelationManager implements RelationManager {
 
 
         //execute the query and return our next result
-        final QueryExecutor executor = new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, indexScope, types, query );
+        final QueryExecutor executor = new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, searchEdge, types, query );
 
         return executor.next();
     }
@@ -955,9 +867,9 @@ public class CpRelationManager implements RelationManager {
         EntityIndexBatch batch = ei.createBatch();
 
         // Index the new connection in app|source|type context
-        IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType );
+        IndexEdge edgeIndex = generateScopeFromSource( edge );
 
-        batch.index( indexScope, targetEntity );
+        batch.index( edgeIndex, targetEntity );
 
         // Index the new connection in app|scope|all-types context
         //TODO REMOVE INDEX CODE
@@ -965,129 +877,11 @@ public class CpRelationManager implements RelationManager {
 //        batch.index( allTypesIndexScope, targetEntity );
 
 
-        BetterFuture future = batch.execute();
-
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-        batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() );
-        //Added Graphite Metrics
-        Timer.Context timeElasticIndexBatch = createConnectionTimer.time();
-        batchExecute( m, CassandraService.RETRY_COUNT );
-        timeElasticIndexBatch.stop();
-
 
         return connection;
     }
 
 
-    @SuppressWarnings( "unchecked" )
-    public Mutator<ByteBuffer> batchUpdateEntityConnection(
-            Mutator<ByteBuffer> batch,
-            boolean disconnect,
-            ConnectionRefImpl conn,
-            UUID timestampUuid ) throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        Entity connectedEntity = em.get(new SimpleEntityRef(
-                conn.getConnectedEntityType(), conn.getConnectedEntityId() ) );
-
-        if ( connectedEntity == null ) {
-            return batch;
-        }
-
-        // Create connection for requested params
-
-        if ( disconnect ) {
-
-            addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
-                        conn.getConnectionType() ),
-                asList(conn.getConnectedEntityId(), conn.getConnectedEntityType() ), timestamp );
-
-            addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
-                key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-                        conn.getConnectionType() ),
-                asList(conn.getConnectingEntityId(), conn.getConnectingEntityType() ), timestamp );
-
-            // delete the connection path if there will be no connections left
-
-            // check out outbound edges of the given type.  If we have more than the 1 specified,
-            // we shouldn't delete the connection types from our outbound index
-            if ( !moreThanOneOutboundConnection(conn.getConnectingEntity(), conn.getConnectionType() ) ) {
-
-                addDeleteToMutator(batch, ENTITY_DICTIONARIES,
-                        key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
-                        conn.getConnectionType(), timestamp );
-            }
-
-            //check out inbound edges of the given type.  If we have more than the 1 specified,
-            // we shouldn't delete the connection types from our outbound index
-            if ( !moreThanOneInboundConnection(conn.getConnectingEntity(), conn.getConnectionType() ) ) {
-
-                addDeleteToMutator(batch, ENTITY_DICTIONARIES,
-                    key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
-                    conn.getConnectionType(), timestamp );
-        }
-        }
-        else {
-
-            addInsertToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
-                            conn.getConnectionType() ),
-                    asList(conn.getConnectedEntityId(), conn.getConnectedEntityType() ), timestamp,
-                    timestamp );
-
-            addInsertToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
-                    key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
-                            conn.getConnectionType() ),
-                    asList(conn.getConnectingEntityId(), conn.getConnectingEntityType() ), timestamp,
-                    timestamp );
-
-            // Add connection type to connections set
-            addInsertToMutator(batch, ENTITY_DICTIONARIES,
-                    key(conn.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
-                    conn.getConnectionType(), null, timestamp );
-
-            // Add connection type to connections set
-            addInsertToMutator(batch, ENTITY_DICTIONARIES,
-                    key(conn.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
-                    conn.getConnectionType(), null, timestamp );
-        }
-
-        // Add indexes for the connected entity's list properties
-
-        // Get the names of the list properties in the connected entity
-        Set<String> dictionaryNames = em.getDictionaryNames( connectedEntity );
-
-        // For each list property, get the values in the list and
-        // update the index with those values
-
-        Schema schema = getDefaultSchema();
-
-        for ( String dictionaryName : dictionaryNames ) {
-
-            boolean has_dictionary = schema.hasDictionary(
-                    connectedEntity.getType(), dictionaryName );
-
-            boolean dictionary_indexed = schema.isDictionaryIndexedInConnections(
-                    connectedEntity.getType(), dictionaryName );
-
-            if ( dictionary_indexed || !has_dictionary ) {
-                Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
-                for ( Object elementValue : elementValues ) {
-                    IndexUpdate indexUpdate = batchStartIndexUpdate(
-                            batch, connectedEntity, dictionaryName, elementValue,
-                            timestampUuid, has_dictionary, true, disconnect, false );
-                    batchUpdateConnectionIndex(indexUpdate, conn );
-                }
-            }
-        }
-
-        return batch;
-    }
-
-
     @Override
     public ConnectionRef createConnection(
             String pairedConnectionType,
@@ -1138,16 +932,6 @@ public class CpRelationManager implements RelationManager {
     public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
 
         // First, clean up the dictionary records of the connection
-        Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-        batchUpdateEntityConnection(
-                m, true, ( ConnectionRefImpl ) connectionRef, UUIDGenerator.newTimeUUID() );
-
-        //Added Graphite Metrics
-        Timer.Context timeDeleteConnections = cassConnectionDelete.time();
-        batchExecute( m, CassandraService.RETRY_COUNT );
-        timeDeleteConnections.stop();
-
         EntityRef connectingEntityRef = connectionRef.getConnectingEntity();  // source
         EntityRef connectedEntityRef = connectionRef.getConnectedEntity();  // target
 
@@ -1177,30 +961,26 @@ public class CpRelationManager implements RelationManager {
                 targetEntity.getId(),
                 System.currentTimeMillis() );
 
+
+
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.deleteEdge( edge ).toBlocking().last();
 
-        final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
-        final EntityIndexBatch batch = ei.createBatch();
 
-        // Deindex the connection in app|source|type context
-        final Id cpId =  createId( connectingEntityRef );
-        IndexScope indexScope = generateScopeFromConnection( cpId, connectionType );
-        batch.deindex( indexScope, targetEntity );
+        final Id sourceId = new SimpleId( connectingEntityRef.getUuid(),  connectingEntityRef.getType() );
 
-        // Deindex the connection in app|source|type context
-        //TODO REMOVE INDEX CODE
-//        IndexScope allTypesIndexScope = new IndexScopeImpl(
-//            new SimpleId( connectingEntityRef.getUuid(),
-//                connectingEntityRef.getType() ),
-//                CpNamingUtils.ALL_TYPES, entityType );
-//
-//        batch.deindex( allTypesIndexScope, targetEntity );
+        final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
+
+        //delete all the edges
+        final Edge lastEdge = gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking().lastOrDefault(null);
+
+        if(lastEdge != null) {
+            final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
+            final EntityIndexBatch batch = ei.createBatch();
+
+            SearchEdge indexScope = createSearchEdgeFromSource( lastEdge );
+            batch.deindex( indexScope, targetEntity );
+        }
 
-        //Added Graphite Metrics
-        Timer.Context timeDeleteConnection = esDeleteConnectionTimer.time();
-        batch.execute();
-        timeDeleteConnection.stop();
 
     }
 
@@ -1302,21 +1082,15 @@ public class CpRelationManager implements RelationManager {
 
         Preconditions.checkNotNull( connection, "connection must be specified" );
 
-//        if ( query == null ) {
-//            query = new Query();
-//        }
-
         headEntity = em.validate( headEntity );
 
-        final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection );
+        final SearchEdge indexScope = createConnectionSearchEdge( cpHeadEntity.getId(), connection );
 
         final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() );
 
         ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
 
-        logger.debug( "Searching connections from the scope {}:{} with types {}", new Object[] {
-                        indexScope.getOwner().toString(), indexScope.getName(), searchTypes
-                } );
+        logger.debug( "Searching {}", indexScope );
 
         query = adjustQuery( query );
 
@@ -1326,18 +1100,15 @@ public class CpRelationManager implements RelationManager {
         final QueryExecutor executor = new ElasticSearchQueryExecutor(resultsLoaderFactory, ei, applicationScope, indexScope, searchTypes, query);
 
         return executor.next();
-//        CandidateResults crs = ei.search( indexScope, searchTypes, query );
-
-//        return buildConnectionResults( indexScope, query, crs, connection );
     }
 
 
     private Query adjustQuery( Query query ) {
 
         // handle the select by identifier case
-        if ( query.getRootOperand() == null ) {
+        if(query.getQl() == null){
 
-            // a name alias or email alias was specified
+        // a name alias or email alias was specified
             if ( query.containsSingleNameOrEmailIdentifier() ) {
 
                 Identifier ident = query.getSingleIdentifier();
@@ -1346,29 +1117,33 @@ public class CpRelationManager implements RelationManager {
                 // This is fulgy to put here, but required.
                 if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
 
-                    Query newQuery = Query.fromQL( "select * where email='"
-                            + query.getSingleNameOrEmailIdentifier() + "'" );
-                    query.setRootOperand( newQuery.getRootOperand() );
+                    final String  newQuery = "select * where email='"
+                            + query.getSingleNameOrEmailIdentifier() + "'" ;
+
+                    query.setQl( newQuery  );
+
                 }
 
                 // use the ident with the default alias. could be an email
                 else {
 
-                    Query newQuery = Query.fromQL( "select * where name='"
-                            + query.getSingleNameOrEmailIdentifier() + "'" );
-                    query.setRootOperand( newQuery.getRootOperand() );
+                    final String newQuery = "select * where name='"
+                            + query.getSingleNameOrEmailIdentifier() + "'" ;
+                    query.setQl( newQuery );
                 }
             }
             else if ( query.containsSingleUuidIdentifier() ) {
 
-                Query newQuery = Query.fromQL(
-                        "select * where uuid='" + query.getSingleUuidIdentifier() + "'" );
-                query.setRootOperand( newQuery.getRootOperand() );
+                //TODO, this shouldn't even come from ES, it should look up the entity directly
+                final String newQuery =
+                        "select * where uuid='" + query.getSingleUuidIdentifier() + "'" ;
+                query.setQl( newQuery );
             }
         }
 
         if ( query.isReversed() ) {
 
+            SortPredicate
             Query.SortPredicate desc =
                 new Query.SortPredicate( PROPERTY_CREATED, Query.SortDirection.DESCENDING );
 
@@ -1589,673 +1364,7 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    public IndexUpdate batchStartIndexUpdate(
-            Mutator<ByteBuffer> batch, Entity entity, String entryName,
-            Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
-             boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed )
-            throws Exception {
-        return batchStartIndexUpdate( batch, entity, entryName, entryValue, timestampUuid,
-                schemaHasProperty, isMultiValue, removeListEntry, fulltextIndexed, false );
-    }
-
-
-    public IndexUpdate batchStartIndexUpdate(
-        Mutator<ByteBuffer> batch, Entity entity, String entryName,
-        Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
-        boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed,
-            boolean skipRead ) throws Exception {
-
-        long timestamp = getTimestampInMicros( timestampUuid );
-
-        IndexUpdate indexUpdate = new IndexUpdate( batch, entity, entryName, entryValue,
-                schemaHasProperty, isMultiValue, removeListEntry, timestampUuid );
-
-        // entryName = entryName.toLowerCase();
-
-        // entity_id,connection_type,connected_entity_id,prop_name
-
-        if ( !skipRead ) {
-
-            List<HColumn<ByteBuffer, ByteBuffer>> entries = null;
-
-            if ( isMultiValue && validIndexableValue( entryValue ) ) {
-                entries = cass.getColumns(
-                    cass.getApplicationKeyspace( applicationId ),
-                        ENTITY_INDEX_ENTRIES,
-                        entity.getUuid(),
-                        new DynamicComposite(
-                            entryName,
-                            indexValueCode( entryValue ),
-                            toIndexableValue( entryValue ) ),
-                        setGreaterThanEqualityFlag(
-                            new DynamicComposite(
-                                entryName, indexValueCode( entryValue ),
-                                toIndexableValue( entryValue ) ) ),
-                        INDEX_ENTRY_LIST_COUNT,
-                        false );
-            }
-            else {
-                entries = cass.getColumns(
-                    cass.getApplicationKeyspace( applicationId ),
-                    ENTITY_INDEX_ENTRIES,
-                    entity.getUuid(),
-                    new DynamicComposite( entryName ),
-                    setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ),
-                    INDEX_ENTRY_LIST_COUNT,
-                    false );
-            }
-
-            if ( logger.isDebugEnabled() ) {
-                logger.debug( "Found {} previous index entries for {} of entity {}", new Object[] {
-                        entries.size(), entryName, entity.getUuid()
-                } );
-            }
-
-            // Delete all matching entries from entry list
-            for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) {
-                UUID prev_timestamp = null;
-                Object prev_value = null;
-                String prev_obj_path = null;
-
-                // new format:
-                // composite(entryName,
-                // value_code,prev_value,prev_timestamp,prev_obj_path) = null
-                DynamicComposite composite =
-                        DynamicComposite.fromByteBuffer( entry.getName().duplicate() );
-                prev_value = composite.get( 2 );
-                prev_timestamp = ( UUID ) composite.get( 3 );
-                if ( composite.size() > 4 ) {
-                    prev_obj_path = ( String ) composite.get( 4 );
-                }
-
-                if ( prev_value != null ) {
-
-                    String entryPath = entryName;
-                    if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) {
-                        entryPath = entryName + "." + prev_obj_path;
-                    }
-
-                    indexUpdate.addPrevEntry(
-                            entryPath, prev_value, prev_timestamp, entry.getName().duplicate() );
-
-                    // composite(property_value,connected_entity_id,entry_timestamp)
-                    // addDeleteToMutator(batch, ENTITY_INDEX_ENTRIES,
-                    // entity.getUuid(), entry.getName(), timestamp);
-
-                }
-                else {
-                    logger.error( "Unexpected condition - deserialized property value is null" );
-                }
-            }
-        }
-
-        if ( !isMultiValue || ( isMultiValue && !removeListEntry ) ) {
-
-            List<Map.Entry<String, Object>> list =
-                    IndexUtils.getKeyValueList( entryName, entryValue, fulltextIndexed );
-
-            if ( entryName.equalsIgnoreCase( "location" ) && ( entryValue instanceof Map ) ) {
-                @SuppressWarnings( "rawtypes" ) double latitude =
-                        MapUtils.getDoubleValue( ( Map ) entryValue, "latitude" );
-                @SuppressWarnings( "rawtypes" ) double longitude =
-                        MapUtils.getDoubleValue( ( Map ) entryValue, "longitude" );
-                list.add( new AbstractMap.SimpleEntry<String, Object>( "location.coordinates",
-                        latitude + "," + longitude ) );
-            }
-
-            for ( Map.Entry<String, Object> indexEntry : list ) {
-
-                if ( validIndexableValue( indexEntry.getValue() ) ) {
-                    indexUpdate.addNewEntry(
-                            indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) );
-                }
-            }
-
-            if ( isMultiValue ) {
-                addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
-                        asList( entryName,
-                            indexValueCode( entryValue ),
-                            toIndexableValue( entryValue ),
-                            indexUpdate.getTimestampUuid() ),
-                        null, timestamp );
-            }
-            else {
-                // int i = 0;
-
-                for ( Map.Entry<String, Object> indexEntry : list ) {
-
-                    String name = indexEntry.getKey();
-                    if ( name.startsWith( entryName + "." ) ) {
-                        name = name.substring( entryName.length() + 1 );
-                    }
-                    else if ( name.startsWith( entryName ) ) {
-                        name = name.substring( entryName.length() );
-                    }
-
-                    byte code = indexValueCode( indexEntry.getValue() );
-                    Object val = toIndexableValue( indexEntry.getValue() );
-                    addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
-                            asList( entryName, code, val, indexUpdate.getTimestampUuid(), name ),
-                            null, timestamp );
-
-                    indexUpdate.addIndex( indexEntry.getKey() );
-                }
-            }
-
-            indexUpdate.addIndex( entryName );
-        }
-
-        return indexUpdate;
-    }
-
-
-    /**
-     * Batch update backward connections set indexes.
-     *
-     * @param indexUpdate The index to update in the dictionary
-     *
-     * @return The index update
-     *
-     * @throws Exception the exception
-     */
-    public IndexUpdate batchUpdateBackwardConnectionsDictionaryIndexes(
-            IndexUpdate indexUpdate ) throws Exception {
-
-        logger.debug( "batchUpdateBackwardConnectionsListIndexes" );
-
-        boolean entityHasDictionary = getDefaultSchema()
-                .isDictionaryIndexedInConnections(
-                        indexUpdate.getEntity().getType(), indexUpdate.getEntryName() );
-
-        if ( !entityHasDictionary ) {
-            return indexUpdate;
-        }
-
-
-        return doBackwardConnectionsUpdate( indexUpdate );
-    }
-
-
-    /**
-     * Search each reverse connection type in the graph for connections.
-     * If one is found, update the index appropriately
-     *
-     * @param indexUpdate The index update to use
-     *
-     * @return The updated index update
-     */
-    private IndexUpdate doBackwardConnectionsUpdate( IndexUpdate indexUpdate ) throws Exception {
-        final Entity targetEntity = indexUpdate.getEntity();
-
-        logger.debug( "doBackwardConnectionsUpdate" );
-
-        final ConnectionTypesIterator connectionTypes =
-                new ConnectionTypesIterator( cass, applicationId, targetEntity.getUuid(), false, 100 );
-
-        for ( String connectionType : connectionTypes ) {
-
-            PagingResultsIterator itr =
-                    getReversedConnectionsIterator( targetEntity, connectionType );
-
-            for ( Object connection : itr ) {
-
-                final ConnectedEntityRef sourceEntity = ( ConnectedEntityRef ) connection;
-
-                //we need to create a connection ref from the source entity (found via reverse edge)
-                // to the entity we're about to update.  This is the index that needs updated
-                final ConnectionRefImpl connectionRef =
-                        new ConnectionRefImpl( sourceEntity, connectionType, indexUpdate.getEntity() );
-
-                batchUpdateConnectionIndex( indexUpdate, connectionRef );
-            }
-        }
-
-        return indexUpdate;
-    }
-
-
-    /**
-     * Batch update connection index.
-     *
-     * @param indexUpdate The update operation to perform
-     * @param connection The connection to update
-     *
-     * @return The index with the batch mutation udpated
-     *
-     * @throws Exception the exception
-     */
-    public IndexUpdate batchUpdateConnectionIndex(
-            IndexUpdate indexUpdate, ConnectionRefImpl connection ) throws Exception {
-
-        logger.debug( "batchUpdateConnectionIndex" );
-
-        // UUID connection_id = connection.getUuid();
-
-        UUID[] index_keys = connection.getIndexIds();
-
-        // Delete all matching entries from entry list
-        for ( IndexUpdate.IndexEntry entry : indexUpdate.getPrevEntries() ) {
-
-            if ( entry.getValue() != null ) {
-
-                batchDeleteConnectionIndexEntries( indexUpdate, entry, connection, index_keys );
-
-                if ( "location.coordinates".equals( entry.getPath() ) ) {
-                    EntityLocationRef loc =
-                        new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
-                        entry.getValue().toString() );
-                    batchDeleteLocationInConnectionsIndex(
-                        indexUpdate.getBatch(), indexBucketLocator, applicationId,
-                        index_keys, entry.getPath(), loc );
-                }
-            }
-            else {
-                logger.error( "Unexpected condition - deserialized property value is null" );
-            }
-        }
-
-        if ( ( indexUpdate.getNewEntries().size() > 0 )
-                && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
-                && !indexUpdate.isRemoveListEntry() ) ) ) {
-
-            for ( IndexUpdate.IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
-
-                batchAddConnectionIndexEntries( indexUpdate, indexEntry, connection, index_keys );
-
-                if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
-                    EntityLocationRef loc =
-                            new EntityLocationRef(
-                        indexUpdate.getEntity(),
-                        indexEntry.getTimestampUuid(),
-                        indexEntry.getValue().toString() );
-                    batchStoreLocationInConnectionsIndex(
-                            indexUpdate.getBatch(), indexBucketLocator, applicationId,
-                            index_keys, indexEntry.getPath(), loc );
-                }
-            }
-
-      /*
-       * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
-       * Schema.INDEXES_SET), indexEntry.getKey(), null, false, timestamp); }
-       *
-       * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
-       * Schema.INDEXES_SET), entryName, null, false, timestamp);
-       */
-        }
-
-        for ( String index : indexUpdate.getIndexesSet() ) {
-            addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
-                    key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES), index, null,
-                    indexUpdate.getTimestamp() );
-        }
-
-        return indexUpdate;
-    }
-
-
-    /**
-     * Get a paging results iterator.  Should return an iterator for all results
-     *
-     * @param targetEntity The target entity search connections from
-     *
-     * @return connectionType The name of the edges to search
-     */
-    private PagingResultsIterator getReversedConnectionsIterator(
-            EntityRef targetEntity, String connectionType ) throws Exception {
 
-        return new PagingResultsIterator(
-                getConnectingEntities( targetEntity, connectionType, null, Level.REFS ) );
-    }
-
-
-    /**
-     * Get all edges that are to the targetEntity
-     *
-     * @param targetEntity The target entity to search edges in
-     * @param connectionType The type of connection.  If not specified, all connections are returned
-     * @param connectedEntityType The connected entity type, if not specified all types are returned
-     * @param resultsLevel The results level to return
-     */
-    private Results getConnectingEntities(
-            EntityRef targetEntity, String connectionType, String connectedEntityType,
-            Level resultsLevel ) throws Exception {
 
-        return getConnectingEntities(
-                targetEntity, connectionType, connectedEntityType, resultsLevel, 0);
-    }
-
-
-    /**
-     * Get all edges that are to the targetEntity
-     *
-     * @param targetEntity The target entity to search edges in
-     * @param connectionType The type of connection.  If not specified, all connections are returned
-     * @param connectedEntityType The connected entity type, if not specified all types are returned
-     * @param count result limit
-     */
-    private Results getConnectingEntities( EntityRef targetEntity, String connectionType,
-            String connectedEntityType, Level level, int count) throws Exception {
-
-        Query query = new Query();
-        query.setResultsLevel( level );
-        query.setLimit( count );
 
-        final ConnectionRefImpl connectionRef = new ConnectionRefImpl(
-                new SimpleEntityRef( connectedEntityType, null ), connectionType, targetEntity );
-        final ConnectionResultsLoaderFactory factory =
-                new ConnectionResultsLoaderFactory( connectionRef );
-
-        QueryProcessorImpl qp = new QueryProcessorImpl( query, null, em, factory );
-        SearchConnectionVisitor visitor = new SearchConnectionVisitor( qp, connectionRef, false );
-
-        return qp.getResults( visitor );
-    }
-
-
-    public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries(
-            IndexUpdate indexUpdate,
-            IndexUpdate.IndexEntry entry,
-            ConnectionRefImpl connection,
-            UUID[] index_keys ) throws Exception {
-
-        logger.debug( "batchDeleteConnectionIndexEntries" );
-
-        // entity_id,prop_name
-        Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
-                indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                        index_keys[ConnectionRefImpl.ALL], entry.getPath() ) );
-
-        // entity_id,entity_type,prop_name
-        Object entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,prop_name
-        Object connection_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,entity_type,prop_name
-        Object connection_type_and_entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
-
-        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
-                        connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
-                indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
-                indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entry_timestamp)
-        addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
-                entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
-
-        return indexUpdate.getBatch();
-    }
-
-
-    public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexUpdate.IndexEntry entry,
-                                                               ConnectionRefImpl conn, UUID[] index_keys ) {
-
-        logger.debug( "batchAddConnectionIndexEntries" );
-
-        // entity_id,prop_name
-        Object property_index_key = key( index_keys[ConnectionRefImpl.ALL],
-                INDEX_CONNECTIONS, entry.getPath(),
-                indexBucketLocator.getBucket( applicationId,
-                        IndexBucketLocator.IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
-                        entry.getPath() ) );
-
-        // entity_id,entity_type,prop_name
-        Object entity_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,prop_name
-        Object connection_type_prop_index_key =
-                key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
-
-        // entity_id,connection_type,entity_type,prop_name
-        Object connection_type_and_entity_type_prop_index_key =
-            key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE],
-                INDEX_CONNECTIONS, entry.getPath(),
-                        indexBucketLocator.getBucket( applicationId, IndexBucketLocator.IndexType.CONNECTION,
-                                index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
-
-        // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
-                entry.getIndexComposite( conn.getConnectedEntityId(), conn.getConnectionType(),
-                        conn.getConnectedEntityType() ), conn.getUuid(), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
-            entry.getIndexComposite( conn.getConnectedEntityId(), conn.getConnectionType() ),
-            conn.getUuid(), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
-            entry.getIndexComposite( conn.getConnectedEntityId(), conn.getConnectedEntityType() ),
-            conn.getUuid(), indexUpdate.getTimestamp() );
-
-        // composite(property_value,connected_entity_id,entry_timestamp)
-        addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX,
-            connection_type_and_entity_type_prop_index_key,
-            entry.getIndexComposite( conn.getConnectedEntityId() ), conn.getUuid(),
-            indexUpdate.getTimestamp() );
-
-        return indexUpdate.getBatch();
-    }
-
-
-    /**
-     * Simple search visitor that performs all the joining
-     *
-     * @author tnine
-     */
-    private class SearchConnectionVisitor extends SearchVisitor {
-
-        private final ConnectionRefImpl connection;
-
-        /** True if we should search from source->target edges.
-         * False if we should search from target<-source edges */
-        private final boolean outgoing;
-
-
-        /**
-         * @param queryProcessor They query processor to use
-         * @param connection The connection refernce
-         * @param outgoing The direction to search.  True if we should search from source->target
-         * edges.  False if we * should search from target<-source edges
-         */
-        public SearchConnectionVisitor( QueryProcessorImpl queryProcessor, ConnectionRefImpl connection,
-                                        boolean outgoing ) {
-            super( queryProcessor );
-            this.connection = connection;
-            this.outgoing = outgoing;
-        }
-
-
-        /* (non-Javadoc)
-     * @see org.apache.usergrid.persistence.query.ir.SearchVisitor#secondaryIndexScan(org.apache.usergrid.persistence
-     * .query.ir
-     * .QueryNode, org.apache.usergrid.persistence.query.ir.QuerySlice)
-     */
-        @Override
-        protected IndexScanner secondaryIndexScan( QueryNode node, QuerySlice slice ) throws Exception {
-
-            UUID id = ConnectionRefImpl.getIndexId(
-                    ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE,
-                    headEntity,
-                    connection.getConnectionType(),
-                    connection.getConnectedEntityType(),
-                    new ConnectedEntityRef[0] );
-
-            Object key = key( id, INDEX_CONNECTIONS );
-
-            // update the cursor and order before we perform the slice
-            // operation
-            queryProcessor.applyCursorAndSort( slice );
-
-            IndexScanner columns = null;
-
-            if ( slice.isComplete() ) {
-                columns = new NoOpIndexScanner();
-            }
-            else {
-                columns = searchIndex( key, slice, queryProcessor.getPageSizeHint( node ) );
-            }
-
-            return columns;
-        }
-
-
-        /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.usergrid.persistence.query.ir.NodeVisitor#visit(org.apache.usergrid.
-     * persistence.query.ir.WithinNode)
-     */
-        @Override
-        public void visit( WithinNode node ) throws Exception {
-
-            QuerySlice slice = node.getSlice();
-
-            queryProcessor.applyCursorAndSort( slice );
-
-            GeoIterator itr = new GeoIterator(
-                new ConnectionGeoSearch( em, indexBucketLocator, cass, connection.getIndexId() ),
-                query.getLimit(),
-                slice,
-                node.getPropertyName(),
-                new Point( node.getLattitude(), node.getLongitude() ),
-                node.getDistance() );
-
-            results.push( itr );
-        }
-
-
-        @Override
-        public void visit( AllNode node ) throws Exception {
-            QuerySlice slice = node.getSlice();
-
-            queryProcessor.applyCursorAndSort( slice );
-
-            int size = queryProcessor.getPageSizeHint( node );
-
-            ByteBuffer start = null;
-
-            if ( slice.hasCursor() ) {
-                start = slice.getCursor();
-            }
-
-
-            boolean skipFirst = !node.isForceKeepFirst() && slice.hasCursor();
-
-            UUID entityIdToUse;
-
-            //change our type depending on which direction we're loading
-            String dictionaryType;
-
-            //the target type
-            String targetType;
-
-            //this is on the "source" side of the edge
-            if ( outgoing ) {
-                entityIdToUse = connection.getConnectingEntityId();
-                dictionaryType = DICTIONARY_CONNECTED_ENTITIES;
-                targetType = connection.getConnectedEntityType();
-            }
-
-            //we're on the target side of the edge
-            else {
-                entityIdToUse = connection.getConnectedEntityId();
-                dictionaryType = DICTIONARY_CONNECTING_ENTITIES;
-                targetType = connection.getConnectingEntityType();
-            }
-
-            final String connectionType = connection.getConnectionType();
-
-            final ConnectionIndexSliceParser connectionParser = new ConnectionIndexSliceParser( targetType );
-
-            final Iterator<String> connectionTypes;
-
-            //use the provided connection type
-            if ( connectionType != null ) {
-                connectionTypes = Collections.singleton( connectionType ).iterator();
-            }
-
-            //we need to iterate all connection types
-            else {
-                connectionTypes = new ConnectionTypesIterator(
-                        cass, applicationId, entityIdToUse, outgoing, size );
-            }
-
-            IndexScanner connectionScanner = new ConnectedIndexScanner(
-                    cass,
-                    dictionaryType,
-                    applicationId,
-                    entityIdToUse,
-                    connectionTypes,
-                    start,
-                    slice.isReversed(),
-                    size,
-                    skipFirst );
-
-            this.results.push( new SliceIterator( slice, connectionScanner, connectionParser ) );
-        }
-
-
-        @Override
-        public void visit( NameIdentifierNode nameIdentifierNode ) throws Exception {
-
-            //TODO T.N. USERGRID-1919 actually validate this is connected
-            EntityRef ref = em.getAlias( connection.getConnectedEntityType(), nameIdentifierNode.getName() );
-
-            if ( ref == null ) {
-                this.results.push( new EmptyIterator() );
-                return;
-            }
-
-            this.results.push( new StaticIdIterator( ref.getUuid() ) );
-        }
-    }
-
-
-    private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
-
-        DynamicComposite[] range = slice.getRange();
-
-        Object keyPrefix = key( indexKey, slice.getPropertyName() );
-
-        IndexScanner scanner = new IndexBucketScanner(
-                cass,
-                indexBucketLocator,
-                ENTITY_INDEX,
-                applicationId,
-                IndexBucketLocator.IndexType.CONNECTION,
-                keyPrefix,
-                range[0],
-                range[1],
-                slice.isReversed(),
-                pageSize,
-                slice.hasCursor(),
-                slice.getPropertyName() );
-
-        return scanner;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3d036e1d/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 47362a9..0376780 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -22,21 +22,23 @@ package org.apache.usergrid.corepersistence.util;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.impl.IndexEdgeImpl;
+import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.clearspring.analytics.util.Preconditions;
+import org.apache.usergrid.utils.UUIDUtils;
 
 
 /**
@@ -51,12 +53,10 @@ public class CpNamingUtils {
     public static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
 
     /** App where we store management info */
-    public static final  UUID MANAGEMENT_APPLICATION_ID =
-            UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c8");
+    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
 
     /** Old and deprecated SYSTEM_APP */
-    public static final UUID SYSTEM_APP_ID =
-        UUID.fromString("b6768a08-b5d5-11e3-a495-10ddb1de66c3");
+    public static final UUID SYSTEM_APP_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-10ddb1de66c3" );
 
     /**
      * Information about applications is stored in the management app using these types
@@ -73,148 +73,140 @@ public class CpNamingUtils {
     public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
 
 
-
-
     /**
-     * Get the collection name from the entity/id type
-     * @param type
-     * @return
+     * Create an Id object from the entity ref
      */
-    private static String getCollectionScopeNameFromEntityType( String type ) {
-        String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
-        return csn.toLowerCase();
+    public static Id createId( final EntityRef entityRef ) {
+        return new SimpleId( entityRef.getUuid(), entityRef.getType() );
     }
 
 
-//    private static String getCollectionScopeNameFromCollectionName( String name ) {
-//        String csn = EDGE_COLL_SUFFIX + name;
-//        return csn.toLowerCase();
-//    }
-//
-//
-//    private static String getConnectionScopeName( String connectionType ) {
-//        String csn = EDGE_CONN_SUFFIX + connectionType ;
-//        return csn.toLowerCase();
-//    }
-
-
     /**
-     * Get the index scope for the edge from the source
-     * @param edge
-     * @return
+     * Generate a standard edge name for our graph using the connection name. To be used only for searching.  DO NOT use
+     * for creation.  Use the createConnectionEdge instead.
+     *
+     * @param connectionType The type of connection made
      */
-    public static IndexEdge generateScopeFromSource(final Edge edge ){
-        return new IndexEdgeImpl( edge.getSourceNode(), edge.getType(), SearchEdge.NodeType.SOURCE, edge.getTimestamp() );
+    public static String getEdgeTypeFromConnectionType( String connectionType ) {
+        return ( EDGE_CONN_SUFFIX + "|" + connectionType ).toLowerCase();
     }
 
 
-
-
-
     /**
-     * Get the index scope for the edge from the source
-     * @param edge
-     * @return
+     * Generate a standard edges from for a collection
+     *
+     * To be used only for searching DO NOT use for creation. Use the createCollectionEdge instead.
      */
-    public static IndexEdge generateScopeToTarget(final Edge edge ){
-                return new IndexEdgeImpl( edge.getTargetNode(), edge.getType(), SearchEdge.NodeType.TARGET, edge.getTimestamp() );
-
+    public static String getEdgeTypeFromCollectionName( String collectionName ) {
+        return ( EDGE_COLL_SUFFIX + "|" + collectionName ).toLowerCase();
     }
 
 
     /**
-     * Generate either the collection name or connection name from the edgeName
-     * @param edgeName
-     * @return
+     * Get the index scope for the edge from the source
      */
-    public static String getNameFromEdgeType(final String edgeName){
-
-
-        if(isCollectionEdgeType( edgeName )){
-           return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) );
-        }
-
-        return getConnectionScopeName(getConnectionType( edgeName )  );
-
+    public static IndexEdge generateScopeFromSource( final Edge edge ) {
+        return new IndexEdgeImpl( edge.getSourceNode(), edge.getType(), SearchEdge.NodeType.SOURCE,
+            edge.getTimestamp() );
     }
 
 
     /**
-     * Get the index scope from the colleciton name
-     * @param nodeId The source or target node id
-     * @param collectionName The name of the collection.  Ex "users"
-     * @return
+     * Get the index scope for the edge from the source
      */
-    public static SearchEdge generateScopeFromCollection( final Id nodeId, final String collectionName ){
-        return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) );
+    public static IndexEdge generateScopeToTarget( final Edge edge ) {
+        return new IndexEdgeImpl( edge.getTargetNode(), edge.getType(), SearchEdge.NodeType.TARGET,
+            edge.getTimestamp() );
     }
 
 
     /**
-     * Get the scope from the connection
-     * @param nodeId
-     * @param connectionName
-     * @return
+     * Create the search edge from the source
      */
-    public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){
-        return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) );
+    public static SearchEdge createSearchEdgeFromSource( final Edge edge ) {
+        return new SearchEdgeImpl( edge.getSourceNode(), edge.getType(), SearchEdge.NodeType.SOURCE );
     }
 
 
     /**
-     * Create an Id object from the entity ref
-     * @param entityRef
+     *
+     * @param sourceId
+     * @param collectionName
+     * @param entityId
      * @return
      */
-    public static Id createId(final EntityRef entityRef){
-      return new SimpleId( entityRef.getUuid(), entityRef.getType() );
-    }
+    public static Edge createCollectionEdge( final Id sourceId, final String collectionName, final Id entityId ) {
+        final String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
 
-    private static boolean isCollectionEdgeType( String type ) {
-        return type.startsWith( EDGE_COLL_SUFFIX );
-    }
+        final UUID entityIdUUID = entityId.getUuid();
 
+        //if they don't use a time based uuid (such as in devices) we need to create a timestamp from "now" since
+        // this is when the entity
+        //will be added to the collection
+        final UUID timeStampUuid = UUIDUtils.isTimeBased( entityIdUUID ) ? entityIdUUID : UUIDUtils.newTimeUUID();
 
-    private static boolean isConnectionEdgeType( String type ) {
-        return type.startsWith( EDGE_CONN_SUFFIX );
-    }
+        long uuidTimestamp = UUIDUtils.getUUIDLong( timeStampUuid );
 
+        // create graph edge connection from head entity to member entity
+        return new SimpleEdge( sourceId, edgeType, entityId, uuidTimestamp );
+    }
 
 
-    private static  String  getConnectionType( String edgeType ) {
-        String[] parts = edgeType.split( "\\|" );
-        return parts[1];
+    /**
+     * Create a connection searchEdge
+     */
+    public static SearchEdge createCollectionSearchEdge( final Id sourceId, final String connectionType ) {
+        return new SearchEdgeImpl( sourceId, getEdgeTypeFromCollectionName( connectionType ),
+            SearchEdge.NodeType.SOURCE );
     }
 
 
-    private static String getCollectionName( String edgeType ) {
-        String[] parts = edgeType.split( "\\|" );
-        return parts[1];
+    /**
+     * Create a new connection edge from the source node with the given connection type and target id
+     */
+    public static Edge createConnectionEdge( final Id sourceEntityId, final String connectionType,
+                                             final Id targetEntityId ) {
+        final String edgeType = getEdgeTypeFromConnectionType( connectionType );
+
+        // create graph edge connection from head entity to member entity
+        return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, System.currentTimeMillis() );
     }
 
 
     /**
-     * Generate a standard edge name for our graph using the connection name
-     * @param connectionType The type of connection made
-     * @return
+     * Create a connection searchEdge
+     *
+     * @param sourceId The source id in the connection
+     * @param connectionType The type of the connection to create a search for
      */
-    public static String getEdgeTypeFromConnectionType( String connectionType ) {
-        return  (EDGE_CONN_SUFFIX  + "|" + connectionType).toLowerCase();
+    public static SearchEdge createConnectionSearchEdge( final Id sourceId, final String connectionType ) {
+        return new SearchEdgeImpl( sourceId, getEdgeTypeFromConnectionType( connectionType ),
+            SearchEdge.NodeType.SOURCE );
     }
 
 
     /**
-     * Generate a standard edges from for a collection
-     * @param collectionName
-     * @return
+     * search for all versions of a connection between 2 entities in teh graph
+     *
+     * @param sourceId The source id of the edge
+     * @param connectionType The connection type used in the edge
+     * @param targetId The target id
+     *
+     * @return A search by edge command to search the graph
      */
-    public static String getEdgeTypeFromCollectionName( String collectionName ) {
-        return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase();
+    public static SearchByEdge createConnectionSearchByEdge( final Id sourceId, final String connectionType,
+                                                             final Id targetId ) {
+
+        final String edgeType = getEdgeTypeFromConnectionType( connectionType );
+
+        return new SimpleSearchByEdge( sourceId, edgeType, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+            null );
     }
 
 
     /**
      * Get the application scope from the given uuid
+     *
      * @param applicationId The applicationId
      */
     public static ApplicationScope getApplicationScope( UUID applicationId ) {
@@ -228,8 +220,8 @@ public class CpNamingUtils {
 
     /**
      * Generate an applicationId from the given UUID
-     * @param applicationId  the applicationId
      *
+     * @param applicationId the applicationId
      */
     public static Id generateApplicationId( UUID applicationId ) {
         return new SimpleId( applicationId, Application.ENTITY_TYPE );
@@ -238,11 +230,51 @@ public class CpNamingUtils {
 
     /**
      * Get the map scope for the applicationId to store entity uuid to type mapping
-     *
-     * @param applicationId
-     * @return
      */
-    public static MapScope getEntityTypeMapScope( final Id applicationId ){
-        return new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP );
+    public static MapScope getEntityTypeMapScope( final Id applicationId ) {
+        return new MapScopeImpl( applicationId, CpNamingUtils.TYPES_BY_UUID_MAP );
+    }
+
+
+    /**
+     * Generate either the collection name or connection name from the edgeName
+     */
+    public static String getNameFromEdgeType( final String edgeName ) {
+
+
+        if ( isCollectionEdgeType( edgeName ) ) {
+            return getCollectionScopeNameFromCollectionName( getCollectionName( edgeName ) );
+        }
+
+        return getConnectionScopeName( getConnectionType( edgeName ) );
+    }
+
+
+    private static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( EDGE_COLL_SUFFIX );
+    }
+
+
+    private static String getConnectionType( String edgeType ) {
+        String[] parts = edgeType.split( "\\|" );
+        return parts[1];
+    }
+
+
+    private static String getCollectionName( String edgeType ) {
+        String[] parts = edgeType.split( "\\|" );
+        return parts[1];
+    }
+
+
+    private static String getCollectionScopeNameFromCollectionName( String name ) {
+        String csn = EDGE_COLL_SUFFIX + name;
+        return csn.toLowerCase();
+    }
+
+
+    private static String getConnectionScopeName( String connectionType ) {
+        String csn = EDGE_CONN_SUFFIX + connectionType;
+        return csn.toLowerCase();
     }
 }


Mime
View raw message