usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [2/4] git commit: Example using google’s event bus. Seems events are getting dispatched multiple times.
Date Thu, 05 Dec 2013 17:35:33 GMT
Example using google’s event bus. Seems events are getting dispatched multiple times.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56c415f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56c415f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56c415f2

Branch: refs/heads/two-dot-o
Commit: 56c415f2bd6ac784b5e5d71a6f92bbbc11f82b86
Parents: 44072d5
Author: Todd Nine <toddnine@apache.org>
Authored: Wed Dec 4 21:14:42 2013 -0700
Committer: Todd Nine <toddnine@apache.org>
Committed: Wed Dec 4 21:14:42 2013 -0700

----------------------------------------------------------------------
 .../collection/impl/CollectionManagerImpl.java  |  61 +++---
 .../mvcc/entity/CollectionEventBus.java         |  38 ++++
 .../collection/mvcc/entity/MvccEntity.java      |   1 -
 .../collection/mvcc/entity/MvccLogEntry.java    |   2 -
 .../entity/impl/CollectionEventBusImpl.java     |  20 ++
 .../mvcc/entity/impl/MvccEntityImpl.java        |   1 -
 .../mvcc/entity/impl/MvccLogEntryImpl.java      |   1 -
 .../mvcc/event/PostProcessListener.java         |   3 -
 .../collection/mvcc/stage/CollectionEvent.java  |  43 ++++
 .../collection/mvcc/stage/EventStage.java       |  18 ++
 .../collection/mvcc/stage/ExecutionContext.java |   5 -
 .../collection/mvcc/stage/ExecutionStage.java   |  15 --
 .../collection/mvcc/stage/Result.java           |  48 ++++
 .../collection/mvcc/stage/StagePipeline.java    |   8 +-
 .../stage/impl/CollectionPipelineModule.java    |  72 ++----
 .../mvcc/stage/impl/ExecutionContextImpl.java   |  92 --------
 .../mvcc/stage/impl/StagePipelineImpl.java      |  61 ------
 .../mvcc/stage/impl/delete/Delete.java          |  27 ++-
 .../mvcc/stage/impl/delete/DeletePipeline.java  |  23 --
 .../mvcc/stage/impl/delete/DeleteStart.java     |  17 ++
 .../mvcc/stage/impl/delete/Deletecommit.java    |  16 ++
 .../mvcc/stage/impl/delete/StartDelete.java     |  36 +--
 .../mvcc/stage/impl/read/EventLoad.java         |  17 ++
 .../collection/mvcc/stage/impl/read/Load.java   |  35 ++-
 .../mvcc/stage/impl/read/PipelineLoad.java      |  23 --
 .../mvcc/stage/impl/write/Commit.java           |  34 +--
 .../mvcc/stage/impl/write/Create.java           |  29 +--
 .../mvcc/stage/impl/write/EventCommit.java      |  16 ++
 .../mvcc/stage/impl/write/EventCreate.java      |  17 ++
 .../mvcc/stage/impl/write/EventStart.java       |  17 ++
 .../mvcc/stage/impl/write/EventUpdate.java      |  16 ++
 .../mvcc/stage/impl/write/EventVerify.java      |  17 ++
 .../mvcc/stage/impl/write/PipelineCreate.java   |  23 --
 .../mvcc/stage/impl/write/PipelineUpdate.java   |  23 --
 .../mvcc/stage/impl/write/StartWrite.java       |  40 ++--
 .../mvcc/stage/impl/write/Update.java           |  29 +--
 .../mvcc/stage/impl/write/Verify.java           |  20 +-
 .../MvccEntitySerializationStrategy.java        |   1 -
 .../MvccLogEntrySerializationStrategyImpl.java  |   2 +-
 .../serialization/impl/SerializationModule.java |   2 -
 .../collection/service/impl/ServiceModule.java  |  10 -
 .../collection/CollectionManagerIT.java         |   8 +
 .../collection/CollectionManagerTest.java       |  83 ++++---
 .../mvcc/stage/ExecutionContextTest.java        | 217 -------------------
 .../mvcc/stage/StagePipelineTest.java           |  89 --------
 .../mvcc/stage/impl/write/CreateTest.java       |  77 ++++---
 .../mvcc/stage/impl/write/StartWriteTest.java   |  84 +++----
 .../mvcc/stage/impl/write/UpdateTest.java       |  98 +++++----
 .../persistence/index/stage/Complete.java       |  21 --
 .../usergrid/persistence/index/stage/Start.java |  16 --
 .../usergrid/persistence/index/stage/Write.java |  18 --
 51 files changed, 659 insertions(+), 1031 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
index e20ba62..7de8f50 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/CollectionManagerImpl.java
@@ -8,16 +8,17 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.CollectionManager;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.ExecutionContextImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeleteStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.EventLoad;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventCreate;
+import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.EventUpdate;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
@@ -32,23 +33,16 @@ public class CollectionManagerImpl implements CollectionManager {
     private static final Logger logger = LoggerFactory.getLogger( CollectionManagerImpl.class );
 
     private final CollectionContext context;
-    private final StagePipeline createPipeline;
-    private final StagePipeline updatePipeline;
-    private final StagePipeline deletePipeline;
-    private final StagePipeline loadPipeline;
+    private final CollectionEventBus eventBus;
 
 
     @Inject
-    public CollectionManagerImpl( @PipelineCreate final StagePipeline createPipeline,
-                                  @PipelineUpdate final StagePipeline updatePipeline,
-                                  @DeletePipeline final StagePipeline deletePipeline,
-                                  @PipelineLoad final StagePipeline loadPipeline,
+    public CollectionManagerImpl(  final CollectionEventBus eventBus,
                                   @Assisted final CollectionContext context ) {
 
-        this.createPipeline = createPipeline;
-        this.updatePipeline = updatePipeline;
-        this.deletePipeline = deletePipeline;
-        this.loadPipeline = loadPipeline;
+        Preconditions.checkNotNull( eventBus, "eventBus must be defined" );
+        Preconditions.checkNotNull( context, "context must be defined" );
+        this.eventBus = eventBus;
         this.context = context;
     }
 
@@ -56,46 +50,43 @@ public class CollectionManagerImpl implements CollectionManager {
     @Override
     public Entity create( final Entity entity ) {
         // Create a new context for the write
-        ExecutionContext executionContext = new ExecutionContextImpl( createPipeline, context );
+        Result result = new Result();
 
-        //perform the write
-        executionContext.execute( entity );
+        eventBus.post( new EventCreate( context, entity, result ) );
 
-        MvccEntity result = executionContext.getMessage( MvccEntity.class );
+        MvccEntity completed = result.getLast( MvccEntity.class );
 
-        return result.getEntity().get();
+        return completed.getEntity().get();
     }
 
 
     @Override
     public Entity update( final Entity entity ) {
         // Create a new context for the write
-        ExecutionContext executionContext = new ExecutionContextImpl( updatePipeline, context );
+        Result result = new Result();
 
-        //perform the write
-        executionContext.execute( entity );
+        eventBus.post( new EventUpdate( context, entity, result ) );
 
-        MvccEntity result = executionContext.getMessage( MvccEntity.class );
+        MvccEntity completed = result.getLast( MvccEntity.class );
 
-         return result.getEntity().get();
+        return completed.getEntity().get();
     }
 
 
     @Override
     public void delete( final UUID entityId ) {
-        ExecutionContext deleteContext = new ExecutionContextImpl( deletePipeline, context );
-
-        deleteContext.execute( entityId );
+        eventBus.post( new DeleteStart( context, entityId, null ) );
     }
 
 
     @Override
     public Entity load( final UUID entityId ) {
-        ExecutionContext loadContext = new ExecutionContextImpl( loadPipeline, context );
+        Result result = new Result();
 
-        loadContext.execute( entityId );
+        eventBus.post( new EventLoad( context, entityId, result ) );
 
-        return loadContext.getMessage( Entity.class );
+        MvccEntity completed = result.getLast( MvccEntity.class );
 
+        return completed.getEntity().get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
new file mode 100644
index 0000000..c53f5ce
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/CollectionEventBus.java
@@ -0,0 +1,38 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity;
+
+
+/** A dup of the Guava EventBus so we can easily mock and test
+ *  @author tnine */
+public interface CollectionEventBus {
+
+    /**
+     * Registers all handler methods on {@code object} to receive events.
+     * Handler methods are selected and classified using this EventBus's
+     * {@link com.google.common.eventbus.HandlerFindingStrategy}; the default strategy is the
+     * {@link com.google.common.eventbus.AnnotatedHandlerFinder}.
+     *
+     * @param object  object whose handler methods should be registered.
+     */
+    void register(Object object);
+
+    /**
+     * Unregisters all handler methods on a registered {@code object}.
+     *
+     * @param object  object whose handler methods should be unregistered.
+     * @throws IllegalArgumentException if the object was not previously registered.
+     */
+    void unregister(Object object);
+
+    /**
+      * Posts an event to all registered handlers.  This method will return
+      * successfully after the event has been posted to all handlers, and
+      * regardless of any exceptions thrown by handlers.
+      *
+      * <p>If no handlers have been subscribed for {@code event}'s class, and
+      * {@code event} is not already a {@link com.google.common.eventbus.DeadEvent}, it will be wrapped in a
+      * DeadEvent and reposted.
+      *
+      * @param event  event to post.
+      */
+     void post(Object event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
index cc212c8..59197e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccEntity.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity;
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
index c6886d1..d14b35c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/MvccLogEntry.java
@@ -3,8 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity;
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.collection.CollectionContext;
-
 
 /**
  * A Marker interface for an in flight update to allow context information to be passed between states

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
new file mode 100644
index 0000000..3278218
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/CollectionEventBusImpl.java
@@ -0,0 +1,20 @@
+package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
+
+
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+
+import com.google.common.eventbus.EventBus;
+
+
+/** @author tnine */
+public class CollectionEventBusImpl extends EventBus implements CollectionEventBus{
+
+    public CollectionEventBusImpl() {
+        super();
+    }
+
+
+    public CollectionEventBusImpl( final String identifier ) {
+        super( identifier );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
index 1179dad..2d9f2f9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccEntityImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.model.entity.Entity;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
index 2f0b4d7..dee56ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/entity/impl/MvccLogEntryImpl.java
@@ -3,7 +3,6 @@ package org.apache.usergrid.persistence.collection.mvcc.entity.impl;
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
index 29e6515..68479e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/event/PostProcessListener.java
@@ -1,9 +1,6 @@
 package org.apache.usergrid.persistence.collection.mvcc.event;
 
 
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-
-
 /**
  *
  * @author: tnine

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
new file mode 100644
index 0000000..84df2d0
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/CollectionEvent.java
@@ -0,0 +1,43 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+
+import com.google.common.base.Preconditions;
+
+
+/** @author tnine */
+public abstract class CollectionEvent<T> {
+
+    private final CollectionContext context;
+    private final T data;
+    private final Result result;
+
+
+    protected CollectionEvent( final CollectionContext context, final T data, final Result result ) {
+        Preconditions.checkNotNull( context, "context is required" );
+        Preconditions.checkNotNull( data, "context is required" );
+        Preconditions.checkNotNull( context, "context is required" );
+        this.context = context;
+        this.data = data;
+        this.result = result;
+    }
+
+
+
+
+    /** Get the collection context for this event */
+    public CollectionContext getCollectionContext() {
+        return this.context;
+    }
+
+
+    public T getData() {
+        return data;
+    }
+
+
+    public Result getResult() {
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
new file mode 100644
index 0000000..c62eb36
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/EventStage.java
@@ -0,0 +1,18 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import com.google.common.eventbus.Subscribe;
+
+
+/** The possible stages in our write flow. */
+public interface EventStage<T extends CollectionEvent> {
+
+    /**
+     * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
+     *
+     * @param event The event to receive
+     *
+     */
+    @Subscribe
+    public void performStage(T event );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
index f124dcf..0fb9ced 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionContext.java
@@ -1,12 +1,7 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage;
 
 
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessListener;
 
 
 /** @author tnine */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
deleted file mode 100644
index a98c813..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/ExecutionStage.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage;
-
-
-/** The possible stages in our write flow. */
-public interface ExecutionStage {
-
-    /**
-     * Run this stage.  This will return the MvccEntity that should be returned or passed to the next stage
-     *
-     * @param context The context of the current write operation
-     *
-     * @return The asynchronous listener to signal success
-     */
-    public void performStage( ExecutionContext context );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
new file mode 100644
index 0000000..1887c71
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/Result.java
@@ -0,0 +1,48 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/** @author tnine */
+public class Result {
+
+    private List<Object> results;
+
+
+    public Result() {
+        results = new ArrayList<Object>();
+    }
+
+
+    public List<Object> getResults() {
+        return results;
+    }
+
+
+    public void addResult( final Object result ) {
+        this.results.add( result );
+    }
+
+
+    /**
+     * Get the last occurrence of an instance that implements the type provided.
+     * @param clazz  The class that the value should be an instance of
+     * @param <T> The type of class
+     * @return The value if one is found, null otherwise
+     */
+    public <T> T getLast( Class<T> clazz ) {
+
+        final int size = results.size();
+
+        for ( int i = size - 1; i > -1; i-- ) {
+            final Object value = results.get( i );
+            if ( clazz.isInstance( value ) ) {
+                return ( T ) value;
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
index 9d68e10..33a2264 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/StagePipeline.java
@@ -13,14 +13,14 @@ public interface StagePipeline {
     /**
      * Get the first stage in this pipeline.
      */
-    ExecutionStage first();
+    EventStage first();
 
 
     /**
-     * get the next executionStage after the executionStage specified
-     * @param executionStage The executionStage to seek in our pipeline
+     * get the next eventStage after the eventStage specified
+     * @param eventStage The eventStage to seek in our pipeline
      */
-    ExecutionStage nextStage(ExecutionStage executionStage );
+    EventStage nextStage(EventStage eventStage );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
index 68dab72..7ef260b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/CollectionPipelineModule.java
@@ -1,23 +1,19 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
 
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.CollectionEventBusImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.Delete;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.DeletePipeline;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete.StartDelete;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.Load;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.read.PipelineLoad;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Commit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Create;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineCreate;
-import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.PipelineUpdate;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.StartWrite;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Update;
 import org.apache.usergrid.persistence.collection.mvcc.stage.impl.write.Verify;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
@@ -31,45 +27,12 @@ import com.google.inject.multibindings.Multibinder;
 public class CollectionPipelineModule extends AbstractModule {
 
 
-    /**
-     * Wire the pipeline of operations for create.  This should create a new instance every time, since StagePipeline
-     * objects are mutable
-     */
     @Provides
-    @PipelineCreate
-    @Inject
     @Singleton
-    public StagePipeline createPipeline( final Create create, final StartWrite startWrite, final Verify write,
-                                         final Commit commit ) {
-        return StagePipelineImpl.fromStages( create, startWrite, write, commit );
-    }
-
+    public CollectionEventBus eventBus(){
+        CollectionEventBus bus =  new CollectionEventBusImpl( "collection" );
 
-    @Provides
-    @PipelineUpdate
-    @Inject
-    @Singleton
-    public StagePipeline updatePipeline( final Update update, final StartWrite startWrite, final Verify write,
-                                         final Commit commit ) {
-        return StagePipelineImpl.fromStages( update, startWrite, write, commit );
-    }
-
-
-    @Provides
-    @DeletePipeline
-    @Inject
-    @Singleton
-    public StagePipeline deletePipeline(final StartDelete startDelete,  final Delete delete ) {
-        return StagePipelineImpl.fromStages(startDelete, delete );
-    }
-
-
-    @Provides
-    @PipelineLoad
-    @Inject
-    @Singleton
-    public StagePipeline deletePipeline( final Load load ) {
-        return StagePipelineImpl.fromStages( load );
+        return bus;
     }
 
 
@@ -79,22 +42,27 @@ public class CollectionPipelineModule extends AbstractModule {
         /**
          * Configure all stages here
          */
-        Multibinder<ExecutionStage> stageBinder = Multibinder.newSetBinder( binder(), ExecutionStage.class );
+        Multibinder<EventStage> stageBinder = Multibinder.newSetBinder( binder(), EventStage.class );
 
 
+        /**
+         * Note we have to have the .asEagerSingleton(); or guice never loads these impls b/c they aren't
+         * directly referenced
+         */
 
         //creation stages
-        stageBinder.addBinding().to( Commit.class );
-        stageBinder.addBinding().to( Create.class );
-        stageBinder.addBinding().to( StartWrite.class );
-        stageBinder.addBinding().to( Update.class );
-        stageBinder.addBinding().to( Verify.class );
+        stageBinder.addBinding().to( Commit.class ).asEagerSingleton();
+        stageBinder.addBinding().to( Create.class ).asEagerSingleton();;
+        stageBinder.addBinding().to( StartWrite.class ).asEagerSingleton();;
+        stageBinder.addBinding().to( Update.class ).asEagerSingleton();;
+        stageBinder.addBinding().to( Verify.class ).asEagerSingleton();;
 
         //delete stages
-        stageBinder.addBinding().to( Delete.class );
-        stageBinder.addBinding().to( StartDelete.class );
+        stageBinder.addBinding().to( Delete.class ).asEagerSingleton();;
+        stageBinder.addBinding().to( StartDelete.class ).asEagerSingleton();;
 
         //loading stages
-        stageBinder.addBinding().to(Load.class);
+        stageBinder.addBinding().to(Load.class).asEagerSingleton();;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
deleted file mode 100644
index 805d1e3..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/ExecutionContextImpl.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import org.apache.usergrid.persistence.collection.CollectionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-
-/** @author tnine */
-public class ExecutionContextImpl implements ExecutionContext {
-
-    private final StagePipeline pipeline;
-    private final CollectionContext context;
-
-    private Object message;
-    private ExecutionStage current;
-
-
-    @Inject
-    public ExecutionContextImpl( final StagePipeline pipeline, final CollectionContext context ) {
-        Preconditions.checkNotNull( pipeline, "pipeline cannot be null" );
-        Preconditions.checkNotNull( context, "context cannot be null" );
-
-        this.pipeline = pipeline;
-        this.context = context;
-    }
-
-
-    @Override
-    public void execute( Object input ) {
-
-        current = this.pipeline.first();
-
-        setMessage( input );
-
-        current.performStage( this );
-    }
-
-
-    @Override
-    public <T> T getMessage( final Class<T> clazz ) {
-        Preconditions.checkNotNull( clazz, "Class must be specified" );
-
-        if ( message == null ) {
-            return null;
-        }
-
-        if ( !clazz.isInstance( message ) ) {
-            throw new ClassCastException(
-                    "Message must be an instance of class " + clazz + ".  However it was of type '" + message.getClass()
-                            + "'" );
-        }
-
-
-        return ( T ) message;
-    }
-
-
-    @Override
-    public Object setMessage( final Object object ) {
-        Object original = message;
-
-        this.message = object;
-
-        return original;
-    }
-
-
-    @Override
-    public void proceed() {
-        ExecutionStage next = this.pipeline.nextStage( current );
-
-        //Nothing to do
-        if ( next == null ) {
-            return;
-        }
-
-        current = next;
-        current.performStage( this );
-    }
-
-
-
-    @Override
-    public CollectionContext getCollectionContext() {
-        return this.context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
deleted file mode 100644
index 1f3a0fe..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/StagePipelineImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl;
-
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.StagePipeline;
-
-import com.google.common.base.Preconditions;
-
-
-/** @author tnine */
-public class StagePipelineImpl implements StagePipeline {
-
-    private final List<ExecutionStage> executionStages;
-
-
-    protected StagePipelineImpl( List<ExecutionStage> executionStages ) {
-        Preconditions.checkNotNull( executionStages, "executionStages is required");
-        Preconditions.checkArgument(  executionStages.size() > 0, "executionStages must have more than 1 element" );
-
-        this.executionStages = executionStages;
-    }
-
-
-    @Override
-    public ExecutionStage first() {
-
-        if ( executionStages.size() == 0 ) {
-            return null;
-        }
-
-        return executionStages.get( 0 );
-    }
-
-
-
-
-
-    @Override
-    public ExecutionStage nextStage( final ExecutionStage executionStage ) {
-
-        Preconditions.checkNotNull( executionStage, "ExecutionStage cannot be null" );
-
-        int index = executionStages.indexOf( executionStage );
-
-        //we're done, do nothing
-        if ( index == executionStages.size() -1  ) {
-            return null;
-        }
-
-        return  executionStages.get( index + 1 );
-    }
-
-
-    /** Factory to create a new instance. */
-    public static StagePipelineImpl fromStages( ExecutionStage... executionStages ) {
-        return new StagePipelineImpl(Arrays.asList( executionStages ));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
index 7810beb..feed7fa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Delete.java
@@ -8,22 +8,23 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Delete implements ExecutionStage {
+public class Delete implements EventStage<DeleteCommit> {
 
 
     private static final Logger LOG = LoggerFactory.getLogger( Delete.class );
@@ -34,20 +35,24 @@ public class Delete implements ExecutionStage {
 
     @Inject
     public Delete( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
-                   final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+                   final MvccEntitySerializationStrategy entitySerializationStrategy,
+                   final CollectionEventBus eventBus ) {
 
         Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
-              Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+        Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
 
 
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        eventBus.register( this );
     }
 
 
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+    @Subscribe
+    public void performStage( final DeleteCommit event ) {
+
+        final MvccEntity entity = event.getData();
 
         Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
@@ -58,11 +63,11 @@ public class Delete implements ExecutionStage {
         Preconditions.checkNotNull( version, "Entity version is required in this stage" );
 
 
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
+        final CollectionContext collectionContext = event.getCollectionContext();
 
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.COMMITTED );
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+                org.apache.usergrid.persistence.collection.mvcc.entity.Stage.COMMITTED );
 
         MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
 
@@ -84,8 +89,6 @@ public class Delete implements ExecutionStage {
         /**
          * We're done executing.
          */
-        executionContext.proceed();
 
-        //TODO connect to post processors via listener
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
deleted file mode 100644
index 52fe4b9..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeletePipeline.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface DeletePipeline {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
new file mode 100644
index 0000000..5f8840b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/DeleteStart.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class DeleteStart extends CollectionEvent<UUID> {
+
+    public DeleteStart( final CollectionContext context, final UUID data, final Result result ) {
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
new file mode 100644
index 0000000..dfdebba
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/Deletecommit.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.delete;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class DeleteCommit extends CollectionEvent<MvccEntity> {
+    public DeleteCommit( final CollectionContext context, final MvccEntity data, final Result result ) {
+
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
index 4208662..ca1865b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/delete/StartDelete.java
@@ -8,17 +8,18 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
@@ -30,36 +31,37 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
  * new write in the data store for a checkpoint and recovery
  */
 @Singleton
-public class StartDelete implements ExecutionStage {
+public class StartDelete implements EventStage<DeleteStart> {
 
     private static final Logger LOG = LoggerFactory.getLogger( StartDelete.class );
 
+    private final CollectionEventBus eventBus;
     private final MvccLogEntrySerializationStrategy logStrategy;
     private final UUIDService uuidService;
 
 
     /** Create a new stage with the current context */
     @Inject
-    public StartDelete( final MvccLogEntrySerializationStrategy logStrategy, final UUIDService uuidService ) {
-
+    public StartDelete( final CollectionEventBus eventBus, final MvccLogEntrySerializationStrategy logStrategy,
+                        final UUIDService uuidService ) {
+        Preconditions.checkNotNull( eventBus, "eventBus is required" );
         Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
         Preconditions.checkNotNull( uuidService, "uuidService is required" );
 
 
+        this.eventBus = eventBus;
         this.logStrategy = logStrategy;
         this.uuidService = uuidService;
+
+        this.eventBus.register( this );
     }
 
 
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
+    @Subscribe
+    public void performStage( final DeleteStart event ) {
 
-        final UUID entityId = executionContext.getMessage( UUID.class );
+        final UUID entityId = event.getData();
 
 
         final UUID version = uuidService.newTimeUUID();
@@ -68,12 +70,11 @@ public class StartDelete implements ExecutionStage {
         Preconditions.checkNotNull( version, "Entity version is required in this stage" );
 
 
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
+        final CollectionContext collectionContext = event.getCollectionContext();
 
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.ACTIVE );
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+                org.apache.usergrid.persistence.collection.mvcc.entity.Stage.ACTIVE );
 
         MutationBatch write = logStrategy.write( collectionContext, startEntry );
 
@@ -90,7 +91,6 @@ public class StartDelete implements ExecutionStage {
         //create the mvcc entity for the next stage
         final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, Optional.<Entity>absent() );
 
-        executionContext.setMessage( nextStage );
-        executionContext.proceed();
+        eventBus.post( new DeleteCommit( collectionContext, nextStage, event.getResult() ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
new file mode 100644
index 0000000..6f3fab9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/EventLoad.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class EventLoad extends CollectionEvent<UUID> {
+    public EventLoad( final CollectionContext context, final UUID data, final Result result ) {
+
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
index 00a2d43..a2acf3e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/Load.java
@@ -8,22 +8,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 
 
-/**
- * This stage is a load stage to load a single entity
- */
-public class Load implements ExecutionStage {
+/** This stage is a load stage to load a single entity */
+public class Load implements EventStage<EventLoad> {
 
 
     private static final Logger LOG = LoggerFactory.getLogger( Load.class );
@@ -33,24 +32,26 @@ public class Load implements ExecutionStage {
 
 
     @Inject
-    public Load( final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
+    public Load(final CollectionEventBus eventBus, final UUIDService uuidService, final MvccEntitySerializationStrategy entitySerializationStrategy ) {
         Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
         Preconditions.checkNotNull( uuidService, "uuidService is required" );
 
 
         this.uuidService = uuidService;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        eventBus.register( this );
     }
 
 
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final UUID entityId = executionContext.getMessage( UUID.class );
+    @Subscribe
+    public void performStage( final EventLoad event ) {
+        final UUID entityId = event.getData();
 
         Preconditions.checkNotNull( entityId, "Entity id required in the read stage" );
 
 
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
+        final CollectionContext collectionContext = event.getCollectionContext();
 
         //generate  a version that represents now
         final UUID versionMax = uuidService.newTimeUUID();
@@ -58,25 +59,21 @@ public class Load implements ExecutionStage {
         List<MvccEntity> results = entitySerializationStrategy.load( collectionContext, entityId, versionMax, 1 );
 
         //nothing to do, we didn't get a result back
-        if(results.size() != 1){
-            executionContext.setMessage( null );
-            executionContext.proceed();
+        if ( results.size() != 1 ) {
             return;
         }
 
-        final Optional<Entity> targetVersion = results.get(0).getEntity();
+        final Optional<Entity> targetVersion = results.get( 0 ).getEntity();
 
         //this entity has been marked as cleared.  The version exists, but does not have entity data
-        if(!targetVersion.isPresent()){
+        if ( !targetVersion.isPresent() ) {
 
             //TODO, a lazy async repair/cleanup here?
 
-            executionContext.setMessage( null );
-            executionContext.proceed();
             return;
         }
 
-        executionContext.setMessage( targetVersion.get() );
-        executionContext.proceed();
+        //this feels like a hack.  Not sure I like this
+        event.getResult().addResult( targetVersion.get() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
deleted file mode 100644
index 0d24b27..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/read/PipelineLoad.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.read;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the delete pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface PipelineLoad {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
index 4780ff1..65af8cf 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Commit.java
@@ -8,22 +8,23 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /** This phase should invoke any finalization, and mark the entity as committed in the data store before returning */
-public class Commit implements ExecutionStage {
+public class Commit implements EventStage<EventCommit> {
 
 
     private static final Logger LOG = LoggerFactory.getLogger( Commit.class );
@@ -33,20 +34,22 @@ public class Commit implements ExecutionStage {
 
 
     @Inject
-    public Commit( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+    public Commit( final CollectionEventBus eventBus, final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
                    final MvccEntitySerializationStrategy entitySerializationStrategy ) {
         Preconditions.checkNotNull( logEntrySerializationStrategy, "logEntrySerializationStrategy is required" );
-                      Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
+        Preconditions.checkNotNull( entitySerializationStrategy, "entitySerializationStrategy is required" );
 
 
         this.logEntrySerializationStrategy = logEntrySerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
+        eventBus.register( this );
     }
 
 
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        final MvccEntity entity = executionContext.getMessage( MvccEntity.class );
+    @Subscribe
+    public void performStage( final EventCommit event ) {
+        final MvccEntity entity = event.getData();
 
         Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
@@ -57,11 +60,11 @@ public class Commit implements ExecutionStage {
         Preconditions.checkNotNull( version, "Entity version is required in this stage" );
 
 
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
+        final CollectionContext collectionContext = event.getCollectionContext();
 
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.COMMITTED );
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+                org.apache.usergrid.persistence.collection.mvcc.entity.Stage.COMMITTED );
 
         MutationBatch logMutation = logEntrySerializationStrategy.write( collectionContext, startEntry );
 
@@ -80,11 +83,10 @@ public class Commit implements ExecutionStage {
             throw new CollectionRuntimeException( "Failed to execute write asynchronously ", e );
         }
 
-        /**
-         * We're done executing.
-         */
-        executionContext.proceed();
-
-        //TODO connect to post processors via listener
+        //add the mvccEntity to the result
+        event.getResult().addResult( entity );
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
index cbd7d9b..971e9f4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Create.java
@@ -9,14 +9,15 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.lang3.reflect.FieldUtils;
 
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.service.TimeService;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.collection.util.Verify;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -26,35 +27,37 @@ import com.google.inject.Singleton;
  * present, and this should set the entityId, version, created, and updated dates
  */
 @Singleton
-public class Create implements ExecutionStage {
+public class Create implements EventStage<EventCreate> {
 
     private static final Logger LOG = LoggerFactory.getLogger( Create.class );
 
 
+    private final CollectionEventBus eventBus;
     private final TimeService timeService;
     private final UUIDService uuidService;
 
 
     @Inject
-    public Create( final TimeService timeService, final UUIDService uuidService ) {
+    public Create( final CollectionEventBus eventBus, final TimeService timeService, final UUIDService uuidService ) {
+
+        Preconditions.checkNotNull( eventBus, "eventBus is required" );
         Preconditions.checkNotNull( timeService, "timeService is required" );
         Preconditions.checkNotNull( uuidService, "uuidService is required" );
 
 
+        this.eventBus = eventBus;
         this.timeService = timeService;
         this.uuidService = uuidService;
+        this.eventBus.register( this );
     }
 
 
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
+    @Subscribe
+    public void performStage( final EventCreate event ) {
+        Preconditions.checkNotNull(event,  "event is required" );
 
-        final Entity entity = executionContext.getMessage( Entity.class );
+        final Entity entity = event.getData();
 
         Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
@@ -78,8 +81,6 @@ public class Create implements ExecutionStage {
         entity.setCreated( created );
         entity.setUpdated( created );
 
-        //set the updated entity for the next stage
-        executionContext.setMessage( entity );
-        executionContext.proceed();
+        eventBus.post( new EventStart( event.getCollectionContext(), entity, event.getResult() ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
new file mode 100644
index 0000000..0091770
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCommit.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+public class EventCommit extends CollectionEvent<MvccEntity> {
+
+
+    protected EventCommit( final CollectionContext context, final MvccEntity data, final Result result ) {
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
new file mode 100644
index 0000000..d5a609f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventCreate.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventCreate extends CollectionEvent<Entity> {
+
+
+    public EventCreate( final CollectionContext context, final Entity data, final Result result ) {
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
new file mode 100644
index 0000000..87e2488
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventStart.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventStart extends CollectionEvent<Entity> {
+
+    public EventStart( final CollectionContext context, final Entity data, final Result result ) {
+
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
new file mode 100644
index 0000000..68fb6ab
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventUpdate.java
@@ -0,0 +1,16 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/** @author tnine */
+public class EventUpdate extends CollectionEvent<Entity> {
+
+    public EventUpdate( final CollectionContext context, final Entity data, final Result result ) {
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
new file mode 100644
index 0000000..f078aa5
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/EventVerify.java
@@ -0,0 +1,17 @@
+package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
+
+
+import org.apache.usergrid.persistence.collection.CollectionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionEvent;
+import org.apache.usergrid.persistence.collection.mvcc.stage.Result;
+
+
+/** @author tnine */
+public class EventVerify extends CollectionEvent<MvccEntity> {
+
+
+    public EventVerify( final CollectionContext context, final MvccEntity data, final Result result ) {
+        super( context, data, result );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
deleted file mode 100644
index f3af972..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineCreate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD })
-@Retention(RUNTIME)
-public @interface PipelineCreate {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
deleted file mode 100644
index 85bc56d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/PipelineUpdate.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Marks the create pipeline
- *
- * @author tnine
- */
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface PipelineUpdate {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
index d6e7d49..ac153f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/StartWrite.java
@@ -8,15 +8,16 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.CollectionContext;
 import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
@@ -28,32 +29,32 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
  * new write in the data store for a checkpoint and recovery
  */
 @Singleton
-public class StartWrite implements ExecutionStage {
+public class StartWrite implements EventStage<EventStart> {
 
     private static final Logger LOG = LoggerFactory.getLogger( StartWrite.class );
 
+    private final CollectionEventBus eventBus;
     private final MvccLogEntrySerializationStrategy logStrategy;
 
 
     /** Create a new stage with the current context */
     @Inject
-    public StartWrite( final MvccLogEntrySerializationStrategy logStrategy ) {
+    public StartWrite( final CollectionEventBus eventBus,
+                       final MvccLogEntrySerializationStrategy logStrategy ) {
+        Preconditions.checkNotNull( eventBus, "eventBus is required" );
         Preconditions.checkNotNull( logStrategy, "logStrategy is required" );
 
-
+        this.eventBus = eventBus;
         this.logStrategy = logStrategy;
+
+        this.eventBus.register( this );
     }
 
 
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
-
-        final Entity entity = executionContext.getMessage( Entity.class );
+    @Subscribe
+    public void performStage( final EventStart event ) {
+        final Entity entity = event.getData();
 
         Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
@@ -64,12 +65,11 @@ public class StartWrite implements ExecutionStage {
         Preconditions.checkNotNull( version, "Entity version is required in this stage" );
 
 
-
-        final CollectionContext collectionContext = executionContext.getCollectionContext();
+        final CollectionContext collectionContext = event.getCollectionContext();
 
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, org.apache.usergrid.persistence
-                .collection.mvcc.entity.Stage.ACTIVE );
+        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version,
+                org.apache.usergrid.persistence.collection.mvcc.entity.Stage.ACTIVE );
 
         MutationBatch write = logStrategy.write( collectionContext, startEntry );
 
@@ -86,7 +86,9 @@ public class StartWrite implements ExecutionStage {
         //create the mvcc entity for the next stage
         final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, version, entity );
 
-        executionContext.setMessage( nextStage );
-        executionContext.proceed();
+        eventBus.post( new EventVerify( collectionContext, nextStage, event.getResult() ) );
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
index 0175b42..a095fb3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Update.java
@@ -6,13 +6,14 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 import org.apache.usergrid.persistence.collection.service.TimeService;
 import org.apache.usergrid.persistence.collection.service.UUIDService;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Preconditions;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -22,33 +23,35 @@ import com.google.inject.Singleton;
  * been set correctly
  */
 @Singleton
-public class Update implements ExecutionStage {
+public class Update implements EventStage<EventUpdate> {
 
     private static final Logger LOG = LoggerFactory.getLogger( Update.class );
 
     private final TimeService timeService;
     private final UUIDService uuidService;
+    private final CollectionEventBus eventBus;
 
 
     @Inject
-    public Update( final TimeService timeService, final UUIDService uuidService ) {
+    public Update( final CollectionEventBus eventBus, final TimeService timeService, final UUIDService uuidService ) {
+        Preconditions.checkNotNull( eventBus, "eventBus is required" );
         Preconditions.checkNotNull( timeService, "timeService is required" );
         Preconditions.checkNotNull( uuidService, "uuidService is required" );
 
+        this.eventBus = eventBus;
         this.timeService = timeService;
         this.uuidService = uuidService;
+
+        this.eventBus.register( this );
+
     }
 
 
-    /**
-     * Create the entity Id  and inject it, as well as set the timestamp versions
-     *
-     * @param executionContext The context of the current write operation
-     */
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
+    @Subscribe
+    public void performStage( final EventUpdate event ) {
 
-        final Entity entity = executionContext.getMessage( Entity.class );
+        final Entity entity = event.getData();
 
         Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 
@@ -64,7 +67,7 @@ public class Update implements ExecutionStage {
         entity.setVersion( version );
         entity.setUpdated( updated );
 
-        executionContext.setMessage( entity );
-        executionContext.proceed();
+        //fire the start event
+        eventBus.post(new EventStart( event.getCollectionContext(), entity, event.getResult()) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56c415f2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
index 884c59b..5a05869 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/impl/write/Verify.java
@@ -1,25 +1,31 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.impl.write;
 
 
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionStage;
-import org.apache.usergrid.persistence.collection.mvcc.stage.ExecutionContext;
+import org.apache.usergrid.persistence.collection.mvcc.entity.CollectionEventBus;
+import org.apache.usergrid.persistence.collection.mvcc.stage.EventStage;
 
+import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 
 /** This phase should execute any verification on the MvccEntity */
 @Singleton
-public class Verify implements ExecutionStage {
+public class Verify implements EventStage<EventVerify> {
 
+    private final CollectionEventBus eventBus;
 
-    public Verify() {
+    @Inject
+    public Verify( final CollectionEventBus eventBus ) {
+        this.eventBus = eventBus;
+        this.eventBus.register( this );
     }
 
 
+
     @Override
-    public void performStage( final ExecutionContext executionContext ) {
-        //TODO no op for now, just continue to the next stage.  Verification logic goes in here
+    public void performStage( final EventVerify event ) {
+        //no op, verification needs to happen here
 
-        executionContext.proceed();
+        eventBus.post( new EventCommit(event.getCollectionContext(), event.getData(), event.getResult()) );
     }
 }


Mime
View raw message