usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [6/6] git commit: Commit after refactor. Need to evaluate an event bus in guava instead of using coupled stages
Date Thu, 05 Dec 2013 00:55:32 GMT
Commit after refactor.  Need to evaluate an event bus in guava instead of using coupled stages


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/44072d59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/44072d59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/44072d59

Branch: refs/heads/two-dot-o
Commit: 44072d594f76f9f4fd0e6bc828e6b4d5ba7b8d81
Parents: 43aba65
Author: Todd Nine <toddnine@apache.org>
Authored: Wed Dec 4 17:42:24 2013 -0700
Committer: Todd Nine <toddnine@apache.org>
Committed: Wed Dec 4 17:52:04 2013 -0700

----------------------------------------------------------------------
 .../collection/impl/CollectionManagerImpl.java  |  14 +-
 .../collection/mvcc/stage/ExecutionStage.java   |  15 ++
 .../collection/mvcc/stage/Stage.java            |  15 --
 .../collection/mvcc/stage/StagePipeline.java    |   8 +-
 .../collection/mvcc/stage/impl/Clear.java       |  91 ---------
 .../stage/impl/CollectionPipelineModule.java    |  33 +++-
 .../collection/mvcc/stage/impl/Commit.java      |  90 ---------
 .../collection/mvcc/stage/impl/Create.java      |  85 ---------
 .../mvcc/stage/impl/CreatePipeline.java         |  23 ---
 .../mvcc/stage/impl/DeletePipeline.java         |  23 ---
 .../mvcc/stage/impl/ExecutionContextImpl.java   |   6 +-
 .../collection/mvcc/stage/impl/Load.java        |  82 --------
 .../mvcc/stage/impl/LoadPipeline.java           |  23 ---
 .../mvcc/stage/impl/StagePipelineImpl.java      |  32 ++--
 .../collection/mvcc/stage/impl/StartDelete.java |  97 ----------
 .../collection/mvcc/stage/impl/StartWrite.java  |  92 ---------
 .../collection/mvcc/stage/impl/Update.java      |  66 -------
 .../mvcc/stage/impl/UpdatePipeline.java         |  23 ---
 .../collection/mvcc/stage/impl/Verify.java      |  25 ---
 .../mvcc/stage/impl/WriteContextCallback.java   |  59 ------
 .../mvcc/stage/impl/delete/Delete.java          |  91 +++++++++
 .../mvcc/stage/impl/delete/DeletePipeline.java  |  23 +++
 .../mvcc/stage/impl/delete/StartDelete.java     |  96 ++++++++++
 .../collection/mvcc/stage/impl/read/Load.java   |  82 ++++++++
 .../mvcc/stage/impl/read/PipelineLoad.java      |  23 +++
 .../mvcc/stage/impl/write/Commit.java           |  90 +++++++++
 .../mvcc/stage/impl/write/Create.java           |  85 +++++++++
 .../mvcc/stage/impl/write/PipelineCreate.java   |  23 +++
 .../mvcc/stage/impl/write/PipelineUpdate.java   |  23 +++
 .../mvcc/stage/impl/write/StartWrite.java       |  92 +++++++++
 .../mvcc/stage/impl/write/Update.java           |  70 +++++++
 .../mvcc/stage/impl/write/Verify.java           |  25 +++
 .../stage/impl/write/WriteContextCallback.java  |  59 ++++++
 .../MvccEntitySerializationStrategy.java        |   2 +-
 .../collection/CollectionManagerTest.java       |  13 +-
 .../mvcc/stage/ExecutionContextTest.java        |  24 +--
 .../mvcc/stage/StagePipelineTest.java           |  22 +--
 .../collection/mvcc/stage/impl/CreateTest.java  | 152 ---------------
 .../mvcc/stage/impl/StartWriteTest.java         | 187 ------------------
 .../mvcc/stage/impl/write/CreateTest.java       | 153 +++++++++++++++
 .../mvcc/stage/impl/write/StartWriteTest.java   | 188 ++++++++++++++++++
 .../mvcc/stage/impl/write/UpdateTest.java       | 189 +++++++++++++++++++
 .../persistence/index/stage/Complete.java       |   4 +-
 .../usergrid/persistence/index/stage/Start.java |   4 +-
 .../usergrid/persistence/index/stage/Write.java |   4 +-
 45 files changed, 1420 insertions(+), 1206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index c20d2c3..e20ba62 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -11,11 +11,11 @@ import org.apache.usergrid.persistence.collection.CollectionManager;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.CreatePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.DeletePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.LoadPipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.UpdatePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.inject.Inject;
@@ -39,10 +39,10 @@ public class CollectionManagerImpl implements CollectionManager {
 
 
     @Inject
-    public CollectionManagerImpl( @CreatePipeline final StagePipeline createPipeline,
-                                  @UpdatePipeline final StagePipeline updatePipeline,
+    public CollectionManagerImpl( @PipelineCreate final StagePipeline createPipeline,
+                                  @PipelineUpdate final StagePipeline updatePipeline,
                                   @DeletePipeline final StagePipeline deletePipeline,
-                                  @LoadPipeline final StagePipeline loadPipeline,
+                                  @PipelineLoad final StagePipeline loadPipeline,
                                   @Assisted final CollectionContext context ) {
 
         this.createPipeline = createPipeline;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
new file mode 100644
index 0000000..a98c813
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
@@ -0,0 +1,15 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+/** The possible stages in our write flow. */
+public interface ExecutionStage {
+
+    /**
+     * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
+     *
+     * @param context The context of the current write operation
+     *
+     * @return The asynchronous listener to signal success
+     */
+    public void performStage( ExecutionContext context );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
deleted file mode 100644
index 538a546..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Stage.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-/** The possible stages in our write flow. */
-public interface Stage {
-
-    /**
-     * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
-     *
-     * @param context The context of the current write operation
-     *
-     * @return The asynchronous listener to signal success
-     */
-    public void performStage( ExecutionContext context );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 0b032d9..9d68e10 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -13,14 +13,14 @@ public interface StagePipeline {
     /**
      * Get the first stage in this pipeline.
      */
-    Stage first();
+    ExecutionStage first();
 
 
     /**
-     * get the next stage after the stage specified
-     * @param stage The stage to seek in our pipeline
+     * get the next executionStage after the executionStage specified
+     * @param executionStage The executionStage to seek in our pipeline
      */
-    Stage nextStage(Stage stage);
+    ExecutionStage nextStage(ExecutionStage executionStage );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
deleted file mode 100644
index 854ec24..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Clear.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Clear implements Stage {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( Clear.class );
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-
-
-    @Inject
-    public Clear( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                  final MvccEntitySerializationStrategy entitySerializationStrategy ) {
-
-        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
-              Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
-
-
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final UUID entityId = entity.getUuid();
-        final UUID version = entity.getVersion();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.COMMITTED );
-
-        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
-
-        //insert a "cleared" value into the versions.  Post processing should actually delete
-        MutationBatch entityMutation = entitySerializationStrategy.clear( collectionContext, entityId, version );
-
-        //merge the 2 into 1 mutation
-        logMutation.mergeShallow( entityMutation );
-
-
-        try {
-            logMutation.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-        /**
-         * We're done executing.
-         */
-        executionContext.proceed();
-
-        //TODO connect to post processors via listener
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 8390e7b..68dab72 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,8 +1,20 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.Delete;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.StartDelete;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Commit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Create;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.StartWrite;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Update;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Verify;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
@@ -24,7 +36,7 @@ public class CollectionPipelineModule extends AbstractModule {
      * objects are mutable
      */
     @Provides
-    @CreatePipeline
+    @PipelineCreate
     @Inject
     @Singleton
     public StagePipeline createPipeline( final Create create, final StartWrite startWrite, final Verify write,
@@ -34,7 +46,7 @@ public class CollectionPipelineModule extends AbstractModule {
 
 
     @Provides
-    @UpdatePipeline
+    @PipelineUpdate
     @Inject
     @Singleton
     public StagePipeline updatePipeline( final Update update, final StartWrite startWrite, final Verify write,
@@ -47,13 +59,13 @@ public class CollectionPipelineModule extends AbstractModule {
     @DeletePipeline
     @Inject
     @Singleton
-    public StagePipeline deletePipeline(final StartDelete startDelete,  final Clear delete ) {
+    public StagePipeline deletePipeline(final StartDelete startDelete,  final Delete delete ) {
         return StagePipelineImpl.fromStages(startDelete, delete );
     }
 
 
     @Provides
-    @LoadPipeline
+    @PipelineLoad
     @Inject
     @Singleton
     public StagePipeline deletePipeline( final Load load ) {
@@ -67,17 +79,20 @@ public class CollectionPipelineModule extends AbstractModule {
         /**
          * Configure all stages here
          */
-        Multibinder<Stage> stageBinder = Multibinder.newSetBinder( binder(), Stage.class );
+        Multibinder<ExecutionStage> stageBinder = Multibinder.newSetBinder( binder(), ExecutionStage.class );
 
 
 
         //creation stages
+        stageBinder.addBinding().to( Commit.class );
         stageBinder.addBinding().to( Create.class );
-        stageBinder.addBinding().to( Update.class );
         stageBinder.addBinding().to( StartWrite.class );
+        stageBinder.addBinding().to( Update.class );
         stageBinder.addBinding().to( Verify.class );
-        stageBinder.addBinding().to( Commit.class );
-        stageBinder.addBinding().to( Clear.class );
+
+        //delete stages
+        stageBinder.addBinding().to( Delete.class );
+        stageBinder.addBinding().to( StartDelete.class );
 
         //loading stages
         stageBinder.addBinding().to(Load.class);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
deleted file mode 100644
index f29bf93..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Commit.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Commit implements Stage {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
-
-    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-
-
-    @Inject
-    public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
-        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
-                      Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
-
-
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final UUID entityId = entity.getUuid();
-        final UUID version = entity.getVersion();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.COMMITTED );
-
-        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
-
-        //now get our actual insert into the entity data
-        MutationBatch entityMutation = entitySerializationStrategy.write( collectionContext, entity );
-
-        //merge the 2 into 1 mutation
-        logMutation.mergeShallow( entityMutation );
-
-
-        try {
-            logMutation.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-        /**
-         * We're done executing.
-         */
-        executionContext.proceed();
-
-        //TODO connect to post processors via listener
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
deleted file mode 100644
index 2026274..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Create.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.reflect.FieldUtils;
-
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.service.TimeService;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.collection.util.Verify;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- * This is the first stage and should be invoked immediately when a new entity create is started. No UUIDs should be
- * present, and this should set the entityId, version, created, and updated dates
- */
-@Singleton
-public class Create implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( Create.class );
-
-
-    private final TimeService timeService;
-    private final UUIDService uuidService;
-
-
-    @Inject
-    public Create( final TimeService timeService, final UUIDService uuidService ) {
-        Preconditions.checkNotNull( timeService, "timeService is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-
-        this.timeService = timeService;
-        this.uuidService = uuidService;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        Verify.isNull( entity.getUuid(), "A new entity should not have an id set.  This is an update operation" );
-
-
-        final UUID entityId = uuidService.newTimeUUID();
-        final UUID version = entityId;
-        final long created = timeService.getTime();
-
-
-        try {
-            FieldUtils.writeDeclaredField( entity, "uuid", entityId, true );
-        }
-        catch ( Throwable t ) {
-            LOG.error( "Unable to set uuid.  See nested exception", t );
-            throw new CollectionRuntimeException( "Unable to set uuid.  See nested exception", t );
-        }
-
-        entity.setVersion( version );
-        entity.setCreated( created );
-        entity.setUpdated( created );
-
-        //set the updated entity for the next stage
-        executionContext.setMessage( entity );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
deleted file mode 100644
index efe50c8..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CreatePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface CreatePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
deleted file mode 100644
index 3d95ddb..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/DeletePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface DeletePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
index 8139f3d..805d1e3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
@@ -3,7 +3,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
 
 import com.google.common.base.Preconditions;
@@ -17,7 +17,7 @@ public class ExecutionContextImpl implements ExecutionContext {
     private final CollectionContext context;
 
     private Object message;
-    private Stage current;
+    private ExecutionStage current;
 
 
     @Inject
@@ -72,7 +72,7 @@ public class ExecutionContextImpl implements ExecutionContext {
 
     @Override
     public void proceed() {
-        Stage next = this.pipeline.nextStage( current );
+        ExecutionStage next = this.pipeline.nextStage( current );
 
         //Nothing to do
         if ( next == null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
deleted file mode 100644
index b0ac251..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Load.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-
-/**
- * This stage is a load stage to load a single entity
- */
-public class Load implements Stage {
-
-
-    private static final Logger LOG = LoggerFactory.getLogger( Load.class );
-
-    private final UUIDService uuidService;
-    private final MvccEntitySerializationStrategy entitySerializationStrategy;
-
-
-    @Inject
-    public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
-        Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-
-        this.uuidService = uuidService;
-        this.entitySerializationStrategy = entitySerializationStrategy;
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final UUID entityId = executionContext.getMessage( UUID.class );
-
-        Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-        //generate  a version that represents now
-        final UUID versionMax = uuidService.newTimeUUID();
-
-        List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
-
-        //nothing to do, we didn't get a result back
-        if(results.size() != 1){
-            executionContext.setMessage( null );
-            executionContext.proceed();
-            return;
-        }
-
-        final Optional<Entity> targetVersion = results.get(0).getEntity();
-
-        //this entity has been marked as cleared.  The version exists, but does not have entity data
-        if(!targetVersion.isPresent()){
-
-            //TODO, a lazy async repair/cleanup here?
-
-            executionContext.setMessage( null );
-            executionContext.proceed();
-            return;
-        }
-
-        executionContext.setMessage( targetVersion.get() );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
deleted file mode 100644
index f712b2c..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/LoadPipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface LoadPipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
index 74dc003..1f3a0fe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
@@ -4,7 +4,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
 
 import com.google.common.base.Preconditions;
@@ -13,25 +13,25 @@ import com.google.common.base.Preconditions;
 /** @author tnine */
 public class StagePipelineImpl implements StagePipeline {
 
-    private final List<Stage> stages;
+    private final List<ExecutionStage> executionStages;
 
 
-    protected StagePipelineImpl( List<Stage> stages ) {
-        Preconditions.checkNotNull(stages, "stages is required");
-        Preconditions.checkArgument(  stages.size() > 0, "stages must have more than 1 element" );
+    protected StagePipelineImpl( List<ExecutionStage> executionStages ) {
+        Preconditions.checkNotNull( executionStages, "executionStages is required");
+        Preconditions.checkArgument(  executionStages.size() > 0, "executionStages must have more than 1 element" );
 
-        this.stages = stages;
+        this.executionStages = executionStages;
     }
 
 
     @Override
-    public Stage first() {
+    public ExecutionStage first() {
 
-        if ( stages.size() == 0 ) {
+        if ( executionStages.size() == 0 ) {
             return null;
         }
 
-        return stages.get( 0 );
+        return executionStages.get( 0 );
     }
 
 
@@ -39,23 +39,23 @@ public class StagePipelineImpl implements StagePipeline {
 
 
     @Override
-    public Stage nextStage( final Stage stage ) {
+    public ExecutionStage nextStage( final ExecutionStage executionStage ) {
 
-        Preconditions.checkNotNull( stage, "Stage cannot be null" );
+        Preconditions.checkNotNull( executionStage, "ExecutionStage cannot be null" );
 
-        int index = stages.indexOf( stage );
+        int index = executionStages.indexOf( executionStage );
 
         //we're done, do nothing
-        if ( index == stages.size() -1  ) {
+        if ( index == executionStages.size() -1  ) {
             return null;
         }
 
-        return  stages.get( index + 1 );
+        return  executionStages.get( index + 1 );
     }
 
 
     /** Factory to create a new instance. */
-    public static StagePipelineImpl fromStages( Stage... stages ) {
-        return new StagePipelineImpl(Arrays.asList(  stages ));
+    public static StagePipelineImpl fromStages( ExecutionStage... executionStages ) {
+        return new StagePipelineImpl(Arrays.asList( executionStages ));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
deleted file mode 100644
index 383ee9e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartDelete.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class StartDelete implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
-
-    private final MvccLogEntrySerializationStrategy logStrategy;
-    private final UUIDService uuidService;
-
-
-    /** Create a new stage with the current context */
-    @Inject
-    public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
-
-        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-
-        this.logStrategy = logStrategy;
-        this.uuidService = uuidService;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final UUID entityId = executionContext.getMessage( UUID.class );
-
-
-        final UUID version = uuidService.newTimeUUID();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.ACTIVE );
-
-        MutationBatch write = logStrategy.write( collectionContext, startEntry );
-
-
-        try {
-            write.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-
-        //create the mvcc entity for the next stage
-        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
-
-        executionContext.setMessage( nextStage );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
deleted file mode 100644
index 4c55ae6..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StartWrite.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-
-/**
- * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
- * new write in the data store for a checkpoint and recovery
- */
-@Singleton
-public class StartWrite implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
-
-    private final MvccLogEntrySerializationStrategy logStrategy;
-
-
-    /** Create a new stage with the current context */
-    @Inject
-    public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
-        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
-
-
-        this.logStrategy = logStrategy;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-        final UUID entityId = entity.getUuid();
-        final UUID version = entity.getVersion();
-
-        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
-        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
-
-
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
-
-
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.ACTIVE );
-
-        MutationBatch write = logStrategy.write( collectionContext, startEntry );
-
-
-        try {
-            write.execute();
-        }
-        catch ( ConnectionException e ) {
-            LOG.error( "Failed to execute write asynchronously ", e );
-            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
-        }
-
-
-        //create the mvcc entity for the next stage
-        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
-
-        executionContext.setMessage( nextStage );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
deleted file mode 100644
index ce1be76..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Update.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.service.TimeService;
-import org.apache.usergrid.persistence.collection.service.UUIDService;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- * This stage performs the initial commit log and write of an entity.  It assumes the entity id and created has already
- * been set correctly
- */
-@Singleton
-public class Update implements Stage {
-
-    private static final Logger LOG = LoggerFactory.getLogger( Update.class );
-
-    private final TimeService timeService;
-    private final UUIDService uuidService;
-
-
-    @Inject
-    public Update( final TimeService timeService, final UUIDService uuidService ) {
-        Preconditions.checkNotNull( timeService, "timeService is required" );
-        Preconditions.checkNotNull( uuidService, "uuidService is required" );
-
-        this.timeService = timeService;
-        this.uuidService = uuidService;
-    }
-
-
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
-
-        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
-
-        final UUID version = uuidService.newTimeUUID();
-        final long updated = timeService.getTime();
-
-
-        entity.setVersion( version );
-        entity.setUpdated( updated );
-
-        executionContext.setMessage( entity );
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
deleted file mode 100644
index abc6e15..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/UpdatePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface UpdatePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
deleted file mode 100644
index cc53000..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/Verify.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.Stage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-
-import com.google.inject.Singleton;
-
-
-/** This phase should execute any verification on the MvccEntity */
-@Singleton
-public class Verify implements Stage {
-
-
-    public Verify() {
-    }
-
-
-    @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        //TODO no op for now, just continue to the next stage.  Verification logic goes in here
-
-        executionContext.proceed();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
deleted file mode 100644
index 2a34e95..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/WriteContextCallback.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.connectionpool.OperationResult;
-
-
-/**
- * Helper class to cause the async execution to continue
- * Not used ATM, just here for demonstration purposes with async astynax invocation on phase proceed
- *
- * @author tnine
- */
-public class WriteContextCallback implements FutureCallback<OperationResult<Void>> {
-
-    private final ExecutionContext context;
-
-
-    /** Create a new callback.  The data will be passed to the next stage */
-    private WriteContextCallback( final ExecutionContext context ) {
-        this.context = context;
-    }
-
-
-    public void onSuccess( final OperationResult<Void> result ) {
-
-        /**
-         * Proceed to the next stage
-         */
-        context.proceed();
-    }
-
-
-    @Override
-    public void onFailure( final Throwable t ) {
-//        context.stop();
-        throw new CollectionRuntimeException( "Failed to execute write", t );
-    }
-
-
-    /**
-     * This encapsulated type of Void in the listenable future is intentional.  If you're not returning void in your
-     * future, you shouldn't be using this callback, you should be using a callback that will set the Response value
-     * into the next stage and invoke it
-     *
-     * @param future The listenable future returned by the Astyanax async op
-     * @param context The context to signal to continue in the callback
-     */
-    public static void createCallback( final ListenableFuture<OperationResult<Void>> future,
-                                       final ExecutionContext context ) {
-
-        Futures.addCallback( future, new WriteContextCallback( context ) );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
new file mode 100644
index 0000000..7810beb
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
@@ -0,0 +1,91 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
+public class Delete implements ExecutionStage {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( Delete.class );
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    @Inject
+    public Delete( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+
+        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
+              Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        final UUID entityId = entity.getUuid();
+        final UUID version = entity.getVersion();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.COMMITTED );
+
+        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
+
+        //insert a "cleared" value into the versions.  Post processing should actually delete
+        MutationBatch entityMutation = entitySerializationStrategy.clear( collectionContext, entityId, version );
+
+        //merge the 2 into 1 mutation
+        logMutation.mergeShallow( entityMutation );
+
+
+        try {
+            logMutation.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+        /**
+         * We're done executing.
+         */
+        executionContext.proceed();
+
+        //TODO connect to post processors via listener
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
new file mode 100644
index 0000000..52fe4b9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface DeletePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
new file mode 100644
index 0000000..4208662
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
@@ -0,0 +1,96 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class StartDelete implements ExecutionStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
+
+    private final MvccLogEntrySerializationStrategy logStrategy;
+    private final UUIDService uuidService;
+
+
+    /** Create a new stage with the current context */
+    @Inject
+    public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
+
+        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+        Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+        this.logStrategy = logStrategy;
+        this.uuidService = uuidService;
+    }
+
+
+    /**
+     * Create the entity Id  and inject it, as well as set the timestamp versions
+     *
+     * @param executionContext The context of the current write operation
+     */
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+
+        final UUID entityId = executionContext.getMessage( UUID.class );
+
+
+        final UUID version = uuidService.newTimeUUID();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.ACTIVE );
+
+        MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+        try {
+            write.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+
+        //create the mvcc entity for the next stage
+        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
+
+        executionContext.setMessage( nextStage );
+        executionContext.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
new file mode 100644
index 0000000..00a2d43
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
@@ -0,0 +1,82 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+
+
+/**
+ * This stage is a load stage to load a single entity
+ */
+public class Load implements ExecutionStage {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( Load.class );
+
+    private final UUIDService uuidService;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    @Inject
+    public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+        Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+        Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+        this.uuidService = uuidService;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+        final UUID entityId = executionContext.getMessage( UUID.class );
+
+        Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+        //generate  a version that represents now
+        final UUID versionMax = uuidService.newTimeUUID();
+
+        List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
+
+        //nothing to do, we didn't get a result back
+        if(results.size() != 1){
+            executionContext.setMessage( null );
+            executionContext.proceed();
+            return;
+        }
+
+        final Optional<Entity> targetVersion = results.get(0).getEntity();
+
+        //this entity has been marked as cleared.  The version exists, but does not have entity data
+        if(!targetVersion.isPresent()){
+
+            //TODO, a lazy async repair/cleanup here?
+
+            executionContext.setMessage( null );
+            executionContext.proceed();
+            return;
+        }
+
+        executionContext.setMessage( targetVersion.get() );
+        executionContext.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
new file mode 100644
index 0000000..0d24b27
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the delete pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface PipelineLoad {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
new file mode 100644
index 0000000..4780ff1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
@@ -0,0 +1,90 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
+public class Commit implements ExecutionStage {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+    @Inject
+    public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+        Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
+                      Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+    }
+
+
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        final UUID entityId = entity.getUuid();
+        final UUID version = entity.getVersion();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.COMMITTED );
+
+        MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
+
+        //now get our actual insert into the entity data
+        MutationBatch entityMutation = entitySerializationStrategy.write( collectionContext, entity );
+
+        //merge the 2 into 1 mutation
+        logMutation.mergeShallow( entityMutation );
+
+
+        try {
+            logMutation.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+        /**
+         * We're done executing.
+         */
+        executionContext.proceed();
+
+        //TODO connect to post processors via listener
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
new file mode 100644
index 0000000..cbd7d9b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
@@ -0,0 +1,85 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.service.TimeService;
+import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.collection.util.Verify;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a new entity create is started. No UUIDs should be
+ * present, and this should set the entityId, version, created, and updated dates
+ */
+@Singleton
+public class Create implements ExecutionStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( Create.class );
+
+
+    private final TimeService timeService;
+    private final UUIDService uuidService;
+
+
+    @Inject
+    public Create( final TimeService timeService, final UUIDService uuidService ) {
+        Preconditions.checkNotNull( timeService, "timeService is required" );
+        Preconditions.checkNotNull( uuidService, "uuidService is required" );
+
+
+        this.timeService = timeService;
+        this.uuidService = uuidService;
+    }
+
+
+    /**
+     * Create the entity Id  and inject it, as well as set the timestamp versions
+     *
+     * @param executionContext The context of the current write operation
+     */
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+
+        final Entity entity = executionContext.getMessage( Entity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        Verify.isNull( entity.getUuid(), "A new entity should not have an id set.  This is an update operation" );
+
+
+        final UUID entityId = uuidService.newTimeUUID();
+        final UUID version = entityId;
+        final long created = timeService.getTime();
+
+
+        try {
+            FieldUtils.writeDeclaredField( entity, "uuid", entityId, true );
+        }
+        catch ( Throwable t ) {
+            LOG.error( "Unable to set uuid.  See nested exception", t );
+            throw new CollectionRuntimeException( "Unable to set uuid.  See nested exception", t );
+        }
+
+        entity.setVersion( version );
+        entity.setCreated( created );
+        entity.setUpdated( created );
+
+        //set the updated entity for the next stage
+        executionContext.setMessage( entity );
+        executionContext.proceed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
new file mode 100644
index 0000000..f3af972
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the create pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD })
+@Retention(RUNTIME)
+public @interface PipelineCreate {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
new file mode 100644
index 0000000..85bc56d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
@@ -0,0 +1,23 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Marks the create pipeline
+ *
+ * @author tnine
+ */
+@BindingAnnotation
+@Target( { FIELD, PARAMETER, METHOD } )
+@Retention( RUNTIME )
+public @interface PipelineUpdate {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44072d59/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
new file mode 100644
index 0000000..d6e7d49
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
@@ -0,0 +1,92 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * This is the first stage and should be invoked immediately when a write is started.  It should persist the start of a
+ * new write in the data store for a checkpoint and recovery
+ */
+@Singleton
+public class StartWrite implements ExecutionStage {
+
+    private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
+
+    private final MvccLogEntrySerializationStrategy logStrategy;
+
+
+    /** Create a new stage with the current context */
+    @Inject
+    public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
+        Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
+
+
+        this.logStrategy = logStrategy;
+    }
+
+
+    /**
+     * Create the entity Id  and inject it, as well as set the timestamp versions
+     *
+     * @param executionContext The context of the current write operation
+     */
+    @Override
+    public void performStage( final ExecutionContext executionContext ) {
+
+        final Entity entity = executionContext.getMessage( Entity.class );
+
+        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
+
+        final UUID entityId = entity.getUuid();
+        final UUID version = entity.getVersion();
+
+        Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+        Preconditions.checkNotNull( version, "Entity version is required in this stage" );
+
+
+
+        final CollectionContext collectionContext = executionContext.getCollectionContext();
+
+
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
+                .collection.mvcc.entity.Stage.ACTIVE );
+
+        MutationBatch write = logStrategy.write( collectionContext, startEntry );
+
+
+        try {
+            write.execute();
+        }
+        catch ( ConnectionException e ) {
+            LOG.error( "Failed to execute write asynchronously ", e );
+            throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
+        }
+
+
+        //create the mvcc entity for the next stage
+        final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
+
+        executionContext.setMessage( nextStage );
+        executionContext.proceed();
+    }
+}


Mime
View raw message