usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject usergrid git commit: Changes observable pattern so that the original hot Observable (the SQS consumer) will receive errors in the catch
Date Thu, 01 Oct 2015 23:15:46 GMT
Repository: usergrid
Updated Branches:
  refs/heads/2.1-release 44ae71d72 -> e0d3cd57c


Changes observable pattern so that the original hot Observable (the SQS consumer) will receive
errors in the catch


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e0d3cd57
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e0d3cd57
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e0d3cd57

Branch: refs/heads/2.1-release
Commit: e0d3cd57c81770020f7242ee79e74ca11dc84081
Parents: 44ae71d
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Oct 1 17:15:44 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Oct 1 17:15:44 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 30 +++++++-------------
 1 file changed, 11 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/e0d3cd57/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 2ba177a..92faed4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -227,7 +227,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    private void handleMessages( final List<QueueMessage> messages ) {
+    private Observable<IndexEventResult> handleMessages( final List<QueueMessage>
messages ) {
         if (logger.isDebugEnabled()) {
             logger.debug("handleMessages with {} message", messages.size());
         }
@@ -268,7 +268,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
             }
         });
 
-        masterObservable
+        return masterObservable
             //remove unsuccessful
             .filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage()
                                                                                        .isPresent()
)
@@ -281,22 +281,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
                     indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get()
) );
 
                 //ack after successful completion of the operation.
-                return indexProducer.put( combined ).flatMap(
-                    operationResult -> Observable.from( indexEventResults ) )
+                return indexProducer.put( combined ).flatMap( operationResult -> Observable.from(
indexEventResults ) )
                     //ack each message, but only if we didn't error.  If we did, we'll want
to log it and
                                     .map( indexEventResult -> {
                                         ack( indexEventResult.queueMessage );
                                         return indexEventResult;
-                                    } ).doOnError( error ->
-                    {
-                        logger.error( "Unable to write messages to elasticsearch.  Messages
not acked", error );
-                    });
-            } )
-                //flat map the ops so they are back to individual
-
-            //overwhelms ES
-            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
-            .subscribe();
+                                    } );
+            } );
+
     }
 
     //transform index operation to
@@ -367,7 +359,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message)
{
 
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex"
);
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
 
@@ -397,7 +389,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message)
{
 
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete"
);
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
 
@@ -500,7 +492,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private void startWorker() {
         synchronized (mutex) {
 
-            Observable<List<QueueMessage>> consumer =
+            Observable<IndexEventResult> consumer =
                     Observable.create(new Observable.OnSubscribe<List<QueueMessage>>()
{
                         @Override
                         public void call(final Subscriber<? super List<QueueMessage>>
subscriber) {
@@ -541,7 +533,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         }
                     })
                             //this won't block our read loop, just reads and proceeds
-                            .doOnNext(this::handleMessages).subscribeOn(Schedulers.newThread());
+                            .flatMap( messages -> handleMessages( messages ) ).subscribeOn(
Schedulers.newThread() );
 
             //start in the background
 
@@ -553,7 +545,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince)
{
         //change to id scope to avoid serialization issues
-        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id), updatedSince));
+        offer( new EntityIndexEvent( new EntityIdScope( applicationScope, id ), updatedSince
) );
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {


Mime
View raw message