usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [25/50] [abbrv] usergrid git commit: Add a separate executor pool for async processing instead of unbounded Schedulers.io()
Date Mon, 02 May 2016 17:54:56 GMT
Add a separate executor pool for async processing instead of unbounded Schedulers.io()


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e639ff8e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e639ff8e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e639ff8e

Branch: refs/heads/USERGRID-1246-MASTER
Commit: e639ff8e73d90df94dcdb55f0fbbb4c399d8fd47
Parents: 5beb53b
Author: Michael Russo <mrusso@apigee.com>
Authored: Wed Apr 20 19:50:41 2016 +0100
Committer: George Reyes <grey@apache.org>
Committed: Mon May 2 10:49:34 2016 -0700

----------------------------------------------------------------------
 .../services/notifications/TaskManager.java     | 96 +++++++++-----------
 .../impl/ApplicationQueueManagerImpl.java       | 86 +++++++++++++-----
 2 files changed, 105 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/e639ff8e/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index ce2b82c..531ca7c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -37,12 +37,10 @@ public class TaskManager {
     private AtomicLong successes = new AtomicLong();
     private AtomicLong failures = new AtomicLong();
     private EntityManager em;
-    private boolean hasFinished;
 
     public TaskManager(EntityManager em, Notification notification) {
         this.em = em;
         this.notification = notification;
-        hasFinished = false;
     }
 
     public long getSuccesses(){return successes.get();}
@@ -53,77 +51,69 @@ public class TaskManager {
         completed(notifier,null,deviceUUID,null);
     }
     public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID, String newProviderId)
throws Exception {
-        if (logger.isTraceEnabled()) {
-            logger.trace("REMOVED {}", deviceUUID);
-        }
+
+        successes.incrementAndGet();
+
         try {
-            if (logger.isTraceEnabled()) {
-                logger.trace("notification {} removing device {} from remaining", notification.getUuid(),
deviceUUID);
-            }
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
+
             if (receipt != null) {
-                if (logger.isTraceEnabled()) {
-                    logger.trace("notification {} sent to device {}. saving receipt.", notification.getUuid(),
deviceUUID);
-                }
+
                 receipt.setSent(System.currentTimeMillis());
                 this.saveReceipt(notification, deviceRef, receipt,false);
                 if (logger.isTraceEnabled()) {
-                    logger.trace("notification {} receipt saved for device {}", notification.getUuid(),
deviceUUID);
+                    logger.trace("Notification {} receipt saved for device {}", notification.getUuid(),
deviceUUID);
                 }
-                successes.incrementAndGet();
+
             }
 
             if (newProviderId != null) {
                 if (logger.isTraceEnabled()) {
-                    logger.trace("notification {} replacing device {} notifierId", notification.getUuid(),
deviceUUID);
+                    logger.trace("Notification {} replacing notifier id for device {} ",
notification.getUuid(), deviceUUID);
                 }
                 replaceProviderId(deviceRef, notifier, newProviderId);
             }
 
             if (logger.isTraceEnabled()) {
-                logger.trace("notification {} completed device {}", notification.getUuid(),
deviceUUID);
+                logger.trace("Notification {} sending completed for device {}", notification.getUuid(),
deviceUUID);
             }
 
-        } finally {
-            if (logger.isTraceEnabled()) {
-                logger.trace("COUNT is: {}", successes.get());
-            }
-//            if (hasFinished) { //process has finished but notifications are still coming
in
-//                finishedBatch();
-//
-//            }
+        } catch(Exception e) {
+
+            logger.error("Unable to mark notification {} as completed due to: {}", notification.getUuid(),
e);
+
         }
     }
 
     public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code,
String message) throws Exception {
 
+        failures.incrementAndGet();
+
         try {
             if (logger.isDebugEnabled()) {
-                logger.debug("notification {} for device {} got error {}", notification.getUuid(),
deviceUUID, code);
+                logger.debug("Notification {} for device {} got error {}", notification.getUuid(),
deviceUUID, code);
             }
 
-            failures.incrementAndGet();
-            if(receipt!=null) {
-                if ( receipt.getUuid() != null ) {
-                    successes.decrementAndGet();
-                }
+            if(receipt != null) {
                 receipt.setErrorCode( code );
                 receipt.setErrorMessage( message );
                 this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE,
deviceUUID ), receipt, true );
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug( "notification {} receipt saved for device {}", notification.getUuid(),
deviceUUID );
-                }
             }
-        } finally {
+
             completed(notifier, deviceUUID);
             finishedBatch();
+
+        } catch (Exception e){
+
+            logger.error("Unable to finish marking notification {} as failed due to error:
", notification.getUuid(), e);
+
         }
     }
 
-    /*
-    * called from TaskManager - creates a persistent receipt and updates the
-    * passed one w/ the UUID
+    /**
+    * Called from TaskManager - Creates a persistent receipt
+    *
     */
     private void saveReceipt(EntityRef notification, EntityRef device, Receipt receipt, boolean
hasError) throws Exception {
 
@@ -142,11 +132,16 @@ public class TaskManager {
             } else {
                 em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt);
             }
+
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Notification {} receipt saved for device {}", notification.getUuid(),
device.getUuid() );
+            }
+
         }
 
     }
 
-    protected void replaceProviderId(EntityRef device, Notifier notifier,
+    private void replaceProviderId(EntityRef device, Notifier notifier,
                                      String newProviderId) throws Exception {
         Object value = em.getProperty(device, notifier.getName()
                 + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
@@ -161,33 +156,24 @@ public class TaskManager {
         }
     }
 
-    public void finishedBatch() throws Exception {
-        finishedBatch(true);
-    }
 
-    public void finishedBatch(boolean refreshNotification) throws Exception {
-
-        long successes = this.successes.get(); //reset counters
-        long failures = this.failures.get(); //reset counters
+    public void finishedBatch() throws Exception {
 
-        for (int i = 0; i < successes; i++) {
-            this.successes.decrementAndGet();
-        }
-        for (int i = 0; i < failures; i++) {
-            this.failures.decrementAndGet();
-        }
+        long successes = this.successes.get();
+        long failures = this.failures.get();
 
-        this.hasFinished = true;
+        // reset the counters
+        this.successes.set(0);
+        this.failures.set(0);
 
-        // force refresh notification by fetching it
-        if (refreshNotification) {
-            notification = em.get(this.notification.getUuid(), Notification.class);
-        }
+        // get the latest notification info
+        notification = em.get(this.notification.getUuid(), Notification.class);
 
         notification.updateStatistics(successes, failures);
         notification.setModified(System.currentTimeMillis());
         notification.setFinished(notification.getModified());
 
         em.update(notification);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e639ff8e/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 2f39ae4..1bb92b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.impl;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
@@ -52,11 +53,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
     private final Meter queueMeter;
     private final Meter sendMeter;
 
+    private final static String PUSH_PROCESSING_MAXTHREADS_PROP = "usergrid.push.async.processing.threads";
+    private final static String PUSH_PROCESSING_QUEUESIZE_PROP = "usergrid.push.async.processing.queue.size";
     private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency";
 
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
 
 
+
+    private final ExecutorService asyncExecutor;
+
+
+
+
     public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
                                         QueueManager queueManager, MetricsFactory metricsFactory,
                                         Properties properties) {
@@ -65,8 +74,31 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
         this.queueName = getQueueNames(properties);
-        queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
-        sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+        this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
+        this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+
+        int maxAsyncThreads;
+        int workerQueueSize;
+
+        try {
+
+            maxAsyncThreads = Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP,
"200"));
+            workerQueueSize = Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP,
"2000"));
+
+        } catch (Exception e){
+
+            // if junk is passed into the property, just default the values
+            maxAsyncThreads = 200;
+            workerQueueSize = 2000;
+
+        }
+
+
+        // create our own executor which has a bounded queue w/ caller runs policy for rejected
tasks
+        this.asyncExecutor = TaskExecutorFactory
+            .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
+                TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
 
     }
 
@@ -296,7 +328,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                             }
 
 
-                        }).subscribeOn(Schedulers.io());
+                        }).subscribeOn(Schedulers.from(asyncExecutor));
 
                 }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")))
                 .doOnError(throwable -> {
@@ -327,7 +359,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
                 });
 
-            processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the
queuing into the background
+            processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe();
// fire the queuing into the background
 
         }
 
@@ -348,7 +380,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
         // if no devices, go ahead and mark the batch finished
         if (deviceCount.get() <= 0 ) {
             TaskManager taskManager = new TaskManager(em, notification);
-            taskManager.finishedBatch(true);
+            taskManager.finishedBatch();
         }
 
 
@@ -540,32 +572,43 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
 
     /**
-     * Validates that a notifier and adapter exists to send notifications to;
-     * {"winphone":"mymessage","apple":"mymessage"}
-     * TODO: document this method better
+     *  Validates that a notifier and adapter exists to send notifications to. For the example
payload
+     *
+     *  { "payloads" : {"winphone":"mymessage","apple":"mymessage"} }
+     *
+     *  Notifiers with name "winphone" and "apple" must exist.
      */
-    private Map<String, Object> translatePayloads(Map<String, Object> payloads,
Map<Object, ProviderAdapter>
-        notifierMap) throws Exception {
-        Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
+    private Map<String, Object> translatePayloads(Map<String, Object> payloads,
+                                                  Map<Object, ProviderAdapter> notifierMap)
throws Exception {
+
+        final Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
+
         for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+
             String payloadKey = entry.getKey().toLowerCase();
             Object payloadValue = entry.getValue();
+
             //look for adapter from payload map
             ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
             if (providerAdapter != null) {
+
                 //translate payload to usable information
                 Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue)
: null;
                 if (translatedPayload != null) {
                     translatedPayloads.put(payloadKey, translatedPayload);
                 }
+
             }
         }
         return translatedPayloads;
     }
 
     public static String getQueueNames(Properties properties) {
-        String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY,
ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
+
+        String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY,
+            ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
         return name;
+
     }
 
     private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T>
{
@@ -585,15 +628,15 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
             try {
                 while (!subscriber.isUnsubscribed() && input.hasNext()) {
+
                     //send our input to the next
-                    //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName());
                     subscriber.onNext((T) input.next());
+
                 }
 
                 //tell the subscriber we don't have any more data
-                //logger.debug("finished iterator: {}", input.getClass().getSimpleName());
-
                 subscriber.onCompleted();
+
             } catch (Throwable t) {
                 logger.error("failed on subscriber", t);
                 subscriber.onError(t);
@@ -617,10 +660,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                     }
                 }
             } catch (Exception e) {
-                logger.error("checkForInactiveDevices", e); // not
-                // essential so
-                // don't fail,
-                // but log
+                // not essential so don't fail, but log
+                logger.error("checkForInactiveDevices", e);
+
             }
         }
     }
@@ -630,14 +672,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
         if (notification.getCanceled() == Boolean.TRUE) {
             if (logger.isDebugEnabled()) {
-                logger.debug("notification {} canceled. not sending.",
+                logger.debug("Notification {} canceled. Not sending.",
                     notification.getUuid());
             }
             return false;
         }
         if (notification.isExpired()) {
             if (logger.isDebugEnabled()) {
-                logger.debug("notification {} expired. not sending.",
+                logger.debug("Notification {} expired. Not sending.",
                     notification.getUuid());
             }
             return false;
@@ -654,7 +696,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
             }
             return value != null ? value.toString() : null;
         } catch (Exception e) {
-            logger.error("Error getting provider ID, proceeding with rest of batch", e);
+            logger.error("Error getting notifier for device {}, proceeding with rest of batch",
device, e);
             return null;
         }
     }


Mime
View raw message