usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [26/50] [abbrv] incubator-usergrid git commit: QueueListener uses observable
Date Mon, 01 Jun 2015 21:48:35 GMT
QueueListener uses observable


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6b3a877d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6b3a877d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6b3a877d

Branch: refs/heads/USERGRID-628
Commit: 6b3a877d4c33cb655e0e806f8db543409b7fd6e1
Parents: e70f047
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Thu May 28 13:47:17 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Thu May 28 13:47:17 2015 -0600

----------------------------------------------------------------------
 .../services/notifications/QueueListener.java   | 159 ++++++++++---------
 1 file changed, 83 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6b3a877d/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 91b3e00..1adf88f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -40,6 +40,7 @@ import javax.annotation.PostConstruct;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -148,91 +149,97 @@ public class QueueListener  {
         QueueScope queueScope = new QueueScopeImpl( queueName ) {};
         QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
         // run until there are no more active jobs
-        long runCount = 0;
+        final AtomicLong runCount = new AtomicLong(0);
         //cache to retrieve push manager, cached per notifier, so many notifications will
get same push manager
         LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager);
 
         while ( true ) {
-            try {
 
                 Timer.Context timerContext = timer.time();
-                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(),
MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class).toList().toBlocking().last();
-                LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
-
-                if (messages.size() > 0) {
-                    HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
-                    //group messages into hash map by app id
-                    for (QueueMessage message : messages) {
-                        //TODO: stop copying around this area as it gets notification specific.
-                        ApplicationQueueMessage queueMessage = (ApplicationQueueMessage)
message.getBody();
-                        UUID applicationId = queueMessage.getApplicationId();
-                        //Groups queue messages by application Id, ( they are all probably
going to the same place )
-                        if (!messageMap.containsKey(applicationId)) {
-                            //For each app id it sends the set.
-                            List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
-                            applicationQueueMessages.add(message);
-                            messageMap.put(applicationId, applicationQueueMessages);
-                        } else {
-                            messageMap.get(applicationId).add(message);
-                        }
-                    }
-                    long now = System.currentTimeMillis();
-                    Observable merge = null;
-                    //send each set of app ids together
-                    for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet())
{
-                        UUID applicationId = entry.getKey();
-                        ApplicationQueueManager manager = queueManagerMap.get(applicationId);
-                        LOG.info("send batch for app {} of {} messages", entry.getKey(),
entry.getValue().size());
-                        Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
-                        if(merge == null)
-                            merge = current;
-                        else {
-                            merge = Observable.merge(merge,current);
-                        }
-                    }
-                    if(merge!=null) {
-                        merge.toBlocking().lastOrDefault(null);
-                    }
-                    queueManager.commitMessages(messages);
-
-                    meter.mark(messages.size());
-                    LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis()
- now);
-
-                    if(sleepBetweenRuns > 0) {
-                        LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
-                        Thread.sleep(sleepBetweenRuns);
-                    }
-                    if(++runCount % consecutiveCallsToRemoveDevices == 0){
-                        for(ApplicationQueueManager applicationQueueManager : queueManagerMap.asMap().values()){
+                queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000,
ApplicationQueueMessage.class)
+                    .buffer(getBatchSize())
+                    .doOnNext(messages -> {
+                        try {
+                            LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
+
+                            if (messages.size() > 0) {
+                                HashMap<UUID, List<QueueMessage>> messageMap
= new HashMap<>(messages.size());
+                                //group messages into hash map by app id
+                                for (QueueMessage message : messages) {
+                                    //TODO: stop copying around this area as it gets notification
specific.
+                                    ApplicationQueueMessage queueMessage = (ApplicationQueueMessage)
message.getBody();
+                                    UUID applicationId = queueMessage.getApplicationId();
+                                    //Groups queue messages by application Id, ( they are
all probably going to the same place )
+                                    if (!messageMap.containsKey(applicationId)) {
+                                        //For each app id it sends the set.
+                                        List<QueueMessage> applicationQueueMessages
= new ArrayList<QueueMessage>();
+                                        applicationQueueMessages.add(message);
+                                        messageMap.put(applicationId, applicationQueueMessages);
+                                    } else {
+                                        messageMap.get(applicationId).add(message);
+                                    }
+                                }
+                                long now = System.currentTimeMillis();
+                                Observable merge = null;
+                                //send each set of app ids together
+                                for (Map.Entry<UUID, List<QueueMessage>> entry
: messageMap.entrySet()) {
+                                    UUID applicationId = entry.getKey();
+                                    ApplicationQueueManager manager = queueManagerMap.get(applicationId);
+                                    LOG.info("send batch for app {} of {} messages", entry.getKey(),
entry.getValue().size());
+                                    Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
+                                    if(merge == null)
+                                        merge = current;
+                                    else {
+                                        merge = Observable.merge(merge,current);
+                                    }
+                                }
+                                if(merge!=null) {
+                                    merge.toBlocking().lastOrDefault(null);
+                                }
+                                queueManager.commitMessages(messages);
+
+                                meter.mark(messages.size());
+                                LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis()
- now);
+
+                                if(sleepBetweenRuns > 0) {
+                                    LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
+                                    Thread.sleep(sleepBetweenRuns);
+                                }
+                                if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices
== 0){
+                                    for(ApplicationQueueManager applicationQueueManager :
queueManagerMap.asMap().values()){
+                                        try {
+                                            applicationQueueManager.asyncCheckForInactiveDevices();
+                                        }catch (Exception inactiveDeviceException){
+                                            LOG.error("Inactive Device Get failed",inactiveDeviceException);
+                                        }
+                                    }
+                                    //clear everything
+                                    runCount.set(0);
+                                }
+                            }
+                            else{
+                                LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
+                                Thread.sleep(sleepWhenNoneFound);
+                            }
+                            timerContext.stop();
+                            //send to the providers
+                            consecutiveExceptions.set(0);
+                        }catch (Exception ex){
+                            LOG.error("failed to dequeue",ex);
                             try {
-                                applicationQueueManager.asyncCheckForInactiveDevices();
-                            }catch (Exception inactiveDeviceException){
-                                LOG.error("Inactive Device Get failed",inactiveDeviceException);
+                                long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet();
+                                long maxSleep = 15000;
+                                sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime
;
+                                LOG.info("sleeping due to failures {} ms", sleeptime);
+                                Thread.sleep(sleeptime);
+                            }catch (InterruptedException ie){
+                                LOG.info("sleep interrupted");
                             }
                         }
-                        //clear everything
-                        runCount=0;
-                    }
-                }
-                else{
-                    LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
-                    Thread.sleep(sleepWhenNoneFound);
-                }
-                timerContext.stop();
-                //send to the providers
-                consecutiveExceptions.set(0);
-            }catch (Exception ex){
-                LOG.error("failed to dequeue",ex);
-                try {
-                    long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet();
-                    long maxSleep = 15000;
-                    sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime ;
-                    LOG.info("sleeping due to failures {} ms", sleeptime);
-                    Thread.sleep(sleeptime);
-                }catch (InterruptedException ie){
-                    LOG.info("sleep interrupted");
-                }
-            }
+                    })
+                    .toBlocking().last();
+
+
         }
     }
 


Mime
View raw message