usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [1/4] git commit: not compiling but refactored to single queue message
Date Thu, 21 Aug 2014 23:38:40 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-notifications-queue 06db9e798 -> 5586b33f9


not compiling but refactored to single queue message


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/610bdf35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/610bdf35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/610bdf35

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 610bdf35fb341fbecfd3be0ba4c9208809706b29
Parents: 06db9e7
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Thu Aug 21 16:13:32 2014 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Thu Aug 21 16:13:32 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/entities/Receipt.java  |  12 +-
 .../notifications/NotificationServiceProxy.java |   1 -
 .../NotificationsQueueManager.java              | 276 ++++++++-----------
 .../services/notifications/QueueListener.java   | 147 ++++++++++
 .../services/notifications/QueueMessage.java    |  34 ++-
 .../services/notifications/TaskManager.java     |  91 +++---
 .../resources/usergrid-services-context.xml     |   2 +
 7 files changed, 347 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
index 1ffc351..5b85e7e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
@@ -22,12 +22,15 @@ import org.apache.usergrid.persistence.TypedEntity;
 import javax.xml.bind.annotation.XmlRootElement;
 import java.util.UUID;
 import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.apache.usergrid.mq.Message;
 
 @XmlRootElement
 public class Receipt extends TypedEntity {
 
     public static final String ENTITY_TYPE = "receipt";
-    public static final String NOTIFICATION_CONNECTION = "notification";
+
+    @EntityProperty
+    private Message message = null;
 
     /** device id **/
     @EntityProperty
@@ -63,11 +66,12 @@ public class Receipt extends TypedEntity {
     public Receipt() {
     }
 
-    public Receipt(UUID notificationUUID, String notifierId, Object payload,UUID deviceId)
{
+    public Receipt(UUID notificationUUID, String notifierId, Object payload,UUID deviceId,
Message message) {
         this.notificationUUID = notificationUUID;
         this.notifierId = notifierId;
         this.payload = payload;
         this.setDeviceId(deviceId);
+        this.message = message;
     }
 
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@@ -130,4 +134,8 @@ public class Receipt extends TypedEntity {
     public void setDeviceId(UUID deviceId) {
         this.deviceId = deviceId;
     }
+
+    public Message getMessage(){
+        return message;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
index 7712163..d6de93e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationServiceProxy.java
@@ -28,5 +28,4 @@ public interface NotificationServiceProxy {
 
     public void finishedBatch(Notification notification, long successes, long failures) throws
Exception;
 
-    void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 23e3e62..fab78d8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -20,10 +20,11 @@ import com.clearspring.analytics.stream.frequency.CountMinSketch;
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
-import com.google.common.cache.*;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.mq.QueueManager;
 import org.apache.usergrid.mq.QueueQuery;
 import org.apache.usergrid.mq.QueueResults;
@@ -43,19 +44,16 @@ import rx.schedulers.Schedulers;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * manages queues for notifications
  */
 public class NotificationsQueueManager implements NotificationServiceProxy {
     private static final String NOTIFICATION_CONCURRENT_BATCHES = "notification.concurrent.batches";
-    private static final long CONSECUTIVE_EMPTY_QUEUES = 10;
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
 
     //this is for tests, will not mark initial post complete, set to false for tests
-    public static boolean IS_TEST = false;
     private final Meter sendMeter;
     private final Histogram queueSize;
     private final Counter outstandingQueue;
@@ -72,7 +70,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
     private final JobScheduler jobScheduler;
     private final Properties props;
     private final InflectionUtils utils;
-    private AtomicLong consecutiveEmptyQueues = new AtomicLong();
     private Long pushAutoExpireAfter = null;
 
     public final Map<String, ProviderAdapter> providerAdapters =   new HashMap<String,
ProviderAdapter>(3);
@@ -162,7 +159,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
                             try {
                                 List<EntityRef> devicesRef = getDevices(entity); //
resolve group
                                 String queueName = getJobQueueName(notification);
-                                boolean maySchedule = false;
                                 for (EntityRef deviceRef : devicesRef) {
                                     long hash = MurmurHash.hash(deviceRef.getUuid());
                                     if(sketch.estimateCount(hash)>0){
@@ -171,8 +167,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
                                     }else {
                                         sketch.add(hash,1);
                                     }
-                                    maySchedule |= deviceCount.incrementAndGet() % BATCH_SIZE
== 0;
-                                    QueueMessage message = new QueueMessage(deviceRef);
+                                    QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
                                     qm.postToQueue(queueName, message);
                                     if(notification.getQueued() == null){
                                         // update queued time
@@ -181,11 +176,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
                                     }
                                 }
 
-                                //start working if you are on a large batch,
-                                if (maySchedule && numCurrentBatchesConfig >=
batchCount.incrementAndGet()) {
-                                    processBatchAndReschedule(notification, null);
-                                }
-
                                 if(devicesRef.size() <= 0){
                                     errorMessages.add("Could not find devices for entity:
"+entity.getUuid());
                                 }
@@ -208,19 +198,13 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
         properties.put("state", notification.getState());
 
         //do i have devices, and have i already started batching.
-        if (deviceCount.get()>0 ) {
-            if(batchCount.get() <= 0) {
-                processBatchAndReschedule(notification, jobExecution);
-            }
-        } else {
+        if (deviceCount.get() <= 0) {
             //if i'm in a test value will be false, do not mark finished for test orchestration,
not ideal need real tests
-            if(!IS_TEST) {
-                finishedBatch(notification, 0, 0);
-                errorMessages.add("No devices for notification " + notification.getUuid());
-            }
+            finishedBatch(notification, 0, 0);
+            errorMessages.add("No devices for notification " + notification.getUuid());
         }
 
-        if(!IS_TEST && errorMessages.size()>0){
+        if(errorMessages.size()>0){
             properties.put("deliveryErrors", errorMessages.toArray());
             if(notification.getErrorMessage()==null){
                 notification.setErrorMessage("There was a problem delivering all of your
notifications. See deliveryErrors in properties");
@@ -265,152 +249,141 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
         LOG.debug("got batch of {} devices for notification {}", results.size(), notification.getUuid());
         return results;
     }
-    /**
-     * executes a Notification batch and schedules the next one - called by
-     * NotificationBatchJob
-     */
-    public void processBatchAndReschedule(Notification notification, JobExecution jobExecution)
throws Exception {
-        if (!isOkToSend(notification,jobExecution)) {
-            LOG.info("notification " + notification.getUuid() + " canceled");
-            if (jobExecution != null) {
-                jobExecution.killed();
-            }
-            return;
-        }
-
-        LOG.debug("processing batch {}", notification.getUuid());
-
-        QueueResults queueResults = getDeliveryBatch(notification, jobExecution == null ?
BATCH_SIZE/2 : BATCH_SIZE ); //if first run then throttle the batch down by factor of 2 if
its a job then try to grab alot of notifications and run them all
-
-        long reschedule_delay = jobScheduler.SCHEDULER_GRACE_PERIOD;
-        final TaskManager taskManager =  new TaskManager( em, this, qm, notification, queueResults);
-        if (queueResults.size() > 0) {
-            consecutiveEmptyQueues.set(0);
-            sendBatchToProviders(taskManager, notification, queueResults.getMessages());
-        }else{
-            consecutiveEmptyQueues.incrementAndGet();
-        }
-        if (qm.hasPendingReads(getJobQueueName(notification), null) && consecutiveEmptyQueues.get()
<= CONSECUTIVE_EMPTY_QUEUES) {
-            if(jobExecution==null) {
-                jobScheduler.scheduleBatchJob(notification, reschedule_delay);
-            }else{
-                processBatchAndReschedule(notification, jobExecution);
-            }
-        } else {
-            consecutiveEmptyQueues.set(0);
-            finishedBatch(notification, 0, 0,true);
-        }
-
-        LOG.debug("finished processing batch");
-    }
 
     /**
      * send batches of notifications to provider
-     * @param taskManager
-     * @param notification
-     * @param queueResults
+     * @param messages
      * @throws Exception
      */
-    private void sendBatchToProviders(final TaskManager taskManager, final Notification notification,
List<Message> queueResults) throws Exception {
-
-        LOG.info("sending batch of {} devices for Notification: {}", queueResults.size(),
notification.getUuid());
+    public Observable sendBatchToProviders( final List<QueueMessage> messages,final
String queuePath) throws Exception {
         final Map<Object, Notifier> notifierMap = getNotifierMap();
-        queueSize.update(queueResults.size());
-        final Map<String, Object> payloads = notification.getPayloads();
-        final Map<String, Object> translatedPayloads = translatePayloads(payloads,
notifierMap);
-        try {
-            rx.Observable
-                    .from(queueResults)
-                    .parallel(new Func1<rx.Observable<Message>, rx.Observable<Message>>()
{
+        final NotificationServiceProxy proxy = this;
+        final LoadingCache<UUID,HashMap<String,Object>> notificationCache = CacheBuilder
+                .newBuilder()
+                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .build(new CacheLoader<UUID, HashMap<String, Object>>() {
+                    @Override
+                    public HashMap<String, Object> load(UUID key) throws Exception
{
+                        HashMap<String,Object> map = new HashMap<String, Object>();
+                        final Notification notification = em.get( key, Notification.class
);
+                        final TaskManager taskManager =  new TaskManager( em, proxy, qm,
notification,queuePath);
+                        map.put("taskManager",taskManager);
+                        map.put("notification",notification);
+                        final Map<String, Object> payloads = notification.getPayloads();
+                        final Map<String, Object> translatedPayloads = translatePayloads(payloads,
notifierMap);
+                        map.put("payloads",payloads);
+                        map.put("translatedPayloads",translatedPayloads);
+                        LOG.info("sending batch of {} devices for Notification: {}", messages.size(),
notification.getUuid());
+                        return map;
+                    }
+                });
+
+        queueSize.update(messages.size());
+
+        return rx.Observable
+                    .from(messages)
+                    .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<QueueMessage>>()
{
                         @Override
-                        public rx.Observable<Message> call(rx.Observable<Message>
messageObservable) {
-                            return messageObservable.map(new Func1<Message, Message>()
{
+                        public rx.Observable<QueueMessage> call(rx.Observable<QueueMessage>
messageObservable) {
+                            return messageObservable.map(new Func1<QueueMessage, QueueMessage>()
{
                                 @Override
-                                public Message call(Message message) {
-                                    UUID deviceUUID =QueueMessage.generate(message).getUuid();
-                                    boolean foundNotifier = false;
-                                    for (Map.Entry<String, Object> entry : payloads.entrySet())
{
-                                        try {
-                                            String payloadKey = entry.getKey();
-                                            Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
-                                            EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE,
deviceUUID);
-
-                                            String providerId;
+                                public QueueMessage call(QueueMessage message) {
+                                    try {
+                                        UUID deviceUUID = message.getUuid();
+                                        HashMap<String, Object> notificationMap = notificationCache.get(message.getNotificationId());
+                                        Map<String, Object> payloads = (Map<String,
Object>) notificationMap.get("payloads");
+                                        Map<String, Object> translatedPayloads = (Map<String,
Object>) notificationMap.get("translatedPayloads");
+                                        TaskManager taskManager = (TaskManager) notificationMap.get("taskManager");
+                                        Notification notification = (Notification) notificationMap.get("notification");
+                                        if(!isOkToSend(notification)){
+                                            return message;
+                                        }
+                                        boolean foundNotifier = false;
+                                        for (Map.Entry<String, Object> entry : payloads.entrySet())
{
                                             try {
-                                                providerId = getProviderId(deviceRef, notifier);
-                                                if (providerId == null) {
-                                                    LOG.debug("Provider not found.{} {}",
deviceRef,notifier.getName());
-                                                    continue;
-                                                }
-                                            } catch (Exception providerException) {
-                                                LOG.error("Exception getting provider.",
providerException);
-                                                continue;
-                                            }
-                                            Object payload = translatedPayloads.get(payloadKey);
+                                                String payloadKey = entry.getKey();
+                                                Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
+                                                EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE,
deviceUUID);
 
-                                            Receipt receipt = new Receipt(notification.getUuid(),
providerId, payload,deviceUUID);
-                                            TaskTracker tracker = new TaskTracker(notifier,
taskManager, receipt, deviceUUID);
-                                            if (payload == null) {
-                                                LOG.debug("selected device {} for notification
{} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+                                                String providerId;
                                                 try {
-                                                    tracker.failed(0, "failed to match payload
to " + payloadKey + " notifier");
-                                                } catch (Exception e) {
-                                                    LOG.debug("failed to mark device failed"
+ e);
+                                                    providerId = getProviderId(deviceRef,
notifier);
+                                                    if (providerId == null) {
+                                                        LOG.debug("Provider not found.{}
{}", deviceRef, notifier.getName());
+                                                        continue;
+                                                    }
+                                                } catch (Exception providerException) {
+                                                    LOG.error("Exception getting provider.",
providerException);
+                                                    continue;
+                                                }
+                                                Object payload = translatedPayloads.get(payloadKey);
+
+                                                Receipt receipt = new Receipt(notification.getUuid(),
providerId, payload, deviceUUID);
+                                                TaskTracker tracker = new TaskTracker(notifier,
taskManager, receipt, deviceUUID);
+                                                if (payload == null) {
+                                                    LOG.debug("selected device {} for notification
{} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+                                                    try {
+                                                        tracker.failed(0, "failed to match
payload to " + payloadKey + " notifier");
+                                                    } catch (Exception e) {
+                                                        LOG.debug("failed to mark device
failed" + e);
+                                                    }
+                                                    continue;
                                                 }
-                                                continue;
-                                            }
-
-                                            if (LOG.isDebugEnabled()) {
-                                                StringBuilder sb = new StringBuilder();
-                                                sb.append("sending notification ").append(notification.getUuid());
-                                                sb.append(" to device ").append(deviceUUID);
-                                                LOG.debug(sb.toString());
-                                            }
 
-                                            try {
-                                                ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
-                                                providerAdapter.sendNotification(providerId,
notifier, payload, notification, tracker);
+                                                if (LOG.isDebugEnabled()) {
+                                                    StringBuilder sb = new StringBuilder();
+                                                    sb.append("sending notification ").append(notification.getUuid());
+                                                    sb.append(" to device ").append(deviceUUID);
+                                                    LOG.debug(sb.toString());
+                                                }
 
-                                            } catch (Exception e) {
                                                 try {
-                                                    tracker.failed(0, e.getMessage());
-                                                } catch (Exception trackerException) {
-                                                    LOG.error("tracker failed", trackerException);
+                                                    ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+                                                    providerAdapter.sendNotification(providerId,
notifier, payload, notification, tracker);
+
+                                                } catch (Exception e) {
+                                                    try {
+                                                        tracker.failed(0, e.getMessage());
+                                                    } catch (Exception trackerException)
{
+                                                        LOG.error("tracker failed", trackerException);
+                                                    }
                                                 }
+                                                foundNotifier = true;
+                                            } finally {
+                                                sendMeter.mark();
                                             }
-                                            foundNotifier = true;
-                                        } finally {
-                                            sendMeter.mark();
                                         }
-                                    }
-                                    if (!foundNotifier) {
-                                        try {
-                                            taskManager.skip(deviceUUID);
-                                        } catch (Exception trackerException) {
-                                            LOG.error("failed on skip", trackerException);
+                                        if (!foundNotifier) {
+                                            try {
+                                                taskManager.skip(deviceUUID);
+                                            } catch (Exception trackerException) {
+                                                LOG.error("failed on skip", trackerException);
+                                            }
                                         }
+                                    } catch (Exception x) {
+
                                     }
                                     return message;
                                 }
                             });
                         }
                     }, Schedulers.io())
-                    .toBlocking()
-                    .lastOrDefault(null);
-
-            //for gcm this will actually send notification
-            for (ProviderAdapter providerAdapter : providerAdapters.values()) {
-                try {
-                    providerAdapter.doneSendingNotifications();
-                } catch (Exception e) {
-                    LOG.error("providerAdapter.doneSendingNotifications: ", e);
-                }
-            }
-
-        } finally {
-            outstandingQueue.dec();
-            LOG.info("finished sending batch for notification {}", notification.getUuid());
-        }
+                    .buffer(1000)
+                    .map(new Func1<List<QueueMessage>, Object>() {
+                        @Override
+                        public Object call(List<QueueMessage> queueMessages) {
+                            //for gcm this will actually send notification
+                            for (ProviderAdapter providerAdapter : providerAdapters.values())
{
+                                try {
+                                    providerAdapter.doneSendingNotifications();
+                                } catch (Exception e) {
+                                    LOG.error("providerAdapter.doneSendingNotifications:
", e);
+                                }
+                            }
+                            notificationCache.cleanUp();
+                            return null;
+                        }
+                    });
 
     }
 
@@ -433,6 +406,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
         properties.put("statistics", notification.getStatistics());
         properties.put("modified", notification.getModified());
 
+        //none of this is known and should you ever do this
         if (isNotificationDeliveryComplete(notification) || overrideComplete) {
             notification.setFinished(notification.getModified());
             properties.put("finished", notification.getModified());
@@ -584,8 +558,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
         }
     }
 
-    private boolean isOkToSend(Notification notification,
-                               JobExecution jobExecution) {
+    private boolean isOkToSend(Notification notification) {
         String autoExpireAfterString = props.getProperty(PUSH_AUTO_EXPIRE_AFTER_PROPNAME);
 
         if (autoExpireAfterString != null) {
@@ -612,15 +585,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy
{
                     notification.getUuid());
             return false;
         }
-        if (jobExecution != null
-                && notification.getDeliver() != null
-                && !notification.getDeliver().equals(jobExecution.getJobData().getProperty("deliver"))
-                ) {
-            LOG.info("notification {} was rescheduled. not sending.",
-                    notification.getUuid());
-            return false;
-        }
-
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
new file mode 100644
index 0000000..383dfb0
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueManager;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import rx.*;
+import rx.Observable;
+
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Component( "notificationsQueueListener" )
+public class QueueListener  {
+
+    public static final String queuePath = "notifications/queuelistener";
+    private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
+
+    @Autowired
+    private MetricsFactory metricsService;
+
+    @Autowired
+    private ServiceManagerFactory smf;
+
+    @Autowired
+    private EntityManagerFactory emf;
+    private QueueManager queueManager;
+    private ServiceManager svcMgr;
+    private Properties properties;
+
+    public QueueListener() {
+    }
+
+    @PostConstruct
+    void init() {
+        svcMgr =  smf.getServiceManager(smf.getManagementAppId());
+        queueManager = svcMgr.getQueueManager();
+        properties = new Properties();
+        try {
+            properties.load(Thread.currentThread()
+                    .getContextClassLoader()
+                    .getResourceAsStream("usergrid.properties"));
+        } catch (Exception e) {
+            LOG.error("Could not load props","");
+        }
+
+        run();
+    }
+
+    public void run(){
+
+        AtomicInteger consecutiveExceptions = new AtomicInteger();
+        // run until there are no more active jobs
+        while ( true ) {
+            try {
+                QueueResults results = getDeliveryBatch(1000);
+                List<Message> messages = results.getMessages();
+                HashMap<UUID,List<QueueMessage>> queueMap = new HashMap<>();
+                for(Message message : messages){
+                    QueueMessage queueMessage = QueueMessage.generate(message);
+                    if(queueMap.containsKey(queueMessage.getNotificationId())){
+                        List<QueueMessage> queueMessages = queueMap.get(queueMessage);
+                        queueMessages.add(queueMessage);
+                    }else{
+                        List<QueueMessage> queueMessages = new ArrayList<>();
+                        queueMessages.add(queueMessage);
+                        queueMap.put(queueMessage.getApplicationId(),queueMessages);
+                    }
+                }
+
+                List<Observable> observables = new ArrayList<>();
+                for(UUID applicationId : queueMap.keySet()){
+                    EntityManager entityManager = emf.getEntityManager(applicationId);
+                    ServiceManager serviceManager = smf.getServiceManager(applicationId);
+
+                    NotificationsQueueManager manager = new NotificationsQueueManager(
+                            new JobScheduler(serviceManager,entityManager),
+                            entityManager,
+                            properties,
+                            queueManager,
+                            metricsService
+                    );
+
+                   observables.add(manager.sendBatchToProviders(queueMap.get(applicationId),
results.getPath()));
+                }
+                rx.Observable first = null;
+                for(rx.Observable o : observables){
+                    if (first == null) {
+                        first = o;
+                    } else {
+                        first = Observable.merge(first, o);
+                    }
+                }
+                first.toBlocking().lastOrDefault(null);
+                consecutiveExceptions.set(0);
+            }catch (Exception ex){
+                LOG.error("failed to dequeue",ex);
+                if(consecutiveExceptions.getAndIncrement() > 10){
+                    LOG.error("killing message listener; too many failures");
+                    break;
+                }
+            }
+        }
+    }
+
+    public void queueMessage(QueueMessage message){
+        queueManager.postToQueue(queuePath, message);
+
+    }
+
+    private QueueResults getDeliveryBatch(int batchSize) throws Exception {
+
+        QueueQuery qq = new QueueQuery();
+        qq.setLimit(batchSize);
+        qq.setTimeout(TaskManager.MESSAGE_TRANSACTION_TIMEOUT);
+        QueueResults results = queueManager.getFromQueue(queuePath, qq);
+        LOG.debug("got batch of {} devices", results.size());
+        return results;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
index 065d6a3..e6301ed 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
@@ -24,20 +24,42 @@ import java.util.UUID;
 public class QueueMessage extends Message {
 
     static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
+    static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
+
 
     public QueueMessage() {
     }
 
-    public QueueMessage(UUID deviceId){
-        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID,deviceId);
+    public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId){
+        setApplicationId(applicationId);
+        setDeviceId(deviceId);
     }
 
-    public QueueMessage(EntityRef deviceRef){
-        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID,deviceRef.getUuid());
-    }
+
 
     public static QueueMessage generate(Message message){
-        return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID));
+        return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID)
message.getObjectProperty("notificationId"),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID));
+    }
+
+    public UUID getApplicationId() {
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
+    }
+    public void setApplicationId(UUID applicationId){
+        this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID,applicationId);
     }
 
+    public UUID getDeviceId() {
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+    }
+    public void setDeviceId(UUID deviceId){
+        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID,deviceId);
+    }
+
+    public UUID getNotificationId(){
+        return (UUID) this.getObjectProperty("notificationId");
+    }
+
+    public void setNotificationId(UUID notificationId){
+        this.setProperty("notificationdId",notificationId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 85c4a16..caef145 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -61,7 +61,6 @@ public class TaskManager {
     private final String path;
 
     private Notification notification;
-    private  ConcurrentHashMap<UUID,Message> remaining;
     private AtomicLong successes = new AtomicLong();
     private AtomicLong failures = new AtomicLong();
     private AtomicLong skips = new AtomicLong();
@@ -69,17 +68,12 @@ public class TaskManager {
     private EntityManager em;
     private NotificationServiceProxy ns;
 
-    public TaskManager(  EntityManager em, NotificationServiceProxy ns,QueueManager qm, Notification
notification, QueueResults queueResults) {
+    public TaskManager(  EntityManager em, NotificationServiceProxy ns,QueueManager qm, Notification
notification,String queuePath) {
         this.em = em;
         this.qm = qm;
         this.ns = ns;
-        this.path = queueResults.getPath();
+        this.path =queuePath;
         this.notification = notification;
-        this.remaining = new ConcurrentHashMap<UUID,Message>();
-        for (Message m : queueResults.getMessages()) {
-            remaining
-                    .put((UUID) m.getObjectProperty("deviceUUID"), m);
-        }
     }
 
     public void skip(UUID deviceUUID) throws Exception {
@@ -92,56 +86,51 @@ public class TaskManager {
     }
 
     public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID,
-            String newProviderId) throws Exception {
-
-            LOG.debug("REMOVED {}", deviceUUID);
-            try {
-                EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE,
-                        deviceUUID);
-
-                if (receipt != null) {
-                    LOG.debug("notification {} sent to device {}. saving receipt.",
-                            notification.getUuid(), deviceUUID);
-                    successes.incrementAndGet();
-                    receipt.setSent(System.currentTimeMillis());
-                    this.saveReceipt(notification, deviceRef, receipt);
-                    LOG.debug("notification {} receipt saved for device {}",
-                            notification.getUuid(), deviceUUID);
-                }
-
-                if (remaining.containsKey(deviceUUID)) {
-                    LOG.debug("notification {} removing device {} from remaining", notification.getUuid(),
deviceUUID);
-                    qm.commitTransaction(path, remaining.get(deviceUUID).getTransaction(),
null);
-                }
-
-                if (newProviderId != null) {
-                    LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(),
deviceUUID);
-                    replaceProviderId(deviceRef, notifier, newProviderId);
-                }
-
-                LOG.debug("notification {} completed device {}", notification.getUuid(),
deviceUUID);
-
-            } finally {
-                LOG.debug("COUNT is: {}", successes.get());
-                remaining.remove(deviceUUID);
-                // note: stats are transient for the duration of the batch
-                if (remaining.size() == 0) {
-                    long successesCopy = successes.get();
-                    long failuresCopy = failures.get();
-                    if (successesCopy > 0 || failuresCopy > 0 || skips.get()>0)
{
-                        ns.finishedBatch(notification, successesCopy, failuresCopy);
-                    }
-                }
+                          String newProviderId) throws Exception {
+
+        LOG.debug("REMOVED {}", deviceUUID);
+        try {
+            EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE,
+                    deviceUUID);
+
+            if (receipt != null) {
+                LOG.debug("notification {} sent to device {}. saving receipt.",
+                        notification.getUuid(), deviceUUID);
+                successes.incrementAndGet();
+                receipt.setSent(System.currentTimeMillis());
+                this.saveReceipt(notification, deviceRef, receipt);
+                LOG.debug("notification {} receipt saved for device {}",
+                        notification.getUuid(), deviceUUID);
+            }
+
+            LOG.debug("notification {} removing device {} from remaining", notification.getUuid(),
deviceUUID);
+            qm.commitTransaction(path, receipt.getMessage().getTransaction(), null);
 
+            if (newProviderId != null) {
+                LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(),
deviceUUID);
+                replaceProviderId(deviceRef, notifier, newProviderId);
             }
+
+            LOG.debug("notification {} completed device {}", notification.getUuid(), deviceUUID);
+
+        } finally {
+            LOG.debug("COUNT is: {}", successes.get());
+            // note: stats are transient for the duration of the batch
+//                if (remaining.size() == 0) {
+//                    long successesCopy = successes.get();
+//                    long failuresCopy = failures.get();
+//                    if (successesCopy > 0 || failuresCopy > 0 || skips.get()>0)
{
+//                        ns.finishedBatch(notification, successesCopy, failuresCopy);
+//                    }
+//                }
+
+        }
     }
 
     public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code,
-            String message) throws Exception {
+                       String message) throws Exception {
 
         try {
-
-
             if (LOG.isDebugEnabled()) {
                 StringBuilder sb = new StringBuilder();
                 sb.append("notification ").append(notification.getUuid());

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/610bdf35/stack/services/src/main/resources/usergrid-services-context.xml
----------------------------------------------------------------------
diff --git a/stack/services/src/main/resources/usergrid-services-context.xml b/stack/services/src/main/resources/usergrid-services-context.xml
index 39748e1..a3e662a 100644
--- a/stack/services/src/main/resources/usergrid-services-context.xml
+++ b/stack/services/src/main/resources/usergrid-services-context.xml
@@ -89,4 +89,6 @@
 
   <bean id="exportJob" class="org.apache.usergrid.management.export.ExportJob" />
 
+  <bean id="notificationsQueueListener" class="org.apache.usergrid.services.notifications.QueueListener"
scope="singleton"/>
+
 </beans>


Mime
View raw message