usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [01/12] usergrid git commit: Adds strong consistency read to maps. Persists ES batches into Cassandra for multi region execution.
Date Tue, 20 Oct 2015 18:02:04 GMT
Repository: usergrid
Updated Branches:
  refs/heads/2.1-release a09485a3a -> 1fe1d1a34


Adds strong consistency read to maps.  Persists ES batches into Cassandra for multi region execution.

A bug in wiring JSON to SQS still exists, it's incorrectly escaping some message subtypes.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/94a90781
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/94a90781
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/94a90781

Branch: refs/heads/2.1-release
Commit: 94a9078125fc32d755e33e562f8e8fd8624641c1
Parents: 2b22c61
Author: Todd Nine <tnine@apigee.com>
Authored: Fri Oct 16 18:02:44 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Fri Oct 16 18:02:44 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 186 +++++++++++---
 .../asyncevents/AsyncEventService.java          |   1 +
 .../asyncevents/AsyncIndexProvider.java         |  22 +-
 .../asyncevents/model/AsyncEvent.java           |   3 +-
 .../model/ElasticsearchIndexEvent.java          |  50 ++++
 .../index/IndexProcessorFig.java                |   8 +
 .../util/ObjectJsonSerializer.java              |  74 ++++++
 .../index/AmazonAsyncEventServiceTest.java      |   6 +-
 .../index/AsyncIndexServiceTest.java            |   2 +-
 .../usergrid/persistence/map/MapManager.java    |  25 +-
 .../persistence/map/impl/MapManagerImpl.java    |   6 +
 .../persistence/map/impl/MapSerialization.java  |  27 +-
 .../map/impl/MapSerializationImpl.java          | 248 ++++++++++---------
 .../index/impl/DeIndexOperation.java            |   4 +
 .../persistence/index/impl/IndexOperation.java  |   4 +
 .../index/impl/IndexOperationMessage.java       |   5 +
 .../persistence/queue/DefaultQueueManager.java  |  12 +-
 .../persistence/queue/QueueManager.java         |   8 +-
 .../queue/impl/SNSQueueManagerImpl.java         | 188 ++++++++++----
 .../queue/impl/SQSQueueManagerImpl.java         |  28 ++-
 .../services/queues/ImportQueueManager.java     |   9 +-
 21 files changed, 666 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 95126c6..c9f0953 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -21,13 +21,20 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.base.Optional;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.exception.NotImplementedException;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +61,13 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;
@@ -82,12 +94,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
 
+    private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer(  );
+
     // SQS maximum receive messages is 10
     private static final int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
 
     private final QueueManager queue;
-    private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
     private final IndexProducer indexProducer;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -109,6 +122,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
+    private final MapManager esMapPersistence;
 
     //the actively running subscription
     private List<Subscription> subscriptions = new ArrayList<>();
@@ -123,6 +137,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     final IndexLocationStrategyFactory indexLocationStrategyFactory,
                                     final EntityIndexFactory entityIndexFactory,
                                     final EventBuilder eventBuilder,
+                                    final MapManagerFactory mapManagerFactory,
                                     final RxTaskScheduler rxTaskScheduler ) {
         this.indexProducer = indexProducer;
 
@@ -130,10 +145,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.eventBuilder = eventBuilder;
+
+        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
+
+        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
         this.rxTaskScheduler = rxTaskScheduler;
 
-        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
+
         this.indexProcessorFig = indexProcessorFig;
 
         this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
@@ -158,7 +179,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     /**
      * Offer the EntityIdScope to SQS
      */
-    private void offer(final Object operation) {
+    private void offer(final Serializable operation) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
@@ -213,7 +234,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final Timer.Context timer = this.ackTimer.time();
 
         try{
-            queue.commitMessage(message);
+            queue.commitMessage( message );
 
             //decrement our in-flight counter
             inFlight.decrementAndGet();
@@ -235,7 +256,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final Timer.Context timer = this.ackTimer.time();
 
         try{
-            queue.commitMessages(messages);
+            queue.commitMessages( messages );
 
             //decrement our in-flight counter
             inFlight.decrementAndGet();
@@ -296,7 +317,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         handleInitializeApplicationIndex(event, message);
                         indexoperationObservable = Observable.just(new IndexOperationMessage());
                         validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                    } else {
+                    } else if (event instanceof ElasticsearchIndexEvent){
+                        handleIndexOperation( (ElasticsearchIndexEvent)event );
+                        indexoperationObservable = Observable.just( new IndexOperationMessage() );
+                        validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
+                    }
+
+                    else {
                         throw new Exception("Unknown EventType");//TODO: print json instead
                     }
 
@@ -434,6 +461,85 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
     }
 
+
+    /**
+     * Queue up an indexOperationMessage for multi region execution
+     * @param indexOperationMessage
+     */
+    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+        final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+
+        final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+        //write to the map in ES
+        esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+
+
+
+        //now queue up the index message
+
+        final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+
+        //send to the topic so all regions index the batch
+        try {
+            queue.sendMessageToTopic( elasticsearchIndexEvent );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to pulish to topic", e );
+        }
+    }
+
+    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+        Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+        //load the entity
+
+        final String message = esMapPersistence.getString( messageId.toString() );
+
+        String highConsistency = null;
+
+        if(message == null){
+            logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+
+            highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
+
+        }
+
+        //read the value from the string
+
+        final IndexOperationMessage indexOperationMessage;
+
+        //our original local read has it, parse it.
+        if(message != null){
+             indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+        }
+        //we tried to read it at a higher consistency level and it works
+        else if (highConsistency != null){
+            indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+        }
+
+        //we couldn't find it, bail
+        else{
+            logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+
+            throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+        }
+
+
+
+        //now execute it
+        indexProducer.put(indexOperationMessage).toBlocking().last();
+
+    }
+
+
+
     @Override
     public long getQueueDepth() {
         return queue.getQueueDepth();
@@ -510,71 +616,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
         synchronized (mutex) {
 
             Observable<List<QueueMessage>> consumer =
-                    Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
+                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
                         @Override
-                        public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
+                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());
+                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
                             List<QueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    drainList = take().toList().toBlocking().lastOrDefault(null);
+                                    drainList = take().toList().toBlocking().lastOrDefault( null );
                                     //emit our list in it's entity to hand off to a worker pool
-                                    subscriber.onNext(drainList);
+                                    subscriber.onNext( drainList );
 
                                     //take since  we're in flight
-                                    inFlight.addAndGet(drainList.size());
-                                } catch (Throwable t) {
+                                    inFlight.addAndGet( drainList.size() );
+                                }
+                                catch ( Throwable t ) {
                                     final long sleepTime = indexProcessorFig.getFailureRetryTime();
 
-                                    logger.error("Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t);
+                                    logger.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
 
-                                    if (drainList != null) {
-                                        inFlight.addAndGet(-1 * drainList.size());
+                                    if ( drainList != null ) {
+                                        inFlight.addAndGet( -1 * drainList.size() );
                                     }
 
 
                                     try {
-                                        Thread.sleep(sleepTime);
-                                    } catch (InterruptedException ie) {
+                                        Thread.sleep( sleepTime );
+                                    }
+                                    catch ( InterruptedException ie ) {
                                         //swallow
                                     }
 
                                     indexErrorCounter.inc();
                                 }
                             }
-                            while (true);
+                            while ( true );
                         }
-                    })
+                    } )
                             //this won't block our read loop, just reads and proceeds
-                            .map(messages ->
-                            {
-                                if (messages == null || messages.size() == 0) {
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 ) {
                                     return null;
                                 }
 
                                 try {
-                                    List<IndexEventResult> indexEventResults = callEventHandlers(messages);
-                                    List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
-                                    if (messagesToAck == null || messagesToAck.size() == 0) {
-                                        logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages);
+                                    List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+                                    List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
+                                    if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+                                        logger.error( "No messages came back from the queue operation should have seen "
+                                            + messages.size(), messages );
                                         return messagesToAck;
                                     }
-                                    if(messagesToAck.size()<messages.size()){
-                                        logger.error("Missing messages from queue post operation",messages,messagesToAck);
+                                    if ( messagesToAck.size() < messages.size() ) {
+                                        logger.error( "Missing messages from queue post operation", messages,
+                                            messagesToAck );
                                     }
                                     //ack each message, but only if we didn't error.
-                                    ack(messagesToAck);
+                                    ack( messagesToAck );
                                     return messagesToAck;
-                                } catch (Exception e) {
-                                    logger.error("failed to ack messages to sqs", e);
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "failed to ack messages to sqs", e );
                                     return null;
                                     //do not rethrow so we can process all of them
                                 }
-                            });
+                            } );
 
             //start in the background
 
@@ -619,12 +729,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         //send the batch
         //TODO: should retry?
-        try {
-            indexProducer.put(combined).toBlocking().lastOrDefault(null);
-        }catch (Exception e){
-            logger.error("Failed to submit to index producer",e);
-            throw e;
-        }
+        queueIndexOperationMessage( combined );
+
         return messagesToAck;
     }
 
@@ -671,4 +777,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
             return creationTime;
         }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index ae5688c..dcfffcb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index e9e36f0..3865ecb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -51,20 +52,18 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final IndexProducer indexProducer;
+    private final MapManagerFactory mapManagerFactory;
 
     private AsyncEventService asyncEventService;
 
 
     @Inject
-    public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
-                              final QueueManagerFactory queueManagerFactory,
-                              final MetricsFactory metricsFactory,
-                              final RxTaskScheduler rxTaskScheduler,
-                              final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                              final EventBuilder eventBuilder,
-                              final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                              final EntityIndexFactory entityIndexFactory,
-                              final IndexProducer indexProducer) {
+    public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
+                               final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
+                                   EntityCollectionManagerFactory entityCollectionManagerFactory,
+                               final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                               final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
+                               final MapManagerFactory mapManagerFactory ) {
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
@@ -75,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexProducer = indexProducer;
+        this.mapManagerFactory = mapManagerFactory;
     }
 
 
@@ -99,10 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
             case SNS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 6b45297..1af54e3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -39,7 +39,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
     @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
     @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
     @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
-    @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+    @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
+    @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
 } )
 
 public abstract class AsyncEvent implements Serializable {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
new file mode 100644
index 0000000..207b15e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * An index event for publishing to elastic search
+ */
+public final class ElasticsearchIndexEvent extends AsyncEvent {
+
+
+    @JsonProperty
+    protected UUID indexBatchId;
+
+    public ElasticsearchIndexEvent() {
+    }
+
+    public ElasticsearchIndexEvent(  UUID indexBatchId ) {
+        this.indexBatchId = indexBatchId;
+    }
+
+
+    /**
+     * Get the unique message id of the
+     * @return
+     */
+    public UUID getIndexBatchId() {
+        return indexBatchId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7d022e5..6fd73b4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -103,4 +103,12 @@ public interface IndexProcessorFig extends GuicyFig {
     @Default("false")
     @Key("elasticsearch.queue_impl.resolution")
     boolean resolveSynchronously();
+
+    /**
+     * Get the message TTL in milliseconds
+     * @return
+     */
+    @Default("604800000")
+    @Key( "elasticsearch.message.ttl" )
+    int getIndexMessageTtl();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
new file mode 100644
index 0000000..dbd5ca3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.util;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * An utility class to serialize and de-serialized objects as json strings
+ */
+public final class ObjectJsonSerializer {
+
+
+    private final JsonFactory JSON_FACTORY = new JsonFactory();
+
+    private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+    public ObjectJsonSerializer( ) {
+        MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+    }
+
+
+    public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+
+        Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
+        final String stringValue;
+        //mark this version as empty
+
+        //Convert to internal entity map
+        try {
+            stringValue = MAPPER.writeValueAsString( toSerialize );
+        }
+        catch ( JsonProcessingException jpe ) {
+            throw new RuntimeException( "Unable to serialize entity", jpe );
+        }
+
+        return stringValue;
+    }
+
+
+    public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) {
+
+        Preconditions.checkNotNull( value, "value must not be null" );
+
+        try {
+            return MAPPER.readValue( value, toSerialize );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to deserialize", e );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index a14437c..e83d6f8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -79,13 +80,16 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     @Inject
     public IndexLocationStrategyFactory indexLocationStrategyFactory;
 
+    @Inject
+    public MapManagerFactory mapManagerFactory;
+
 
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index d34a1a9..2863cbf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -189,7 +189,7 @@ public abstract class AsyncIndexServiceTest {
             }
 
             try {
-                Thread.sleep( 100 );
+                Thread.sleep( 10000 );
             }
             catch ( InterruptedException e ) {
                 //swallow

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index c280fee..80e2d17 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -33,7 +33,14 @@ public interface MapManager {
     /**
      * Return the string, null if not found
      */
-    public String getString( final String key );
+    String getString( final String key );
+
+    /**
+     * Read the string at a high consistency level.  This should ensure data replication has happened
+     * @param key
+     * @return
+     */
+    String getStringHighConsistency(final String key);
 
 
     /**
@@ -41,12 +48,12 @@ public interface MapManager {
      * @param keys
      * @return
      */
-    public Map<String, String> getStrings(final Collection<String> keys);
+    Map<String, String> getStrings( final Collection<String> keys );
 
     /**
      * Return the string, null if not found
      */
-    public void putString( final String key, final String value );
+    void putString( final String key, final String value );
 
     /**
      * The time to live (in seconds) of the string
@@ -54,33 +61,33 @@ public interface MapManager {
      * @param value
      * @param ttl
      */
-    public void putString( final String key, final String value, final int ttl );
+    void putString( final String key, final String value, final int ttl );
 
 
     /**
      * Return the uuid, null if not found
      */
-    public UUID getUuid( final String key );
+    UUID getUuid( final String key );
 
     /**
      * Return the uuid, null if not found
      */
-    public void putUuid( final String key, final UUID putUuid );
+    void putUuid( final String key, final UUID putUuid );
 
     /**
      * Return the long, null if not found
      */
-    public Long getLong( final String key );
+    Long getLong( final String key );
 
     /**
      * Return the long, null if not found
      */
-    public void putLong( final String key, final Long value );
+    void putLong( final String key, final Long value );
 
     /**
      * Delete the key
      *
      * @param key The key used to delete the entry
      */
-    public void delete( final String key );
+    void delete( final String key );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index fb2e7ff..501ade7 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -53,6 +53,12 @@ public class MapManagerImpl implements MapManager {
 
 
     @Override
+    public String getStringHighConsistency( final String key ) {
+        return mapSerialization.getStringHighConsistency(scope, key);
+    }
+
+
+    @Override
     public Map<String, String> getStrings( final Collection<String> keys ) {
         return mapSerialization.getStrings( scope, keys );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 2e958c2..e9c21d2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -32,50 +32,59 @@ public interface MapSerialization extends Migration {
     /**
      * Return the string, null if not found
      */
-    public String getString( final MapScope scope, final String key );
+    String getString( final MapScope scope, final String key );
+
+
+    /**
+     * Get the key from all regions with a high consistency
+     * @param scope
+     * @param key
+     * @return
+     */
+    String getStringHighConsistency( final MapScope scope, final String key );
 
     /**
      * Get strings from the map
      * @param keys
      * @return
      */
-    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+    Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
 
     /**
      * Return the string, null if not found
      */
-    public void putString( final MapScope scope, final String key, final String value );
+    void putString( final MapScope scope, final String key, final String value );
 
     /**
      * Write the string
      */
-    public void putString( final MapScope scope, final String key, final String value, final int ttl );
+    void putString( final MapScope scope, final String key, final String value, final int ttl );
 
 
     /**
      * Return the uuid, null if not found
      */
-    public UUID getUuid( final MapScope scope, final String key );
+    UUID getUuid( final MapScope scope, final String key );
 
     /**
      * Return the uuid, null if not found
      */
-    public void putUuid( final MapScope scope, final String key, final UUID putUuid );
+    void putUuid( final MapScope scope, final String key, final UUID putUuid );
 
     /**
      * Return the long, null if not found
      */
-    public Long getLong( final MapScope scope, final String key );
+    Long getLong( final MapScope scope, final String key );
 
     /**
      * Return the long, null if not found
      */
-    public void putLong( final MapScope scope, final String key, final Long value );
+    void putLong( final MapScope scope, final String key, final Long value );
 
     /**
      * Delete the key
      *
      * @param key The key used to delete the entry
      */
-    public void delete( final MapScope scope, final String key );
+    void delete( final MapScope scope, final String key );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 825d636..1aa3229 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,6 +18,8 @@
  */
 
 package org.apache.usergrid.persistence.map.impl;
+
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -26,21 +28,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.google.common.base.Preconditions;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
 import org.apache.usergrid.persistence.core.shard.StringHashUtils;
 import org.apache.usergrid.persistence.map.MapScope;
 
+import com.google.common.base.Preconditions;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.PrimitiveSink;
 import com.google.inject.Inject;
@@ -53,9 +55,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.ConsistencyLevel;
 import com.netflix.astyanax.model.Row;
 import com.netflix.astyanax.model.Rows;
-import com.netflix.astyanax.query.ColumnFamilyQuery;
 import com.netflix.astyanax.serializers.BooleanSerializer;
 import com.netflix.astyanax.serializers.StringSerializer;
 
@@ -65,41 +67,40 @@ public class MapSerializationImpl implements MapSerialization {
 
     private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
 
-        private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
-                new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
+    private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
+        new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
 
 
-        private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
-        private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
-                new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
+    private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
+    private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
+        new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
 
 
-        private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
+    private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
 
-        private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
+    private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
 
 
     private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
 
 
-        /**
-         * CFs where the row key contains the source node id
-         */
-        public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
-            MAP_ENTRIES = new MultiTennantColumnFamily<>(
-                "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
+    /**
+     * CFs where the row key contains the source node id
+     */
+    public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES =
+        new MultiTennantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
 
 
-        /**
-         * CFs where the row key contains the source node id
-         */
-        public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
-                new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
+    /**
+     * CFs where the row key contains the source node id
+     */
+    public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
+        new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
 
     /**
      * Number of buckets to hash across.
      */
-    private static final int[] NUM_BUCKETS = {20};
+    private static final int[] NUM_BUCKETS = { 20 };
 
     /**
      * How to funnel keys for buckets
@@ -107,7 +108,6 @@ public class MapSerializationImpl implements MapSerialization {
     private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
 
 
-
         @Override
         public void funnel( final String key, final PrimitiveSink into ) {
             into.putString( key, StringHashUtils.UTF8 );
@@ -117,8 +117,8 @@ public class MapSerializationImpl implements MapSerialization {
     /**
      * Locator to get us all buckets
      */
-    private static final ExpandingShardLocator<String>
-            BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
+    private static final ExpandingShardLocator<String> BUCKET_LOCATOR =
+        new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
 
     private final Keyspace keyspace;
 
@@ -129,13 +129,20 @@ public class MapSerializationImpl implements MapSerialization {
 
     @Override
     public String getString( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue(scope, key); // TODO: why boolean?
-        return (col !=null) ?  col.getStringValue(): null;
+        Column<Boolean> col = getValue( scope, key );
+        return ( col != null ) ? col.getStringValue() : null;
     }
 
 
     @Override
-    public Map<String, String> getStrings(final MapScope scope,  final Collection<String> keys ) {
+    public String getStringHighConsistency( final MapScope scope, final String key ) {
+        Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+        return ( col != null ) ? col.getStringValue() : null;
+    }
+
+
+    @Override
+    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
         return getValues( scope, keys, STRING_RESULTS_BUILDER );
     }
 
@@ -144,13 +151,13 @@ public class MapSerializationImpl implements MapSerialization {
     public void putString( final MapScope scope, final String key, final String value ) {
         final RowOp op = new RowOp() {
             @Override
-            public void putValue(final ColumnListMutation<Boolean> columnListMutation ) {
+            public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
                 columnListMutation.putColumn( true, value );
             }
 
 
             @Override
-            public void putKey(final ColumnListMutation<String> keysMutation ) {
+            public void putKey( final ColumnListMutation<String> keysMutation ) {
                 keysMutation.putColumn( key, true );
             }
         };
@@ -184,10 +191,6 @@ public class MapSerializationImpl implements MapSerialization {
 
     /**
      * Write our string index with the specified row op
-     * @param scope
-     * @param key
-     * @param value
-     * @param rowOp
      */
     private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
 
@@ -225,10 +228,11 @@ public class MapSerializationImpl implements MapSerialization {
     /**
      * Callbacks for performing row operations
      */
-    private static interface RowOp{
+    private static interface RowOp {
 
         /**
          * Callback to do the row
+         *
          * @param columnListMutation The column mutation
          */
         void putValue( final ColumnListMutation<Boolean> columnListMutation );
@@ -236,104 +240,97 @@ public class MapSerializationImpl implements MapSerialization {
 
         /**
          * Write the key
-         * @param keysMutation
          */
         void putKey( final ColumnListMutation<String> keysMutation );
-
-
     }
 
+
     @Override
     public UUID getUuid( final MapScope scope, final String key ) {
 
-        Column<Boolean> col = getValue(scope, key);
-        return (col !=null) ?  col.getUUIDValue(): null;
+        Column<Boolean> col = getValue( scope, key );
+        return ( col != null ) ? col.getUUIDValue() : null;
     }
 
 
     @Override
     public void putUuid( final MapScope scope, final String key, final UUID putUuid ) {
 
-        Preconditions.checkNotNull(scope, "mapscope is required");
+        Preconditions.checkNotNull( scope, "mapscope is required" );
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( putUuid, "value is required" );
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
         //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //serialize to the entry
-        batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, putUuid);
+        batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
 
         //add it to the keys
 
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
 
-        final BucketScopedRowKey< String> keyRowKey =
-                BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
 
         //serialize to the entry
-        batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
-
-        executeBatch(batch);
+        batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
 
+        executeBatch( batch );
     }
 
 
     @Override
     public Long getLong( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue(scope, key);
-        return (col !=null) ?  col.getLongValue(): null;
+        Column<Boolean> col = getValue( scope, key );
+        return ( col != null ) ? col.getLongValue() : null;
     }
 
 
-
-
     @Override
     public void putLong( final MapScope scope, final String key, final Long value ) {
 
-        Preconditions.checkNotNull(scope, "mapscope is required");
+        Preconditions.checkNotNull( scope, "mapscope is required" );
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( value, "value is required" );
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
         //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //serialize to the entry
-        batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value);
+        batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
 
         //add it to the keys
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
 
-               final BucketScopedRowKey< String> keyRowKey =
-                       BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
 
         //serialize to the entry
-        batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
+        batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
 
-        executeBatch(batch);
+        executeBatch( batch );
     }
 
 
     @Override
     public void delete( final MapScope scope, final String key ) {
         final MutationBatch batch = keyspace.prepareMutationBatch();
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //serialize to the entry
-        batch.withRow(MAP_ENTRIES, entryRowKey).delete();
+        batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
 
         //add it to the keys, we're not sure which one it may have come from
-       final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
 
 
-        final List<BucketScopedRowKey<String>>
-                rowKeys = BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
+        final List<BucketScopedRowKey<String>> rowKeys =
+            BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
 
-        for(BucketScopedRowKey<String> rowKey: rowKeys) {
+        for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
             batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
         }
 
@@ -345,34 +342,53 @@ public class MapSerializationImpl implements MapSerialization {
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
 
         final MultiTennantColumnFamilyDefinition mapEntries =
-                new MultiTennantColumnFamilyDefinition( MAP_ENTRIES,
-                       BytesType.class.getSimpleName(),
-                       BytesType.class.getSimpleName(),
-                       BytesType.class.getSimpleName(),
-                       MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+            new MultiTennantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(),
+                BytesType.class.getSimpleName(), BytesType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
 
         final MultiTennantColumnFamilyDefinition mapKeys =
-                new MultiTennantColumnFamilyDefinition( MAP_KEYS,
-                        BytesType.class.getSimpleName(),
-                        UTF8Type.class.getSimpleName(),
-                        BytesType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+            new MultiTennantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(),
+                UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
 
         return Arrays.asList( mapEntries, mapKeys );
     }
 
 
-    private  Column<Boolean> getValue(MapScope scope, String key) {
+    private Column<Boolean> getValue( MapScope scope, String key ) {
+
+
+        //add it to the entry
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+
+        //now get all columns, including the "old row key value"
+        try {
+            final Column<Boolean> result =
+                keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
+
+            return result;
+        }
+        catch ( NotFoundException nfe ) {
+            //nothing to return
+            return null;
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+    }
+
 
+    private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
 
 
         //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //now get all columns, including the "old row key value"
         try {
-            final Column<Boolean> result = keyspace.prepareQuery( MAP_ENTRIES )
-                    .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+            final Column<Boolean> result =
+                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM )
+                        .getKey( entryRowKey ).getColumn( true ).execute().getResult();
 
             return result;
         }
@@ -388,52 +404,45 @@ public class MapSerializationImpl implements MapSerialization {
 
     /**
      * Get multiple values, using the string builder
-     * @param scope
-     * @param keys
-     * @param builder
-     * @param <T>
-     * @return
      */
-    private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+    private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
 
 
         final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
 
-        for(final String key: keys){
-             //add it to the entry
-            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        for ( final String key : keys ) {
+            //add it to the entry
+            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
             rowKeys.add( entryRowKey );
-
         }
 
 
+        //now get all columns, including the "old row key value"
+        try {
+            final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
+                keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute()
+                        .getResult();
 
-          //now get all columns, including the "old row key value"
-          try {
-              final Rows<ScopedRowKey<MapEntryKey>, Boolean>
-                  rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
-                                                     .execute().getResult();
-
-
-             return builder.buildResults( rows );
-          }
-          catch ( NotFoundException nfe ) {
-              //nothing to return
-              return null;
-          }
-          catch ( ConnectionException e ) {
-              throw new RuntimeException( "Unable to connect to cassandra", e );
-          }
-      }
 
+            return builder.buildResults( rows );
+        }
+        catch ( NotFoundException nfe ) {
+            //nothing to return
+            return null;
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+    }
 
 
-    private void executeBatch(MutationBatch batch) {
+    private void executeBatch( MutationBatch batch ) {
         try {
             batch.execute();
-        } catch (ConnectionException e) {
-            throw new RuntimeException("Unable to connect to cassandra", e);
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
         }
     }
 
@@ -501,8 +510,7 @@ public class MapSerializationImpl implements MapSerialization {
         /**
          * Create a scoped row key from the key
          */
-        public static ScopedRowKey<MapEntryKey> fromKey(
-                final MapScope mapScope, final String key ) {
+        public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) {
 
             return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
         }
@@ -511,32 +519,32 @@ public class MapSerializationImpl implements MapSerialization {
 
     /**
      * Build the results from the row keys
-     * @param <T>
      */
     private static interface ResultsBuilder<T> {
 
-        public T buildResults(final  Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+        public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
     }
 
-    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
 
         @Override
         public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
             final int size = rows.size();
 
-            final Map<String, String> results = new HashMap<>(size);
+            final Map<String, String> results = new HashMap<>( size );
 
-            for(int i = 0; i < size; i ++){
+            for ( int i = 0; i < size; i++ ) {
 
                 final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
 
                 final String value = row.getColumns().getStringValue( true, null );
 
-                if(value == null){
+                if ( value == null ) {
                     continue;
                 }
 
-               results.put( row.getKey().getKey().key,  value );
+                results.put( row.getKey().getKey().key, value );
             }
 
             return results;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index 4f47749..4060dac 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
@@ -42,7 +43,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd
 @JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
 public class DeIndexOperation implements BatchOperation {
 
+    @JsonProperty
     private String[] indexes;
+
+    @JsonProperty
     private String documentId;
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
index fae809f..28f2e0d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
@@ -30,6 +30,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 
 /**
@@ -37,9 +38,12 @@ import org.elasticsearch.client.Client;
  */
 public class IndexOperation implements BatchOperation {
 
+    @JsonProperty
     public String writeAlias;
+    @JsonProperty
     public String documentId;
 
+    @JsonProperty
     public Map<String, Object> data;
 
     public IndexOperation( final String writeAlias, final ApplicationScope applicationScope, IndexEdge indexEdge,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 12df390..bcee308 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 
 
@@ -33,9 +34,13 @@ import com.google.common.base.Optional;
  * Container for index operations.
  */
 public class IndexOperationMessage implements Serializable {
+    @JsonProperty
     private final Set<IndexOperation> indexRequests;
+
+    @JsonProperty
     private final Set<DeIndexOperation> deIndexRequests;
 
+    @JsonProperty
     private long creationTime;
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index d974529..5201279 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.queue;
 import rx.Observable;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -67,12 +68,21 @@ public class DefaultQueueManager implements QueueManager {
         }
     }
 
+
     @Override
-    public synchronized void sendMessage(Object body) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
         String uuid = UUID.randomUUID().toString();
         queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
+
     }
 
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+       sendMessage( body );
+    }
+
+
     @Override
     public void deleteQueue() {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 027abb2..dc3d1b5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,13 @@ public interface QueueManager {
      * @param body
      * @throws IOException
      */
-    void sendMessage(Object body)throws IOException;
+    <T extends Serializable> void  sendMessage(T body)throws IOException;
+
+    /**
+     * Send a messae to the topic to be sent to other queues
+     * @param body
+     */
+    <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
 
     /**
      * purge messages

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index d476f76..59ecd24 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -18,15 +18,55 @@
 package org.apache.usergrid.persistence.queue.impl;
 
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNSAsyncClient;
 import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.SubscribeRequest;
+import com.amazonaws.services.sns.model.SubscribeResult;
+import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.DeleteQueueRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
+import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,20 +76,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 
 public class SNSQueueManagerImpl implements QueueManager {
 
@@ -59,10 +85,10 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final QueueFig fig;
     private final ClusterFig clusterFig;
     private final CassandraFig cassandraFig;
-    private final QueueFig queueFig;
     private final AmazonSQSClient sqs;
     private final AmazonSNSClient sns;
     private final AmazonSNSAsyncClient snsAsync;
+    private final AmazonSQSAsyncClient sqsAsync;
 
 
     private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -110,6 +136,7 @@ public class SNSQueueManagerImpl implements QueueManager {
         });
 
 
+
     @Inject
     public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
                                CassandraFig cassandraFig, QueueFig queueFig) {
@@ -117,12 +144,21 @@ public class SNSQueueManagerImpl implements QueueManager {
         this.fig = fig;
         this.clusterFig = clusterFig;
         this.cassandraFig = cassandraFig;
-        this.queueFig = queueFig;
+
+
+        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+        final ExecutorService executor = TaskExecutorFactory
+            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+                TaskExecutorFactory.RejectionAction.CALLERRUNS);
+
+
+        final Region region = getRegion();
 
         try {
-            sqs = createSQSClient(getRegion());
-            sns = createSNSClient(getRegion());
-            snsAsync = createAsyncSNSClient(getRegion());
+            sqs = createSQSClient(region);
+            sns = createSNSClient(region);
+            snsAsync = createAsyncSNSClient(region, executor);
+            sqsAsync = createAsyncSQSClient( region, executor );
 
         } catch (Exception e) {
             throw new RuntimeException("Error setting up mapper", e);
@@ -157,7 +193,7 @@ public class SNSQueueManagerImpl implements QueueManager {
         try {
 
             SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
-            sns.subscribe(primarySubscribeRequest);
+             sns.subscribe(primarySubscribeRequest);
 
             // ensure the SNS primary topic has permission to send to the primary SQS queue
             List<String> primaryTopicArnList = new ArrayList<>();
@@ -276,22 +312,35 @@ public class SNSQueueManagerImpl implements QueueManager {
      *
      */
 
-    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
+    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
 
-        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
-        final Executor executor = TaskExecutorFactory
-            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
-                TaskExecutorFactory.RejectionAction.CALLERRUNS);
-
-        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor);
+        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
 
         sns.setRegion(region);
 
         return sns;
     }
 
+
+    /**
+     * Create the async sqs client
+     * @param region
+     * @param executor
+     * @return
+     */
+    private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+        final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
+
+        sqs.setRegion( region );
+
+        return sqs;
+
+    }
+
     /**
      * The Synchronous SNS client is used for creating topics and subscribing queues.
      *
@@ -369,7 +418,12 @@ public class SNSQueueManagerImpl implements QueueManager {
                 try {
                     final JsonNode bodyNode =  mapper.readTree(message.getBody());
                     JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
-                    body = fromString(bodyObj.textValue(), klass);
+
+
+
+                    final String bodyText = mapper.writeValueAsString( bodyObj );;
+
+                    body = fromString(bodyText, klass);
                 } catch (Exception e) {
                     logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
                     throw new RuntimeException(e);
@@ -405,6 +459,40 @@ public class SNSQueueManagerImpl implements QueueManager {
         }
     }
 
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+        if (snsAsync == null) {
+                   logger.error("SNS client is null, perhaps it failed to initialize successfully");
+                   return;
+               }
+
+               final String stringBody = toString(body);
+
+               String topicArn = getWriteTopicArn();
+
+               if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+               PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+
+               snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+                   @Override
+                   public void onError( Exception e ) {
+                       logger.error( "Error publishing message... {}", e );
+                   }
+
+
+                   @Override
+                   public void onSuccess( PublishRequest request, PublishResult result ) {
+                       if ( logger.isDebugEnabled() ) logger
+                           .debug( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
+                               request.getTopicArn() );
+                   }
+               } );
+
+    }
+
+
     @Override
     public void sendMessages(final List bodies) throws IOException {
 
@@ -414,41 +502,47 @@ public class SNSQueueManagerImpl implements QueueManager {
         }
 
         for (Object body : bodies) {
-            sendMessage(body);
+            sendMessage((Serializable)body);
         }
 
     }
 
+
     @Override
-    public void sendMessage(final Object body) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
 
-        if (snsAsync == null) {
-            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+        if ( snsAsync == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
             return;
         }
 
-        final String stringBody = toString(body);
+        final String stringBody = toString( body );
 
-        String topicArn = getWriteTopicArn();
+        String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Publishing Message...{} to url: {}", stringBody, url );
+        }
 
-        PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+        SendMessageRequest request = new SendMessageRequest( url, stringBody );
 
-        snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
-            @Override
-            public void onError(Exception e) {
-                logger.error("Error publishing message... {}", e);
-            }
+        sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
 
             @Override
-            public void onSuccess(PublishRequest request, PublishResult result) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(), request.getTopicArn());
+            public void onError( final Exception e ) {
 
+                logger.error( "Error sending message... {}", e );
             }
-        });
 
+
+            @Override
+            public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) {
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug( "Successfully send... messageBody=[{}],  url=[{}]", request.getMessageBody(),
+                        request.getQueueUrl() );
+                }
+            }
+        } );
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index fa9a7ac..0c56c05 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl;
 
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
@@ -243,25 +244,34 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     }
 
+
     @Override
-    public void sendMessage(final Object body) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
 
         if (sqs == null) {
-            logger.error("Sqs is null");
-            return;
-        }
+              logger.error("Sqs is null");
+              return;
+          }
 
-        String url = getQueue().getUrl();
+          String url = getQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
+          if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
 
-        final String stringBody = toString(body);
+          final String stringBody = toString(body);
 
-        SendMessageRequest request = new SendMessageRequest(url, stringBody);
-        sqs.sendMessage(request);
+          SendMessageRequest request = new SendMessageRequest(url, stringBody);
+          sqs.sendMessage(request);
+      }
+
+
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+        sendMessage( body );
     }
 
 
+
     @Override
     public void commitMessage(final QueueMessage queueMessage) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index bca9a49..bc55ff4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.services.queues;
 
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -65,7 +66,13 @@ public class ImportQueueManager implements QueueManager {
 
 
     @Override
-    public void sendMessage( final Object body ) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
+
+    }
+
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
 
     }
 


Mime
View raw message