usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [12/12] usergrid git commit: Converts de-indexing of edges and entities to identify documents in the index to delete based on data in Cassandra vs. resource intensive queries to the index. Fixes issue where nodes were not actually getting deleted from gr
Date Sat, 19 Aug 2017 00:01:58 GMT
Converts de-indexing of edges and entities to identify documents in the index to delete based on data in Cassandra vs. resource intensive queries to the index. Fixes issue where nodes were not actually getting deleted from graph as marked edges were being filtered out during the delete process itself. Update to a newer vs. of jamm (used for jvm memory management in the test framework).


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c201f1f6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c201f1f6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c201f1f6

Branch: refs/heads/collectionClearJob
Commit: c201f1f67aa0f373958c3886c893979215cba88f
Parents: 570e1ab
Author: Michael Russo <russomichael@google.com>
Authored: Sat Jul 8 17:10:25 2017 -0700
Committer: Mike Dunker <mdunker@google.com>
Committed: Fri Aug 18 17:00:43 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/EntityManagerFig.java       |   2 +-
 .../asyncevents/AsyncEventServiceImpl.java      |  23 +--
 .../asyncevents/EventBuilder.java               |  15 +-
 .../asyncevents/EventBuilderImpl.java           | 172 ++++++++++++++-----
 .../corepersistence/index/IndexService.java     |  16 +-
 .../corepersistence/index/IndexServiceImpl.java |  56 ++----
 .../read/traverse/AbstractReadGraphFilter.java  |  37 +---
 .../AbstractReadReverseGraphFilter.java         |  36 +---
 .../org/apache/usergrid/persistence/Query.java  |  17 ++
 .../rx/EdgesFromSourceObservableIT.java         |   2 +-
 .../rx/EdgesToTargetObservableIT.java           |   4 +-
 .../core/src/test/resources/project.properties  |   2 +-
 .../persistence/graph/GraphManager.java         |   2 +-
 .../graph/impl/GraphManagerImpl.java            |  21 +--
 .../graph/impl/SimpleSearchByEdgeType.java      |   9 +
 .../graph/impl/stage/NodeDeleteListener.java    |   6 +-
 .../impl/stage/NodeDeleteListenerImpl.java      |  64 +++----
 .../graph/serialization/EdgesObservable.java    |   6 +-
 .../impl/EdgeMetadataSerializationV2Impl.java   |   1 -
 .../serialization/impl/EdgesObservableImpl.java |  22 +--
 .../impl/TargetIdObservableImpl.java            |   4 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |   2 +-
 .../graph/impl/NodeDeleteListenerTest.java      |   8 +-
 .../usergrid/persistence/index/EntityIndex.java |  12 +-
 .../index/impl/EsEntityIndexImpl.java           |  63 -------
 .../persistence/index/query/Identifier.java     |   3 +-
 stack/pom.xml                                   |   8 +-
 stack/rest/pom.xml                              |   4 +-
 .../rest/src/test/resources/project.properties  |   2 +-
 stack/services/pom.xml                          |   4 +-
 .../src/test/resources/project.properties       |   2 +-
 stack/test-utils/pom.xml                        |   4 +-
 .../src/test/resources/project.properties       |   2 +-
 stack/tools/pom.xml                             |   2 +-
 stack/websocket/pom.xml                         |   2 +-
 35 files changed, 286 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 872ffbb..46c7a1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -39,7 +39,7 @@ public interface EntityManagerFig extends GuicyFig {
     int sleep();
 
     @Key( "usergrid.entityManager.enable_deindex_on_update" )
-    @Default( "true" )
+    @Default( "false" )
     boolean getDeindexOnUpdate();
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0e55e9b..79a80c0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -640,8 +640,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         // default this observable's return to empty index operation message if nothing is emitted
-        return eventBuilder.buildDeleteEdge(applicationScope, edge)
-            .toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildDeleteEdge(applicationScope, edge);
 
     }
 
@@ -832,20 +831,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
         final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
 
-        if (logger.isDebugEnabled())
+        if (logger.isDebugEnabled()) {
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+        }
 
-        final EventBuilderImpl.EntityDeleteResults
-            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-
-        // Delete the entities and remove from graph separately
-        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
-        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
-        // default this observable's return to empty index operation message if nothing is emitted
-        return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildEntityDelete( applicationScope, entityId );
 
     }
 
@@ -885,9 +875,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             startWorker(AsyncEventQueueType.UTILITY);
         }
 
-        for (int i = 0; i < deleteCount; i++) {
-            startWorker(AsyncEventQueueType.DELETE);
-        }
         if( indexQueue instanceof SNSQueueManagerImpl) {
             logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
             for (int i = 0; i < indexDeadCount; i++) {
@@ -902,7 +889,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                 startDeadQueueWorker(AsyncEventQueueType.DELETE);
             }
         }else{
-            logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue worker.");
+            logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue workers.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 4db9f4b..ebb9190 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -54,7 +55,7 @@ public interface EventBuilder {
      * @param edge
      * @return
      */
-    Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
+    IndexOperationMessage buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
 
     /**
      * Return a bin with 2 observable streams for entity delete.
@@ -62,7 +63,7 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId );
 
 
 
@@ -94,17 +95,17 @@ public interface EventBuilder {
 
 
 
-        private final Observable<Id> compactedNode;
+        private final Observable<MarkedEdge> deletedEdges;
 
 
 
 
         public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
                                     final Observable<List<MvccLogEntry>> entitiesDeleted,
-                                    final Observable<Id> compactedNode) {
+                                    final Observable<MarkedEdge> deletedEdges) {
             this.indexOperationMessageObservable = indexOperationMessageObservable;
             this.entitiesDeleted = entitiesDeleted;
-            this.compactedNode = compactedNode;
+            this.deletedEdges = deletedEdges;
         }
 
 
@@ -116,8 +117,8 @@ public interface EventBuilder {
             return entitiesDeleted;
         }
 
-        public Observable<Id> getCompactedNode() {
-            return compactedNode;
+        public Observable<MarkedEdge> getEdgesDeleted() {
+            return deletedEdges;
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index bbdce5a..5051598 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -24,12 +24,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -82,7 +81,7 @@ public class EventBuilderImpl implements EventBuilder {
 
         if (logger.isDebugEnabled()) {
             logger.debug("Indexing  in app scope {} with entity {} and new edge {}",
-                    applicationScope, entity, newEdge);
+                applicationScope, entity, newEdge);
         }
 
         return indexService.indexEdge( applicationScope, entity, newEdge );
@@ -90,15 +89,38 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+    public IndexOperationMessage buildDeleteEdge( final ApplicationScope applicationScope, final Edge
         edge ) {
         if (logger.isDebugEnabled()) {
             logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
         }
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-        return gm.deleteEdge( edge )
-            .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final IndexOperationMessage combined = new IndexOperationMessage();
+
+        gm.deleteEdge( edge )
+            .doOnNext( deletedEdge -> {
+
+                logger.debug("Processing deleted edge for de-indexing {}", deletedEdge);
+
+                // get ALL versions of the target node as any connection from this source node needs to be removed
+                ecm.getVersionsFromMaxToMin(deletedEdge.getTargetNode(), UUIDUtils.newTimeUUID())
+                    .doOnNext(mvccLogEntry -> {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", deletedEdge.getTargetNode(), mvccLogEntry);
+                        }
+                        combined.ingest(
+                            indexService
+                                .deIndexEdge(applicationScope, deletedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+                                .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+                    }).toBlocking().lastOrDefault(null);
+
+            }).toBlocking().lastOrDefault(null);
+
+        return combined;
     }
 
 
@@ -106,43 +128,102 @@ public class EventBuilderImpl implements EventBuilder {
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+
         if (logger.isDebugEnabled()) {
-            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+            logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}",
+                applicationScope, entityId);
         }
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
-        //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
-
-        MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
-            .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
-
-        // De-indexing and entity deletes don't check log entries.  We must do that first. If no DELETED logs, then
-        // return an empty observable as our no-op.
-        Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
-        Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
-
-        if(mostRecentlyMarked != null){
+        MvccLogEntry mostRecentToDelete =
+            ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )
+                .toBlocking()
+                .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
 
-            // fetch entity versions to be de-index by looking in cassandra
-            deIndexObservable =
-                indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(),
-                    getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion()));
+//        logger.info("mostRecent stage={} entityId={} version={} state={}",
+//            mostRecentToDelete.getStage().name(), mostRecentToDelete.getEntityId(),
+//            mostRecentToDelete.getVersion().toString(), mostRecentToDelete.getState().name());
 
-            ecmDeleteObservable =
-                ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() )
-                    .filter( mvccLogEntry->
-                        mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() )
-                    .buffer( serializationFig.getBufferSize() )
-                    .doOnNext( buffer -> ecm.delete( buffer ) );
+        if (mostRecentToDelete == null) {
+            logger.info("No entity versions to delete for id {}", entityId.toString());
+        }
+        // if nothing is marked, then abort
+        if(mostRecentToDelete == null){
+            return new IndexOperationMessage();
         }
 
-        // Graph compaction checks the versions inside compactNode, just build this up for the caller to subscribe to
-        final Observable<Id> graphCompactObservable = gm.compactNode(entityId);
-
-        return new EntityDeleteResults( deIndexObservable, ecmDeleteObservable, graphCompactObservable );
+        final List<MvccLogEntry> logEntries = new ArrayList<>();
+        Observable<MvccLogEntry> mvccLogEntryListObservable =
+            ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() );
+        mvccLogEntryListObservable
+            .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
+            .buffer( serializationFig.getBufferSize() )
+            .doOnNext( buffer -> ecm.delete( buffer ) )
+            .doOnNext(mvccLogEntries -> {
+                logEntries.addAll(mvccLogEntries);
+            }).toBlocking().lastOrDefault(null);
+
+        //logger.info("logEntries size={}", logEntries.size());
+
+        IndexOperationMessage combined = new IndexOperationMessage();
+
+        // do the edge deletes and build up de-index messages for each edge deleted
+        // assume we have "server1" and "region1" nodes in the graph with the following relationships (edges/connections):
+        //
+        // region1  -- zzzconnzzz|has -->  server1
+        // server1  -- zzzconnzzz|in  -->  region1
+        //
+        // there will always be a relationship from the appId to each entity based on the entity type (collection):
+        //
+        // application -- zzzcollzzz|servers --> server1
+        // application -- zzzcollzzz|regions --> region1
+        //
+        // When deleting either "server1" or "region1" entity, the connections should get deleted and de-indexed along
+        // with the entry for the entity itself in the collection. The above example should have at minimum 3 things to
+        // be de-indexed. There may be more as either "server1" or "region1" could have multiple versions.
+        //
+        // Further comments using the example of deleting "server1" from the above example.
+        gm.compactNode(entityId).doOnNext(markedEdge -> {
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
+            }
+
+            // if the edge was for a connection where the entity to be deleted is the source node, we need to load
+            // the target node's versions so that all versions of connections to that entity can be de-indexed
+            // server1  -- zzzconnzzz|in  -->  region1
+            if(!markedEdge.getTargetNode().getType().equals(entityId.getType())){
+
+                // get ALL versions of the target node as any connection from this source node needs to be removed
+                ecm.getVersionsFromMaxToMin( markedEdge.getTargetNode(), UUIDUtils.newTimeUUID() )
+                    .doOnNext(mvccLogEntry -> {
+                        logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, mvccLogEntry);
+                        combined.ingest(
+                            indexService
+                                .deIndexEdge(applicationScope, markedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+                                .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+                    }).toBlocking().lastOrDefault(null);
+
+            }else {
+
+                // for each version of the entity being deleted, de-index the connections where the entity is the target
+                // node ( application -- zzzcollzzz|servers --> server1 ) or (region1  -- zzzconnzzz|has -->  server1)
+                logEntries.forEach(logEntry -> {
+                    logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, logEntry);
+                    combined.ingest(
+                        indexService
+                            .deIndexEdge(applicationScope, markedEdge, logEntry.getEntityId(), logEntry.getVersion())
+                            .toBlocking().lastOrDefault(new IndexOperationMessage()));
+                });
+            }
+
+        }).toBlocking().lastOrDefault(null);
+
+        return combined;
     }
 
     @Override
@@ -185,22 +266,22 @@ public class EventBuilderImpl implements EventBuilder {
 
 
         return indexService.deIndexOldVersions( applicationScope, entityId,
-            getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion);
+            getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
 
     }
 
 
-    private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm,
-                                                   final Id entityId, final UUID markedVersion ){
+    private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
+                                                           final Id entityId, final UUID markedVersion ){
 
         final List<UUID> versions = new ArrayList<>();
 
-        // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
+        // only take last 100 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
         // clusters that do not have this in-line cleanup
         ecm.getVersionsFromMaxToMin( entityId, markedVersion)
-            .take(5)
+            .take(100)
             .forEach( mvccLogEntry -> {
-                if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
+                if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
                     versions.add(mvccLogEntry.getVersion());
                 }
 
@@ -210,4 +291,17 @@ public class EventBuilderImpl implements EventBuilder {
         return versions;
     }
 
+    private List<UUID> getAllVersions( final EntityCollectionManager ecm,
+                                       final Id entityId ) {
+
+        final List<UUID> versions = new ArrayList<>();
+
+        ecm.getVersionsFromMaxToMin(entityId, UUIDUtils.newTimeUUID())
+            .forEach( mvccLogEntry -> {
+                versions.add(mvccLogEntry.getVersion());
+            });
+
+        return versions;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index b989a9c..58d470a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -69,18 +69,17 @@ public interface IndexService {
      */
     Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge);
 
-
     /**
-     * De-index all documents with the specified entityId and versions provided.  This will also remove any documents
-     * where the entity is a source/target node ( index docs where this entityId is a part of connections).
-     *
+     * Delete an index edge from the specified scope for a specific entity version
      * @param applicationScope
+     * @param edge
      * @param entityId
-     * @param markedVersion
+     * @param entityVersion
      * @return
      */
-    Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId,
-                                                    final UUID markedVersion, final List<UUID> allVersionsBeforeMarked);
+    Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+                                                  final Id entityId, final UUID entityVersion);
+
 
 
     /**
@@ -88,10 +87,9 @@ public interface IndexService {
      *
      * @param applicationScope
      * @param entityId
-     * @param markedVersion
      * @return
      */
     Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId,
-                                                         final List<UUID> versions, UUID markedVersion);
+                                                         final List<UUID> versions);
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 1b8614f..32470f6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -29,9 +29,7 @@ import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.*;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.*;
@@ -103,7 +101,7 @@ public class IndexServiceImpl implements IndexService {
 
 
         //we always index in the target scope
-        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
 
         //we may have to index  we're indexing from source->target here
         final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
@@ -216,10 +214,8 @@ public class IndexServiceImpl implements IndexService {
         return Optional.of(defaultProperties);
     }
 
-    //Steps to delete an IndexEdge.
-    //1.Take the search edge given and search for all the edges in elasticsearch matching that search edge
-    //2. Batch Delete all of those edges returned in the previous search.
-    //TODO: optimize loops further.
+    // DO NOT USE THIS AS THE QUERY TO ES CAN CAUSE EXTREME LOAD
+    // TODO REMOVE THIS AND UPDATE THE TESTS TO NOT USE THIS METHOD
     @Override
     public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope,
                                                               final Edge edge ) {
@@ -256,49 +252,31 @@ public class IndexServiceImpl implements IndexService {
         return ObservableTimer.time( batches, addTimer );
     }
 
-
     @Override
-    public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId,
-                                                            final UUID markedVersion,
-                                                            final List<UUID> allVersionsBeforeMarked ) {
-
-        final EntityIndex ei = entityIndexFactory.
-            createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
-
-        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
-        // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
-        final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
-            CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
-            Long.MAX_VALUE ) );
-
-
+    public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+                                                         final Id entityId, final UUID entityVersion){
 
-        final EntityIndexBatch batch = ei.createBatch();
-
-        // de-index each version of the entity before the marked version
-        allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version));
-
-
-        // for now, query the index to remove docs where the entity is source/target node and older than markedVersion
-        // TODO: investigate getting this information from graph
-        CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion );
-        candidateResults.forEach(candidateResult -> batch.deindex(candidateResult));
-
-        return Observable.just(batch.build());
+        if (logger.isTraceEnabled()) {
+            logger.trace("deIndexEdge edge={} entityId={} entityVersion={}", edge.toString(), entityId.toString(), entityVersion.toString());
+        }
+        final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
+        final EntityIndexBatch entityBatch = ei.createBatch();
+        entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion);
+        return Observable.just(entityBatch.build());
 
     }
 
+
     @Override
     public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
                                                                 final Id entityId,
-                                                                final List<UUID> versions,
-                                                                UUID markedVersion) {
+                                                                final List<UUID> versions) {
 
         final EntityIndex ei = entityIndexFactory.
             createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
 
-        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
-        // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
+        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code for de-indexing
+        // previously .timsetamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
         final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
             CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
             Long.MAX_VALUE ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 4886b08..b1b7f75 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -122,12 +122,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                 if (isDeleted) {
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
-                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
-                    indexMessageObservable
-                        .compose(applyCollector(AsyncEventQueueType.DELETE))
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -136,18 +132,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector(AsyncEventQueueType.DELETE))
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -156,19 +142,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     final Id targetNodeId = markedEdge.getTargetNode();
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector(AsyncEventQueueType.DELETE))
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
-
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
                 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index e8a20dd..c75545e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -122,12 +122,8 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                 if (isDeleted) {
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
-                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
-                    indexMessageObservable
-                        .compose(applyCollector(AsyncEventQueueType.DELETE))
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -136,18 +132,8 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector(AsyncEventQueueType.DELETE))
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 
@@ -156,18 +142,8 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     final Id targetNodeId = markedEdge.getTargetNode();
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector(AsyncEventQueueType.DELETE))
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
 
                 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 900bda5..4c9d16c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.index.query.tree.Operand;
 import org.apache.usergrid.persistence.index.utils.ClassUtils;
 import org.apache.usergrid.persistence.index.utils.ListUtils;
 import org.apache.usergrid.persistence.index.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -43,6 +45,7 @@ import java.util.Map.Entry;
 public class Query {
 
 
+    private static final Logger logger = LoggerFactory.getLogger(Query.class);
 
     public enum Level {
         IDS, REFS, CORE_PROPERTIES, ALL_PROPERTIES, LINKED_PROPERTIES
@@ -319,6 +322,13 @@ public class Query {
 
 
     public static Query fromIdentifier( Object id ) {
+        if (id == null) {
+            throw new IllegalArgumentException("null identifier passed in");
+        }
+        Identifier objectIdentifier = Identifier.from(id);
+        if (objectIdentifier == null) {
+            throw new IllegalArgumentException("Supplied id results in null Identifier");
+        }
         Query q = new Query();
         q.addIdentifier( Identifier.from(id) );
         return q;
@@ -409,6 +419,10 @@ public class Query {
         }
 
         for ( Identifier identifier : identifiers ) {
+            if (identifier == null) {
+                logger.error("containsUuidIdentifiersOnly(): identifier in identifiers list is null");
+                return false;
+            }
             if ( !identifier.isUUID() ) {
                 return false;
             }
@@ -635,6 +649,9 @@ public class Query {
         if ( identifiers == null ) {
             identifiers = new ArrayList<Identifier>();
         }
+        if (identifier == null) {
+            throw new IllegalArgumentException("adding null identifier is not allowed");
+        }
         identifiers.add( identifier );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 3bfe460..68834b3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -92,7 +92,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesToTargetObservable.edgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+        edgesToTargetObservable.edgesToTarget( gm, target, true).doOnNext(new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9e84219..55b77c0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId, true).doOnNext(edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 
@@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, source, true).doOnNext(edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/core/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/project.properties b/stack/core/src/test/resources/project.properties
index 1a848bc..77a785a 100644
--- a/stack/core/src/test/resources/project.properties
+++ b/stack/core/src/test/resources/project.properties
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 000c633..b746c61 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -93,7 +93,7 @@ public interface GraphManager extends CPManager {
      * @param node The node to remove.  This will apply a timestamp to apply the delete + compact operation.  Any edges connected to this node with a timestamp
      * <= the specified time on the mark will be removed from the graph
      */
-    Observable<Id> compactNode( final Id node );
+    Observable<MarkedEdge> compactNode( final Id node );
 
     /**
      * Get all versions of this edge where versions <= max version

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 5fcdcb4..2fe40b1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -262,24 +262,17 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Id> compactNode( final Id inputNode ) {
-
+    public Observable<MarkedEdge> compactNode( final Id inputNode ) {
 
         final UUID startTime = UUIDGenerator.newTimeUUID();
 
-
-        final Observable<Id> nodeObservable =
-            Observable.just( inputNode ).map( node -> nodeSerialization.getMaxVersion( scope, node ) ).takeWhile(
-                maxTimestamp -> maxTimestamp.isPresent() )
-
+        final Observable<MarkedEdge> nodeObservable =
+            Observable.just( inputNode )
+                .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
+                //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
+                .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
                 //map our delete listener
-                .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) )
-                    //set to 0 if nothing is emitted
-                .lastOrDefault( 0 )
-                    //log for posterity
-                .doOnNext( count -> logger.trace( "Removed {} edges from node {}", count, inputNode ) )
-                    //return our id
-                .map( count -> inputNode );
+                .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
 
         return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 9392dbc..71d2f1d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -169,6 +169,15 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
         return true;
     }
 
+    @Override
+    public String toString(){
+        return "SimpleSearchByEdgeType{node="+node
+            +", type="+type
+            +", maxTimestamp="+maxTimestamp
+            +", order="+order
+            +", filterMarked="+filterMarked
+            +", last="+last+"}";
+    }
 
     @Override
     public int hashCode() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
index 68569e5..3bcdc55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.graph.impl.stage;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -39,8 +41,8 @@ public interface NodeDeleteListener {
        * @param node The node that was deleted
        * @param timestamp The timestamp of the event
        *
-       * @return An observable that emits the total number of edges that have been removed with this node both as the
+       * @return An observable that emits the marked edges that have been removed with this node both as the
        *         target and source
        */
-    Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp );
+    Observable<MarkedEdge> receive(final ApplicationScope scope, final Id node, final UUID timestamp );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index cd5b1a8..df4e5d5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -95,53 +95,37 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
      * @param node The node that was deleted
      * @param timestamp The timestamp of the event
      *
-     * @return An observable that emits the total number of edges that have been removed with this node both as the
+     * @return An observable that emits the marked edges that have been removed with this node both as the
      *         target and source
      */
-    public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
+    public Observable<MarkedEdge> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
 
 
         return Observable.just( node )
 
                 //delete source and targets in parallel and merge them into a single observable
-                .flatMap( new Func1<Id, Observable<Integer>>() {
-                    @Override
-                    public Observable<Integer> call( final Id node ) {
-
-                        final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
-
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
-                        }
-
-
-                        if ( !maxVersion.isPresent() ) {
-                            return Observable.empty();
-                        }
-
-
-                        //do all the delete, then when done, delete the node
-                        return doDeletes( node, scope, maxVersion.get(), timestamp ).count()
-                                //if nothing is ever emitted, emit 0 so that we know no operations took place.
-                                // Finally remove
-                                // the
-                                // target node in the mark
-                                .doOnCompleted( new Action0() {
-                                    @Override
-                                    public void call() {
-                                        try {
-                                            nodeSerialization.delete( scope, node, maxVersion.get()).execute();
-                                        }
-                                        catch ( ConnectionException e ) {
-                                            throw new RuntimeException( "Unable to connect to casandra", e );
-                                        }
-                                    }
-                                } );
-                    }
-                } ).defaultIfEmpty( 0 );
-    }
+                .flatMap( id -> {
 
+                    final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
+                    }
+                    if ( !maxVersion.isPresent() ) {
+                        return Observable.empty();
+                    }
 
+                    // do all the edge deletes and then remove the marked node, return all edges just deleted
+                    return
+                        doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
+                            try {
+                                nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+                            } catch ( ConnectionException e ) {
+                                throw new RuntimeException( "Unable to connect to cassandra", e );
+                            }
+                        });
+                });
+    }
     /**
      * Do the deletes
      */
@@ -162,7 +146,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesToTarget(scope,
-                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
                             }
                         }));
 
@@ -174,7 +158,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesFromSource(scope,
-                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
                             }
                         }));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 78a1d4b..5577bd0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -36,9 +36,10 @@ public interface EdgesObservable {
      * Return an observable of all edges from a source
      * @param gm
      * @param sourceNode
+     * @param filterMarked
      * @return
      */
-    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
+    Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked);
 
 
     /**
@@ -54,9 +55,10 @@ public interface EdgesObservable {
      * Return an observable of all edges to a target
      * @param gm
      * @param targetNode
+     * @param filterMarked
      * @return
      */
-    Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
+    Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked);
 
     /**
      * Return an observable of all edges from a source node.  Ordered ascending, from the startTimestamp if specified

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 1f81864..e9e2b28 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -400,7 +400,6 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchEdgeType( search );
 
-
         final Id applicationId = scope.getApplication();
         final Id searchNode = search.getNode();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 20efe42..9e0998d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 
 import rx.Observable;
-import rx.functions.Func1;
 
 
 /**
@@ -55,7 +54,7 @@ public class EdgesObservableImpl implements EdgesObservable {
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
+    public Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked) {
         final Observable<String> edgeTypes =
             gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
 
@@ -67,7 +66,7 @@ public class EdgesObservableImpl implements EdgesObservable {
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        Optional.<Edge>absent() ) );
+                        Optional.<Edge>absent(), filterMarked ) );
         } );
     }
 
@@ -119,19 +118,16 @@ public class EdgesObservableImpl implements EdgesObservable {
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode ) {
-        final Observable<String> edgeTypes =
-            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
-
-        return edgeTypes.flatMap( edgeType -> {
-
-            if (logger.isTraceEnabled()) {
-                logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
-            }
+    public Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked) {
 
+        return gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) )
+            .flatMap( edgeType -> {
+                if (logger.isTraceEnabled()) {
+                    logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
+                }
             return gm.loadEdgesToTarget(
                 new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    Optional.<Edge>absent() ) );
+                    Optional.<Edge>absent(), filterMarked ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 6a08d46..69dd43b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl;
 
 
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
@@ -29,7 +28,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
-import rx.functions.Func1;
 
 /**
  * Emits the id of all nodes that are target nodes from the given source node
@@ -55,7 +53,7 @@ public class TargetIdObservableImpl implements TargetIdObservable {
     public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
 
         //only search edge types that start with collections
-        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( edge -> {
+        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode, true).map(edge -> {
             final Id targetNode = edge.getTargetNode();
 
             if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 8eccdbd..1d4331c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -89,7 +89,7 @@ public class EdgeDataMigrationImpl implements DataMigration {
             final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
 
             //get edges from the source
-            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
+            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode, true).buffer( 1000 )
                                             .doOnNext( edges -> {
                                                     final MutationBatch batch = keyspace.prepareMutationBatch();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 438a978..80198de 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -135,7 +135,7 @@ public class NodeDeleteListenerTest {
         UUID eventTime = UUIDGenerator.newTimeUUID();
 
 
-        int count = deleteListener.receive( scope, sourceNode, eventTime ).toBlocking().last();
+        int count = deleteListener.receive( scope, sourceNode, eventTime ).count().toBlocking().last();
 
         assertEquals( "Mark was not set, no delete should be executed", 0, count );
 
@@ -171,7 +171,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, sourceNode, timestamp ).execute();
 
-        int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).toBlocking().last();
+        int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).count().toBlocking().last();
 
         assertEquals( 1, count );
 
@@ -256,7 +256,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, targetNode, deleteBefore ).execute();
 
-        int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+        int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
 
         assertEquals( 1, count );
 
@@ -366,7 +366,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
 
-        int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+        int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
 
         assertEquals( edgeCount, count );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 14020a9..b444199 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -20,14 +20,12 @@
 package org.apache.usergrid.persistence.index;
 
 
-import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 import java.util.Map;
-import java.util.UUID;
 
 
 /**
@@ -36,7 +34,7 @@ import java.util.UUID;
 public interface EntityIndex extends CPManager {
 
 
-    public static final int MAX_LIMIT = 1000;
+    int MAX_LIMIT = 1000;
 
     /**
      * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
@@ -134,14 +132,6 @@ public interface EntityIndex extends CPManager {
     CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
 
     /**
-     * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode)
-     *
-     * @param entityId      The entityId to match when searching
-     * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
-     * @return
-     */
-    CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion);
-    /**
      * delete all application records
      *
      * @return

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index cb695d5..f4fae2b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -34,7 +34,6 @@ import org.apache.usergrid.persistence.core.migration.data.VersionedData;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.StringUtils;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
@@ -584,68 +583,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     }
 
 
-    @Override
-    public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) {
-
-        // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone
-
-        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-        Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
-        ValidationUtils.verifyVersion(markedVersion);
-
-        SearchResponse searchResponse;
-        List<CandidateResult> candidates = new ArrayList<>();
-
-        final long markedTimestamp = markedVersion.timestamp();
-
-        // never let this fetch more than 100 to save memory
-        final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
-
-        // this query will find all the documents where this entity is a source/target node
-        final QueryBuilder nodeQuery = QueryBuilders
-            .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId));
-
-        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder()
-            .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC);
-
-        try {
-
-            long queryTimestamp = 0L;
-
-            QueryBuilder timestampQuery =  QueryBuilders
-                .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
-                .gte(queryTimestamp)
-                .lt(markedTimestamp);
-
-            QueryBuilder finalQuery = QueryBuilders.constantScoreQuery(
-                QueryBuilders
-                    .boolQuery()
-                    .must(timestampQuery)
-                    .must(nodeQuery)
-            );
-
-
-            searchResponse = srb
-                .setQuery(finalQuery)
-                .setSize(searchLimit)
-                .execute()
-                .actionGet();
-
-
-            candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
-
-        }
-        catch ( Throwable t ) {
-            logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }
-        failureMonitor.success();
-
-        return new CandidateResults( candidates, Collections.EMPTY_SET);
-    }
-
-
     /**
      * Completely delete an index.
      */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
index 84a28f0..70d2284 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
@@ -34,6 +34,7 @@ public class Identifier implements Serializable {
     public static final String UUID_REX =
             "[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}";
     public static final String EMAIL_REX =  "[a-zA-Z0-9._%'+\\-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}";
+    public static final String NAME_REX = "[a-zA-Z0-9_\\-./'+ ]*";
 
     public enum Type {
         UUID, NAME, EMAIL
@@ -46,7 +47,7 @@ public class Identifier implements Serializable {
     static Pattern emailRegEx = Pattern.compile( EMAIL_REX );
     // "Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to a
     // ddress https://issues.apache.org/jira/browse/USERGRID-94
-    static Pattern nameRegEx = Pattern.compile( "[a-zA-Z0-9_\\-./'+ ]*" );
+    static Pattern nameRegEx = Pattern.compile( NAME_REX );
 
 
     private Identifier( Type type, Object value ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 513c894..c98c72d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -661,9 +661,9 @@
             </dependency>
 
             <dependency>
-                <groupId>com.github.stephenc</groupId>
+                <groupId>com.github.jbellis</groupId>
                 <artifactId>jamm</artifactId>
-                <version>0.2.5</version>
+                <version>0.3.1</version>
             </dependency>
 
             <!-- Third Party Non-Commercial Dependencies -->
@@ -1311,7 +1311,7 @@
                     <useSystemClassLoader>false</useSystemClassLoader>
                     <testFailureIgnore>false</testFailureIgnore>
                     <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin}
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         ${ug.argline}
                     </argLine>
                     <systemPropertyVariables>
@@ -1548,7 +1548,7 @@
                         <version>${surefire.plugin.version}</version>
                         <configuration>
                             <argLine>
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 ${ug.argline}
                                 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
                             </argLine>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/rest/pom.xml
----------------------------------------------------------------------
diff --git a/stack/rest/pom.xml b/stack/rest/pom.xml
index 9bb83a6..e7aa68a 100644
--- a/stack/rest/pom.xml
+++ b/stack/rest/pom.xml
@@ -93,7 +93,7 @@
                     <argLine>-Dwebapp.directory=${basedir}/src/main/webapp
                         -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax}
                         -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         -Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties ${ug.argline}
                     </argLine>
                     <includes>
@@ -460,7 +460,7 @@
                                 -Dwebapp.directory=${basedir}/src/main/webapp
                                 -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                                 -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 -Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties
                                 ${ug.argline}
                             </argLine>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/rest/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/project.properties b/stack/rest/src/test/resources/project.properties
index 94ef3bd..20a38f6 100644
--- a/stack/rest/src/test/resources/project.properties
+++ b/stack/rest/src/test/resources/project.properties
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index b1df1b4..29fa311 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -102,7 +102,7 @@
                     <useSystemClassLoader>false</useSystemClassLoader>
                     <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                         -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         ${ug.argline} -Dlog4j.configuration=file:${basedir}/src/test/resources/log4j.properties
                     </argLine>
                     <includes>
@@ -499,7 +499,7 @@
                         <configuration>
                             <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                                 -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 ${ug.argline}
                                 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
                             </argLine>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/services/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/services/src/test/resources/project.properties b/stack/services/src/test/resources/project.properties
index d38e878..03736c0 100644
--- a/stack/services/src/test/resources/project.properties
+++ b/stack/services/src/test/resources/project.properties
@@ -16,4 +16,4 @@
 
 target.directory=${project.build.directory}
 resources.dir=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/stack/test-utils/pom.xml b/stack/test-utils/pom.xml
index bbcf1ff..f99d43a 100644
--- a/stack/test-utils/pom.xml
+++ b/stack/test-utils/pom.xml
@@ -59,7 +59,7 @@
                        <threadCount>${usergrid.it.threads}</threadCount>
                        <threadCountClasses></threadCountClasses>
                        <reuseForks>true</reuseForks>
-                       <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8  -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+                       <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8  -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
                         <includes>
                            <include>**/CassandraResourceITSuite.java</include>
                         </includes>
@@ -290,7 +290,7 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <version>${surefire.plugin.version}</version>
                         <configuration>
-                            <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+                            <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
                         </configuration>
                     </plugin>
                 </plugins>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/test-utils/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/resources/project.properties b/stack/test-utils/src/test/resources/project.properties
index cd5b819..0bc9bb7 100644
--- a/stack/test-utils/src/test/resources/project.properties
+++ b/stack/test-utils/src/test/resources/project.properties
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c201f1f6/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index cbd2c1e..b34b068 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -61,7 +61,7 @@
             <storage-config>${basedir}/src/test/conf</storage-config>
           </systemPropertyVariables>
           <forkMode>always</forkMode>
-          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
         </configuration>
 
       </plugin>


Mime
View raw message