usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject usergrid git commit: Added a dumb retry method that get the entities indexed locally, but not safely.
Date Fri, 02 Oct 2015 23:47:07 GMT
Repository: usergrid
Updated Branches:
  refs/heads/index-fix [created] dad27e139


Added a dumb retry method that get the entities indexed locally, but not safely.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/dad27e13
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/dad27e13
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/dad27e13

Branch: refs/heads/index-fix
Commit: dad27e139b549889d1d41a252a732896677b5b2a
Parents: e0d3cd5
Author: George Reyes <grey@apache.org>
Authored: Fri Oct 2 16:47:02 2015 -0700
Committer: George Reyes <grey@apache.org>
Committed: Fri Oct 2 16:47:02 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  8 ++-
 .../index/impl/EsIndexProducerImpl.java         | 55 ++++++++++++++++++--
 2 files changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/dad27e13/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 92faed4..3c0e09e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -226,19 +226,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     }
 
-
+///TODO: Document what this does. And document what the tmobile build does. Then compare
the two.
     private Observable<IndexEventResult> handleMessages( final List<QueueMessage>
messages ) {
         if (logger.isDebugEnabled()) {
             logger.debug("handleMessages with {} message", messages.size());
         }
 
         Observable<IndexEventResult> masterObservable = Observable.from(messages).flatMap(message
-> {
+            //Get the meat of the message.
             final AsyncEvent event = (AsyncEvent) message.getBody();
 
             logger.debug("Processing {} event", event);
 
+            //if the event is null then return nothing.
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
+                //Emit a single index event result that contains the message that is empty.
                 return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),
false));
             }
             try {
@@ -269,13 +272,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
         });
 
         return masterObservable
-            //remove unsuccessful
+            //remove unsuccessful events //grab only successful events that are present.
             .filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage()
                                                                                        .isPresent()
)
             //take the max
             .buffer( MAX_TAKE )
             //map them to index results and return them
             .flatMap( indexEventResults -> {
+
                 IndexOperationMessage combined = new IndexOperationMessage();
                 indexEventResults.stream().forEach(
                     indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get()
) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/dad27e13/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 869b75a..f178263 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.index.impl;
 
 
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.codahale.metrics.Histogram;
@@ -40,6 +41,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
+import rx.functions.Func1;
 
 
 /**
@@ -128,6 +130,7 @@ public class EsIndexProducerImpl implements IndexProducer {
                 }))
                 //write them
             .doOnNext(bulkRequestBuilder -> sendRequest(bulkRequestBuilder));
+          //  .retryWhen( new RetryWithDelay( 250,1000 ) );
 
 
         //now that we've processed them all, ack the futures after our last batch comes through
@@ -147,6 +150,38 @@ public class EsIndexProducerImpl implements IndexProducer {
         });
     }
 
+    //stolen shamelessly from http://stackoverflow.com/questions/22066481/rxjava-can-i-use-retry-but-with-delay?rq=1
+    public class RetryWithDelay implements Func1<Observable<? extends Throwable>,
Observable<?>> {
+
+        private final int maxRetries;
+        private final int retryDelayMillis;
+        private int retryCount;
+
+        public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
+            this.maxRetries = maxRetries;
+            this.retryDelayMillis = retryDelayMillis;
+            this.retryCount = 0;
+        }
+
+        @Override
+        public Observable<?> call(Observable<? extends Throwable> attempts) {
+            return attempts
+                .flatMap(new Func1<Throwable, Observable<?>>() {
+                    @Override
+                    public Observable<?> call(Throwable throwable) {
+                        if (++retryCount < maxRetries) {
+                            // When this Observable calls onNext, the original
+                            // Observable will be retried (i.e. re-subscribed).
+                            return Observable.timer(retryDelayMillis,
+                                TimeUnit.MILLISECONDS);
+                        }
+
+                        // Max retries hit. Just pass the error along.
+                        return Observable.error(throwable);
+                    }
+                });
+        }
+    }
 
     /*
 
@@ -198,9 +233,23 @@ public class EsIndexProducerImpl implements IndexProducer {
                 log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ",
response.getId(),
                     response.getType(), response.getIndex(), response.getFailureMessage()
);
 
-                error = true;
-
-                errorString.append( response.getFailureMessage() ).append( "\n" );
+                //too many requests error and we're overloading es.
+                if(response.getFailure().getStatus().getStatus() == 429){
+                    log.error("Overloading ES, going to retry.");
+                                       // System.out.println("retrying");
+                         try {
+                             Thread.sleep( 800 );
+                         }
+                         catch ( InterruptedException e ) {
+                             e.printStackTrace();
+                         }
+                         sendRequest( bulkRequest );
+                                }
+               // else {
+                    error = true;
+
+                    errorString.append( response.getFailureMessage() ).append( "\n" );
+               // }
             }
         }
 


Mime
View raw message