usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [3/3] git commit: single queue
Date Thu, 04 Sep 2014 16:26:56 GMT
single queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0a13340f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0a13340f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0a13340f

Branch: refs/heads/two-dot-o-single-queue
Commit: 0a13340f4130cab187d059d968560327d170b4a2
Parents: c4b1ae6
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Thu Sep 4 10:26:17 2014 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Thu Sep 4 10:26:17 2014 -0600

----------------------------------------------------------------------
 .../persistence/entities/Notification.java      |   9 +
 .../notifications/ApplicationQueueManager.java  | 558 +++++++++++++++++++
 .../notifications/ApplicationQueueMessage.java  |  93 ++++
 .../notifications/NotificationBatchJob.java     |  13 +-
 .../services/notifications/NotificationJob.java |  58 +-
 .../notifications/NotificationsService.java     |  30 +-
 .../notifications/NotificationsTaskManager.java |  33 ++
 .../services/notifications/QueueListener.java   | 195 +++++++
 .../services/notifications/QueueManager.java    |  33 ++
 .../notifications/SingleQueueTaskManager.java   | 224 ++++++++
 .../services/notifications/TaskManager.java     |   2 +-
 .../services/notifications/TaskTracker.java     |  15 +-
 .../resources/usergrid-services-context.xml     |   3 +
 .../AbstractServiceNotificationIT.java          |  12 +-
 .../apns/NotificationsServiceIT.java            | 231 ++++----
 .../gcm/NotificationsServiceIT.java             |  82 ++-
 16 files changed, 1383 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 7a643ac..d8e7242 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,6 +41,10 @@ public class Notification extends TypedEntity {
 
     public static final String RECEIPTS_COLLECTION = "receipts";
 
+    /** Total count */
+    @EntityProperty
+    protected int expectedCount;
+
     public static enum State {
         CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
     }
@@ -237,4 +241,9 @@ public class Notification extends TypedEntity {
     public void setQueued(Long queued) {
         this.queued = queued;
     }
+
+    public void setExpectedCount(int expectedCount) {  this.expectedCount = expectedCount;  }
+
+    @org.codehaus.jackson.map.annotate.JsonSerialize(include = org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion.NON_NULL)
+    public int getExpectedCount() {  return expectedCount;  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
new file mode 100644
index 0000000..c88702e
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.frequency.CountMinSketch;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.Device;
+import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.apns.APNsAdapter;
+import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Created by ApigeeCorporation on 8/27/14.
+ */
+public class ApplicationQueueManager implements QueueManager {
+    public static String QUEUE_NAME = "notifications/queuelistenerv1";
+    public static int BATCH_SIZE = 1000;
+
+    public static final long MESSAGE_TRANSACTION_TIMEOUT =  5 * 60 * 1000;
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
+
+    //need a mocking framework, this is to substitute for no mocking
+    public static PathQuery<Device> TEST_PATH_QUERY = null;
+
+    //this is for tests, will not mark initial post complete, set to false for tests
+    private final Meter sendMeter;
+    private final Histogram queueSize;
+    private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
+    public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
+
+    private final EntityManager em;
+    private final org.apache.usergrid.mq.QueueManager qm;
+    private final JobScheduler jobScheduler;
+
+    public final Map<String, ProviderAdapter> providerAdapters =   new HashMap<String, ProviderAdapter>(3);
+    {
+        providerAdapters.put("apple", APNS_ADAPTER);
+        providerAdapters.put("google", new GCMAdapter());
+        providerAdapters.put("noop", TEST_ADAPTER);
+    };
+    // these 2 can be static, but GCM can't. future: would be nice to get gcm
+    // static as well...
+    public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
+    public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
+
+    //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
+    private static LoadingCache<EntityManager, HashMap<Object,Notifier>> notifierCacheMap = CacheBuilder
+            .newBuilder()
+            .expireAfterWrite(90, TimeUnit.SECONDS)
+            .build(new CacheLoader<EntityManager, HashMap<Object, Notifier>>() {
+                @Override
+                public HashMap<Object, Notifier> load(EntityManager em) {
+                    HashMap<Object, Notifier> notifierHashMap = new HashMap<Object, Notifier>();
+                    Query query = new Query();
+                    query.setCollection("notifiers");
+                    query.setLimit(100);
+                    PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(new SimpleEntityRef( em.getApplicationRef() ), query);
+                    Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
+                    while (notifierIterator.hasNext()) {
+                        Notifier notifier = notifierIterator.next();
+                        String name = notifier.getName() != null ? notifier.getName() : "" ;
+                        UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
+                        notifierHashMap.put(name.toLowerCase(), notifier);
+                        notifierHashMap.put(uuid, notifier);
+                        notifierHashMap.put(uuid.toString(), notifier);
+                    }
+                    return notifierHashMap;
+                }
+            });;
+
+    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory){
+        this.em = entityManager;
+        this.qm = queueManager;
+        this.jobScheduler = jobScheduler;
+        this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
+        this.queueSize = metricsFactory.getHistogram(NotificationsService.class, "queue_size");
+    }
+
+    public static QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager) throws Exception {
+        QueueQuery qq = new QueueQuery();
+        qq.setLimit(BATCH_SIZE);
+        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
+        QueueResults results = queueManager.getFromQueue(QUEUE_NAME, qq);
+        LOG.debug("got batch of {} devices", results.size());
+        return results;
+    }
+
+    public boolean scheduleQueueJob(Notification notification) throws Exception{
+        return jobScheduler.scheduleQueueJob(notification);
+    }
+
+    public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+        if (notification.getCanceled() == Boolean.TRUE) {
+            LOG.info("notification " + notification.getUuid() + " canceled");
+            if (jobExecution != null) {
+                jobExecution.killed();
+            }
+            return;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LOG.info("notification {} start queuing", notification.getUuid());
+        final PathQuery<Device> pathQuery = TEST_PATH_QUERY == null ? notification.getPathQuery() : TEST_PATH_QUERY; //devices query
+        final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
+        final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+        final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
+        final Map<String,Object> payloads = notification.getPayloads();
+
+        //get devices in querystring, and make sure you have access
+        if (pathQuery != null) {
+            final Iterator<Device> iterator = pathQuery.iterator(em);
+            //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+                jobScheduler.scheduleQueueJob(notification, true);
+                return;
+            }
+            final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
+
+            rx.Observable.create(new IteratorObservable<Entity>(iterator)).parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
+                @Override
+                public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
+                    return deviceObservable.map(new Func1<Entity, Entity>() {
+                        @Override
+                        public Entity call(Entity entity) {
+                            try {
+                                List<EntityRef> devicesRef = getDevices(entity); // resolve group
+
+                                for (EntityRef deviceRef : devicesRef) {
+                                    long hash = MurmurHash.hash(deviceRef.getUuid());
+                                    if (sketch.estimateCount(hash) > 0) { //look for duplicates
+                                        LOG.debug("Maybe Found duplicate device: {}", deviceRef.getUuid());
+                                        continue;
+                                    } else {
+                                        sketch.add(hash, 1);
+                                    }
+                                    String notifierId = null;
+                                    String notifierKey = null;
+
+                                    //find the device notifier info, match it to the payload
+                                    for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+                                        Notifier notifier = notifierMap.get(entry.getKey().toLowerCase());
+                                        String providerId = getProviderId(deviceRef, notifier);
+                                        if (providerId != null) {
+                                            notifierId = providerId;
+                                            notifierKey = entry.getKey().toLowerCase();
+                                            break;
+                                        }
+                                    }
+
+                                    if (notifierId == null) {
+                                        LOG.debug("Notifier did not match for device {} ", deviceRef);
+                                        continue;
+                                    }
+
+                                    ApplicationQueueMessage message = new ApplicationQueueMessage(em.getApplication().getUuid(), notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+                                    qm.postToQueue(QUEUE_NAME, message);
+                                    if (notification.getQueued() == null) {
+                                        // update queued time
+                                        notification.setQueued(System.currentTimeMillis());
+                                        em.update(notification);
+                                    }
+                                    deviceCount.incrementAndGet();
+                                }
+
+                            } catch (Exception deviceLoopException) {
+                                LOG.error("Failed to add devices", deviceLoopException);
+                                errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+                            }
+                            return entity;
+                        }
+                    });
+                }
+            }, Schedulers.io())
+                    .doOnError(new Action1<Throwable>() {
+                        @Override
+                        public void call(Throwable throwable) {
+                            LOG.error("Failed while writing", throwable);
+                        }
+                    })
+                    .toBlocking()
+                    .lastOrDefault(null);
+        }
+
+        // update queued time
+        Map<String, Object> properties = new HashMap<String, Object>(2);
+        properties.put("queued", notification.getQueued());
+        properties.put("state", notification.getState());
+
+
+        if(errorMessages.size()>0){
+            if (notification.getErrorMessage() == null) {
+                notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
+            }
+        }
+
+        notification.setExpectedCount(deviceCount.get());
+        notification.addProperties(properties);
+        em.update(notification);
+
+        //do i have devices, and have i already started batching.
+        if (deviceCount.get() <= 0) {
+            SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this, notification);
+            //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
+            taskManager.finishedBatch();
+        }
+
+        if (LOG.isInfoEnabled()) {
+            long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
+            StringBuilder sb = new StringBuilder();
+            sb.append("notification ").append(notification.getUuid());
+            sb.append(" done queuing to ").append(deviceCount);
+            sb.append(" devices in ").append(elapsed).append(" ms");
+            LOG.info(sb.toString());
+        }
+
+    }
+
+
+    public HashMap<Object,Notifier> getNotifierMap(){
+        try{
+            HashMap<Object,Notifier> map = notifierCacheMap.get(em);
+            return map;
+        }catch (ExecutionException ee){
+            LOG.error("failed to get from cache",ee);
+            return new HashMap<Object, Notifier>();
+        }
+    }
+    private void clearNotifierMap(){
+        notifierCacheMap.invalidate(em);
+    }
+
+
+    /**
+     * send batches of notifications to provider
+     * @param messages
+     * @throws Exception
+     */
+    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages) {
+        final Map<Object, Notifier> notifierMap = getNotifierMap();
+        final QueueManager proxy = this;
+        final ConcurrentHashMap<UUID,SingleQueueTaskManager> taskMap = new ConcurrentHashMap<UUID, SingleQueueTaskManager>(messages.size());
+        final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
+
+        return rx.Observable
+                .from(messages)
+                .parallel(new Func1<rx.Observable<ApplicationQueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+                    @Override
+                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<ApplicationQueueMessage> messageObservable) {
+                        return messageObservable.map(new Func1<ApplicationQueueMessage, ApplicationQueueMessage>() {
+                            @Override
+                            public ApplicationQueueMessage call(ApplicationQueueMessage message) {
+                                try {
+                                    UUID deviceUUID = message.getUuid();
+                                    Notification notification = notificationMap.get(message.getNotificationId());
+                                    if (notification == null) {
+                                        notification = em.get(message.getNotificationId(), Notification.class);
+                                        notificationMap.put(message.getNotificationId(), notification);
+                                    }
+                                    SingleQueueTaskManager taskManager;
+                                    synchronized (taskMap) {
+                                        taskManager = taskMap.get(message.getNotificationId());
+                                        if (taskManager == null) {
+                                            taskManager = new SingleQueueTaskManager(em, qm, proxy, notification);
+                                            taskMap.put(message.getNotificationId(), taskManager);
+                                        }
+                                    }
+
+                                    final Map<String, Object> payloads = notification.getPayloads();
+                                    final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
+                                    LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
+                                    if(!isOkToSend(notification)){
+                                        return message;
+                                    }
+                                    taskManager.addMessage(deviceUUID,message);
+                                    try {
+                                        String notifierName = message.getNotifierKey().toLowerCase();
+                                        Notifier notifier = notifierMap.get(notifierName.toLowerCase());
+                                        Object payload = translatedPayloads.get(notifierName);
+                                        Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
+                                        TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+
+                                        if (payload == null) {
+                                            LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+                                            tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+
+                                        }else{
+                                            try {
+                                                ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+                                                providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
+                                            } catch (Exception e) {
+                                                tracker.failed(0, e.getMessage());
+                                            }
+                                        }
+
+                                    } finally {
+                                        sendMeter.mark();
+                                    }
+
+                                } catch (Exception e) {
+                                    LOG.error("Failure unknown",e);
+                                }
+                                return message;
+                            }
+                        });
+                    }
+                }, Schedulers.io())
+                .buffer(BATCH_SIZE)
+                .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
+                    @Override
+                    public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
+                        //for gcm this will actually send notification
+                        for (ProviderAdapter providerAdapter : providerAdapters.values()) {
+                            try {
+                                providerAdapter.doneSendingNotifications();
+                            } catch (Exception e) {
+                                LOG.error("providerAdapter.doneSendingNotifications: ", e);
+                            }
+                        }
+                        //TODO: check if a notification is done and mark it
+                        HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
+                        for (ApplicationQueueMessage message : queueMessages) {
+                            if (notifications.get(message.getNotificationId()) == null) {
+                                try {
+                                    SingleQueueTaskManager taskManager = taskMap.get(message.getNotificationId());
+                                    notifications.put(message.getNotificationId(), message);
+                                    taskManager.finishedBatch();
+                                } catch (Exception e) {
+                                    LOG.error("Failed to finish batch", e);
+                                }
+                            }
+
+                        }
+                        return notifications;
+                    }
+                })
+                .doOnError(new Action1<Throwable>() {
+                    @Override
+                    public void call(Throwable throwable) {
+                        LOG.error("Failed while sending",throwable);
+                    }
+                });
+
+
+    }
+
+
+    /**
+     * Call the adapter with the notifier
+     */
+    private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, Notifier> notifierMap) throws Exception {
+        Map<String, Object> translatedPayloads = new HashMap<String, Object>(
+                payloads.size());
+        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+            String payloadKey = entry.getKey().toLowerCase();
+            Object payloadValue = entry.getValue();
+            Notifier notifier = notifierMap.get(payloadKey);
+            if(notifier==null){
+                clearNotifierMap();
+                notifierMap = getNotifierMap();
+                notifier = notifierMap.get(payloadKey);
+            }
+            if (notifier != null) {
+                ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+                if (providerAdapter != null) {
+                    Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
+                    if (translatedPayload != null) {
+                        translatedPayloads.put(payloadKey, translatedPayload);
+                    }
+                }
+            }
+        }
+        return translatedPayloads;
+    }
+
+
+    private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
+        private final Iterator<T> input;
+        private IteratorObservable( final Iterator input ) {this.input = input;}
+
+        @Override
+        public void call( final Subscriber<? super T> subscriber ) {
+
+            /**
+             * You would replace this code with your file reading.  Instead of emitting from an iterator,
+             * you would create a bean object that represents the entity, and then emit it
+             */
+
+            try {
+                while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+                    //send our input to the next
+                    subscriber.onNext( (T) input.next() );
+                }
+
+                //tell the subscriber we don't have any more data
+                subscriber.onCompleted();
+            }
+            catch ( Throwable t ) {
+                LOG.error("failed on subscriber",t);
+                subscriber.onError( t );
+            }
+        }
+    }
+
+    public void asyncCheckForInactiveDevices(Set<Notifier> notifiers)  throws Exception {
+        for (final Notifier notifier : notifiers) {
+            INACTIVE_DEVICE_CHECK_POOL.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        checkForInactiveDevices(notifier);
+                    } catch (Exception e) {
+                        LOG.error("checkForInactiveDevices", e); // not
+                        // essential so
+                        // don't fail,
+                        // but log
+                    }
+                }
+            });
+        }
+    }
+
+    /** gets the list of inactive devices from the Provider and updates them */
+    private void checkForInactiveDevices(Notifier notifier) throws Exception {
+        ProviderAdapter providerAdapter = providerAdapters.get(notifier
+                .getProvider());
+        if (providerAdapter != null) {
+            LOG.debug("checking notifier {} for inactive devices", notifier);
+            Map<String, Date> inactiveDeviceMap = providerAdapter
+                    .getInactiveDevices(notifier, em);
+
+            if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
+                LOG.debug("processing {} inactive devices",
+                        inactiveDeviceMap.size());
+                Map<String, Object> clearPushtokenMap = new HashMap<String, Object>(
+                        2);
+                clearPushtokenMap.put(notifier.getName() + NOTIFIER_ID_POSTFIX,
+                        "");
+                clearPushtokenMap.put(notifier.getUuid() + NOTIFIER_ID_POSTFIX,
+                        "");
+
+                // todo: this could be done in a single query
+                for (Map.Entry<String, Date> entry : inactiveDeviceMap
+                        .entrySet()) {
+                    // name
+                    Query query = new Query();
+                    query.addEqualityFilter(notifier.getName()
+                            + NOTIFIER_ID_POSTFIX, entry.getKey());
+                    Results results = em.searchCollection(em.getApplication(),
+                            "devices", query);
+                    for (Entity e : results.getEntities()) {
+                        em.updateProperties(e, clearPushtokenMap);
+                    }
+                    // uuid
+                    query = new Query();
+                    query.addEqualityFilter(notifier.getUuid()
+                            + NOTIFIER_ID_POSTFIX, entry.getKey());
+                    results = em.searchCollection(em.getApplication(),
+                            "devices", query);
+                    for (Entity e : results.getEntities()) {
+                        em.updateProperties(e, clearPushtokenMap);
+                    }
+                }
+            }
+            LOG.debug("finished checking notifier {} for inactive devices",
+                    notifier);
+        }
+    }
+
+    private boolean isOkToSend(Notification notification) {
+        if (notification.getFinished() != null) {
+            LOG.info("notification {} already processed. not sending.",
+                    notification.getUuid());
+            return false;
+        }
+        if (notification.getCanceled() == Boolean.TRUE) {
+            LOG.info("notification {} canceled. not sending.",
+                    notification.getUuid());
+            return false;
+        }
+        if (notification.isExpired()) {
+            LOG.info("notification {} expired. not sending.",
+                    notification.getUuid());
+            return false;
+        }
+        return true;
+    }
+
+    private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+        List<EntityRef> devices = Collections.EMPTY_LIST;
+        if ("device".equals(ref.getType())) {
+            devices = Collections.singletonList(ref);
+        } else if ("user".equals(ref.getType())) {
+            devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
+                    Query.Level.REFS, false).getRefs();
+        } else if ("group".equals(ref.getType())) {
+            devices = new ArrayList<EntityRef>();
+            for (EntityRef r : em.getCollection(ref, "users", null,
+                    Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
+                devices.addAll(getDevices(r));
+            }
+        }
+        return devices;
+    }
+
+
+    private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
+        try {
+            Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
+            if (value == null) {
+                value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
+            }
+            return value != null ? value.toString() : null;
+        } catch (Exception e) {
+            LOG.error("Errer getting provider ID, proceding with rest of batch", e);
+            return null;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
new file mode 100644
index 0000000..f66fbf7
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.mq.Message;
+
+import java.util.UUID;
+
+/**
+ * Created by ApigeeCorporation on 9/4/14.
+ */
+public class ApplicationQueueMessage extends Message {
+
+
+    static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
+    static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
+    static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
+    static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
+    static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
+
+
+    public ApplicationQueueMessage() {
+    }
+
+    public ApplicationQueueMessage(UUID applicationId, UUID notificationId, UUID deviceId, String notifierKey, String notifierId) {
+        setApplicationId(applicationId);
+        setDeviceId(deviceId);
+        setNotificationId(notificationId);
+        setNotifierKey(notifierKey);
+        setNotifierId(notifierId);
+    }
+
+
+    public static ApplicationQueueMessage generate(Message message) {
+        return new ApplicationQueueMessage(UUID.fromString(message.getStringProperty(MESSAGE_PROPERTY_APPLICATION_UUID)), UUID.fromString(message.getStringProperty(MESSAGE_PROPERTY_NOTIFICATION_ID)), UUID.fromString(message.getStringProperty(MESSAGE_PROPERTY_DEVICE_UUID)), message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME), message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
+    }
+
+    public UUID getApplicationId() {
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
+    }
+
+    public void setApplicationId(UUID applicationId) {
+        this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID, applicationId);
+    }
+
+    public UUID getDeviceId() {
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+    }
+
+    public void setDeviceId(UUID deviceId) {
+        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID, deviceId);
+    }
+
+    public UUID getNotificationId() {
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
+    }
+
+    public void setNotificationId(UUID notificationId) {
+        this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID, notificationId);
+    }
+
+    public String getNotifierId() {
+        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
+    }
+
+    public void setNotifierId(String notifierId) {
+        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID, notifierId);
+    }
+
+    public String getNotifierKey() {
+        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+    }
+
+    public void setNotifierKey(String name) {
+        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME, name);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
index b9ea73e..5e7e568 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationBatchJob.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.services.notifications;
 
 
+import java.util.Properties;
 import java.util.UUID;
 
 import javax.annotation.PostConstruct;
@@ -59,6 +60,9 @@ public class NotificationBatchJob implements Job {
     @Autowired
     private EntityManagerFactory emf;
 
+    @Autowired
+    private Properties properties;
+
 
     public NotificationBatchJob() {
 
@@ -102,7 +106,14 @@ public class NotificationBatchJob implements Job {
 
 
             try {
-                notificationsService.getQueueManager().processBatchAndReschedule( notification, jobExecution );
+                NotificationsQueueManager queueManager = new NotificationsQueueManager(
+                        new JobScheduler(sm,em),
+                        em,
+                        properties,
+                        sm.getQueueManager(),
+                        metricsService
+                );
+                queueManager.processBatchAndReschedule(notification, jobExecution);
             }
             catch ( Exception e ) {
                 logger.error( "execute NotificationBatchJob failed", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
index 35051bc..372ad4e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationJob.java
@@ -46,19 +46,6 @@ public class NotificationJob extends OnlyOnceJob {
 
     private static final Logger logger = LoggerFactory.getLogger( NotificationJob.class );
 
-    @Autowired
-    private MetricsFactory metricsService;
-
-    @Autowired
-    private ServiceManagerFactory smf;
-
-    @Autowired
-    private EntityManagerFactory emf;
-    private Meter requests;
-    private Timer execution;
-    private Histogram end;
-
-
     public NotificationJob() {
 
     }
@@ -66,56 +53,13 @@ public class NotificationJob extends OnlyOnceJob {
 
     @PostConstruct
     void init() {
-        requests = metricsService.getMeter( NotificationJob.class, "requests" );
-        execution = metricsService.getTimer( NotificationJob.class, "execution" );
-        end = metricsService.getHistogram( QueueJob.class, "end" );
     }
 
 
     @Override
     public void doJob( JobExecution jobExecution ) throws Exception {
 
-        Timer.Context timer = execution.time();
-        requests.mark();
-
-        logger.info( "execute NotificationJob {}", jobExecution );
-
-        JobData jobData = jobExecution.getJobData();
-        UUID applicationId = ( UUID ) jobData.getProperty( "applicationId" );
-        ServiceManager sm = smf.getServiceManager( applicationId );
-        NotificationsService notificationsService = ( NotificationsService ) sm.getService( "notifications" );
-
-        EntityManager em = emf.getEntityManager( applicationId );
-        try {
-            if ( em == null ) {
-                logger.info( "no EntityManager for applicationId  {}", applicationId );
-                return;
-            }
-            UUID notificationId = ( UUID ) jobData.getProperty( "notificationId" );
-            Notification notification = em.get( notificationId, Notification.class );
-            if ( notification == null ) {
-                logger.info( "notificationId {} no longer exists", notificationId );
-                return;
-            }
-
-            try {
-                notificationsService.getQueueManager().processBatchAndReschedule( notification, jobExecution );
-            }
-            catch ( Exception e ) {
-                logger.error( "execute NotificationJob failed", e );
-                em.setProperty( notification, "errorMessage", e.getMessage() );
-                throw e;
-            }
-            finally {
-                long diff = System.currentTimeMillis() - notification.getCreated();
-                end.update( diff );
-            }
-        }
-        finally {
-            timer.stop();
-        }
-
-        logger.info( "execute NotificationJob completed normally" );
+        logger.info( "execute NotificationJob is deprecated " );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index e347f98..5a64cbc 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -39,6 +39,7 @@ import static org.apache.usergrid.utils.InflectionUtils.pluralize;
 
 import org.apache.usergrid.services.notifications.apns.APNsAdapter;
 import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
+import org.springframework.beans.factory.annotation.Autowired;
 import rx.Observable;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -79,8 +80,12 @@ public class NotificationsService extends AbstractCollectionService {
         providerAdapters.put("noop", TEST_ADAPTER);
     }
 
-    private NotificationsQueueManager notificationQueueManager;
+    private ApplicationQueueManager notificationQueueManager;
     private long gracePeriod;
+    @Autowired
+    private ServiceManagerFactory smf;
+    @Autowired
+    private EntityManagerFactory emf;
 
     public NotificationsService() {
         LOG.info("/notifications");
@@ -89,6 +94,9 @@ public class NotificationsService extends AbstractCollectionService {
     @Override
     public void init( ServiceInfo info ) {
         super.init(info);
+        smf = getApplicationContext().getBean(ServiceManagerFactory.class);
+        emf = getApplicationContext().getBean(EntityManagerFactory.class);
+
         metricsService = getApplicationContext().getBean(MetricsFactory.class);
         sendMeter = metricsService.getMeter(NotificationsService.class, "send");
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
@@ -96,14 +104,15 @@ public class NotificationsService extends AbstractCollectionService {
         queueSize = metricsService.getHistogram(NotificationsService.class, "queue_size");
         outstandingQueue = metricsService.getCounter(NotificationsService.class,"current_queue");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
-        notificationQueueManager = new NotificationsQueueManager(jobScheduler,em,sm.getProperties(),sm.getQueueManager(),metricsService);
+        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,smf.getServiceManager(smf.getManagementAppId()).getQueueManager(),metricsService);
         gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
     }
 
-    public NotificationsQueueManager getQueueManager(){
+    public ApplicationQueueManager getQueueManager(){
         return notificationQueueManager;
     }
 
+
     @Override
     public ServiceContext getContext(ServiceAction action,
             ServiceRequest request, ServiceResults previousResults,
@@ -206,7 +215,7 @@ public class NotificationsService extends AbstractCollectionService {
         Long deliver = (Long) payload.getProperty("deliver");
         if (deliver != null) {
             if (!deliver.equals(notification.getDeliver())) {
-                notificationQueueManager.processBatchAndReschedule((Notification) response, null);
+                notificationQueueManager.queueNotification((Notification) response, null);
             }
         }
         return response;
@@ -327,4 +336,17 @@ public class NotificationsService extends AbstractCollectionService {
             providerAdapter.testConnection(notifier);
         }
     }
+
+
+    public ServiceManagerFactory getServiceManagerFactory(){
+        return this.smf;
+    }
+    public EntityManagerFactory getEntityManagerFactory(){
+        return this.emf;
+    }
+
+
+    public MetricsFactory getMetricsFactory() {
+        return metricsService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsTaskManager.java
new file mode 100644
index 0000000..0869f73
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsTaskManager.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.entities.Receipt;
+
+import java.util.UUID;
+
+public interface NotificationsTaskManager {
+
+    void completed(Notifier notifier, Receipt receipt, UUID deviceUUID,
+                   String newProviderId) throws Exception;
+
+    void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code,
+                String message) throws Exception;
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
new file mode 100644
index 0000000..0ff8482
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.mq.*;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.observables.GroupedObservable;
+import rx.schedulers.Schedulers;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueueListener  {
+    public static int MAX_CONSECUTIVE_FAILS = 10;
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
+
+    @Autowired
+    private MetricsFactory metricsService;
+
+    @Autowired
+    private ServiceManagerFactory smf;
+
+    @Autowired
+    private EntityManagerFactory emf;
+
+    @Autowired
+    private Properties properties;
+
+    private org.apache.usergrid.mq.QueueManager queueManager;
+
+    private ServiceManager svcMgr;
+
+    private long sleepPeriod = 5000;
+
+    ExecutorService pool;
+    List<Future> futures;
+
+    public static final String MAX_THREADS = "1";
+
+    public QueueListener() {
+        pool = Executors.newFixedThreadPool(1);
+    }
+    public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+        this();
+        this.smf = smf;
+        this.emf = emf;
+        this.metricsService = metricsService;
+        this.properties = props;
+    }
+
+    @PostConstruct
+    void init() {
+        run();
+    }
+
+    public void run(){
+        int threadCount = 0;
+        try {
+            sleepPeriod = new Long(properties.getProperty("usergrid.notifications.listener.sleep", "5000")).longValue();
+            int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
+            futures = new ArrayList<Future>(maxThreads);
+            while (threadCount++ < maxThreads) {
+                futures.add(
+                        pool.submit(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    execute();
+                                } catch (Exception e) {
+                                    LOG.error("failed to start push", e);
+                                }
+                            }
+                        })
+                );
+            }
+        }catch (Exception e){
+            LOG.error("QueueListener failed to start:", e);
+        }
+    }
+
+    private void execute(){
+        svcMgr = smf.getServiceManager(new UUID( 0, 1 ));
+        queueManager = svcMgr.getQueueManager();
+        final AtomicInteger consecutiveExceptions = new AtomicInteger();
+        // run until there are no more active jobs
+        while ( true ) {
+            try {
+                QueueResults results = ApplicationQueueManager.getDeliveryBatch(queueManager);
+                List<Message> messages = results.getMessages();
+                if(messages.size()>0) {
+                    Observable.from(messages) //observe all messages
+                            .subscribeOn(Schedulers.io())
+                            .map(new Func1<Message, ApplicationQueueMessage>() { //map a message to a typed message
+                                @Override
+                                public ApplicationQueueMessage call(Message message) {
+                                    return ApplicationQueueMessage.generate(message);
+                                }
+                            })
+                            .groupBy(new Func1<ApplicationQueueMessage, UUID>() { //group all of the messages together by app id
+                                @Override
+                                public UUID call(ApplicationQueueMessage message) {
+                                    return message.getApplicationId();
+                                }
+                            })
+                            .flatMap(new Func1<GroupedObservable<UUID, ApplicationQueueMessage>, Observable<?>>() { //take the observable and buffer in
+                                @Override
+                                public Observable<?> call(GroupedObservable<UUID, ApplicationQueueMessage> groupedObservable) {
+                                    UUID appId = groupedObservable.getKey();
+                                    EntityManager entityManager = emf.getEntityManager(appId);
+                                    ServiceManager serviceManager = smf.getServiceManager(appId);
+                                    final ApplicationQueueManager manager = new ApplicationQueueManager(
+                                            new JobScheduler(serviceManager, entityManager),
+                                            entityManager,
+                                            queueManager,
+                                            metricsService
+                                    );
+
+                                    return groupedObservable //buffer all of your notifications into a sender and send.
+                                            .buffer(ApplicationQueueManager.BATCH_SIZE)
+                                            .flatMap(new Func1<List<ApplicationQueueMessage>, Observable<?>>() {
+                                                @Override
+                                                public Observable<?> call(List<ApplicationQueueMessage> queueMessages) {
+                                                    return manager.sendBatchToProviders(queueMessages);
+                                                }
+                                            });
+                                }
+                            })
+                            .doOnError(new Action1<Throwable>() {
+                                @Override
+                                public void call(Throwable throwable) {
+                                    LOG.error("Failed while listening",throwable);
+                                }
+                            })
+                            .toBlocking()
+                            .last();
+                    LOG.info("Messages sent in batch");
+
+                }
+                else{
+                    Thread.sleep(sleepPeriod);
+                }
+                //send to the providers
+                consecutiveExceptions.set(0);
+            }catch (Exception ex){
+                LOG.error("failed to dequeue",ex);
+                if(consecutiveExceptions.getAndIncrement() > MAX_CONSECUTIVE_FAILS){
+                    LOG.error("killing message listener; too many failures");
+                    break;
+                }
+            }
+        }
+    }
+
+    public void stop(){
+        for(Future future : futures){
+            future.cancel(true);
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
new file mode 100644
index 0000000..3abe6b2
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.entities.Notifier;
+
+import java.util.HashMap;
+import java.util.Set;
+
+/**
+ * Created by ApigeeCorporation on 9/4/14.
+ */
+public interface QueueManager {
+
+    public HashMap<Object,Notifier> getNotifierMap();
+
+    public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
new file mode 100644
index 0000000..9f0a0e6
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.entities.Device;
+import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.entities.Receipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SingleQueueTaskManager implements NotificationsTaskManager {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TaskManager.class);
+    private final String path;
+    private final QueueManager proxy;
+
+    private Notification notification;
+    private AtomicLong successes = new AtomicLong();
+    private AtomicLong failures = new AtomicLong();
+    private org.apache.usergrid.mq.QueueManager qm;
+    private EntityManager em;
+    private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
+    private boolean hasFinished;
+
+    public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification) {
+        this.em = em;
+        this.qm = qm;
+        this.path = ApplicationQueueManager.QUEUE_NAME;
+        this.notification = notification;
+        this.proxy = proxy;
+        this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
+        hasFinished = false;
+    }
+
+    public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
+        messageMap.put(deviceId, message);
+    }
+
+    public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID, String newProviderId) throws Exception {
+        LOG.debug("REMOVED {}", deviceUUID);
+        try {
+            EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
+            if (receipt != null) {
+                LOG.debug("notification {} sent to device {}. saving receipt.", notification.getUuid(), deviceUUID);
+                receipt.setSent(System.currentTimeMillis());
+                this.saveReceipt(notification, deviceRef, receipt);
+                LOG.debug("notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
+                successes.incrementAndGet();
+            }
+
+            LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
+            qm.commitTransaction(path, messageMap.get(deviceUUID).getTransaction(), null);
+            if (newProviderId != null) {
+                LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
+                replaceProviderId(deviceRef, notifier, newProviderId);
+            }
+
+            LOG.debug("notification {} completed device {}", notification.getUuid(), deviceUUID);
+
+        } finally {
+            LOG.debug("COUNT is: {}", successes.get());
+            if (hasFinished) { //process has finished but notifications are still coming in
+                finishedBatch();
+
+            }
+        }
+    }
+
+    public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code, String message) throws Exception {
+
+        try {
+            if (LOG.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("notification ").append(notification.getUuid());
+                sb.append(" for device ").append(deviceUUID);
+                sb.append(" got error ").append(code);
+                LOG.debug(sb.toString());
+            }
+
+            failures.incrementAndGet();
+            if (receipt.getUuid() != null) {
+                successes.decrementAndGet();
+            }
+            receipt.setErrorCode(code);
+            receipt.setErrorMessage(message);
+            this.saveReceipt(notification, new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID), receipt);
+            LOG.debug("notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
+        } finally {
+            completed(notifier, null, deviceUUID, null);
+        }
+    }
+
+    /*
+    * called from TaskManager - creates a persistent receipt and updates the
+    * passed one w/ the UUID
+    */
+    private void saveReceipt(EntityRef notification, EntityRef device, Receipt receipt) throws Exception {
+        if (receipt.getUuid() == null) {
+            Receipt savedReceipt = em.create(receipt);
+            receipt.setUuid(savedReceipt.getUuid());
+
+            List<EntityRef> entities = Arrays.asList(notification, device);
+            em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
+        } else {
+            em.update(receipt);
+        }
+    }
+
+    protected void replaceProviderId(EntityRef device, Notifier notifier,
+                                     String newProviderId) throws Exception {
+        Object value = em.getProperty(device, notifier.getName()
+                + NotificationsService.NOTIFIER_ID_POSTFIX);
+        if (value != null) {
+            em.setProperty(device, notifier.getName() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+        } else {
+            value = em.getProperty(device, notifier.getUuid()
+                    + NotificationsService.NOTIFIER_ID_POSTFIX);
+            if (value != null) {
+                em.setProperty(device, notifier.getUuid() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+            }
+        }
+    }
+
+    public void finishedBatch() throws Exception {
+        synchronized (this) { //avoid issues with counting
+            long successes = this.successes.getAndSet(0); //reset counters
+            long failures = this.failures.getAndSet(0); //reset counters
+            this.hasFinished = true;
+
+            // refresh notification
+            Notification notification = em.get(this.notification.getUuid(), Notification.class);
+            notification.setModified(System.currentTimeMillis());
+
+            Map<String, Object> properties;
+            Map<String, Long> stats;
+            String statsKey = "statistics_batch";
+
+            //write out current results to a set so no overlap in multiple writes will occur
+            if (successes + failures > 0) {
+                properties = new HashMap<String, Object>(4);
+                stats = new HashMap<String, Long>(2);
+                stats.put("sent", successes);
+                stats.put("errors", failures);
+                properties.put(statsKey + "_" + System.currentTimeMillis(), stats);
+                properties.put("modified", notification.getModified());
+                em.updateProperties(notification, properties);
+            }
+
+            //resum the stats
+            properties = em.getProperties(notification); // re-read
+            long sent = 0;
+            long errors = 0;
+            for (String key : properties.keySet()) {
+                if (key.contains(statsKey)) {
+                    stats = (Map<String, Long>) properties.get(key);
+                    sent += stats.get("sent");
+                    errors += stats.get("errors");
+                }
+            }
+
+            //and write them out again, this will produce the most accurate count
+            stats = new HashMap<String, Long>(2);
+            stats.put("sent", sent);
+            stats.put("errors", errors);
+            notification.setStatistics(stats);
+
+            if (LOG.isInfoEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("notification ").append(notification.getUuid());
+                sb.append(" sending to ").append(sent + errors);
+                LOG.info(sb.toString());
+            }
+
+            //none of this is known and should you ever do this
+            if (notification.getExpectedCount() <= (errors + sent)) {
+                notification.setFinished(notification.getModified());
+                properties.put("finished", notification.getModified());
+                properties.put("state", notification.getState());
+
+                if (LOG.isInfoEnabled()) {
+                    long elapsed = notification.getFinished() - notification.getStarted();
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("done sending to devices in ").append(elapsed).append(" ms");
+                    LOG.info(sb.toString());
+                }
+            }
+
+            LOG.info("notification finished batch: {}", notification.getUuid());
+            em.updateProperties(notification, properties);
+            em.update(notification);
+        }
+
+        Set<Notifier> notifiers = new HashSet<Notifier>(proxy.getNotifierMap().values()); // remove dups
+        proxy.asyncCheckForInactiveDevices(notifiers);
+    }
+
+
+    protected void hasFinished(boolean hasFinished) {
+        this.hasFinished = hasFinished;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 85c4a16..9a5b632 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -37,7 +37,7 @@ import org.apache.usergrid.persistence.entities.Device;
  * mean that all work is done, however, as delivery errors may come in after a
  * notification is "sent."
  */
-public class TaskManager {
+public class TaskManager implements NotificationsTaskManager {
 
     // period to poll for batch completion (keep well under Job scheduler
     // heartbeat value!)

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
index 1ae0591..322c4bf 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
@@ -22,29 +22,30 @@ import org.apache.usergrid.persistence.entities.Receipt;
 
 import java.util.UUID;
 
+
 public class TaskTracker {
 
     private Notifier notifier;
-    private TaskManager taskManager;
+    private NotificationsTaskManager taskManager;
     private Receipt receipt;
-    private UUID id;
+    private UUID deviceId;
 
-    public TaskTracker(Notifier notifier, TaskManager taskManager, Receipt receipt, UUID id) {
+    public TaskTracker(Notifier notifier, NotificationsTaskManager taskManager, Receipt receipt, UUID deviceId) {
         this.notifier = notifier;
         this.taskManager = taskManager;
         this.receipt = receipt;
-        this.id = id;
+        this.deviceId = deviceId;
     }
 
     public void completed() throws Exception {
-        taskManager.completed(notifier, receipt, id, null);
+        taskManager.completed(notifier, receipt, deviceId, null);
     }
 
     public void failed(Object code, String message) throws Exception {
-        taskManager.failed(notifier, receipt, id, code, message);
+        taskManager.failed(notifier, receipt, deviceId, code, message);
     }
 
     public void completed(String newToken) throws Exception {
-        taskManager.completed(notifier, receipt, id, newToken);
+        taskManager.completed(notifier, receipt, deviceId, newToken);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/main/resources/usergrid-services-context.xml
----------------------------------------------------------------------
diff --git a/stack/services/src/main/resources/usergrid-services-context.xml b/stack/services/src/main/resources/usergrid-services-context.xml
index 39748e1..be47f08 100644
--- a/stack/services/src/main/resources/usergrid-services-context.xml
+++ b/stack/services/src/main/resources/usergrid-services-context.xml
@@ -89,4 +89,7 @@
 
   <bean id="exportJob" class="org.apache.usergrid.management.export.ExportJob" />
 
+  <bean id="notificationsQueueListener" class="org.apache.usergrid.services.notifications.QueueListener" scope="singleton"/>
+
+
 </beans>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0a13340f/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index bfa52e7..7c68b7f 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -20,6 +20,7 @@ import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.ServiceManagerFactory;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -30,14 +31,20 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.usergrid.services.AbstractServiceIT;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
+
 public class AbstractServiceNotificationIT extends AbstractServiceIT {
     private NotificationsService ns;
+    @Autowired
+    private ServiceManagerFactory smf;
 
+    @Autowired
+    private EntityManagerFactory emf;
     @Rule
     public TestName name = new TestName();
 
@@ -57,7 +64,6 @@ public class AbstractServiceNotificationIT extends AbstractServiceIT {
 
     protected Notification scheduleNotificationAndWait(Notification notification)
             throws Exception {
-        getNotificationService().getQueueManager().processBatchAndReschedule(notification,null);
         long timeout = System.currentTimeMillis() + 60000;
         while (System.currentTimeMillis() < timeout) {
             Thread.sleep(200);
@@ -79,7 +85,7 @@ public class AbstractServiceNotificationIT extends AbstractServiceIT {
         List<EntityRef> list =new ArrayList<EntityRef>();//get all
         PagingResultsIterator it = new PagingResultsIterator(r);
         while(it.hasNext()){
-           list.add((EntityRef)it.next());
+            list.add((EntityRef)it.next());
         }
         return list;
     }
@@ -92,7 +98,7 @@ public class AbstractServiceNotificationIT extends AbstractServiceIT {
             Thread.sleep(200);
             receipts =getNotificationReceipts(notification);
             if (receipts.size()==expected) {
-                 break;
+                break;
             }
         }
         assertEquals(expected, receipts.size());


Mime
View raw message