usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject usergrid git commit: Add ability to walk through a collection and delete all the entities, optionally up to a certain timestamp. Modeled after reindex services.
Date Mon, 28 Aug 2017 21:47:41 GMT
Repository: usergrid
Updated Branches:
  refs/heads/collectionClearJob 3f7afcd88 -> 11823f294


Add ability to walk through a collection and delete all the entities, optionally up to a certain timestamp. Modeled after reindex services.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11823f29
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11823f29
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11823f29

Branch: refs/heads/collectionClearJob
Commit: 11823f294dfae762754ef1c4da8a5ee573107968
Parents: 3f7afcd
Author: Mike Dunker <mdunker@google.com>
Authored: Mon Aug 28 14:46:17 2017 -0700
Committer: Mike Dunker <mdunker@google.com>
Committed: Mon Aug 28 14:46:17 2017 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   1 +
 .../asyncevents/AsyncEventService.java          |   3 +-
 .../asyncevents/AsyncEventServiceImpl.java      |  67 +++--
 .../asyncevents/EventBuilder.java               |  13 +-
 .../asyncevents/EventBuilderImpl.java           |  45 ++-
 .../asyncevents/model/EntityDeleteEvent.java    |  27 +-
 .../index/CollectionDeleteAction.java           |  43 +++
 .../index/CollectionDeleteRequestBuilder.java   |  92 ++++++
 .../CollectionDeleteRequestBuilderImpl.java     | 146 +++++++++
 .../index/CollectionDeleteService.java          | 108 +++++++
 .../index/CollectionDeleteServiceImpl.java      | 299 +++++++++++++++++++
 .../index/IndexProcessorFig.java                |   9 +
 .../index/ReIndexServiceImpl.java               |   2 +-
 .../persistence/CollectionDeleteTest.java       | 266 +++++++++++++++++
 .../resources/usergrid-custom-test.properties   |   2 +
 .../rest/applications/CollectionResource.java   | 130 +++++++-
 16 files changed, 1217 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ec6b775..a0748e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -140,6 +140,7 @@ public class CoreModule extends AbstractModule {
 
 
         bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
+        bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
 
         bind( ExportService.class ).to( ExportServiceImpl.class );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9e346cf..04eaf4c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -33,7 +34,7 @@ import java.util.UUID;
 /**
  * Low level queue service for events in the entity.  These events are fire and forget, and will always be asynchronous
  */
-public interface AsyncEventService extends ReIndexAction {
+public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 428772f..3d06cae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -75,9 +75,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.commons.lang.StringUtils.indexOf;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
 
 /**
  * TODO, this whole class is becoming a nightmare.
@@ -106,7 +103,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
     public static final String QUEUE_NAME_DELETE = "delete";
-    public static final String DEAD_LETTER_SUFFIX = "_dead";
 
 
     private final LegacyQueueManager indexQueue;
@@ -522,8 +518,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             applicationScope);
 
 
-        logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
-            applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
+                applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+        }
 
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
             new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
@@ -535,8 +533,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                        final Entity entity, long updatedAfter) {
 
 
-        logger.trace("Offering EntityIndexEvent for {}:{}",
-            entity.getId().getUuid(), entity.getId().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EntityIndexEvent for {}:{}",
+                entity.getId().getUuid(), entity.getId().getType());
+        }
 
         offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
             new EntityIdScope(applicationScope, entity.getId()), updatedAfter));
@@ -577,8 +577,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                              final Entity entity,
                              final Edge newEdge) {
 
-        logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
-            newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
+                newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+        }
 
         offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
 
@@ -612,8 +614,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
-        logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
-            edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
+                edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+        }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
         offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
@@ -675,7 +679,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         //send to the topic so all regions index the batch
 
-        logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId);
+        }
 
         offerTopic( elasticsearchIndexEvent, queueType );
     }
@@ -749,8 +755,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         // queue the de-index of old versions to the topic so cleanup happens in all regions
 
-        logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
-            applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
+                applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+        }
 
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
             new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE );
@@ -810,7 +818,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
 
-        logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+        }
 
         // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ),
@@ -830,12 +840,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
         final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
         final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+        final boolean isCollectionDelete = entityDeleteEvent.isCollectionDelete();
+        final long updatedBefore = entityDeleteEvent.getUpdatedBefore();
 
         if (logger.isDebugEnabled()) {
-            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+            logger.debug("Deleting entity id from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore {}",
+                applicationScope, entityId, isCollectionDelete, updatedBefore);
         }
 
-        return eventBuilder.buildEntityDelete( applicationScope, entityId );
+        return eventBuilder.buildEntityDelete( applicationScope, entityId, isCollectionDelete, updatedBefore );
 
     }
 
@@ -1192,11 +1205,27 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         });
 
-        logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+        if (logger.isTraceEnabled()) {
+            logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+        }
 
         offerBatch( batch, queueType );
     }
 
+    public void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType) {
+
+        final List<EntityDeleteEvent> batch = new ArrayList<>();
+        edges.forEach(e -> {
+
+            //change to id scope to avoid serialization issues
+            batch.add(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
+                new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), true, updatedBefore));
+
+        });
+
+        offerBatch(batch, queueType);
+    }
+
 
     public class IndexEventResult{
         private final Optional<IndexOperationMessage> indexOperationMessage;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index ebb9190..4bb6312 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -63,7 +63,18 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId);
+
+    /**
+     * Return a bin with 2 observable streams for entity delete.
+     * @param applicationScope
+     * @param entityId
+     * @param isCollectionDelete
+     * @param updatedBefore
+     * @return
+     */
+    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId,
+                                            boolean isCollectionDelete, long updatedBefore);
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 5051598..7c72b72 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -24,7 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.antlr.misc.Graph;
 import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,15 +130,48 @@ public class EventBuilderImpl implements EventBuilder {
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+        return buildEntityDelete(applicationScope, entityId, false, Long.MAX_VALUE);
+    }
+
+    @Override
+    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId,
+                                                   final boolean isCollectionDelete, final long updatedBefore) {
 
         if (logger.isDebugEnabled()) {
-            logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}",
-                applicationScope, entityId);
+            logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore={}",
+                applicationScope, entityId, isCollectionDelete, updatedBefore);
+        }
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+        final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+
+        boolean deleteEntity = ecm.load(entityId).
+            map(entity -> {
+                final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+
+                boolean willDelete = false;
+                if ( modified == null ) {
+                    // We don't have a modified field, so we can't check, so delete it
+                    willDelete = true;
+                } else if (modified.getValue() <= updatedBefore) {
+                    willDelete = true;
+                }
+
+                if (isCollectionDelete && willDelete) {
+                    // need to mark for deletion
+                    ecm.mark(entityId, null)
+                        .mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
+                        .toBlocking().last();
+                }
+
+                return willDelete;
+            }).toBlocking().firstOrDefault(true);
+
+        if (!deleteEntity) {
+            return new IndexOperationMessage();
         }
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
         MvccLogEntry mostRecentToDelete =
             ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 01d2ba8..1589632 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
 
 /**
  * Event that will signal to finish the actual delete (post-mark delete) for an Entity
+ * It will mark if this is for a collection delete
  */
 public final class EntityDeleteEvent extends AsyncEvent {
 
@@ -31,17 +32,41 @@ public final class EntityDeleteEvent extends AsyncEvent {
     @JsonProperty
     protected EntityIdScope entityIdScope;
 
+    @JsonProperty
+    private long updatedBefore;
+
+    @JsonProperty
+    private boolean isCollectionDelete;
+
     public EntityDeleteEvent() {
         super();
     }
 
     public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) {
         super(sourceRegion);
-        this.entityIdScope =  entityIdScope;
+        this.entityIdScope = entityIdScope;
+        this.updatedBefore = Long.MAX_VALUE;
+        this.isCollectionDelete = false;
+    }
+
+    public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope,
+                             boolean isCollectionDelete, long updatedBefore) {
+        super(sourceRegion);
+        this.entityIdScope = entityIdScope;
+        this.updatedBefore = updatedBefore;
+        this.isCollectionDelete = isCollectionDelete;
     }
 
 
     public EntityIdScope getEntityIdScope() {
         return entityIdScope;
     }
+
+    public long getUpdatedBefore() {
+        return updatedBefore;
+    }
+
+    public boolean isCollectionDelete() {
+        return isCollectionDelete;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
new file mode 100644
index 0000000..7bad06b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+
+/**
+ * Callback to perform a collection delete operation based on an scope during bulk collection delete operations
+ */
+public interface CollectionDeleteAction {
+
+    /**
+     * Delete a batch list of entities.
+     * @param edges
+     * @param updatedBefore
+     * @param queueType
+     */
+    void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType);
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
new file mode 100644
index 0000000..4abdfea
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A builder interface to build our collection delete request
+ */
+public interface CollectionDeleteRequestBuilder {
+
+    /**
+     * Set the application id
+     */
+    CollectionDeleteRequestBuilder withApplicationId(final UUID applicationId);
+
+    /**
+     * Set the collection name.
+     * @param collectionName
+     * @return
+     */
+    CollectionDeleteRequestBuilder withCollection(final String collectionName);
+
+    /**
+     * Set our cursor to resume processing
+     * @param cursor
+     * @return
+     */
+    CollectionDeleteRequestBuilder withCursor(final String cursor);
+
+
+    CollectionDeleteRequestBuilder withDelay(int delayTimer, TimeUnit timeUnit);
+
+    /**
+     * Set the timestamp to delete entities updated <= this timestamp
+     * @param timestamp
+     * @return
+     */
+    CollectionDeleteRequestBuilder withEndTimestamp(final Long timestamp);
+
+
+    Optional<Integer> getDelayTimer();
+
+    Optional<TimeUnit> getTimeUnitOptional();
+
+    /**
+     * Get the application scope
+     * @return
+     */
+    Optional<ApplicationScope> getApplicationScope();
+
+    /**
+     * Get the collection name
+     * @return
+     */
+    Optional<String> getCollectionName();
+
+    /**
+     * Get the cursor
+     * @return
+     */
+    Optional<String> getCursor();
+
+    /**
+     * Get the latest timestamp to delete
+     * @return
+     */
+    Optional<Long> getEndTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
new file mode 100644
index 0000000..890b770
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * collection delete service request builder
+ */
+public class CollectionDeleteRequestBuilderImpl implements CollectionDeleteRequestBuilder {
+
+    private Optional<UUID> withApplicationId = Optional.absent();
+    private Optional<String> withCollectionName = Optional.absent();
+    private Optional<String> cursor = Optional.absent();
+    private Optional<Long> endTimestamp = Optional.absent();
+    private Optional<Integer> delayTimer = Optional.absent();
+    private Optional<TimeUnit> timeUnitOptional = Optional.absent();
+
+
+    /***
+     *
+     * @param applicationId The application id
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withApplicationId( final UUID applicationId ) {
+        this.withApplicationId = Optional.fromNullable( applicationId );
+        return this;
+    }
+
+
+    /**
+     * the collection name
+     * @param collectionName
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withCollection( final String collectionName ) {
+        this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() ) );
+        return this;
+    }
+
+
+    /**
+     * The cursor
+     * @param cursor
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withCursor( final String cursor ) {
+        this.cursor = Optional.fromNullable( cursor );
+        return this;
+    }
+
+
+    /**
+     * Determines whether we should tack on a delay for collection delete and for how long if we do. Also
+     * allowed to specify how throttled back it should be.
+     * @param delayTimer
+     * @param timeUnit
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){
+        this.delayTimer = Optional.fromNullable( delayTimer );
+        this.timeUnitOptional = Optional.fromNullable( timeUnit );
+
+        return this;
+    }
+
+
+    /**
+     * Set end timestamp in epoch time.  Only entities created before this time will be processed for deletion
+     * @param timestamp
+     * @return
+     */
+    @Override
+    public CollectionDeleteRequestBuilder withEndTimestamp( final Long timestamp ) {
+        this.endTimestamp = Optional.fromNullable( timestamp );
+        return this;
+    }
+
+
+    @Override
+    public Optional<Integer> getDelayTimer() {
+        return delayTimer;
+    }
+
+    @Override
+    public Optional<TimeUnit> getTimeUnitOptional() {
+        return timeUnitOptional;
+    }
+
+
+    @Override
+    public Optional<ApplicationScope> getApplicationScope() {
+
+        if ( this.withApplicationId.isPresent() ) {
+            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+        }
+
+        return Optional.absent();
+    }
+
+
+    @Override
+    public Optional<String> getCollectionName() {
+        return withCollectionName;
+    }
+
+
+    @Override
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    @Override
+    public Optional<Long> getEndTimestamp() {
+        return endTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
new file mode 100644
index 0000000..c939dd3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+/**
+ * An interface for re-indexing all entities in an application
+ */
+public interface CollectionDeleteService {
+
+
+    /**
+     * Perform a collection delete via service
+     *
+     * @param collectionDeleteRequestBuilder The builder to build the request
+     */
+    CollectionDeleteStatus deleteCollection(final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder);
+
+
+    /**
+     * Generate a build for the collection delete
+     */
+    CollectionDeleteRequestBuilder getBuilder();
+
+
+    /**
+     * Get the status of a job
+     * @param jobId The jobId returned during the collection delete
+     * @return
+     */
+    CollectionDeleteStatus getStatus(final String jobId);
+
+
+    /**
+     * The response when requesting a collection delete operation
+     */
+    public class CollectionDeleteStatus {
+        final String jobId;
+        final Status status;
+        final long numberProcessed;
+        final long lastUpdated;
+
+
+        public CollectionDeleteStatus(final String jobId, final Status status, final long numberProcessed,
+                                      final long lastUpdated ) {
+            this.jobId = jobId;
+            this.status = status;
+            this.numberProcessed = numberProcessed;
+            this.lastUpdated = lastUpdated;
+        }
+
+
+        /**
+         * Get the jobId used to resume this operation
+         */
+        public String getJobId() {
+            return jobId;
+        }
+
+
+        /**
+         * Get the last updated time, as a long
+         * @return
+         */
+        public long getLastUpdated() {
+            return lastUpdated;
+        }
+
+
+        /**
+         * Get the number of records processed
+         * @return
+         */
+        public long getNumberProcessed() {
+            return numberProcessed;
+        }
+
+
+        /**
+         * Get the status
+         * @return
+         */
+        public Status getStatus() {
+            return status;
+        }
+    }
+
+    enum Status{
+        STARTED, INPROGRESS, COMPLETE, UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
new file mode 100644
index 0000000..7b3e324
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.utils.InflectionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static com.google.common.base.Optional.fromNullable;
+
+
+@Singleton
+public class CollectionDeleteServiceImpl implements CollectionDeleteService {
+
+    private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteServiceImpl.class );
+
+    private static final MapScope RESUME_MAP_SCOPE =
+        new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "collectiondeleteresume" );
+
+    //Keep cursors to resume collection delete for 10 days.
+    private static final int CURSOR_TTL = 60 * 60 * 24 * 10;
+
+    private static final String MAP_CURSOR_KEY = "cursor";
+    private static final String MAP_COUNT_KEY = "count";
+    private static final String MAP_STATUS_KEY = "status";
+    private static final String MAP_UPDATED_KEY = "lastUpdated";
+
+
+    private final AllApplicationsObservable allApplicationsObservable;
+    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final AllEntityIdsObservable allEntityIdsObservable;
+    private final IndexProcessorFig indexProcessorFig;
+    private final MapManager mapManager;
+    private final MapManagerFactory mapManagerFactory;
+    private final AsyncEventService indexService;
+    private final EntityIndexFactory entityIndexFactory;
+    private final CollectionSettingsFactory collectionSettingsFactory;
+
+
+    @Inject
+    public CollectionDeleteServiceImpl(final EntityIndexFactory entityIndexFactory,
+                                       final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                       final AllEntityIdsObservable allEntityIdsObservable,
+                                       final MapManagerFactory mapManagerFactory,
+                                       final AllApplicationsObservable allApplicationsObservable,
+                                       final IndexProcessorFig indexProcessorFig,
+                                       final CollectionSettingsFactory collectionSettingsFactory,
+                                       final AsyncEventService indexService ) {
+        this.entityIndexFactory = entityIndexFactory;
+        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.allEntityIdsObservable = allEntityIdsObservable;
+        this.allApplicationsObservable = allApplicationsObservable;
+        this.indexProcessorFig = indexProcessorFig;
+        this.indexService = indexService;
+        this.collectionSettingsFactory = collectionSettingsFactory;
+        this.mapManagerFactory = mapManagerFactory;
+        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
+    }
+
+
+    //TODO: optional delay, param.
+    @Override
+    public CollectionDeleteStatus deleteCollection( final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder) {
+
+        final AtomicInteger count = new AtomicInteger();
+
+        final Optional<EdgeScope> cursor = parseCursor( collectionDeleteRequestBuilder.getCursor() );
+
+        final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
+
+        final Optional<Integer> delayTimer = collectionDeleteRequestBuilder.getDelayTimer();
+
+        final Optional<TimeUnit> timeUnitOptional = collectionDeleteRequestBuilder.getTimeUnitOptional();
+
+        Optional<ApplicationScope> appId = collectionDeleteRequestBuilder.getApplicationScope();
+
+        Preconditions.checkArgument(collectionDeleteRequestBuilder.getCollectionName().isPresent(),
+            "You must specify a collection name");
+        String collectionName = collectionDeleteRequestBuilder.getCollectionName().get();
+
+        Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
+            "You cannot specify an app id and a cursor.  When resuming with cursor you must omit the appid." );
+        Preconditions.checkArgument( cursor.isPresent() || appId.isPresent(),
+            "Either application ID or cursor is required.");
+
+        ApplicationScope applicationScope;
+        if (appId.isPresent()) {
+            applicationScope = appId.get();
+        } else { // cursor is present
+            applicationScope = cursor.get().getApplicationScope();
+        }
+
+
+        final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+        // default to current time
+        final long endTimestamp = collectionDeleteRequestBuilder.getEndTimestamp().or( System.currentTimeMillis() );
+
+        String pluralizedCollectionName = InflectionUtils.pluralize(CpNamingUtils.getNameFromEdgeType(collectionName));
+
+        CollectionSettings collectionSettings =
+            collectionSettingsFactory.getInstance(new CollectionSettingsScopeImpl(applicationScope.getApplication(), pluralizedCollectionName));
+
+        Optional<Map<String, Object>> existingSettings =
+            collectionSettings.getCollectionSettings( pluralizedCollectionName );
+
+        if ( existingSettings.isPresent() ) {
+
+            Map jsonMapData = existingSettings.get();
+
+            jsonMapData.put( "lastCollectionClear", Instant.now().toEpochMilli() );
+
+            collectionSettings.putCollectionSettings(
+                pluralizedCollectionName, JsonUtils.mapToJsonString(jsonMapData ) );
+        }
+
+        allEntityIdsObservable.getEdgesToEntities( Observable.just(applicationScope),
+            fromNullable(collectionName), cursorSeek.getSeekValue() )
+            .buffer( indexProcessorFig.getCollectionDeleteBufferSize())
+            .doOnNext( edgeScopes -> {
+                logger.info("Sending batch of {} to be deleted.", edgeScopes.size());
+                indexService.deleteBatch(edgeScopes, endTimestamp, AsyncEventQueueType.DELETE);
+                count.addAndGet(edgeScopes.size() );
+                if( edgeScopes.size() > 0 ) {
+                    writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+                }
+                writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+            .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+            .subscribeOn( Schedulers.io() ).subscribe();
+
+
+        return new CollectionDeleteStatus( jobId, Status.STARTED, 0, 0 );
+    }
+
+
+    @Override
+    public CollectionDeleteRequestBuilder getBuilder() {
+        return new CollectionDeleteRequestBuilderImpl();
+    }
+
+
+    @Override
+    public CollectionDeleteStatus getStatus( final String jobId ) {
+        Preconditions.checkNotNull( jobId, "jobId must not be null" );
+        return getCollectionDeleteResponse( jobId );
+    }
+
+
+    /**
+     * Get the resume edge scope
+     *
+     * @param edgeScope The optional edge scope from the cursor
+     */
+    private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+        if ( edgeScope.isPresent() ) {
+            return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+        }
+
+        return new CursorSeek<>( Optional.absent() );
+    }
+
+
+    /**
+     * Swap our cursor for an optional edgescope
+     */
+    private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+        if ( !cursor.isPresent() ) {
+            return Optional.absent();
+        }
+
+        //get our cursor
+        final String persistedCursor = mapManager.getString( cursor.get() );
+
+        if ( persistedCursor == null ) {
+            return Optional.absent();
+        }
+
+        final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+        final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+        return Optional.of( edgeScope );
+    }
+
+
+    /**
+     * Write the cursor state to the map in cassandra
+     */
+    private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+        final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+        final String serializedState = CursorSerializerUtil.asString( node );
+
+        mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, CURSOR_TTL);
+    }
+
+
+    /**
+     * Write our state meta data into cassandra so everyone can see it
+     * @param jobId
+     * @param status
+     * @param processedCount
+     * @param lastUpdated
+     */
+    private void writeStateMeta( final String jobId, final Status status, final long processedCount,
+                                 final long lastUpdated ) {
+
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+                    jobId, status, processedCount, lastUpdated);
+        }
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
+        mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+    /**
+     * Get the index response from the jobId
+     * @param jobId
+     * @return
+     */
+    private CollectionDeleteStatus getCollectionDeleteResponse( final String jobId ) {
+
+        final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+        if(stringStatus == null){
+           return new CollectionDeleteStatus( jobId, Status.UNKNOWN, 0, 0 );
+        }
+
+        final Status status = Status.valueOf( stringStatus );
+
+        final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
+
+        return new CollectionDeleteStatus( jobId, status, processedCount, lastUpdated );
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index eb63056..948e106 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -56,6 +56,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
 
+    String COLLECTION_DELETE_BUFFER_SIZE = "elasticsearch.collection_delete.buffer_size";
+
     String REINDEX_CONCURRENCY_FACTOR = "elasticsearch.reindex.concurrency.factor";
 
 
@@ -157,6 +159,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getReindexConcurrencyFactor();
 
     /**
+     * Number of parallel buffers during collection delete
+     */
+    @Default("500")
+    @Key(COLLECTION_DELETE_BUFFER_SIZE)
+    int getCollectionDeleteBufferSize();
+
+    /**
      * Flag to resolve the LOCAL queue implementation service synchronously.
      */
     @Default("false")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index c7371b3..05602fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -352,7 +352,7 @@ public class ReIndexServiceImpl implements ReIndexService {
         final Status status = Status.valueOf( stringStatus );
 
         final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
-        final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
 
         return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
new file mode 100644
index 0000000..ddf2c68
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Injector;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+
+@NotThreadSafe
+public class CollectionDeleteTest extends AbstractCoreIT {
+    private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class );
+
+    private static final MetricRegistry registry = new MetricRegistry();
+
+
+    private static final int ENTITIES_TO_DELETE = 1000;
+    private static final int ENTITIES_TO_ADD_AFTER_TIME = 3;
+
+
+    @Before
+    public void startReporting() {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Starting metrics reporting");
+        }
+    }
+
+
+    @After
+    public void printReport() {
+        logger.debug( "Printing metrics report" );
+    }
+
+
+    @Test( timeout = 240000 )
+    public void clearOneCollection() throws Exception {
+
+        logger.info( "Started clearOneCollection()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+        final CollectionDeleteService collectionDeleteService = setup.getInjector().getInstance( CollectionDeleteService.class );
+
+        // ----------------- create a bunch of entities
+
+        Map<String, Object> entityMap = new HashMap<String, Object>() {{
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
+        }};
+
+        String collectionName = "items";
+        String itemType = "item";
+
+
+        List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+        for ( int i = 0; i < ENTITIES_TO_DELETE; i++ ) {
+
+            final Entity entity;
+
+            try {
+                entityMap.put( "key", i );
+                entity = em.create(itemType, entityMap);
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
+            }
+
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities", i );
+            }
+        }
+
+        logger.info("Created {} entities", ENTITIES_TO_DELETE);
+        long timeFirstPutDone = System.currentTimeMillis();
+        logger.info("timeFirstPutDone={}", timeFirstPutDone);
+
+        for (int i = 0; i < ENTITIES_TO_ADD_AFTER_TIME; i++) {
+
+            final Entity entity;
+
+            try {
+                entityMap.put( "key", ENTITIES_TO_DELETE + i );
+                entity = em.create(itemType, entityMap);
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
+            }
+
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities after delete time", i );
+            }
+
+        }
+        logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME);
+
+
+        app.waitForQueueDrainAndRefreshIndex(5000);
+
+        final CollectionDeleteRequestBuilder builder =
+            collectionDeleteService.getBuilder()
+                .withApplicationId( em.getApplicationId() )
+                .withCollection(collectionName)
+                .withEndTimestamp(timeFirstPutDone);
+
+        CollectionDeleteService.CollectionDeleteStatus status = collectionDeleteService.deleteCollection(builder);
+
+        assertNotNull( status.getJobId(), "JobId is present" );
+
+        logger.info( "Delete collection" );
+
+
+        waitForDelete( status, collectionDeleteService );
+
+        app.waitForQueueDrainAndRefreshIndex(15000);
+
+        // ----------------- test that we can read the entries after the timestamp
+
+        readData( em, collectionName,ENTITIES_TO_ADD_AFTER_TIME);
+    }
+
+    /**
+     * Wait for the delete to occur
+     */
+    private void waitForDelete( final CollectionDeleteService.CollectionDeleteStatus status, final CollectionDeleteService collectionDeleteService )
+        throws InterruptedException, IllegalArgumentException {
+        if (status != null) {
+            logger.info("waitForDelete: jobID={}", status.getJobId());
+        } else {
+            logger.info("waitForDelete: error, status = null");
+            throw new IllegalArgumentException("collectionDeleteStatus = null");
+        }
+        while ( true ) {
+
+            try {
+                final CollectionDeleteService.CollectionDeleteStatus updatedStatus =
+                    collectionDeleteService.getStatus( status.getJobId() );
+
+                if (updatedStatus == null) {
+                    logger.info("waitForDelete: updated status is null");
+                } else {
+                    logger.info("waitForDelete: status={} numberProcessed={}",
+                        updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+                    if ( updatedStatus.getStatus() == CollectionDeleteService.Status.COMPLETE ) {
+                        break;
+                    }
+                }
+            }
+            catch ( IllegalArgumentException iae ) {
+                //swallow.  Thrown if our job can't be found.  I.E hasn't updated yet
+            }
+
+
+            Thread.sleep( 1000 );
+        }
+    }
+
+
+    private int readData(EntityManager em, String collectionName, int expectedEntities)
+        throws Exception {
+
+        app.waitForQueueDrainAndRefreshIndex();
+
+        Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities,
+            Query.Level.ALL_PROPERTIES, false);
+
+        int count = 0;
+        while ( true ) {
+
+            if (results.getEntities().size() == 0) {
+                break;
+            }
+
+            UUID lastEntityUUID = null;
+            for ( Entity e : results.getEntities() ) {
+
+                assertEquals(2000, e.getProperty("key2"));
+
+                if (count % 100 == 0) {
+                    logger.info("read {} entities", count);
+                }
+                lastEntityUUID = e.getUuid();
+                count++;
+            }
+
+            results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities,
+                Query.Level.ALL_PROPERTIES, false);
+
+        }
+        logger.info("read {} total entities", count);
+
+        assertEquals( "Did not get expected entities", expectedEntities, count );
+        return count;
+    }
+
+    private int countEntities( EntityManager em, String collectionName, int expectedEntities)
+           throws Exception {
+
+           app.waitForQueueDrainAndRefreshIndex();
+
+           Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+           Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
+
+           int count = 0;
+           while ( true ) {
+
+               count += results.size();
+
+
+               if ( results.hasCursor() ) {
+                   logger.info( "Counted {} : query again with cursor", count );
+                   q.setCursor( results.getCursor() );
+                   results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+               }
+               else {
+                   break;
+               }
+           }
+
+           assertEquals( "Did not get expected entities", expectedEntities, count );
+           return count;
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/query-validator/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/query-validator/src/test/resources/usergrid-custom-test.properties b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
index bc1ba56..c8e3eee 100644
--- a/stack/query-validator/src/test/resources/usergrid-custom-test.properties
+++ b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
@@ -30,3 +30,5 @@ usergrid.sysadmin.login.allowed=true
 
 # This property is required to be set and cannot be defaulted anywhere
 usergrid.cluster_name=usergrid
+
+elasticsearch.queue_impl=LOCAL

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b8c1caa..c9174c1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -18,19 +18,19 @@
 package org.apache.usergrid.rest.applications;
 
 
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
+import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.PathSegment;
 import javax.ws.rs.core.UriInfo;
 
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteService;
+import org.apache.usergrid.persistence.index.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
@@ -48,6 +48,9 @@ import org.apache.usergrid.services.ServicePayload;
 
 import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  * A collection resource that stands before the Service Resource. If it cannot find
@@ -61,6 +64,9 @@ import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
 })
 public class CollectionResource extends ServiceResource {
 
+    private static final Logger logger = LoggerFactory.getLogger( CollectionResource.class );
+    private static final String UPDATED_BEFORE_FIELD = "updatedBefore";
+
     public CollectionResource() {
     }
 
@@ -190,6 +196,61 @@ public class CollectionResource extends ServiceResource {
     }
 
 
+    @PUT
+    @Path("{itemName}/_clear")
+    @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse clearCollectionPut(
+        final Map<String, Object> payload,
+        @PathParam("itemName") final String collectionName,
+        @QueryParam("callback") @DefaultValue("callback") String callback
+    ) throws Exception {
+
+        logger.info("Clearing collection {} for application {}", collectionName, getApplicationId().toString());
+
+        final CollectionDeleteRequestBuilder request = createRequest()
+            .withApplicationId(getApplicationId())
+            .withCollection(collectionName);
+
+        return executeResumeAndCreateResponse(payload, request, callback);
+
+    }
+
+
+    @GET
+    @Path( "{itemName}/_clear/{jobId}")
+    @Produces({MediaType.APPLICATION_JSON,"application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse clearCollectionJobGet(
+        @Context UriInfo ui,
+        @PathParam("itemName") PathSegment itemName,
+        @PathParam("jobId") String jobId,
+        @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+        if(logger.isTraceEnabled()){
+            logger.trace( "CollectionResource.clearCollectionJobGet" );
+        }
+
+        Preconditions
+            .checkNotNull(jobId, "path param jobId must not be null" );
+
+        CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().getStatus(jobId);
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "clear collection" );
+        response.setProperty( "jobId", status.getJobId() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberCheckedForDeletion", status.getNumberProcessed() );
+        response.setSuccess();
+
+        return response;
+    }
+
+
     // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
     // So system access only.
     // TODO: use scheduler here to get around people sending a reindex call 30 times.
@@ -210,4 +271,57 @@ public class CollectionResource extends ServiceResource {
             services.getApplicationId().toString(),itemName.getPath(),false,callback );
     }
 
+
+    private CollectionDeleteService getCollectionDeleteService() {
+        return injector.getInstance( CollectionDeleteService.class );
+    }
+
+
+    private CollectionDeleteRequestBuilder createRequest() {
+        return new CollectionDeleteRequestBuilderImpl();
+    }
+
+
+    private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> payload,
+                                                        final CollectionDeleteRequestBuilder request,
+                                                        final String callback ) {
+
+        Map<String,Object> newPayload = payload;
+        if(newPayload == null ||  !payload.containsKey( UPDATED_BEFORE_FIELD )){
+            newPayload = new HashMap<>(1);
+            newPayload.put(UPDATED_BEFORE_FIELD,Long.MAX_VALUE);
+        }
+
+        Preconditions.checkArgument(newPayload.get(UPDATED_BEFORE_FIELD) instanceof Number,
+            "The field \"updatedBefore\" in the payload must be a timestamp" );
+
+        //add our updated timestamp to the request
+        if ( newPayload.containsKey( UPDATED_BEFORE_FIELD ) ) {
+            final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_BEFORE_FIELD));
+            request.withEndTimestamp( timestamp );
+        }
+
+        return executeAndCreateResponse( request, callback );
+    }
+
+    /**
+     * Execute the request and return the response.
+     */
+    private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
+
+
+        final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "clear collection" );
+        response.setProperty( "jobId", status.getJobId() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberQueued", status.getNumberProcessed() );
+        response.setSuccess();
+
+        return response;
+    }
+
 }


Mime
View raw message