fix subscribe for messages Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1a1d42e1 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1a1d42e1 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1a1d42e1 Branch: refs/heads/master Commit: 1a1d42e1f53cabf433442c17f614f9fcae418a22 Parents: b437f61 Author: Shawn Feldman Authored: Mon Oct 5 16:28:04 2015 -0600 Committer: Shawn Feldman Committed: Mon Oct 5 16:28:04 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/1a1d42e1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index e215d48..bf29c5a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -285,13 +285,9 @@ public class AmazonAsyncEventService implements AsyncEventService { //ack after successful completion of the operation. - return indexProducer.put( combined ) - .flatMap( operationResult -> Observable.from( indexEventResults ) ) - //ack each message, but only if we didn't error. If we did, we'll want to log it and - .map( indexEventResult -> { - ack( indexEventResult.queueMessage ); - return indexEventResult; - } ); + return indexProducer.put(combined) + .flatMap(operationResult -> Observable.from(indexEventResults)); + } ); } @@ -538,7 +534,15 @@ public class AmazonAsyncEventService implements AsyncEventService { } }) //this won't block our read loop, just reads and proceeds - .flatMap( messages -> handleMessages( messages ) ).subscribeOn( Schedulers.newThread() ); + .map(messages -> + handleMessages(messages) + .map(indexEventResult -> { + ack( indexEventResult.getQueueMessage() ); + return indexEventResult; + }) + .toBlocking().lastOrDefault(null) + )//ack each message, but only if we didn't error. If we did, we'll want to log it and + .subscribeOn( Schedulers.newThread() ); //start in the background