usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [15/33] usergrid git commit: Fixes serialization tests and verifies full end to end functionality.
Date Tue, 20 Oct 2015 21:07:16 GMT
Fixes serialization tests and verifies full end to end functionality.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/04a3f47b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/04a3f47b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/04a3f47b

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 04a3f47bd86ec0674ff487f479327e0f174f0425
Parents: 94a9078
Author: Todd Nine <tnine@apigee.com>
Authored: Mon Oct 19 12:00:56 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon Oct 19 12:00:56 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  70 ++-
 .../asyncevents/model/AsyncEvent.java           |   5 +-
 .../index/IndexProcessorFig.java                |   7 +-
 .../util/ObjectJsonSerializer.java              |  28 +-
 .../persistence/queue/QueueManager.java         |   2 +-
 .../persistence/queue/guice/QueueModule.java    |   1 -
 .../queue/impl/SNSQueueManagerImpl.java         | 515 ++++++++++---------
 .../queue/impl/SQSQueueManagerImpl.java         | 362 -------------
 8 files changed, 336 insertions(+), 654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c9f0953..d319ac8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,19 +29,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import com.google.common.base.Optional;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.exception.NotImplementedException;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
@@ -50,6 +44,8 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -61,6 +57,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -78,6 +75,7 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -94,8 +92,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
 
-    private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer(  );
-
     // SQS maximum receive messages is 10
     private static final int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
@@ -192,6 +188,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
+
+    private void offerTopic( final Serializable operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessageToTopic( operation );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
     private void offerBatch(final List operations){
         final Timer.Context timer = this.writeTimer.time();
 
@@ -226,27 +238,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    /**
-     * Ack message in SQS
-     */
-    public void ack(final QueueMessage message) {
-
-        final Timer.Context timer = this.ackTimer.time();
-
-        try{
-            queue.commitMessage( message );
-
-            //decrement our in-flight counter
-            inFlight.decrementAndGet();
-
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }finally {
-            timer.stop();
-        }
-
-
-    }
 
     /**
      * Ack message in SQS
@@ -355,7 +346,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope );
-        offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+        offerTopic(
+            new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
     }
 
 
@@ -468,7 +460,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
      */
     public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
 
-        final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
 
         final UUID newMessageId = UUIDGenerator.newTimeUUID();
 
@@ -482,12 +474,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
 
         //send to the topic so all regions index the batch
-        try {
-            queue.sendMessageToTopic( elasticsearchIndexEvent );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to pulish to topic", e );
-        }
+
+        offerTopic( elasticsearchIndexEvent );
     }
 
     public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
@@ -505,7 +493,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         String highConsistency = null;
 
         if(message == null){
-            logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
 
             highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 
@@ -517,11 +505,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         //our original local read has it, parse it.
         if(message != null){
-             indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
         //we tried to read it at a higher consistency level and it works
         else if (highConsistency != null){
-            indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
         }
 
         //we couldn't find it, bail

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 1af54e3..7c51003 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -30,8 +30,11 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 /**
  * Marker class for serialization
+ *
+ * Note that when you add a subtype, you will need to add it's serialization value below in the JsonSubTypes annotation.
+ *
+ * Each name must be unique, and must map to a subclass that is serialized
  */
-
 @JsonIgnoreProperties( ignoreUnknown = true )
 @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
 @JsonSubTypes( {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 6fd73b4..ec9b315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -105,10 +105,13 @@ public interface IndexProcessorFig extends GuicyFig {
     boolean resolveSynchronously();
 
     /**
-     * Get the message TTL in milliseconds
+     * Get the message TTL in milliseconds.  Defaults to 24 hours
+     *
+     * 24 * 60 * 60 * 1000
+     *
      * @return
      */
-    @Default("604800000")
+    @Default("86400000")
     @Key( "elasticsearch.message.ttl" )
     int getIndexMessageTtl();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
index dbd5ca3..4e5873a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -33,16 +34,33 @@ import com.google.common.base.Preconditions;
 public final class ObjectJsonSerializer {
 
 
-    private final JsonFactory JSON_FACTORY = new JsonFactory();
+    private static final JsonFactory JSON_FACTORY = new JsonFactory();
 
-    private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+    private static final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+    static{
+
+           /**
+            * Because of the way SNS escapes all our json, we have to tell jackson to accept it.  See the documentation
+            * here for how SNS borks the message body
+            *
+            *  http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+            */
+            MAPPER.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+       }
+
+    /**
+     * Singleton instance of our serializer, instantiating it and configuring the mapper is expensive.
+     */
+    public static final ObjectJsonSerializer INSTANCE = new ObjectJsonSerializer();
+
+
+    private ObjectJsonSerializer( ) {
 
-    public ObjectJsonSerializer( ) {
-        MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
     }
 
 
-    public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+    public <T extends Serializable> String toString( final T toSerialize ) {
 
         Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
         final String stringValue;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index dc3d1b5..34a3654 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,7 @@ public interface QueueManager {
      * @param body
      * @throws IOException
      */
-    <T extends Serializable> void  sendMessage(T body)throws IOException;
+    <T extends Serializable> void sendMessage(T body)throws IOException;
 
     /**
      * Send a messae to the topic to be sent to other queues

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index dd1fe16..caf61bf 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -26,7 +26,6 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.queue.QueueFig;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 59ecd24..a3fa05e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -68,6 +68,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -77,9 +78,10 @@ import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
+
 public class SNSQueueManagerImpl implements QueueManager {
 
-    private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );
 
     private final QueueScope scope;
     private final QueueFig fig;
@@ -91,55 +93,64 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final AmazonSQSAsyncClient sqsAsync;
 
 
-    private final JsonFactory JSON_FACTORY = new JsonFactory();
-    private final ObjectMapper mapper = new ObjectMapper(JSON_FACTORY);
+    private static final JsonFactory JSON_FACTORY = new JsonFactory();
+    private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
+
+    static {
+
+        /**
+         * Because of the way SNS escapes all our json, we have to tell jackson to accept it.  See the documentation
+         * here for how SNS borks the message body
+         *
+         *  http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+         */
+        mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+    }
 
 
-    private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .build(new CacheLoader<String, String>() {
+    private final LoadingCache<String, String> writeTopicArnMap =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() {
             @Override
-            public String load(String queueName)
-                throws Exception {
+            public String load( String queueName ) throws Exception {
 
-                return setupTopics(queueName);
+                return setupTopics( queueName );
             }
-        });
+        } );
 
-    private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .build(new CacheLoader<String, Queue>() {
+    private final LoadingCache<String, Queue> readQueueUrlMap =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() {
             @Override
-            public Queue load(String queueName) throws Exception {
+            public Queue load( String queueName ) throws Exception {
 
                 Queue queue = null;
 
                 try {
-                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
-                    queue = new Queue(result.getQueueUrl());
-                } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                    logger.error("Queue {} does not exist, will create", queueName);
-                } catch (Exception e) {
-                    logger.error("failed to get queue from service", e);
+                    GetQueueUrlResult result = sqs.getQueueUrl( queueName );
+                    queue = new Queue( result.getQueueUrl() );
+                }
+                catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+                    logger.error( "Queue {} does not exist, will create", queueName );
+                }
+                catch ( Exception e ) {
+                    logger.error( "failed to get queue from service", e );
                     throw e;
                 }
 
-                if (queue == null) {
-                    String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
-                    queue = new Queue(url);
+                if ( queue == null ) {
+                    String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+                    queue = new Queue( url );
                 }
 
-                setupTopics(queueName);
+                setupTopics( queueName );
 
                 return queue;
             }
-        });
-
+        } );
 
 
     @Inject
-    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
-                               CassandraFig cassandraFig, QueueFig queueFig) {
+    public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
+                                CassandraFig cassandraFig, QueueFig queueFig ) {
         this.scope = scope;
         this.fig = fig;
         this.clusterFig = clusterFig;
@@ -148,177 +159,179 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
         final ExecutorService executor = TaskExecutorFactory
-            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
-                TaskExecutorFactory.RejectionAction.CALLERRUNS);
+            .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+                TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
 
         final Region region = getRegion();
 
         try {
-            sqs = createSQSClient(region);
-            sns = createSNSClient(region);
-            snsAsync = createAsyncSNSClient(region, executor);
+            sqs = createSQSClient( region );
+            sns = createSNSClient( region );
+            snsAsync = createAsyncSNSClient( region, executor );
             sqsAsync = createAsyncSQSClient( region, executor );
-
-        } catch (Exception e) {
-            throw new RuntimeException("Error setting up mapper", e);
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Error setting up mapper", e );
         }
     }
 
-    private String setupTopics(final String queueName)
-        throws Exception {
 
-        logger.info("Setting up setupTopics SNS/SQS...");
+    private String setupTopics( final String queueName ) throws Exception {
 
-        String primaryTopicArn = AmazonNotificationUtils.getTopicArn(sns, queueName, true);
+        logger.info( "Setting up setupTopics SNS/SQS..." );
 
-        if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn);
+        String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );
 
-        String queueUrl = AmazonNotificationUtils.getQueueUrlByName(sqs, queueName);
-        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(sqs, queueName);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn );
+        }
+
+        String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
+        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );
 
-        if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn );
+        }
 
-        if (primaryQueueArn == null) {
-            if (logger.isDebugEnabled())
-                logger.debug("SNS/SQS Setup: primaryQueueArn is null, creating queue...");
+        if ( primaryQueueArn == null ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
+            }
 
-            queueUrl = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
-            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(sqs, queueUrl);
+            queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );
 
-            if (logger.isDebugEnabled())
-                logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
+            }
         }
 
         try {
 
-            SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
-             sns.subscribe(primarySubscribeRequest);
+            SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
+            sns.subscribe( primarySubscribeRequest );
 
             // ensure the SNS primary topic has permission to send to the primary SQS queue
             List<String> primaryTopicArnList = new ArrayList<>();
-            primaryTopicArnList.add(primaryTopicArn);
-            AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList);
-        } catch (AmazonServiceException e) {
-            logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
+            primaryTopicArnList.add( primaryTopicArn );
+            AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
+        }
+        catch ( AmazonServiceException e ) {
+            logger.error(
+                String.format( "Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn ), e );
         }
 
-        if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL) {
+        if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) {
 
             String multiRegion = fig.getRegionList();
 
-            if (logger.isDebugEnabled())
-                logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
+            }
 
-            String[] regionNames = multiRegion.split(",");
+            String[] regionNames = multiRegion.split( "," );
 
-            final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
-            final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
+            final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
+            final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
 
-            arrQueueArns.put(primaryQueueArn, fig.getRegion());
-            topicArns.put(primaryTopicArn, fig.getRegion());
+            arrQueueArns.put( primaryQueueArn, fig.getRegion() );
+            topicArns.put( primaryTopicArn, fig.getRegion() );
 
-            for (String regionName : regionNames) {
+            for ( String regionName : regionNames ) {
 
                 regionName = regionName.trim();
-                Regions regions = Regions.fromName(regionName);
-                Region region = Region.getRegion(regions);
+                Regions regions = Regions.fromName( regionName );
+                Region region = Region.getRegion( regions );
 
-                AmazonSQSClient sqsClient = createSQSClient(region);
-                AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+                AmazonSQSClient sqsClient = createSQSClient( region );
+                AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
 
                 // getTopicArn will create the SNS topic if it doesn't exist
-                String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
-                topicArns.put(topicArn, regionName);
+                String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
+                topicArns.put( topicArn, regionName );
 
                 // create the SQS queue if it doesn't exist
-                String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
-                if (queueArn == null) {
-                    queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
-                    queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
+                String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
+                if ( queueArn == null ) {
+                    queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
+                    queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
                 }
 
-                arrQueueArns.put(queueArn, regionName);
+                arrQueueArns.put( queueArn, regionName );
             }
 
-            logger.debug("Creating Subscriptions...");
+            logger.debug( "Creating Subscriptions..." );
 
-            for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
+            for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
                 String queueARN = queueArnEntry.getKey();
                 String strSqsRegion = queueArnEntry.getValue();
 
-                Regions sqsRegions = Regions.fromName(strSqsRegion);
-                Region sqsRegion = Region.getRegion(sqsRegions);
+                Regions sqsRegions = Regions.fromName( strSqsRegion );
+                Region sqsRegion = Region.getRegion( sqsRegions );
 
-                AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion);
+                AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );
 
                 // ensure the URL used to subscribe is for the correct name/region
-                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName);
+                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );
 
                 // this list used later for adding permissions to queues
                 List<String> topicArnList = new ArrayList<>();
 
-                for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
+                for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {
 
                     String topicARN = topicArnEntry.getKey();
-                    topicArnList.add(topicARN);
+                    topicArnList.add( topicARN );
 
                     String strSnsRegion = topicArnEntry.getValue();
-                    Regions snsRegions = Regions.fromName(strSnsRegion);
-                    Region snsRegion = Region.getRegion(snsRegions);
+                    Regions snsRegions = Regions.fromName( strSnsRegion );
+                    Region snsRegion = Region.getRegion( snsRegions );
 
-                    AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously
-                    SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN);
+                    AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
+                    SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );
 
                     try {
 
-                        logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
-                            queueARN,
-                            strSqsRegion,
-                            topicARN,
-                            strSnsRegion
-                        );
+                        logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
+                            strSqsRegion, topicARN, strSnsRegion );
 
-                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest);
+                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
                         String subscriptionARN = subscribeResult.getSubscriptionArn();
-                        if(logger.isDebugEnabled()){
-                            logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug(
+                                "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
+                                queueARN, topicARN, subscriptionARN );
                         }
-
-
-                    } catch (Exception e) {
-                        logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
-                            queueARN,
-                            strSqsRegion,
-                            topicARN,
-                            strSnsRegion), e);
-
-
+                    }
+                    catch ( Exception e ) {
+                        logger.error( String
+                            .format( "ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
+                                queueARN, strSqsRegion, topicARN, strSnsRegion ), e );
                     }
                 }
 
-                logger.info("Adding permission to receive messages...");
+                logger.info( "Adding permission to receive messages..." );
                 // add permission to each queue, providing a list of topics that it's subscribed to
-                AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList);
-
+                AmazonNotificationUtils
+                    .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
             }
         }
 
         return primaryTopicArn;
     }
 
+
     /**
      * The Asynchronous SNS client is used for publishing events to SNS.
-     *
      */
 
-    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
+    private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
 
-        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
+        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor );
 
-        sns.setRegion(region);
+        sns.setRegion( region );
 
         return sns;
     }
@@ -326,11 +339,8 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     /**
      * Create the async sqs client
-     * @param region
-     * @param executor
-     * @return
      */
-    private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+    private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
         final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
@@ -338,173 +348,209 @@ public class SNSQueueManagerImpl implements QueueManager {
         sqs.setRegion( region );
 
         return sqs;
-
     }
 
+
     /**
      * The Synchronous SNS client is used for creating topics and subscribing queues.
-     *
      */
-    private AmazonSNSClient createSNSClient(final Region region) {
+    private AmazonSNSClient createSNSClient( final Region region ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
-        final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+        final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() );
 
-        sns.setRegion(region);
+        sns.setRegion( region );
 
         return sns;
     }
 
 
     private String getName() {
-        String name = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
+        String name =
+            clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+                + scope.getRegionImplementation();
         name = name.toLowerCase(); //user lower case values
-        Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
+        Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
 
         return name;
     }
 
+
     public Queue getReadQueue() {
         String queueName = getName();
 
         try {
-            return readQueueUrlMap.get(queueName);
-        } catch (ExecutionException ee) {
-            throw new RuntimeException(ee);
+            return readQueueUrlMap.get( queueName );
+        }
+        catch ( ExecutionException ee ) {
+            throw new RuntimeException( ee );
         }
     }
 
+
     public String getWriteTopicArn() {
         try {
-            return writeTopicArnMap.get(getName());
-
-        } catch (ExecutionException ee) {
-            throw new RuntimeException(ee);
+            return writeTopicArnMap.get( getName() );
+        }
+        catch ( ExecutionException ee ) {
+            throw new RuntimeException( ee );
         }
     }
 
+
     @Override
-    public rx.Observable<QueueMessage> getMessages(final int limit,
-                                                   final int transactionTimeout,
-                                                   final int waitTime,
-                                                   final Class klass) {
+    public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+                                                    final Class klass ) {
 
-        if (sqs == null) {
-            logger.error("SQS is null - was not initialized properly");
+        if ( sqs == null ) {
+            logger.error( "SQS is null - was not initialized properly" );
             return rx.Observable.empty();
         }
 
         String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Getting up to {} messages from {}", limit, url );
+        }
 
-        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
-        receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(Math.max(1, transactionTimeout / 1000));
-        receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
+        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
+        receiveMessageRequest.setMaxNumberOfMessages( limit );
+        receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) );
+        receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 );
 
         try {
-            ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+            ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );
             List<Message> messages = result.getMessages();
 
-            if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Received {} messages from {}", messages.size(), url );
+            }
+
+            List<QueueMessage> queueMessages = new ArrayList<>( messages.size() );
 
-            List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+            for ( Message message : messages ) {
 
-            for (Message message : messages) {
-                Object body;
+                Object payload;
                 final String originalBody = message.getBody();
 
                 try {
-                    final JsonNode bodyNode =  mapper.readTree(message.getBody());
-                    JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
+                    final JsonNode bodyNode = mapper.readTree( message.getBody() );
 
+                    /**
+                     * When a message originates from SNS it has a "Message"  we have to extract
+                     * it and then process it seperately
+                     */
 
 
-                    final String bodyText = mapper.writeValueAsString( bodyObj );;
+                    if ( bodyNode.has( "Message" ) ) {
+                        final String snsNode = bodyNode.get( "Message" ).asText();
 
-                    body = fromString(bodyText, klass);
-                } catch (Exception e) {
-                    logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
-                    throw new RuntimeException(e);
+                        payload = deSerializeSQSMessage( snsNode, klass );
+                    }
+                    else {
+                        payload = deSerializeSQSMessage( originalBody, klass );
+                    }
+                }
+                catch ( Exception e ) {
+                    logger.error( String.format( "failed to deserialize message: %s", message.getBody() ), e );
+                    throw new RuntimeException( e );
                 }
 
-                QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
-                queueMessage.setStringBody(originalBody);
-                queueMessages.add(queueMessage);
+                QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
+                    message.getAttributes().get( "type" ) );
+                queueMessage.setStringBody( originalBody );
+                queueMessages.add( queueMessage );
             }
 
-            return rx.Observable.from(queueMessages);
-
-        } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) {
-            logger.error(String.format("Queue does not exist! [%s]", url), dne);
-        } catch (Exception e) {
-            logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e);
+            return rx.Observable.from( queueMessages );
+        }
+        catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
+            logger.error( String.format( "Queue does not exist! [%s]", url ), dne );
+        }
+        catch ( Exception e ) {
+            logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e );
         }
 
-        return rx.Observable.from(new ArrayList<>(0));
+        return rx.Observable.from( new ArrayList<>( 0 ) );
     }
 
+
+    /**
+     * Take a string, possibly escaped via SNS, and run it through our mapper to create an object)
+     */
+    private Object deSerializeSQSMessage( final String message, final Class type ) {
+        try {
+            final Object o = mapper.readValue( message, type );
+            return o;
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e );
+        }
+    }
+
+
     @Override
     public long getQueueDepth() {
         String key = "ApproximateNumberOfMessages";
         try {
-            GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key));
-            String depthString = result.getAttributes().get(key);
-            return depthString != null ? Long.parseLong(depthString) : 0;
-        }catch (Exception e){
-            logger.error("Exception getting queue depth",e);
+            GetQueueAttributesResult result =
+                sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) );
+            String depthString = result.getAttributes().get( key );
+            return depthString != null ? Long.parseLong( depthString ) : 0;
+        }
+        catch ( Exception e ) {
+            logger.error( "Exception getting queue depth", e );
             return -1;
-
         }
     }
 
 
     @Override
     public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
-        if (snsAsync == null) {
-                   logger.error("SNS client is null, perhaps it failed to initialize successfully");
-                   return;
-               }
-
-               final String stringBody = toString(body);
+        if ( snsAsync == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+            return;
+        }
 
-               String topicArn = getWriteTopicArn();
+        final String stringBody = toString( body );
 
-               if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+        String topicArn = getWriteTopicArn();
 
-               PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Publishing Message...{} to arn: {}", stringBody, topicArn );
+        }
 
-               snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
-                   @Override
-                   public void onError( Exception e ) {
-                       logger.error( "Error publishing message... {}", e );
-                   }
+        PublishRequest publishRequest = new PublishRequest( topicArn, stringBody );
 
+        snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+            @Override
+            public void onError( Exception e ) {
+                logger.error( "Error publishing message... {}", e );
+            }
 
-                   @Override
-                   public void onSuccess( PublishRequest request, PublishResult result ) {
-                       if ( logger.isDebugEnabled() ) logger
-                           .debug( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
-                               request.getTopicArn() );
-                   }
-               } );
 
+            @Override
+            public void onSuccess( PublishRequest request, PublishResult result ) {
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
+                        request.getTopicArn() );
+                }
+            }
+        } );
     }
 
 
     @Override
-    public void sendMessages(final List bodies) throws IOException {
+    public void sendMessages( final List bodies ) throws IOException {
 
-        if (snsAsync == null) {
-            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+        if ( snsAsync == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
             return;
         }
 
-        for (Object body : bodies) {
-            sendMessage((Serializable)body);
+        for ( Object body : bodies ) {
+            sendMessage( ( Serializable ) body );
         }
-
     }
 
 
@@ -545,94 +591,81 @@ public class SNSQueueManagerImpl implements QueueManager {
         } );
     }
 
+
     @Override
     public void deleteQueue() {
-        logger.warn("Deleting queue: "+getReadQueue().getUrl());
-        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
-        logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
-        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
-
+        logger.warn( "Deleting queue: " + getReadQueue().getUrl() );
+        sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) );
+        logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" );
+        sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) );
     }
 
 
     @Override
-    public void commitMessage(final QueueMessage queueMessage) {
+    public void commitMessage( final QueueMessage queueMessage ) {
         String url = getReadQueue().getUrl();
-        if (logger.isDebugEnabled())
-            logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
+        }
 
-        sqs.deleteMessage(new DeleteMessageRequest()
-            .withQueueUrl(url)
-            .withReceiptHandle(queueMessage.getHandle()));
+        sqs.deleteMessage(
+            new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) );
     }
 
 
     @Override
-    public void commitMessages(final List<QueueMessage> queueMessages) {
+    public void commitMessages( final List<QueueMessage> queueMessages ) {
         String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Commit messages {} to queue {}", queueMessages.size(), url );
+        }
 
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
 
-        for (QueueMessage message : queueMessages) {
-            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
+        for ( QueueMessage message : queueMessages ) {
+            entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
         }
 
-        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
-        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries );
+        DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
 
         boolean successful = result.getFailed().size() <= 0;
 
-        if (!successful) {
-            for (BatchResultErrorEntry failed : result.getFailed()) {
-                logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
+        if ( !successful ) {
+            for ( BatchResultErrorEntry failed : result.getFailed() ) {
+                logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() );
             }
         }
     }
 
 
     /**
-     * Read the object from Base64 string.
-     */
-
-    private Object fromString(final String s, final Class klass)
-        throws IOException, ClassNotFoundException {
-
-        Object o = mapper.readValue(s, klass);
-        return o;
-    }
-
-    /**
      * Write the object to a Base64 string.
      */
-    private String toString(final Object o) throws IOException {
-        return mapper.writeValueAsString(o);
+    private String toString( final Object o ) throws IOException {
+        return mapper.writeValueAsString( o );
     }
 
 
     /**
      * Get the region
-     *
-     * @return
      */
     private Region getRegion() {
-        Regions regions = Regions.fromName(fig.getRegion());
-        return Region.getRegion(regions);
+        Regions regions = Regions.fromName( fig.getRegion() );
+        return Region.getRegion( regions );
     }
 
 
     /**
      * Create the SQS client for the specified settings
      */
-    private AmazonSQSClient createSQSClient(final Region region) {
+    private AmazonSQSClient createSQSClient( final Region region ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
+        final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
 
-        sqs.setRegion(region);
+        sqs.setRegion( region );
 
         return sqs;
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
deleted file mode 100644
index 0c56c05..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-public class SQSQueueManagerImpl implements QueueManager {
-    private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
-
-
-    private final QueueScope scope;
-    private ObjectMapper mapper;
-    protected final QueueFig fig;
-    private final ClusterFig clusterFig;
-    protected final AmazonSQSClient sqs;
-
-    private static SmileFactory smileFactory = new SmileFactory();
-
-    private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .build(new CacheLoader<String, Queue>() {
-            @Override
-            public Queue load(String queueName) throws Exception {
-
-                //the amazon client is not thread safe, we need to create one per queue
-                Queue queue = null;
-
-                try {
-
-                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
-                    queue = new Queue(result.getQueueUrl());
-
-                } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                    //no op, swallow
-                    logger.error("Queue {} does not exist, creating", queueName);
-
-                } catch (Exception e) {
-                    logger.error("failed to get queue from service", e);
-                    throw e;
-                }
-
-                if (queue == null) {
-
-                    final String deadletterQueueName = String.format("%s_dead", queueName);
-                    final Map<String, String> deadLetterAttributes = new HashMap<>(2);
-
-                    deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
-                    CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
-                        .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
-
-                    final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
-                    logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
-
-                    final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
-
-                    String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
-                        " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
-
-                    final Map<String, String> queueAttributes = new HashMap<>(2);
-                    deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
-                    deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
-
-                    CreateQueueRequest createQueueRequest = new CreateQueueRequest().
-                        withQueueName(queueName)
-                        .withAttributes(queueAttributes);
-
-                    CreateQueueResult result = sqs.createQueue(createQueueRequest);
-
-                    String url = result.getQueueUrl();
-                    queue = new Queue(url);
-
-                    logger.info("Created queue with url {}", url);
-                }
-
-                return queue;
-            }
-        });
-
-
-    @Inject
-    public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig, final ClusterFig clusterFig) {
-
-        this.scope = scope;
-        this.fig = fig;
-        this.clusterFig = clusterFig;
-        try {
-
-            smileFactory.delegateToTextual(true);
-            mapper = new ObjectMapper(smileFactory);
-            //pretty print, disabling for speed
-//            mapper.enable(SerializationFeature.INDENT_OUTPUT);
-            mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
-            sqs = createClient();
-
-        } catch (Exception e) {
-            throw new RuntimeException("Error setting up mapper", e);
-        }
-    }
-
-
-    protected String getName() {
-
-        String name = clusterFig.getClusterName() + "_" + scope.getName();
-
-        Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
-
-        return name;
-    }
-
-    public Queue getQueue() {
-
-        try {
-            Queue queue = urlMap.get(getName());
-            return queue;
-        } catch (ExecutionException ee) {
-            throw new RuntimeException(ee);
-        }
-    }
-
-    @Override
-    public rx.Observable<QueueMessage> getMessages(final int limit,
-                                          final int transactionTimeout,
-                                          final int waitTime,
-                                          final Class klass) {
-
-        if (sqs == null) {
-            logger.error("Sqs is null");
-            return rx.Observable.empty();
-        }
-
-        String url = getQueue().getUrl();
-
-        if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url);
-
-        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
-        receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
-        receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
-        ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
-        List<Message> messages = result.getMessages();
-
-        if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
-
-        List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
-
-        for (Message message : messages) {
-            Object body;
-
-            try {
-                body = fromString(message.getBody(), klass);
-            } catch (Exception e) {
-                logger.error("failed to deserialize message", e);
-                throw new RuntimeException(e);
-            }
-
-            QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
-            queueMessage.setStringBody(message.getBody());
-            queueMessages.add(queueMessage);
-        }
-
-        return rx.Observable.from(queueMessages);
-    }
-
-    @Override
-    public long getQueueDepth() {
-        String key = "ApproximateNumberOfMessages";
-        try {
-            GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
-            String depthString = result.getAttributes().get(key);
-            return depthString != null ? Long.parseLong(depthString) : 0;
-        }catch (Exception e){
-            logger.error("Exception getting queue depth",e);
-            return -1;
-
-        }
-    }
-    @Override
-    public void sendMessages(final List bodies) throws IOException {
-
-        if (sqs == null) {
-            logger.error("Sqs is null");
-            return;
-        }
-        String url = getQueue().getUrl();
-
-        if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url);
-
-        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
-        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-
-        for (Object body : bodies) {
-            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
-            entry.setId(UUID.randomUUID().toString());
-            entry.setMessageBody(toString(body));
-            entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype"));
-            entries.add(entry);
-        }
-
-        request.setEntries(entries);
-        sqs.sendMessageBatch(request);
-
-    }
-
-
-    @Override
-    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
-
-        if (sqs == null) {
-              logger.error("Sqs is null");
-              return;
-          }
-
-          String url = getQueue().getUrl();
-
-          if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
-
-          final String stringBody = toString(body);
-
-          SendMessageRequest request = new SendMessageRequest(url, stringBody);
-          sqs.sendMessage(request);
-      }
-
-
-
-    @Override
-    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
-        sendMessage( body );
-    }
-
-
-
-    @Override
-    public void commitMessage(final QueueMessage queueMessage) {
-
-        String url = getQueue().getUrl();
-        if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
-
-        sqs.deleteMessage(new DeleteMessageRequest()
-            .withQueueUrl(url)
-            .withReceiptHandle(queueMessage.getHandle()));
-    }
-
-
-    @Override
-    public void commitMessages(final List<QueueMessage> queueMessages) {
-
-        String url = getQueue().getUrl();
-        if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
-
-        List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
-
-        for (QueueMessage message : queueMessages) {
-            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
-        }
-
-        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
-        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
-
-        boolean successful = result.getFailed().size() <= 0;
-
-        if (!successful) {
-
-            for (BatchResultErrorEntry failed : result.getFailed()) {
-                logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
-            }
-        }
-    }
-
-
-    /**
-     * Read the object from Base64 string.
-     */
-    private Object fromString(final String s,
-                              final Class klass) throws IOException, ClassNotFoundException {
-        Object o = mapper.readValue(s, klass);
-        return o;
-    }
-
-    /**
-     * Write the object to a Base64 string.
-     */
-    protected String toString(final Object o) throws IOException {
-        return mapper.writeValueAsString(o);
-    }
-
-
-    /**
-     * Get the region
-     *
-     * @return
-     */
-    protected Region getRegion() {
-        Regions regions = Regions.fromName(fig.getRegion());
-        Region region = Region.getRegion(regions);
-        return region;
-    }
-
-    @Override
-    public void deleteQueue() {
-        logger.warn("Deleting queue: "+getQueue().getUrl());
-        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
-    }
-
-
-
-    /**
-     * Create the SQS client for the specified settings
-     */
-    private AmazonSQSClient createClient() {
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
-        final Region region = getRegion();
-        sqs.setRegion(region);
-
-        return sqs;
-    }
-
-
-}


Mime
View raw message