usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject incubator-usergrid git commit: add scheduler to search observables
Date Wed, 08 Jul 2015 23:29:17 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 28bc4ca8e -> 7a72c5b19


add scheduler to search observables


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a72c5b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a72c5b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a72c5b1

Branch: refs/heads/two-dot-o-dev
Commit: 7a72c5b19ded2ba19f27b8370971a78f8b229fc0
Parents: 28bc4ca
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Wed Jul 8 17:29:01 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Wed Jul 8 17:29:01 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/pipeline/Pipeline.java |  4 +++-
 .../pipeline/read/collect/ResultsPageCollector.java | 16 +++++++++++++---
 .../results/ObservableQueryExecutor.java            |  6 ++++--
 3 files changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index dc95178..81f857f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -30,6 +30,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import rx.Observable;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -75,7 +76,8 @@ public class Pipeline<InputType> {
 
         //set our observable to start at the application
         final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(),
Optional.absent() );
-        this.currentObservable = Observable.just( filter );
+
+        this.currentObservable = Observable.just( filter ).subscribeOn(Schedulers.io());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
index 91773c4..29a7134 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -61,9 +61,19 @@ public class ResultsPageCollector<T> extends AbstractFilter<FilterResult<T>,
Res
 
         final int limit = pipelineContext.getLimit();
 
-        return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from(
buffer ).collect(
-            () -> new ResultsPageWithCursorCollector( limit ), ( collector, element )
-> collector.add( element ) ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
-            new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
) );
+        return filterResultObservable
+            .buffer( limit )
+            .flatMap( buffer
+                -> Observable
+                    .from( buffer )
+                    .collect(() -> new ResultsPageWithCursorCollector( limit ), ( collector,
element ) -> collector.add( element ) )
+            )
+            .map( resultsPageCollector ->
+                new ResultsPage(
+                    resultsPageCollector.results,
+                    new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit()
+                )
+            );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a72c5b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index fce1fb2..a20b84f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 
 import rx.Observable;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -97,11 +98,12 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor
{
     public boolean hasNext() {
 
         if ( iterator == null ) {
-            iterator = resultsObservable.toBlocking().getIterator();
+            iterator =  resultsObservable.toBlocking().getIterator();
         }
 
+        boolean hasNext = iterator.hasNext();
 
-        return iterator.hasNext();
+        return hasNext;
     }
 
 


Mime
View raw message