usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [1/2] usergrid git commit: Parallel device fetching from users, need to update to support all PN use cases with this parallelism.
Date Sun, 17 Apr 2016 17:04:33 GMT
Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 8cf782527 -> 06caa2509


Parallel device fetching from users, need to update to support all PN use cases with this
parallelism.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cc3cbfee
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cc3cbfee
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cc3cbfee

Branch: refs/heads/release-2.1.1
Commit: cc3cbfee60f83e107269fe4fb558cb094b5c2032
Parents: 8cf7825
Author: Michael Russo <mrusso@apigee.com>
Authored: Sun Apr 17 14:34:45 2016 +0100
Committer: Michael Russo <mrusso@apigee.com>
Committed: Sun Apr 17 14:34:45 2016 +0100

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  21 ++++
 .../pipeline/builder/IdBuilder.java             |   6 ++
 .../pipeline/read/FilterFactory.java            |   8 +-
 .../pipeline/read/traverse/IdFilter.java        |  52 ++++++++++
 .../results/IdQueryExecutor.java                |  66 ++++++++++++
 .../service/CollectionSearch.java               |   9 ++
 .../service/CollectionService.java              |   5 +
 .../service/CollectionServiceImpl.java          |  23 ++++
 .../persistence/NotificationGraphIterator.java  |  79 +++++++++++++-
 .../apache/usergrid/persistence/Results.java    |  16 +++
 .../persistence/entities/Notification.java      |  16 ++-
 .../impl/ApplicationQueueManagerImpl.java       | 104 +++++++++++++++----
 12 files changed, 369 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 9ecf466..5596ab4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.corepersistence;
 
 import java.util.*;
 
+import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -614,6 +615,24 @@ public class CpRelationManager implements RelationManager {
         final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent():
query.getQl();
         final Id ownerId = headEntity.asId();
 
+
+        if(query.getLevel() == Level.IDS ){
+
+            return new IdQueryExecutor( toExecute.getCursor() ) {
+                @Override
+                protected Observable<ResultsPage<Id>> buildNewResultsPage(
+                    final Optional<String> cursor ) {
+
+                    final CollectionSearch search =
+                        new CollectionSearch( applicationScope, ownerId, collectionName,
collection.getType(), toExecute.getLimit(),
+                            queryString, cursor );
+
+                    return collectionService.searchCollectionIds( search );
+                }
+            }.next();
+
+        }
+
         //wire the callback so we can get each page
         return new EntityQueryExecutor( toExecute.getCursor() ) {
             @Override
@@ -989,6 +1008,8 @@ public class CpRelationManager implements RelationManager {
         //            }
         //        }
 
+
+
         return query;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 0f784a6..65cf7c1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe
 import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -148,4 +149,9 @@ public class IdBuilder {
         return new ConnectionRefBuilder(connectionRefFilter);
     }
 
+    public Observable<ResultsPage<Id>> build(){
+        //we must add our resume filter so we drop our previous page first element if it's
present
+        return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index ca5695c..883fdc8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -27,13 +27,7 @@ import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityF
 import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.*;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
new file mode 100644
index 0000000..9b3a05a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/IdFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+import java.util.List;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has
been implemented, this should
+ * be removed
+ */
+public class IdFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Id>>{
+
+    @Inject
+    public IdFilter() {};
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>>
filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+        return filterValueObservable.map( idFilterResult ->  new FilterResult( idFilterResult.getValue(),
idFilterResult.getPath() ));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
new file mode 100644
index 0000000..5a0bb3f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Processes our results of connection refs
+ */
+@Deprecated//Required for 1.0 compatibility
+public abstract class IdQueryExecutor extends ObservableQueryExecutor<Id> {
+
+
+    protected IdQueryExecutor( final Optional<String> startCursor ) {
+        super( startCursor );
+    }
+
+
+    @Override
+    protected Results createResults( final ResultsPage resultsPage ) {
+        final List<Id> ids = resultsPage.getEntityList();
+
+        List<UUID> uuids = ids.stream().map(id -> id.getUuid()).collect(Collectors.toList());
+
+        final Results results = Results.fromIdList(uuids);
+
+        return results;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
index ab8a8bc..602a5b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -29,6 +29,10 @@ import com.google.common.base.Optional;
  */
 public class CollectionSearch {
 
+    public enum Level {
+        IDS, ALL
+    }
+
     private final ApplicationScope applicationScope;
     private final Id collectionOwnerId;
     private final String collectionName;
@@ -36,6 +40,7 @@ public class CollectionSearch {
     private final int limit;
     private final Optional<String> query;
     private final Optional<String> cursor;
+    private Level level = Level.ALL;
 
 
     public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId,
final String
@@ -84,4 +89,8 @@ public class CollectionSearch {
     public Id getCollectionOwnerId() {
         return collectionOwnerId;
     }
+
+    public void setResultsLevel(Level level){ this.level = level; }
+
+    public Level getResultsLevel(){ return level; }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
index eef741a..6a46022 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.service;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
@@ -35,4 +36,8 @@ public interface CollectionService {
      * @return An observable with results page entries for the stream
      */
     Observable<ResultsPage<Entity>> searchCollection(final CollectionSearch search);
+
+
+
+    Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch search);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
index fa79d09..9244315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -29,6 +29,7 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 
@@ -73,4 +74,26 @@ public class CollectionServiceImpl implements CollectionService {
 
         return results.build();
     }
+
+    @Override
+    public Observable<ResultsPage<Id>> searchCollectionIds(final CollectionSearch
search ) {
+
+
+        final ApplicationScope applicationScope = search.getApplicationScope();
+        final String collectionName = search.getCollectionName();
+        final Optional<String> query = search.getQuery();
+
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor()
)
+                .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+
+
+        final IdBuilder results;
+
+
+        results = pipelineBuilder.traverseCollection( collectionName );
+
+
+        return results.build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
index b83f555..a1f3246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -17,10 +17,16 @@
 package org.apache.usergrid.persistence;
 
 
+import org.apache.usergrid.persistence.entities.Group;
+import org.apache.usergrid.persistence.entities.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 public class NotificationGraphIterator implements ResultsIterator, Iterable {
 
@@ -58,8 +64,19 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
             return true;
         }
         while (source.hasNext()) {
-            EntityRef ref = source.next();
-            Results r = getResultsFor(ref);
+            Object next = source.next();
+            Results r;
+
+//            if(next instanceof UUID){
+//
+//                UUID id = (UUID) next;
+//                r = getResultsForId(id, "user");
+//
+//            }else {
+                EntityRef ref = (EntityRef) next;
+                r = getResultsFor(ref);
+           // }
+
             if (r.size() > 0) {
                 currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
                 return currentIterator.hasNext();
@@ -90,13 +107,57 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
 
         try {
 
+
+            query.setLimit(Query.MAX_LIMIT); // always fetch our MAX limit to reduce # of
IO hops
             if (query.getCollection() != null) {
 
+                // make sure this results in graph traversal
+                query.setQl("select *");
+
                 if(logger.isTraceEnabled()) {
                     logger.trace("Fetching with refType: {}, collection: {} with no query",
                         ref.getType(), query.getCollection());
                 }
-                return entityManager.searchCollection(ref, query.getCollection(), null);
+
+                // if we're fetching devices through groups->users->devices, get only
the IDs and don't load the entities
+                if( ref.getType().equals(Group.ENTITY_TYPE)){
+
+                    // query users using IDs as we don't need to load the full entities just
to find their devices
+                    Query usersQuery = new Query();
+                    usersQuery.setCollection("users");
+                    usersQuery.setResultsLevel(Query.Level.IDS);
+                    usersQuery.setLimit(1000);
+
+
+                    // set the query level for the iterator temporarily to IDS
+                    query.setResultsLevel(Query.Level.IDS);
+
+                 return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery);
+
+
+//                    List<EntityRef> refs =
+//                        results.getIds().stream()
+//                            .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList());
+//
+//                    // set the query level for the iterator back to REFS after mapping
our IDS
+//                    query.setResultsLevel(Query.Level.REFS);
+//                    return Results.fromRefList(refs);
+
+                }
+
+                if( ref.getType().equals(User.ENTITY_TYPE)){
+
+                    Query devicesQuery = new Query();
+                    devicesQuery.setCollection("devices");
+                    devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+
+                    //query.setCollection("devices");
+                    //query.setResultsLevel(Query.Level.CORE_PROPERTIES);
+                    return entityManager.searchCollection(ref, devicesQuery.getCollection(),
devicesQuery);
+                }
+
+                return entityManager.searchCollection(ref, query.getCollection(), query);
+
 
             } else {
 
@@ -105,7 +166,7 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
                         ref.getType(), query.getCollection());
                 }
 
-                query.setQl("select *");
+                query.setQl("select *"); // make sure this results in graph traversal
                 return entityManager.searchTargetEntities(ref, query);
 
             }
@@ -116,4 +177,14 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable
{
         }
     }
 
+
+    private Results getResultsForId(UUID uuid, String type) {
+
+        EntityRef ref = new SimpleEntityRef(type, uuid);
+        return getResultsFor(ref);
+
+
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index e9a3251..2a84622 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -708,6 +708,22 @@ public class Results implements Iterable<Entity> {
         }
     }
 
+    public void addEntities( Results results){
+
+        if(entities == null){
+            //init();
+            entities = new ArrayList<>();
+            level = Level.CORE_PROPERTIES;
+        }
+
+        if( results.getEntities().size() > 0){
+
+            entities.addAll(results.getEntities());
+
+        }
+
+    }
+
 
     /** Remove the passed in results from the current results */
     public void subtract( Results results ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index aca10cf..d4f3529 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -409,20 +409,28 @@ public class Notification extends TypedEntity {
 
                     if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE)
)){
 
-                        Query usersQuery = new Query();
+                        final Query usersQuery = new Query();
                         usersQuery.setQl("select *");
                         usersQuery.setCollection("users");
                         usersQuery.setLimit(100);
 
-                        Query devicesQuery = new Query();
+                        final Query devicesQuery = new Query();
                         devicesQuery.setQl("select *");
                         devicesQuery.setCollection("devices");
-                        usersQuery.setLimit(100);
+                        devicesQuery.setLimit(100);
 
 
                         // build up the chain so the proper iterators can be used later
-                        pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery );
+                        pathQuery = pathQuery.chain( usersQuery );//.chain( devicesQuery
);
+
+                    }else if(pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(User.ENTITY_TYPE))){
+
+                        final Query devicesQuery = new Query();
+                        devicesQuery.setQl("select *");
+                        devicesQuery.setCollection("devices");
+                        devicesQuery.setLimit(100);
 
+                        pathQuery = pathQuery.chain( devicesQuery );
                     }
 
                 } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/cc3cbfee/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5ce1b93..1cbb2c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,10 +19,7 @@ package org.apache.usergrid.services.notifications.impl;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -108,6 +105,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
+            logger.info("notification {} start query", notification.getUuid());
+
 
 
             // the main iterator can use graph traversal or index querying
@@ -185,30 +184,80 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
             final Map<String, Object> filters = notification.getFilters();
 
-            Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
-                .flatMap(entity -> {
 
-                    if(entity.getType().equals(Device.ENTITY_TYPE)){
-                        return Observable.from(Collections.singletonList(entity));
-                    }
 
-                    // if it's not a device, drill down and get them
-                    return Observable.from(getDevices(entity));
+            Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator))
+//                .flatMap(entity -> {
+//
+//                    if(entity.getType().equals(Device.ENTITY_TYPE)){
+//                        return Observable.from(Collections.singletonList(entity));
+//                    }
+//
+//                    // if it's not a device, drill down and get them
+//                    return Observable.from(getDevices(entity));
+//
+//                })
+                .distinct()
+                .flatMap( entityRef -> {
+
+                    return Observable.just(entityRef).flatMap(ref->{
+
+                        List<Entity> entities = new ArrayList<>();
+
+                                Query devicesQuery = new Query();
+                                devicesQuery.setCollection("devices");
+                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+
+                                try {
+
+                                   entities = em.searchCollection(new SimpleEntityRef("user",
ref), devicesQuery.getCollection(), devicesQuery).getEntities();
+
+                                }catch (Exception e){
+
+                                    logger.error("Unable to load devices for user: {}", ref);
+                                    return Observable.empty();
+                                }
 
-                })
-                .distinct(ref -> ref.getUuid() )
-                .map( entityRef -> entityRef.getUuid() )
-                .buffer(10)
-                .flatMap( uuids -> {
 
-                    if(logger.isTraceEnabled()) {
-                        logger.trace("Processing batch of {} device(s)", uuids.size());
-                    }
 
 
-                    return Observable.from(em.getEntities(uuids, "device"))
+//                            if( ref.getType().equals(User.ENTITY_TYPE)){
+//
+//                                Query devicesQuery = new Query();
+//                                devicesQuery.setCollection("devices");
+//                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
+//
+//                                try {
+//
+//                                   entities = em.searchCollection(new SimpleEntityRef("user",
ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
+//
+//                                }catch (Exception e){
+//
+//                                    logger.error("Unable to load devices for user: {}",
ref.getUuid());
+//                                    return Observable.empty();
+//                                }
+//
+//
+//                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
+//
+//                                try{
+//                                    entities.add(em.get(ref));
+//
+//                                }catch(Exception e){
+//
+//                                    logger.error("Unable to load device: {}", ref.getUuid());
+//                                    return Observable.empty();
+//
+//                                }
+//
+//                            }
+                        return Observable.from(entities);
+
+                        })
                         .filter( device -> {
 
+                            logger.info("Filtering device: {}", device.getUuid());
+
                             if(logger.isTraceEnabled()) {
                                 logger.trace("Filtering device: {}", device.getUuid());
                             }
@@ -233,7 +282,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                                 }
 
                                 if(logger.isTraceEnabled()) {
-                                    logger.trace("Push notification filter matched for notification
{}, so removing from notification",
+                                    logger.trace("Push notification filter did not match
for notification {}, so removing from notification",
                                         device.getUuid(), notification.getUuid());
                                 }
                                 return false;
@@ -271,7 +320,20 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
 
 
                         }).subscribeOn(Schedulers.io());
-                }, 10)
+
+                }, 100)
+                //.map( entityRef -> entityRef.getUuid() )
+                //.buffer(10)
+//                .flatMap( uuids -> {
+//
+//                    if(logger.isTraceEnabled()) {
+//                        logger.trace("Processing batch of {} device(s)", uuids.size());
+//                    }
+//
+//
+//                    return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io());
+//
+//                }, 10)
 
                 .doOnError(throwable -> {
 


Mime
View raw message