usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject usergrid git commit: Add caching to queue manager so we don't create many instances of local or SNS queue manager impls for the same exact queue. Enhance the node.js integration tests.
Date Fri, 05 Feb 2016 23:28:08 GMT
Repository: usergrid
Updated Branches:
  refs/heads/master 9b8d1dc39 -> 853d6486f


Add caching to queue manager so we don't create many instances of local or SNS queue manager impls for the same exact queue.  Enhance the node.js integration tests.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/853d6486
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/853d6486
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/853d6486

Branch: refs/heads/master
Commit: 853d6486f82c51610bec6dc52d6fbde2cfe2da1a
Parents: 9b8d1dc
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Fri Feb 5 15:28:06 2016 -0800
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Fri Feb 5 15:28:06 2016 -0800

----------------------------------------------------------------------
 .../persistence/entities/Notification.java      |   2 +-
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../queue/impl/QueueManagerFactoryImpl.java     |  51 ++-
 .../queue/impl/SNSQueueManagerImpl.java         |   4 +-
 .../notifications/ApplicationQueueManager.java  |   9 +-
 .../services/notifications/TaskManager.java     |  68 ++--
 .../impl/ApplicationQueueManagerImpl.java       | 389 +++++++++----------
 .../gcm/NotificationsServiceIT.java             | 114 +++---
 tests/integration/lib/entities.js               |   2 +-
 tests/integration/lib/notifications.js          |   1 +
 tests/integration/lib/notifiers.js              |  43 ++
 tests/integration/test/groups/groups.js         |  55 +--
 tests/integration/test/main.js                  |   5 +-
 .../test/notifications/notifications.js         | 313 +++++++++++++++
 tests/integration/test/teardown.js              |  18 +-
 15 files changed, 712 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 34c7758..5c3ee89 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -90,7 +90,7 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected String priority;
 
-    /** Error messages that may have been encounted by Usergrid when trying to process the notification */
+    /** Error messages that may have been encountered by Usergrid when trying to process the notification */
     @EntityProperty
     protected String errorMessage;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index ad38f6d..cdab3e0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -72,7 +72,7 @@ public interface QueueFig extends GuicyFig {
 
     // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
     @Key("usergrid.queue.publish.queuesize")
-    @Default("850000")
+    @Default("250000")
     int getAsyncQueueSize();
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index 0f78678..de9cac5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -19,12 +19,18 @@
  */
 package org.apache.usergrid.persistence.queue.impl;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.queue.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 /**
  * manages whether we take in an external in memory override for queues.
@@ -32,10 +38,36 @@ import java.util.Map;
 @Singleton
 public class QueueManagerFactoryImpl implements QueueManagerFactory {
 
+    private static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class );
 
     private final QueueFig queueFig;
     private final QueueManagerInternalFactory queuemanagerInternalFactory;
     private final Map<String,QueueManager> defaultManager;
+    private final LoadingCache<QueueScope, QueueManager> queueManager =
+        CacheBuilder
+            .newBuilder()
+            .initialCapacity(5)
+            .maximumSize(100)
+            .build(new CacheLoader<QueueScope, QueueManager>() {
+
+                @Override
+                public QueueManager load( QueueScope scope ) throws Exception {
+
+                    if ( queueFig.overrideQueueForDefault() ){
+
+                        QueueManager manager = defaultManager.get( scope.getName() );
+                        if ( manager == null ) {
+                            manager = new LocalQueueManager();
+                            defaultManager.put( scope.getName(), manager );
+                        }
+                        return manager;
+
+                    } else {
+                        return queuemanagerInternalFactory.getQueueManager(scope);
+                    }
+
+                }
+            });
 
     @Inject
     public QueueManagerFactoryImpl(final QueueFig queueFig, final QueueManagerInternalFactory queuemanagerInternalFactory){
@@ -43,17 +75,18 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory {
         this.queuemanagerInternalFactory = queuemanagerInternalFactory;
         this.defaultManager = new HashMap<>(10);
     }
+
     @Override
     public QueueManager getQueueManager(QueueScope scope) {
-        if(queueFig.overrideQueueForDefault()){
-            QueueManager manager = defaultManager.get(scope.getName());
-            if(manager==null){
-                manager = new LocalQueueManager();
-                defaultManager.put(scope.getName(),manager);
-            }
-            return manager;
-        }else{
-            return queuemanagerInternalFactory.getQueueManager(scope);
+
+        try {
+            return queueManager.get(scope);
+
+        } catch (ExecutionException e) {
+
+            logger.error("Unable to load or retrieve queue manager from cache for queue {}", scope.getName());
+            throw new RuntimeException(e);
         }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 3a1f045..8a503a5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -564,7 +564,8 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
             return;
         }
-
+        final long startSend = System.currentTimeMillis();
+        logger.info("starting send message");
         final String stringBody = toString( body );
 
         String url = getReadQueue().getUrl();
@@ -575,6 +576,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         SendMessageRequest request = new SendMessageRequest( url, stringBody );
 
+        logger.info("now sending.  time spent since starting to send in ms: {}", System.currentTimeMillis() - startSend);
         sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 6bbd117..3f0ca69 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -32,14 +32,15 @@ import java.util.List;
  */
 public interface ApplicationQueueManager {
 
-    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
+    String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
 
-    public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
+    String NOTIFIER_ID_POSTFIX = ".notifier.id";
 
-    public static final  String DEFAULT_QUEUE_NAME = "push"; //keep this short as AWS limits queue name size to 80 chars
+    String DEFAULT_QUEUE_NAME = "push"; //keep this short as AWS limits queue name size to 80 chars
 
     /**
      * send notification to queue
+     *
      * @param notification
      * @param jobExecution
      * @throws Exception
@@ -48,6 +49,7 @@ public interface ApplicationQueueManager {
 
     /**
      * send notifications to providers
+     *
      * @param messages
      * @param queuePath
      * @return
@@ -61,6 +63,7 @@ public interface ApplicationQueueManager {
 
     /**
      * check for inactive devices, apple and google require this
+     *
      * @throws Exception
      */
     void asyncCheckForInactiveDevices() throws Exception;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 4f051e6..950447a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -89,10 +89,10 @@ public class TaskManager {
             if (logger.isTraceEnabled()) {
                 logger.trace("COUNT is: {}", successes.get());
             }
-            if (hasFinished) { //process has finished but notifications are still coming in
-                finishedBatch();
-
-            }
+//            if (hasFinished) { //process has finished but notifications are still coming in
+//                finishedBatch();
+//
+//            }
         }
     }
 
@@ -115,6 +115,7 @@ public class TaskManager {
             }
         } finally {
             completed(notifier, deviceUUID);
+            finishedBatch();
         }
     }
 
@@ -128,7 +129,7 @@ public class TaskManager {
                 Receipt savedReceipt = em.create(receipt);
                 receipt.setUuid(savedReceipt.getUuid());
                 List<EntityRef> entities = Arrays.asList(notification, device);
-//              em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
+                em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
             } else {
                 em.update(receipt);
             }
@@ -150,51 +151,34 @@ public class TaskManager {
             }
         }
     }
+
     public void finishedBatch() throws Exception {
-        finishedBatch(true,false);
+        finishedBatch(true);
     }
-    public void finishedBatch(boolean fetch, boolean force) throws Exception {
-
-        if (notification.getDebug() || getFailures() > 0 || force) {
-            long successes = this.successes.get(); //reset counters
-            long failures = this.failures.get(); //reset counters
-
-            for (int i = 0; i < successes; i++) {
-                this.successes.decrementAndGet();
-            }
 
-            for (int i = 0; i < failures; i++) {
-                this.failures.decrementAndGet();
-            }
-
-            this.hasFinished = true;
+    public void finishedBatch(boolean refreshNotification) throws Exception {
 
-            // refresh notification
-            if (fetch)
-                notification = em.get(this.notification.getUuid(), Notification.class);
+        long successes = this.successes.get(); //reset counters
+        long failures = this.failures.get(); //reset counters
 
-            //and write them out again, this will produce the most accurate count
-            Map<String, Long> stats = new HashMap<>(2);
-            stats.put("sent", successes);
-            stats.put("errors", failures);
-            notification.updateStatistics(successes, failures);
+        for (int i = 0; i < successes; i++) {
+            this.successes.decrementAndGet();
+        }
+        for (int i = 0; i < failures; i++) {
+            this.failures.decrementAndGet();
+        }
 
-            long totals = (notification.getStatistics().get("sent") + notification.getStatistics().get("errors"));
-            //none of this is known and should you ever do this
-            notification.setModified(System.currentTimeMillis());
-            notification.setFinished(notification.getModified());
+        this.hasFinished = true;
 
-            Map<String, Object> properties = new HashMap<>();
-            properties.put("finished", notification.getModified());
-            properties.put("state", notification.getState());
-            notification.addProperties(properties);
+        // force refresh notification by fetching it
+        if (refreshNotification) {
+            notification = em.get(this.notification.getUuid(), Notification.class);
+        }
 
-            long latency = notification.getFinished() - notification.getStarted();
-            logger.info("notification finished batch: {} of {} devices in {} ms", notification.getUuid(), totals, latency);
+        notification.updateStatistics(successes, failures);
+        notification.setModified(System.currentTimeMillis());
+        notification.setFinished(notification.getModified());
 
-            em.update(notification);
-//        Set<Notifier> notifiers = new HashSet<>(proxy.getAdapterMap().values()); // remove dups
-//        proxy.asyncCheckForInactiveDevices(notifiers);
-        }
+        em.update(notification);
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index c956417..d0f8ca8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -56,7 +56,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
 
 
-    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties) {
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
@@ -67,13 +67,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
     }
 
-    private boolean scheduleQueueJob(Notification notification) throws Exception{
+    private boolean scheduleQueueJob(Notification notification) throws Exception {
         return jobScheduler.scheduleQueueJob(notification);
     }
 
     @Override
     public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
-        if(scheduleQueueJob(notification)){
+        if (scheduleQueueJob(notification)) {
             em.update(notification);
             return;
         }
@@ -94,110 +94,97 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             logger.trace("notification {} start queuing", notification.getUuid());
         }
 
-        final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery() ; //devices query
+        final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
-        final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+        final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues
 
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
-            final HashMap<Object,ProviderAdapter> notifierMap =  getAdapterMap();
+            final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
             final Iterator<Device> iterator = pathQuery.iterator(em);
+
             //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
             if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+                if(logger.isTraceEnabled()){
+                    logger.trace("Scheduling notification job as it has multiple pages of devices.");
+                }
                 jobScheduler.scheduleQueueJob(notification, true);
                 em.update(notification);
                 return;
             }
             final UUID appId = em.getApplication().getUuid();
-            final Map<String,Object> payloads = notification.getPayloads();
-
-            final Func1<Entity,Entity> entityListFunct = entity -> {
+            final Map<String, Object> payloads = notification.getPayloads();
 
+            final Func1<EntityRef, EntityRef> sendMessageFunction = deviceRef -> {
                 try {
 
                     long now = System.currentTimeMillis();
-                    List<EntityRef> devicesRef = getDevices(entity); // resolve group
 
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("notification {} queue  {} devices, duration {} ms", notification.getUuid(), devicesRef.size(), (System.currentTimeMillis() - now));
-                    }
+                    String notifierId = null;
+                    String notifierKey = null;
 
-                    for (EntityRef deviceRef : devicesRef) {
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+                    //find the device notifier info, match it to the payload
+                    for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+                        ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+                        now = System.currentTimeMillis();
+                        String providerId = getProviderId(deviceRef, adapter.getNotifier());
+                        if (providerId != null) {
+                            notifierId = providerId;
+                            notifierKey = entry.getKey().toLowerCase();
+                            break;
                         }
-                        String notifierId = null;
-                        String notifierKey = null;
-
-                        //find the device notifier info, match it to the payload
-                        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
-                            ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
-                            now = System.currentTimeMillis();
-                            String providerId = getProviderId(deviceRef, adapter.getNotifier());
-                            if (providerId != null) {
-                                notifierId = providerId;
-                                notifierKey = entry.getKey().toLowerCase();
-                                break;
-                            }
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now));
-                            }
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now));
                         }
+                    }
 
-                        if (notifierId == null) {
-                            logger.info("Notifier did not match for device {} ", deviceRef);
-                            continue;
-                        }
+                    if (notifierId == null) {
+                        return deviceRef;
+                    }
+
+                    ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+                    if (notification.getQueued() == null) {
+
+                        // update queued time
+                        notification.setQueued(System.currentTimeMillis());
 
-                        ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
-                        if (notification.getQueued() == null) {
-                            // update queued time
-                            now = System.currentTimeMillis();
-                            notification.setQueued(System.currentTimeMillis());
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("notification {} device {} queue time set. duration {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now));
-                            }
-                        }
-                        now = System.currentTimeMillis();
-                        qm.sendMessage(message);
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("notification {} post-queue to device {} duration {} ms, {} queue", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now), queueName);
-                        }
-                        deviceCount.incrementAndGet();
-                        queueMeter.mark();
                     }
+                    qm.sendMessage(message);
+                    deviceCount.incrementAndGet();
+                    queueMeter.mark();
+
+
                 } catch (Exception deviceLoopException) {
-                    logger.error("Failed to add devices", deviceLoopException);
-                    errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+                    logger.error("Failed to add device", deviceLoopException);
+                    errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);
                 }
-                return entity;
+                return deviceRef;
             };
 
-            long now = System.currentTimeMillis();
 
 
             //process up to 10 concurrently
-            Observable o = rx.Observable.create( new IteratorObservable<Entity>( iterator ) )
-                .distinct( entity -> entity.getUuid() )
-                .flatMap(entity ->
-                    Observable.just(entity).map(entityListFunct)
-                        .doOnError(throwable -> logger.error("Failed while writing", throwable)) , 10);
+            Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
+                .flatMap(entity -> {
+                    return Observable.from(getDevices(entity));
+                }, 10)
+                .distinct(ref -> ref.getUuid())
+                .map(sendMessageFunction)
+                .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
+
+            processMessagesObservable.toBlocking().lastOrDefault(null);
 
-            o.toBlocking().lastOrDefault( null );
-            if (logger.isTraceEnabled()) {
-                logger.trace("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-            }
         }
 
         // update queued time
         Map<String, Object> properties = new HashMap<>(2);
         properties.put("queued", notification.getQueued());
         properties.put("state", notification.getState());
-        if(errorMessages.size()>0){
+        if (errorMessages.size() > 0) {
             if (notification.getErrorMessage() == null) {
                 notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
             }
@@ -205,40 +192,33 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
         notification.setExpectedCount(deviceCount.get());
         notification.addProperties(properties);
-        long now = System.currentTimeMillis();
-
+        em.update(notification);
 
-        logger.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
 
-        //do i have devices, and have i already started batching.
-        if (deviceCount.get() <= 0 || !notification.getDebug()) {
+        // if no devices, go ahead and mark the batch finished
+        if (deviceCount.get() <= 0 ) {
             TaskManager taskManager = new TaskManager(em, notification);
-            //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
-            taskManager.finishedBatch(false,true);
-        }else {
-            em.update(notification);
+            taskManager.finishedBatch(true);
         }
 
-        long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
-        if (logger.isTraceEnabled()) {
-            logger.trace("notification {} done queuing to {} devices in {} ms", notification.getUuid().toString(), deviceCount.get(), elapsed);
-        }
+
     }
 
     /**
      * only need to get notifiers once. will reset on next batch
+     *
      * @return
      */
-    private HashMap<Object,ProviderAdapter> getAdapterMap(){
-        if(notifierHashMap == null) {
+    private HashMap<Object, ProviderAdapter> getAdapterMap() {
+        if (notifierHashMap == null) {
             long now = System.currentTimeMillis();
-            notifierHashMap = new HashMap<Object, ProviderAdapter>();
+            notifierHashMap = new HashMap<>();
             Query query = new Query();
             query.setCollection("notifiers");
             query.setLimit(100);
-            PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
-                    new SimpleEntityRef(em.getApplicationRef()),
-                    query
+            PathQuery<Notifier> pathQuery = new PathQuery<>(
+                new SimpleEntityRef(em.getApplicationRef()),
+                query
             );
             Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
             int count = 0;
@@ -246,22 +226,22 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 Notifier notifier = notifierIterator.next();
                 String name = notifier.getName() != null ? notifier.getName() : "";
                 UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
-                ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+                ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em);
                 notifierHashMap.put(name.toLowerCase(), providerAdapter);
                 notifierHashMap.put(uuid, providerAdapter);
                 notifierHashMap.put(uuid.toString(), providerAdapter);
-                if(count++ >= 100){
+                if (count++ >= 100) {
                     logger.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
                     break;
                 }
             }
-            logger.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
         }
         return notifierHashMap;
     }
 
     /**
      * send batches of notifications to provider
+     *
      * @param messages
      * @throws Exception
      */
@@ -273,128 +253,123 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
         final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
         final ApplicationQueueManagerImpl proxy = this;
-        final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
-        final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
-
-        final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
-            @Override
-            public ApplicationQueueMessage call(QueueMessage queueMessage) {
-                boolean messageCommitted = false;
-                ApplicationQueueMessage message = null;
-                try {
-                    message = (ApplicationQueueMessage) queueMessage.getBody();
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId());
-                    }
+        final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
+        final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
 
-                    UUID deviceUUID = message.getDeviceId();
+        final Func1<QueueMessage, ApplicationQueueMessage> func = queueMessage -> {
+            boolean messageCommitted = false;
+            ApplicationQueueMessage message = null;
+            try {
+                message = (ApplicationQueueMessage) queueMessage.getBody();
+                if (logger.isTraceEnabled()) {
+                    logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId());
+                }
 
-                    Notification notification = notificationMap.get(message.getNotificationId());
-                    if (notification == null) {
-                        notification = em.get(message.getNotificationId(), Notification.class);
-                        notificationMap.put(message.getNotificationId(), notification);
-                    }
-                    TaskManager taskManager = taskMap.get(message.getNotificationId());
-                    if (taskManager == null) {
-                        taskManager = new TaskManager(em, notification);
-                        taskMap.putIfAbsent(message.getNotificationId(), taskManager);
-                        taskManager = taskMap.get(message.getNotificationId());
-                    }
+                UUID deviceUUID = message.getDeviceId();
 
-                    final Map<String, Object> payloads = notification.getPayloads();
-                    final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
-                    }
+                Notification notification = notificationMap.get(message.getNotificationId());
+                if (notification == null) {
+                    notification = em.get(message.getNotificationId(), Notification.class);
+                    notificationMap.put(message.getNotificationId(), notification);
+                }
+                TaskManager taskManager = taskMap.get(message.getNotificationId());
+                if (taskManager == null) {
+                    taskManager = new TaskManager(em, notification);
+                    taskMap.putIfAbsent(message.getNotificationId(), taskManager);
+                    taskManager = taskMap.get(message.getNotificationId());
+                }
 
-                    try {
-                        String notifierName = message.getNotifierKey().toLowerCase();
-                        ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
-                        Object payload = translatedPayloads.get(notifierName);
-                        Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
-                        TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
-                        if(!isOkToSend(notification)){
-                             tracker.failed(0, "Notification is duplicate/expired/cancelled.");
-                        }else {
-                            if (payload == null) {
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
-                                }
-                                tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
-                            } else {
-                                long now = System.currentTimeMillis();
-                                try {
-                                    providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
-                                } catch (Exception e) {
-                                    tracker.failed(0, e.getMessage());
-                                } finally {
-                                    if (logger.isTraceEnabled()) {
-                                        logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now));
-                                    }
+                final Map<String, Object> payloads = notification.getPayloads();
+                final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
+                if (logger.isTraceEnabled()) {
+                    logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
+                }
+
+                try {
+                    String notifierName = message.getNotifierKey().toLowerCase();
+                    ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
+                    Object payload = translatedPayloads.get(notifierName);
+                    Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
+                    TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
+                    if (!isOkToSend(notification)) {
+                        tracker.failed(0, "Notification is duplicate/expired/cancelled.");
+                    } else {
+                        if (payload == null) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+                            }
+                            tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+                        } else {
+                            long now = System.currentTimeMillis();
+                            try {
+                                providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
+                            } catch (Exception e) {
+                                tracker.failed(0, e.getMessage());
+                            } finally {
+                                if (logger.isTraceEnabled()) {
+                                    logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now));
                                 }
                             }
                         }
-                        messageCommitted = true;
-                    } finally {
-                        sendMeter.mark();
                     }
+                    messageCommitted = true;
+                } finally {
+                    sendMeter.mark();
+                }
 
-                } catch (Exception e) {
-                    logger.error("Failure while sending",e);
-                    try {
-                        if(!messageCommitted && queuePath != null) {
-                            qm.commitMessage(queueMessage);
-                        }
-                    }catch (Exception queueException){
-                        logger.error("Failed to commit message.",queueException);
+            } catch (Exception e) {
+                logger.error("Failure while sending", e);
+                try {
+                    if (!messageCommitted && queuePath != null) {
+                        qm.commitMessage(queueMessage);
                     }
+                } catch (Exception queueException) {
+                    logger.error("Failed to commit message.", queueException);
                 }
-                return message;
             }
+            return message;
         };
 
         //from each queue message, process them in parallel up to 10 at a time
-        Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> {
+        Observable queueMessageObservable = Observable.from(messages).flatMap(queueMessage -> {
 
 
-            return Observable.just( queueMessage ).map( func ).buffer( messages.size() ).map( queueMessages -> {
+            return Observable.just(queueMessage).map(func).buffer(messages.size()).map(queueMessages -> {
                 //for gcm this will actually send notification
-                for ( ProviderAdapter providerAdapter : notifierMap.values() ) {
+                for (ProviderAdapter providerAdapter : notifierMap.values()) {
                     try {
                         providerAdapter.doneSendingNotifications();
-                    }
-                    catch ( Exception e ) {
-                        logger.error( "providerAdapter.doneSendingNotifications: ", e );
+                    } catch (Exception e) {
+                        logger.error("providerAdapter.doneSendingNotifications: ", e);
                     }
                 }
                 //TODO: check if a notification is done and mark it
                 HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>();
-                for ( ApplicationQueueMessage message : queueMessages ) {
-                    if ( notifications.get( message.getNotificationId() ) == null ) {
+                for (ApplicationQueueMessage message : queueMessages) {
+                    if (notifications.get(message.getNotificationId()) == null) {
                         try {
-                            TaskManager taskManager = taskMap.get( message.getNotificationId() );
-                            notifications.put( message.getNotificationId(), message );
+                            TaskManager taskManager = taskMap.get(message.getNotificationId());
+                            notifications.put(message.getNotificationId(), message);
                             taskManager.finishedBatch();
-                        }
-                        catch ( Exception e ) {
-                            logger.error( "Failed to finish batch", e );
+                        } catch (Exception e) {
+                            logger.error("Failed to finish batch", e);
                         }
                     }
                 }
                 return notifications;
-            } ).doOnError( throwable -> logger.error( "Failed while sending", throwable ) );
-        }, 10 );
+            }).doOnError(throwable -> logger.error("Failed while sending", throwable));
+        }, 10);
 
-        return o;
+        return queueMessageObservable;
     }
 
     @Override
-    public void stop(){
-        for(ProviderAdapter adapter : getAdapterMap().values()){
+    public void stop() {
+        for (ProviderAdapter adapter : getAdapterMap().values()) {
             try {
                 adapter.stop();
-            }catch (Exception e){
-                logger.error("failed to stop adapter",e);
+            } catch (Exception e) {
+                logger.error("failed to stop adapter", e);
             }
         }
     }
@@ -407,7 +382,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
      */
     private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter>
         notifierMap) throws Exception {
-        Map<String, Object> translatedPayloads = new HashMap<String, Object>(  payloads.size());
+        Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
         for (Map.Entry<String, Object> entry : payloads.entrySet()) {
             String payloadKey = entry.getKey().toLowerCase();
             Object payloadValue = entry.getValue();
@@ -431,10 +406,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
     private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
         private final Iterator<T> input;
-        private IteratorObservable( final Iterator input ) {this.input = input;}
+
+        private IteratorObservable(final Iterator input) {
+            this.input = input;
+        }
 
         @Override
-        public void call( final Subscriber<? super T> subscriber ) {
+        public void call(final Subscriber<? super T> subscriber) {
 
             /**
              * You would replace this code with your file reading.  Instead of emitting from an iterator,
@@ -442,17 +420,16 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
              */
 
             try {
-                while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+                while (!subscriber.isUnsubscribed() && input.hasNext()) {
                     //send our input to the next
-                    subscriber.onNext( (T) input.next() );
+                    subscriber.onNext((T) input.next());
                 }
 
                 //tell the subscriber we don't have any more data
                 subscriber.onCompleted();
-            }
-            catch ( Throwable t ) {
-                logger.error("failed on subscriber",t);
-                subscriber.onError( t );
+            } catch (Throwable t) {
+                logger.error("failed on subscriber", t);
+                subscriber.onError(t);
             }
         }
     }
@@ -483,45 +460,61 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
     private boolean isOkToSend(Notification notification) {
-        Map<String,Long> stats = notification.getStatistics();
-        if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
+        Map<String, Long> stats = notification.getStatistics();
+        if (stats != null && notification.getExpectedCount() == (stats.get("sent") + stats.get("errors"))) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} already processed. not sending.",
-                        notification.getUuid());
+                    notification.getUuid());
             }
             return false;
         }
         if (notification.getCanceled() == Boolean.TRUE) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} canceled. not sending.",
-                        notification.getUuid());
+                    notification.getUuid());
             }
             return false;
         }
         if (notification.isExpired()) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} expired. not sending.",
-                        notification.getUuid());
+                    notification.getUuid());
             }
             return false;
         }
         return true;
     }
 
-    private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+    private List<EntityRef> getDevices(EntityRef ref) {
+
         List<EntityRef> devices = Collections.EMPTY_LIST;
-        if ("device".equals(ref.getType())) {
-            devices = Collections.singletonList(ref);
-        } else if ("user".equals(ref.getType())) {
-            devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
+
+        try {
+            if ("device".equals(ref.getType())) {
+                devices = Collections.singletonList(ref);
+            } else if ("user".equals(ref.getType())) {
+                devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
                     Query.Level.REFS, false).getRefs();
-        } else if ("group".equals(ref.getType())) {
-            devices = new ArrayList<EntityRef>();
-            for (EntityRef r : em.getCollection(ref, "users", null,
+            } else if ("group".equals(ref.getType())) {
+                devices = new ArrayList<>();
+                for (EntityRef r : em.getCollection(ref, "users", null,
                     Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
-                devices.addAll(getDevices(r));
+                    devices.addAll(getDevices(r));
+                }
+            }
+        } catch (Exception e) {
+
+            if (ref != null){
+                logger.error("Error while retrieving devices for entity type {} and uuid {}. Error: {}",
+                    ref.getType(), ref.getUuid(), e);
+            }else{
+                logger.error("Error while retrieving devices. Entity ref was null.");
             }
+
+            throw new RuntimeException("Unable to retrieve devices for EntityRef", e);
+
         }
+
         return devices;
     }
 
@@ -534,7 +527,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             }
             return value != null ? value.toString() : null;
         } catch (Exception e) {
-            logger.error("Errer getting provider ID, proceding with rest of batch", e);
+            logger.error("Error getting provider ID, proceeding with rest of batch", e);
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 97513be..65cc54a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -39,16 +39,18 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
 
     private static final Logger logger = LoggerFactory
-            .getLogger(NotificationsServiceIT.class);
+        .getLogger(NotificationsServiceIT.class);
 
-    /** set to true to use actual connections to GCM servers */
+    /**
+     * set to true to use actual connections to GCM servers
+     */
     private static final boolean USE_REAL_CONNECTIONS = true;
     private static final String PROVIDER = USE_REAL_CONNECTIONS ? "google" : "noop";
 
     private static final String API_KEY = "AIzaSyCIH_7WC0mOqBGMOXyQnFgrBpOePgHvQJM";
     private static final String PUSH_TOKEN = "APA91bGxRGnMK8tKgVPzSlxtCFvwSVqx0xEPjA06sBmiK0k"
-            + "QsiwUt6ipSYF0iPRHyUgpXle0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmo"
-            + "sqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A";
+        + "QsiwUt6ipSYF0iPRHyUgpXle0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmo"
+        + "sqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A";
 
     private Notifier notifier;
     private Device device1, device2;
@@ -56,14 +58,12 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     private QueueListener listener;
 
 
-
-
-
     @BeforeClass
-    public static void setup(){
+    public static void setup() {
 
 
     }
+
     @Before
     public void before() throws Exception {
 
@@ -77,8 +77,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.put("apiKey", API_KEY);
 
         notifier = (Notifier) app
-                .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
-                .toTypedEntity();
+            .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
+            .toTypedEntity();
         String key = notifier.getName() + NOTIFIER_ID_POSTFIX;
 
         // create devices //
@@ -86,7 +86,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.clear();
         app.put(key, PUSH_TOKEN);
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices") .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices").getEntity();
         app.testRequest(ServiceAction.GET, 1, "devices", e.getUuid());
 
         device1 = app.getEntityManager().get(e.getUuid(), Device.class);
@@ -103,8 +103,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     }
 
     @After
-    public void after(){
-        if(listener!=null) {
+    public void after() {
+        if (listener != null) {
             listener.stop();
             listener = null;
         }
@@ -119,23 +119,23 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.put("environment", "development");
         app.put("apiKey", API_KEY);
         Notifier n = (Notifier) app
-                .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
-                .toTypedEntity();
+            .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
+            .toTypedEntity();
 
         app.clear();
         String payload = "Hello, World!";
         Map<String, String> payloads = new HashMap<String, String>(1);
         payloads.put("foo", payload);
         app.put("payloads", payloads);
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("queued", System.currentTimeMillis());
 
-        Entity e = app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications")
-                .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications")
+            .getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(),
-                Notification.class);
+            Notification.class);
 
         // perform push //
         notification = notificationWaitForComplete(notification);
@@ -151,16 +151,16 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class);
         assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+            notification.getPayloads().get(notifier.getUuid().toString()),
+            payload);
 
         // perform push //
         notification = notificationWaitForComplete(notification);
@@ -177,11 +177,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time
         app.put("priority", "high");
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class);
@@ -204,11 +204,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time
         app.put("priority", "not_a_priority");
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class);
@@ -232,10 +232,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class);
@@ -257,10 +257,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
         app.put("expire", System.currentTimeMillis() + 300000); // add 5 minutes to current time
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices","*","notifications").getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", "*", "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
         Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class);
@@ -285,7 +285,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         assertNotNull(user);
 
         // post an existing device to user's devices collection
-        Entity device = app.testRequest(ServiceAction.POST, 1, "users",  user.getUuid(), "devices", device1.getUuid()).getEntity();
+        Entity device = app.testRequest(ServiceAction.POST, 1, "users", user.getUuid(), "devices", device1.getUuid()).getEntity();
         assertEquals(device.getUuid(), device1.getUuid());
 
         // create and post notification
@@ -294,8 +294,8 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
-        Entity e = app.testRequest(ServiceAction.POST, 1,"users",user.getUuid(), "notifications").getEntity();
+        app.put("debug", true);
+        Entity e = app.testRequest(ServiceAction.POST, 1, "users", user.getUuid(), "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
 
@@ -315,15 +315,15 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
 
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices","notifications")   .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", "notifications").getEntity();
         app.testRequest(ServiceAction.GET, 1, "notifications", e.getUuid());
 
-        Notification notification = app.getEntityManager().get(e.getUuid(),  Notification.class);
+        Notification notification = app.getEntityManager().get(e.getUuid(), Notification.class);
         assertEquals(
-                notification.getPayloads().get(notifier.getUuid().toString()),
-                payload);
+            notification.getPayloads().get(notifier.getUuid().toString()),
+            payload);
 
         // reduce Batch size to 1
         Field field = GCMAdapter.class.getDeclaredField("BATCH_SIZE");
@@ -349,10 +349,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         app.clear();
         app.put("payloads", "{asdf}");
-        app.put("debug",true);
+        app.put("debug", true);
 
         try {
-            app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications");
+            app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications");
             fail("invalid payload should have been rejected");
         } catch (IllegalArgumentException ex) {
             // ok
@@ -364,7 +364,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.put("payloads", payloads);
         payloads.put("xxx", "");
         try {
-            app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications");
+            app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications");
             fail("invalid payload should have been rejected");
         } catch (IllegalArgumentException ex) {
             // ok
@@ -379,7 +379,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.put("environment", "development");
         app.put("apiKey", API_KEY);
         Entity e = app.testRequest(ServiceAction.POST, 1, "notifiers")
-                .getEntity();
+            .getEntity();
         Notifier notifier2 = app.getEntityManager().get(e.getUuid(), Notifier.class);
 
         payloads.clear();
@@ -393,14 +393,14 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         app.clear();
         app.put("payloads", payloads);
-        app.put("debug",true);
+        app.put("debug", true);
 
         try {
-            app.testRequest(ServiceAction.POST, 1, "devices",device1.getUuid(),"notifications");
+            app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications");
             fail("invalid payload should have been rejected");
         } catch (Exception ex) {
             assertEquals("java.lang.IllegalArgumentException: GCM payloads must be 4096 characters or less",
-                    ex.getMessage());
+                ex.getMessage());
             // ok
         }
     }
@@ -420,10 +420,10 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
 
         // create push notification
-        Entity e = app.testRequest(ServiceAction.POST, 1, "devices",badDevice.getUuid(),"notifications")
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", badDevice.getUuid(), "notifications")
             .getEntity();
 
         // validate notification  was created successfully
@@ -448,7 +448,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     @Test
     public void createGoogleNotifierWithBadAPIKey() throws Exception {
 
-        final String badKey = API_KEY+"bad";
+        final String badKey = API_KEY + "bad";
 
         // create notifier with bad API key
         app.clear();
@@ -457,25 +457,25 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         app.put("environment", "development");
         app.put("apiKey", badKey);
 
-        try{
+        try {
             notifier = (Notifier) app
                 .testRequest(ServiceAction.POST, 1, "notifiers").getEntity()
                 .toTypedEntity();
-        }catch(InvalidRequestException e){
+        } catch (InvalidRequestException e) {
             assertEquals(Constants.ERROR_INVALID_REGISTRATION, e.getDescription());
         }
 
     }
 
     @Test
-    public void sendNotificationWithBadAPIKey() throws Exception{
-        final String badKey = API_KEY+"bad";
+    public void sendNotificationWithBadAPIKey() throws Exception {
+        final String badKey = API_KEY + "bad";
 
         // update an existing notifier with a bad API key
         app.clear();
         app.put("apiKey", badKey);
         notifier = (Notifier) app
-            .testRequest(ServiceAction.PUT, 1, "notifiers",notifier.getUuid()).getEntity()
+            .testRequest(ServiceAction.PUT, 1, "notifiers", notifier.getUuid()).getEntity()
             .toTypedEntity();
 
         // create notification payload
@@ -485,11 +485,11 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         payloads.put(notifier.getUuid().toString(), payload);
         app.put("payloads", payloads);
         app.put("queued", System.currentTimeMillis());
-        app.put("debug",true);
+        app.put("debug", true);
 
         // create notification
-        Entity e = app.testRequest(ServiceAction.POST, 1,"devices",device1.getUuid(), "notifications")
-                .getEntity();
+        Entity e = app.testRequest(ServiceAction.POST, 1, "devices", device1.getUuid(), "notifications")
+            .getEntity();
 
 
         // validate notification  was created successfully

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/entities.js
----------------------------------------------------------------------
diff --git a/tests/integration/lib/entities.js b/tests/integration/lib/entities.js
index b17fc74..e941d6f 100644
--- a/tests/integration/lib/entities.js
+++ b/tests/integration/lib/entities.js
@@ -117,7 +117,7 @@ function deleteAllEntities(collection, cb) {
                     deleteAllEntities(collection, function(e) {
                         cb(e);
                     });
-                }, 600); // Mandatory, since it seems to not retrieve entities if you make a request in < 600ms
+                }, 100); // add some delay
             });
         } else {
             cb();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/notifications.js
----------------------------------------------------------------------
diff --git a/tests/integration/lib/notifications.js b/tests/integration/lib/notifications.js
index dd04864..f046bfd 100644
--- a/tests/integration/lib/notifications.js
+++ b/tests/integration/lib/notifications.js
@@ -28,6 +28,7 @@ module.exports.send = function(path, payload, cb) {
         url: urls.appendOrgCredentials(urls.getAppUrl() + path + "/notifications"),
         json: payload
     }, function(err, response, body) {
+        //console.log(JSON.stringify(body, null, 2));
         var error = responseLib.getError(err, response);
         cb(error, error ? null : body.entities.pop());
     });

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/lib/notifiers.js
----------------------------------------------------------------------
diff --git a/tests/integration/lib/notifiers.js b/tests/integration/lib/notifiers.js
new file mode 100644
index 0000000..33f46cb
--- /dev/null
+++ b/tests/integration/lib/notifiers.js
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var request = require("request");
+var urls = require("./urls");
+var responseLib = require("./response");
+module.exports = {};
+
+
+module.exports.add = function(notifier, cb) {
+    request.put({
+        url: urls.appendOrgCredentials(urls.getAppUrl() + "notifiers/" + notifier.name),
+        json: notifier
+    }, function(err, response, body) {
+        var error = responseLib.getError(err, response);
+        cb(error, error ? null : body.entities.pop());
+    });
+};
+
+
+module.exports.get = function(notifierUUID, cb) {
+    request.get(urls.appendOrgCredentials(urls.getAppUrl() + "notifiers/" + notifierUUID), function(err, response, body) {
+        var json = JSON.parse(body);
+        var error = response.statusCode === 404 ? null : responseLib.getError(err, response);
+        cb(error, error ? null : response.statusCode === 404 ? null : json.entities.pop());
+    })
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/groups/groups.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/groups/groups.js b/tests/integration/test/groups/groups.js
index b822661..b56a11c 100644
--- a/tests/integration/test/groups/groups.js
+++ b/tests/integration/test/groups/groups.js
@@ -1,3 +1,6 @@
+/**
+ * Created by russo on 2/4/16.
+ */
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -34,7 +37,7 @@ module.exports = {
         var username = "groupuser";
         var password = "password";
         var usersArray = [];
-        for (var i = 1; i <= 5; i++) {
+        for (var i = 0; i < 5; i++) {
             usersArray.push({
                 "username": username + "-" + i,
                 "password": password,
@@ -46,7 +49,7 @@ module.exports = {
         // build devices
         var name = "device";
         var devicesArray = [];
-        for (var j = 1; j <= 5; j++) {
+        for (var j = 0; j < 5; j++) {
             devicesArray.push({
                 "name": name + "-" + j,
                 "gcm.notifier.id": DEVICE_TOKEN
@@ -55,7 +58,7 @@ module.exports = {
 
 
         describe("users", function () {
-            it("should create some devices", function (done) {
+            it("should create some users", function (done) {
                 this.slow(2000);
                 async.each(usersArray, function (user, cb) {
                     users.add(user, function (err, user) {
@@ -74,51 +77,6 @@ module.exports = {
         });
 
 
-        describe("devices", function () {
-            it("should create some devices", function (done) {
-                this.slow(2000);
-                async.each(devicesArray, function (device, cb) {
-                    devices.add(device, function (err, device) {
-                        should(err).be.null;
-                        device.should.not.be.null;
-                        cb(err, device);
-                    });
-
-                }, function (err) {
-
-                    done()
-
-                });
-
-            })
-
-        });
-
-
-        describe("user<->devices", function () {
-            it("should connect devices to users", function (done) {
-                this.slow(2000);
-                async.eachSeries(usersArray, function (user, cb) {
-                    async.each(devicesArray, function (device, cb) {
-                        connections.connect("users", user.username, "devices", device.name, null, function (err) {
-                            cb(err, device);
-                        });
-                    });
-                    cb(null);
-
-                }, function (err) {
-
-                    if (err) {
-                        console.log("error adding users " + err);
-                    }
-                    done();
-                });
-
-            })
-
-        });
-
-
         describe("groups", function () {
             it("should create some groups", function (done) {
                 this.slow(2000);
@@ -130,7 +88,6 @@ module.exports = {
                     path: "group2"
                 };
 
-                console.log("        creating some groups");
                 groups.add(group1, function (err) {
                     if (err) {
                         console.log("failed to create " + "group1:" + err);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/main.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/main.js b/tests/integration/test/main.js
index 5833607..4d8341c 100644
--- a/tests/integration/test/main.js
+++ b/tests/integration/test/main.js
@@ -57,8 +57,11 @@ describe("** Usergrid REST Integration Tests **", function() {
     describe("groups", function() {
         require("./groups/groups.js").test();
     });
+    describe("notifications", function() {
+        require("./notifications/notifications.js").test();
+    });
     after(function(done) {
-        this.timeout(40000);
+        this.timeout(180000);
         console.log("    teardown");
         teardown.do(function(err) {
             should(err).be.null;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/notifications/notifications.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/notifications/notifications.js b/tests/integration/test/notifications/notifications.js
new file mode 100644
index 0000000..07e7642
--- /dev/null
+++ b/tests/integration/test/notifications/notifications.js
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+var should = require("should");
+var uuid = require("uuid");
+var users = require("../../lib/users");
+var devices = require("../../lib/devices");
+var groups = require("../../lib/groups");
+var notifiers = require("../../lib/notifiers");
+var notifications = require("../../lib/notifications");
+var connections = require("../../lib/connections");
+var async = require('async');
+
+var GOOGLE_API_KEY = "AIzaSyCIH_7WC0mOqBGMOXyQnFgrBpOePgHvQJM";
+var ANDROID_DEVICE_TOKEN = "APA91bGxRGnMK8tKgVPzSlxtCFvwSVqx0xEPjA06sBmiK0kQsiwUt6ipSYF0iPRHyUgpX" +
+    "le0P8OlRWJADkQrcN7yxG4pLMg1CVmrqDu8tfSe63mZ-MRU2IW0cOhmosqzC9trl33moS3OvT7qjDjkP4Qq8LYdwwYC5A";
+
+module.exports = {
+    test: function () {
+
+        var username = "notificationuser";
+        var password = "password";
+        var usersArray = [];
+        for (var i = 0; i < 5; i++) {
+            usersArray.push({
+                "username": username + "-" + i,
+                "password": password,
+                "name": username + "-" + i,
+                "email": username + "-" + i + "@uge2e.com",
+                "number": i
+            });
+        }
+
+        // build devices
+        var deviceName = "notificationdevice";
+        var devicesArray = [];
+        for (var j = 0; j < 5; j++) {
+            devicesArray.push({
+                "name": deviceName + "-" + j,
+                "gcm.notifier.id": ANDROID_DEVICE_TOKEN,
+                "number": i
+            });
+        }
+
+        var notifiersArray = [];
+        var notifier = {
+            name: "gcm",
+            provider: "google",
+            environment: "environment",
+            apiKey: GOOGLE_API_KEY
+        };
+        notifiersArray.push(notifier);
+
+
+        var gcmNotification = {
+
+            payloads: {
+                gcm: "Usergrid Integration Push Test - GCM"
+            }
+        };
+
+
+        describe("notifiers -> GCM", function () {
+            it("should create a GCM notifier", function (done) {
+                this.slow(5000);
+                async.each(notifiersArray, function (notifier, cb) {
+                    notifiers.add(notifier, function (err, notifier) {
+                        should(err).be.null;
+                        notifier.should.not.be.null;
+                        cb(err, notifier);
+                    });
+                }, function (err) {
+
+                    done();
+
+                });
+
+            })
+
+        });
+
+        describe("users", function () {
+            it("should create some users", function (done) {
+                this.slow(2000);
+                async.each(usersArray, function (user, cb) {
+                    users.add(user, function (err, user) {
+                        should(err).be.null;
+                        user.should.not.be.null;
+                        cb(err, user);
+                    });
+                }, function (err) {
+
+                    done();
+
+                });
+
+            })
+
+        });
+
+
+        describe("devices", function () {
+            it("should create some devices", function (done) {
+                this.slow(2000);
+                async.each(devicesArray, function (device, cb) {
+                    devices.add(device, function (err, device) {
+                        should(err).be.null;
+                        device.should.not.be.null;
+                        cb(err, device);
+                    });
+
+                }, function (err) {
+
+                    done()
+
+                });
+
+            })
+
+        });
+
+
+        describe("user<->devices", function () {
+            it("should connect devices to users", function (done) {
+                this.slow(5000);
+                async.eachSeries(usersArray, function (user, cb) {
+                    connections.connect("users", user.username, "devices", devicesArray[user.number].name,
+                        null, function (err) {
+                            cb(err);
+                    });
+                }, function (err) {
+
+                    if (err) {
+                        console.log("error adding users " + err);
+                    }
+                    done();
+                });
+
+            })
+
+        });
+
+
+        describe("groups", function () {
+            it("should create some groups", function (done) {
+                this.slow(2000);
+                var group1 = {
+                    path: "notificationgroup1"
+                };
+
+                var group2 = {
+                    path: "notificationgroup2"
+                };
+
+                async.series([
+                    function (cb) {
+
+                        groups.add(group1, function (err) {
+                            if (err) {
+                                console.log("failed to create " + "notificationgroup1:" + err);
+                            }
+                            cb(err);
+
+                        });
+                    }, function (cb) {
+
+                        groups.add(group2, function (err) {
+                            if (err) {
+                                console.log("failed to create " + "notificationgroup2:" + err);
+                            }
+                            cb(err);
+                        });
+
+
+                    }
+                ], function (err, results) {
+
+                    done();
+
+                });
+
+
+            })
+
+        });
+
+
+        describe("groups<->users", function () {
+            it("should connect users to groups", function (done) {
+                this.slow(2000);
+                async.each(usersArray, function (user, cb) {
+
+                    async.series([
+                        function (cb) {
+                            connections.connect("groups", "notificationgroup1", "users", user.username, null,
+                                function (err) {
+                                    cb(err, user);
+                            });
+
+                        },
+                        function (cb) {
+                            connections.connect("groups", "notificationgroup2", "users", user.username, null,
+                                function (err) {
+                                    cb(err, user);
+
+                            });
+                        }
+
+                    ], function (err, results) {
+
+                        cb(err);
+
+                    });
+
+                }, function (err) {
+                    done();
+                });
+
+            })
+
+        });
+
+
+        // SEND NOTIFICATIONS HERE AND VALIDATE THE NUMBER OF NOTIFICATIONS SENT ARE ACCURATE FOR QUERY
+
+        describe("notification -> user - direct path", function () {
+            it("should send a single notification to a user", function (done) {
+                this.timeout(5000)
+                this.slow(5000);
+                setTimeout(function () {
+
+                    notifications.send("users/" + usersArray[1].username, gcmNotification,
+                        function (err, notification) {
+                            should(err).be.null;
+                            notification.should.not.be.null;
+                            notification.expectedCount.should.be.equal(1);
+                            done();
+
+                    });
+
+                }, 1000)
+
+
+            })
+
+        });
+
+        describe("notification -> user - via matrix query", function () {
+            it("should send a single notification to a user", function (done) {
+                this.timeout(5000)
+                this.slow(5000);
+
+                setTimeout(function () {
+
+                    notifications.send("users;ql=select * where username = 'notificationuser-0'", gcmNotification,
+                        function (err, notification) {
+                            should(err).be.null;
+                            notification.should.not.be.null;
+                            notification.expectedCount.should.be.equal(1);
+                            done();
+
+                    });
+
+                }, 1000);
+
+
+            })
+
+        });
+
+        describe("notification -> groups - via matrix query", function () {
+            it("should send a single notification to groups with the same users", function (done) {
+                this.timeout(5000)
+                this.slow(5000);
+                setTimeout(function () {
+
+                    notifications.send("groups;ql=select * where path = 'notificationgroup1' " +
+                        "or path = 'notificationgroup2'", gcmNotification, function (err, notification) {
+
+                            should(err).be.null;
+                            notification.should.not.be.null;
+                            // we set up 2 groups of the same 5 users.  if duplicate filtering is working,
+                            // we'll only have 5 expected
+                            notification.expectedCount.should.be.equal(5);
+                            done();
+
+                    });
+
+                }, 1000);
+
+            })
+
+        });
+
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/usergrid/blob/853d6486/tests/integration/test/teardown.js
----------------------------------------------------------------------
diff --git a/tests/integration/test/teardown.js b/tests/integration/test/teardown.js
index d1031ff..24a9ddd 100644
--- a/tests/integration/test/teardown.js
+++ b/tests/integration/test/teardown.js
@@ -65,16 +65,32 @@ module.exports = {
                     })
                 },
                 function(cb) {
+                    entities.deleteAll('notifiers', function(err, body) {
+                        should(err).be.null;
+                        body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0);
+                        body.count.should.equal(0);
+                        cb(err);
+                    })
+                },
+                function(cb) {
                     entities.deleteAll('notifications', function(err, body) {
                         should(err).be.null;
                         body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0);
                         body.count.should.equal(0);
                         cb(err);
                     })
+                },
+                function(cb) {
+                    entities.deleteAll('receipts', function(err, body) {
+                        should(err).be.null;
+                        body.entities.should.be.an.instanceOf(Array).and.have.lengthOf(0);
+                        body.count.should.equal(0);
+                        cb(err);
+                    })
                 }
             ],
             function(err, data) {
                 cb(err);
             });
     }
-}
\ No newline at end of file
+};
\ No newline at end of file


Mime
View raw message