usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [2/2] usergrid git commit: Final changes to enhance parallel loading of devices for push notifications.
Date Sun, 17 Apr 2016 17:04:34 GMT
Final changes to enhance parallel loading of devices for push notifications.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06caa250
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06caa250
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06caa250

Branch: refs/heads/release-2.1.1
Commit: 06caa2509407322498c025b1b3d39135d82777cc
Parents: cc3cbfe
Author: Michael Russo <mrusso@apigee.com>
Authored: Sun Apr 17 18:02:32 2016 +0100
Committer: Michael Russo <mrusso@apigee.com>
Committed: Sun Apr 17 18:02:32 2016 +0100

----------------------------------------------------------------------
 .../pipeline/builder/IdBuilder.java             |   2 +-
 .../persistence/MultiQueryIterator.java         |   2 +-
 .../persistence/NotificationGraphIterator.java  |  59 ++----
 .../persistence/PagingResultsIterator.java      |  25 ++-
 .../apache/usergrid/persistence/PathQuery.java  |  14 +-
 .../apache/usergrid/persistence/Results.java    |   4 +
 .../impl/ApplicationQueueManagerImpl.java       | 211 +++++--------------
 7 files changed, 102 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 65cf7c1..781d7d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -151,7 +151,7 @@ public class IdBuilder {
 
     public Observable<ResultsPage<Id>> build(){
         //we must add our resume filter so we drop our previous page first element if it's
present
-        return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute();
+        return pipeline.withFilter( new IdResumeFilter() ).withFilter(new ResultsPageCollector<>()).execute();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
index c5de5c1..9e28204 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
@@ -62,7 +62,7 @@ public class MultiQueryIterator implements ResultsIterator {
             EntityRef ref = source.next();
             Results r = getResultsFor( ref );
             if ( r.size() > 0 ) {
-                currentIterator = new PagingResultsIterator( r, query.getResultsLevel() );
+                currentIterator = new PagingResultsIterator( r, query.getResultsLevel(),
null);
                 return currentIterator.hasNext();
             }
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
index a1f3246..a1b162d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -23,10 +23,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
 
 public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
@@ -67,18 +63,20 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
             Object next = source.next();
             Results r;
 
-//            if(next instanceof UUID){
-//
-//                UUID id = (UUID) next;
-//                r = getResultsForId(id, "user");
-//
-//            }else {
-                EntityRef ref = (EntityRef) next;
-                r = getResultsFor(ref);
-           // }
+            EntityRef ref = (EntityRef) next;
+            r = getResultsFor(ref);
 
             if (r.size() > 0) {
-                currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
+
+
+                if(ref.getType().equals(Group.ENTITY_TYPE)) {
+
+                    currentIterator = new PagingResultsIterator(r, query.getResultsLevel(),
Query.Level.REFS);
+                }else{
+                    currentIterator = new PagingResultsIterator(r, query.getResultsLevel(),
null);
+
+                }
+
                 return currentIterator.hasNext();
             }
         }
@@ -122,26 +120,13 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
                 // if we're fetching devices through groups->users->devices, get only
the IDs and don't load the entities
                 if( ref.getType().equals(Group.ENTITY_TYPE)){
 
-                    // query users using IDs as we don't need to load the full entities just
to find their devices
-                    Query usersQuery = new Query();
-                    usersQuery.setCollection("users");
-                    usersQuery.setResultsLevel(Query.Level.IDS);
-                    usersQuery.setLimit(1000);
+                    // groups->users is a passthrough to devices, load our max limit
+                    query.setLimit(Query.MAX_LIMIT);
 
-
-                    // set the query level for the iterator temporarily to IDS
+                    // set the query level for the when fetching users to IDS, we don't need
the full entity
                     query.setResultsLevel(Query.Level.IDS);
 
-                 return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery);
-
-
-//                    List<EntityRef> refs =
-//                        results.getIds().stream()
-//                            .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList());
-//
-//                    // set the query level for the iterator back to REFS after mapping
our IDS
-//                    query.setResultsLevel(Query.Level.REFS);
-//                    return Results.fromRefList(refs);
+                 return entityManager.searchCollection(ref, "users", query);
 
                 }
 
@@ -151,8 +136,6 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
                     devicesQuery.setCollection("devices");
                     devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
 
-                    //query.setCollection("devices");
-                    //query.setResultsLevel(Query.Level.CORE_PROPERTIES);
                     return entityManager.searchCollection(ref, devicesQuery.getCollection(),
devicesQuery);
                 }
 
@@ -177,14 +160,4 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
         }
     }
 
-
-    private Results getResultsForId(UUID uuid, String type) {
-
-        EntityRef ref = new SimpleEntityRef(type, uuid);
-        return getResultsFor(ref);
-
-
-    }
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
index a883e1b..640ee06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
@@ -19,6 +19,8 @@ package org.apache.usergrid.persistence;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.usergrid.persistence.Query.Level;
 
 
@@ -28,20 +30,23 @@ public class PagingResultsIterator implements ResultsIterator, Iterable
{
     private Results results;
     private Iterator currentPageIterator;
     private Level level;
+    private Level overrideLevel;
 
 
     public PagingResultsIterator( Results results ) {
-        this( results, results.level );
+        this( results, results.level, null);
     }
 
 
     /**
      * @param level overrides the default level from the Results - in case you want to return,
say, UUIDs where the
      * Query was set for Entities
+     * @param overrideLevel
      */
-    public PagingResultsIterator( Results results, Level level ) {
+    public PagingResultsIterator(Results results, Level level, Level overrideLevel) {
         this.results = results;
         this.level = level;
+        this.overrideLevel = overrideLevel;
         initCurrentPageIterator();
     }
 
@@ -86,16 +91,32 @@ public class PagingResultsIterator implements ResultsIterator, Iterable
{
      */
     private boolean initCurrentPageIterator() {
         List currentPage;
+        Level origLevel = level;
+        if(overrideLevel != null){
+            level=overrideLevel;
+            if(results.getIds()!=null){
+
+                List<EntityRef> userRefs = results.getIds().stream()
+                    .map( uuid -> new SimpleEntityRef("user", uuid)).collect(Collectors.toList());
+
+                results.setRefs(userRefs);
+
+            }
+        }
+
         if ( results != null ) {
             switch ( level ) {
                 case IDS:
                     currentPage = results.getIds();
+                    level = origLevel;
                     break;
                 case REFS:
                     currentPage = results.getRefs();
+                    level = origLevel;
                     break;
                 default:
                     currentPage = results.getEntities();
+                    level = origLevel;
             }
             if ( currentPage.size() > 0 ) {
                 currentPageIterator = currentPage.iterator();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index 215f6ac..30636ab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -88,7 +88,7 @@ public class PathQuery<E> {
         try {
 
             if ( uuid != null && type != null ) {
-                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel()
);
+                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(),
null);
             }
             else {
                 return new MultiQueryIterator( em, source.refIterator( em, false), query
);
@@ -103,7 +103,7 @@ public class PathQuery<E> {
         try {
 
             if ( uuid != null && type != null ) {
-                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel()
);
+                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(),
null);
             }else {
 
                 return new NotificationGraphIterator(em, source.refIterator(em, true), query);
@@ -130,6 +130,12 @@ public class PathQuery<E> {
 
             UUID entityId = em.getUniqueIdFromAlias( entityType, name );
 
+            if( entityId == null){
+                throw new
+                    IllegalArgumentException("Entity with name "+name+" not found. Unable
to send push notification");
+            }
+
+
             return em.getEntities(Collections.singletonList(entityId), entityType);
         }
 
@@ -143,12 +149,12 @@ public class PathQuery<E> {
 
         if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){
 
-            return new PagingResultsIterator( getHeadResults( em ), Level.REFS );
+            return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null);
 
         }
 
         if ( type != null  && uuid != null) {
-            return new PagingResultsIterator( getHeadResults( em ), Level.REFS );
+            return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null);
         }
         else {
             Query q = query;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 2a84622..3502581 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -436,6 +436,10 @@ public class Results implements Iterable<Entity> {
         level = Level.REFS;
     }
 
+    public void setRefsOnly( List<EntityRef> resultsRefs ) {
+        refs = resultsRefs;
+    }
+
 
     public Results withRefs( List<EntityRef> resultsRefs ) {
         setRefs( resultsRefs );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 1cbb2c6..2f39ae4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -52,6 +52,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
     private final Meter queueMeter;
     private final Meter sendMeter;
 
+    private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency";
+
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
 
 
@@ -91,25 +93,22 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
             return;
         }
 
-        if (logger.isTraceEnabled()) {
-            logger.trace("notification {} start queuing", notification.getUuid());
-        }
-
         final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery();
//devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can
make a judgement on batching
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>();
//build up list of issues
 
-        //get devices in querystring, and make sure you have access
+        // Get devices in querystring, and make sure you have access
         if (pathQuery != null) {
             final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
-            logger.info("notification {} start query", notification.getUuid());
+
+            logger.info("Notification {} started processing", notification.getUuid());
 
 
 
-            // the main iterator can use graph traversal or index querying
+            // The main iterator can use graph traversal or index querying based on payload
property. Default is Index.
             final Iterator<Device> iterator;
             if( notification.getUseGraph()){
                 iterator = pathQuery.graphIterator(em);
@@ -117,15 +116,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                 iterator = pathQuery.iterator(em);
             }
 
-//            //if there are more pages (defined by PAGE_SIZE) you probably want this to
be async, also if this is already a job then don't reschedule
-//            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages()
&& jobExecution == null) {
-//                if(logger.isTraceEnabled()){
-//                    logger.trace("Scheduling notification job as it has multiple pages
of devices.");
-//                }
-//                jobScheduler.scheduleQueueJob(notification, true);
-//                em.update(notification);
-//                return;
-//            }
+            /**** Old code to scheduler large sets of data, but now the processing is fired
off async in the background.
+                Leaving this only a reference of how to do it, if needed in future.
+
+                    //if there are more pages (defined by PAGE_SIZE) you probably want this
to be async,
+                    //also if this is already a job then don't reschedule
+
+                    if (iterator instanceof ResultsIterator
+                                && ((ResultsIterator) iterator).hasPages() &&
jobExecution == null) {
+
+                        if(logger.isTraceEnabled()){
+                            logger.trace("Scheduling notification job as it has multiple
pages of devices.");
+                        }
+                        jobScheduler.scheduleQueueJob(notification, true);
+                        em.update(notification);
+                        return;
+                     }
+             ****/
+
             final UUID appId = em.getApplication().getUuid();
             final Map<String, Object> payloads = notification.getPayloads();
 
@@ -182,87 +190,57 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
             };
 
-            final Map<String, Object> filters = notification.getFilters();
 
+            final Map<String, Object> filters = notification.getFilters();
 
+            Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator))
 
-            Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator))
-//                .flatMap(entity -> {
-//
-//                    if(entity.getType().equals(Device.ENTITY_TYPE)){
-//                        return Observable.from(Collections.singletonList(entity));
-//                    }
-//
-//                    // if it's not a device, drill down and get them
-//                    return Observable.from(getDevices(entity));
-//
-//                })
-                .distinct()
                 .flatMap( entityRef -> {
 
                     return Observable.just(entityRef).flatMap(ref->{
 
                         List<Entity> entities = new ArrayList<>();
 
+                            if( ref.getType().equals(User.ENTITY_TYPE)){
+
                                 Query devicesQuery = new Query();
                                 devicesQuery.setCollection("devices");
                                 devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
 
                                 try {
 
-                                   entities = em.searchCollection(new SimpleEntityRef("user",
ref), devicesQuery.getCollection(), devicesQuery).getEntities();
+                                   entities = em.searchCollection(new SimpleEntityRef("user",
ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
 
                                 }catch (Exception e){
 
-                                    logger.error("Unable to load devices for user: {}", ref);
+                                    logger.error("Unable to load devices for user: {}", ref.getUuid());
                                     return Observable.empty();
                                 }
 
 
+                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
 
+                                try{
+                                    entities.add(em.get(ref));
 
-//                            if( ref.getType().equals(User.ENTITY_TYPE)){
-//
-//                                Query devicesQuery = new Query();
-//                                devicesQuery.setCollection("devices");
-//                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
-//
-//                                try {
-//
-//                                   entities = em.searchCollection(new SimpleEntityRef("user",
ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
-//
-//                                }catch (Exception e){
-//
-//                                    logger.error("Unable to load devices for user: {}",
ref.getUuid());
-//                                    return Observable.empty();
-//                                }
-//
-//
-//                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
-//
-//                                try{
-//                                    entities.add(em.get(ref));
-//
-//                                }catch(Exception e){
-//
-//                                    logger.error("Unable to load device: {}", ref.getUuid());
-//                                    return Observable.empty();
-//
-//                                }
-//
-//                            }
+                                }catch(Exception e){
+
+                                    logger.error("Unable to load device: {}", ref.getUuid());
+                                    return Observable.empty();
+
+                                }
+
+                            }
                         return Observable.from(entities);
 
                         })
+                        .distinct( deviceRef -> deviceRef.getUuid())
                         .filter( device -> {
 
-                            logger.info("Filtering device: {}", device.getUuid());
-
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Filtering device: {}", device.getUuid());
                             }
 
-
                             if(notification.getUseGraph() && filters.size() >
0 ) {
 
                                 for (Map.Entry<String, Object> entry : filters.entrySet())
{
@@ -280,7 +258,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                                     }
 
                                 }
-
                                 if(logger.isTraceEnabled()) {
                                     logger.trace("Push notification filter did not match
for notification {}, so removing from notification",
                                         device.getUuid(), notification.getUuid());
@@ -321,20 +298,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
                         }).subscribeOn(Schedulers.io());
 
-                }, 100)
-                //.map( entityRef -> entityRef.getUuid() )
-                //.buffer(10)
-//                .flatMap( uuids -> {
-//
-//                    if(logger.isTraceEnabled()) {
-//                        logger.trace("Processing batch of {} device(s)", uuids.size());
-//                    }
-//
-//
-//                    return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io());
-//
-//                }, 10)
-
+                }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")))
                 .doOnError(throwable -> {
 
                     logger.error("Error while processing devices for notification : {}",
notification.getUuid());
@@ -355,7 +319,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                         notification.setProcessingFinished(System.currentTimeMillis());
                         notification.setDeviceProcessedCount(deviceCount.get());
                         em.update(notification);
-                        logger.info("{} device(s) processed for notification {}", deviceCount.get(),
notification.getUuid());
+                        logger.info("Notification {} finished processing {} device(s)", notification.getUuid(),
deviceCount.get());
 
                     } catch (Exception e) {
                         logger.error("Unable to set processing finished timestamp for notification");
@@ -622,10 +586,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
             try {
                 while (!subscriber.isUnsubscribed() && input.hasNext()) {
                     //send our input to the next
+                    //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName());
                     subscriber.onNext((T) input.next());
                 }
 
                 //tell the subscriber we don't have any more data
+                //logger.debug("finished iterator: {}", input.getClass().getSimpleName());
+
                 subscriber.onCompleted();
             } catch (Throwable t) {
                 logger.error("failed on subscriber", t);
@@ -678,90 +645,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
         return true;
     }
 
-    private List<EntityRef> getDevices(EntityRef ref) {
-
-        List<EntityRef> devices = new ArrayList<>();
-
-        final int LIMIT = Query.MID_LIMIT;
-
-        try {
-
-           if (User.ENTITY_TYPE.equals(ref.getType())) {
-
-                UUID start = null;
-                boolean initial = true;
-                int resultSize = 0;
-                while( initial || resultSize >= Query.DEFAULT_LIMIT) {
-
-                    initial = false;
-
-                    final List<EntityRef> mydevices = em.getCollection(ref, "devices",
start, LIMIT,
-                        Query.Level.REFS, true).getRefs();
-
-                    resultSize = mydevices.size();
-
-                    if(mydevices.size() > 0){
-                        start = mydevices.get(mydevices.size() - 1 ).getUuid();
-                    }
-
-                    devices.addAll( mydevices  );
-
-                }
-
-            } else if (Group.ENTITY_TYPE.equals(ref.getType())) {
-
-                UUID start = null;
-                boolean initial = true;
-                int resultSize = 0;
-
-                while( initial || resultSize >= LIMIT){
-
-                    initial = false;
-
-                    final List<EntityRef> myusers =  em.getCollection(ref, "users",
start,
-                        LIMIT, Query.Level.REFS, true).getRefs();
-                    resultSize = myusers.size();
-
-                    if(myusers.size() > 0){
-                        start = myusers.get(myusers.size() - 1 ).getUuid();
-                    }
-
-
-                    Observable.from(myusers).flatMap( user -> {
-
-                        try {
-                            devices.addAll(em.getCollection(user, "devices", null, 100,
-                                Query.Level.REFS, true).getRefs());
-                        }catch (Exception e){
-                            logger.error ("Unable to fetch devices for user: {}", user.getUuid());
-                        }
-                        return Observable.from(Collections.singletonList(user));
-
-                    }, 50).toBlocking().lastOrDefault(null);
-
-
-
-
-
-                }
-
-            }
-        } catch (Exception e) {
-
-            if (ref != null){
-                logger.error("Error while retrieving devices for entity type {} and uuid
{}. Error: {}",
-                    ref.getType(), ref.getUuid(), e);
-            }else{
-                logger.error("Error while retrieving devices. Entity ref was null.");
-            }
-
-            throw new RuntimeException("Unable to retrieve devices for EntityRef", e);
-
-        }
-
-        return devices;
-    }
-
 
     private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
         try {


Mime
View raw message