usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [28/50] [abbrv] usergrid git commit: Fix scheduler.
Date Mon, 02 May 2016 17:54:59 GMT
Fix scheduler.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b0593335
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b0593335
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b0593335

Branch: refs/heads/USERGRID-1246-MASTER
Commit: b05933352d9068108e65f882b4e5b1a13410f4c0
Parents: 2f47ac0
Author: Michael Russo <mrusso@apigee.com>
Authored: Wed Apr 20 18:24:59 2016 -0700
Committer: George Reyes <grey@apache.org>
Committed: Mon May 2 10:49:34 2016 -0700

----------------------------------------------------------------------
 .../impl/ApplicationQueueManagerImpl.java          | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b0593335/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 778307c..4b2612f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -29,6 +29,7 @@ import org.apache.usergrid.services.notifications.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
+import rx.Scheduler;
 import rx.Subscriber;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -62,7 +63,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
 
 
-    //private final ExecutorService asyncExecutor;
+    private final Scheduler scheduler;
 
 
 
@@ -78,7 +79,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
         this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
         this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
 
-        /**
+
         int maxAsyncThreads;
         int workerQueueSize;
 
@@ -99,11 +100,11 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
 
         // create our own executor which has a bounded queue w/ caller runs policy for rejected
tasks
-        this.asyncExecutor = TaskExecutorFactory
+        this.scheduler = Schedulers.from(TaskExecutorFactory
             .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
-                TaskExecutorFactory.RejectionAction.CALLERRUNS );
+                TaskExecutorFactory.RejectionAction.CALLERRUNS ));
+
 
-        **/
     }
 
     private boolean scheduleQueueJob(Notification notification) throws Exception {
@@ -306,7 +307,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
                         })
                         .map(sendMessageFunction)
-                        .subscribeOn(Schedulers.io());
+                        .subscribeOn(scheduler);
 
                 }, concurrencyFactor)
                 .distinct( queueMessage -> {
@@ -314,7 +315,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                     if(queueMessage.isPresent()) {
                         return queueMessage.get().getNotificationId();
                     }
-                    
+
                     return queueMessage; // this will always be distinct, default handling
for the Optional.empty() case
 
                 } )
@@ -372,7 +373,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
                 });
 
-            processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the
queuing into the background
+            processMessagesObservable.subscribeOn(scheduler).subscribe(); // fire the queuing
into the background
 
         }
 


Mime
View raw message