usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [1/2] usergrid git commit: Remove SNS Async client and use the synchronous client, putting the publish events on the system's rx async thread pool.
Date Mon, 31 Aug 2015 20:23:35 GMT
Repository: usergrid
Updated Branches:
  refs/heads/two-dot-o-dev a9e54d34b -> c905ebebd


Remove SNS Async client and use the synchronous client, putting the publish events on the
system's rx async thread pool.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/405559a5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/405559a5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/405559a5

Branch: refs/heads/two-dot-o-dev
Commit: 405559a5baa9b5cf06c40d426b4e40917a65f254
Parents: a9e54d3
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Mon Aug 31 12:59:02 2015 -0700
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Mon Aug 31 12:59:02 2015 -0700

----------------------------------------------------------------------
 .../queue/impl/SNSQueueManagerImpl.java         | 74 ++++++++------------
 1 file changed, 30 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/405559a5/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 7e1e99c..8bfab8e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -19,10 +19,8 @@ package org.apache.usergrid.persistence.queue.impl;
 
 
 import com.amazonaws.AmazonServiceException;
-import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sns.AmazonSNSAsyncClient;
 import com.amazonaws.services.sns.AmazonSNSClient;
 import com.amazonaws.services.sns.model.*;
 import com.amazonaws.services.sqs.AmazonSQSClient;
@@ -41,12 +39,15 @@ import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.Queue;
 import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
+import rx.Observable;
 
 public class SNSQueueManagerImpl implements QueueManager {
 
@@ -58,7 +59,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final CassandraFig cassandraFig;
     private final AmazonSQSClient sqs;
     private final AmazonSNSClient sns;
-    private final AmazonSNSAsyncClient snsAsync;
+    private final RxTaskScheduler rxTaskScheduler;
 
 
     private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -107,16 +108,16 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Inject
-    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
CassandraFig cassandraFig) {
+    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
CassandraFig cassandraFig, final RxTaskScheduler rxTaskScheduler) {
         this.scope = scope;
         this.fig = fig;
         this.clusterFig = clusterFig;
         this.cassandraFig = cassandraFig;
+        this.rxTaskScheduler = rxTaskScheduler;
 
         try {
             sqs = createSQSClient(getRegion());
             sns = createSNSClient(getRegion());
-            snsAsync = createAsyncSNSClient(getRegion());
 
         } catch (Exception e) {
             throw new RuntimeException("Error setting up mapper", e);
@@ -266,27 +267,6 @@ public class SNSQueueManagerImpl implements QueueManager {
     }
 
     /**
-     * The Asynchronous SNS client is used for publishing events to SNS.
-     *
-     */
-
-    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-
-        /**
-         * The Async client will use default client configurations (default max conn: 50)
-         * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html
-         * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/constant-values.html#com.amazonaws.ClientConfiguration.DEFAULT_MAX_CONNECTIONS
-         */
-
-        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials());
-
-        sns.setRegion(region);
-
-        return sns;
-    }
-
-    /**
      * The Synchronous SNS client is used for creating topics and subscribing queues.
      *
      */
@@ -400,7 +380,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public void sendMessages(final List bodies) throws IOException {
 
-        if (snsAsync == null) {
+        if (sns == null) {
             logger.error("SNS client is null, perhaps it failed to initialize successfully");
             return;
         }
@@ -413,32 +393,38 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     @Override
     public void sendMessage(final Object body) throws IOException {
+        Observable.just(body).doOnNext(message->{
 
-        if (snsAsync == null) {
-            logger.error("SNS client is null, perhaps it failed to initialize successfully");
-            return;
-        }
+            if (sns == null) {
+                logger.error("SNS client is null, perhaps it failed to initialize successfully");
+                return;
+            }
+
+            final String stringBody;
+            try {
 
-        final String stringBody = toString(body);
+                stringBody = toString(body);
+                String topicArn = getWriteTopicArn();
 
-        String topicArn = getWriteTopicArn();
+                if (logger.isDebugEnabled()){
+                    logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+                }
 
-        if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody,
topicArn);
+                PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
 
-        PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+                // publish message to SNS
+                PublishResult publishResult = sns.publish(publishRequest);
 
-        snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>()
{
-                @Override
-                public void onError(Exception e) {
-                    logger.error("Error publishing message... {}", e);
+                if(logger.isDebugEnabled()){
+                    logger.debug("Successfully published... messageID=[{}],  arn=[{}]",
+                        publishResult.getMessageId(), publishRequest.getTopicArn());
                 }
 
-                @Override
-                public void onSuccess(PublishRequest request, PublishResult result) {
-                    if (logger.isDebugEnabled()) logger.debug("Successfully published...
messageID=[{}],  arn=[{}]", result.getMessageId(), request.getTopicArn());
+            } catch (IOException e) {
+                logger.error("Unable to convert queue object to a string message body", e);
+            }
 
-                }
-            });
+        }).subscribeOn(rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
 
     }
 


Mime
View raw message