usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [4/7] incubator-usergrid git commit: refresh to observable
Date Fri, 01 May 2015 16:50:11 GMT
refresh to observable


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5b8a4dea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5b8a4dea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5b8a4dea

Branch: refs/heads/USERGRID-609
Commit: 5b8a4deac24b23a08635150a708c9a52ea3cbb96
Parents: a05b46d
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Thu Apr 30 10:15:53 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Thu Apr 30 10:15:53 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |  5 ++
 .../index/impl/IndexRefreshCommandImpl.java     | 88 +++++++++++---------
 2 files changed, 52 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5b8a4dea/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index e0cacb8..961719d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -190,6 +190,11 @@ public interface IndexFig extends GuicyFig {
     @Default("25")
     int maxRefreshSearches();
 
+    @Key("elasticsearch.refresh_wait_ms")
+    @Default("5000")
+    long refreshWaitTime();
+
+
     @Default( "5000" )
     @Key( ELASTICSEARCH_WRITE_TIMEOUT )
     long getWriteTimeout();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5b8a4dea/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 06e6219..888a805 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,9 +20,12 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.elastictranscoder.model.TimeSpan;
 import org.apache.usergrid.persistence.core.util.StringUtils;
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -89,7 +92,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
 
 
     @Override
-    public Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
+    public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes)
{
         final long start = System.currentTimeMillis();
 
         Timer.Context refreshTimer = timer.time();
@@ -130,51 +133,54 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand
{
 
 
         //start our processing immediately
-        final Observable<IndexRefreshCommandInfo> future = Async.toAsync( () ->
{
-            final Observable combined =  Observable.concat(addRecord, refresh);
-            combined.toBlocking().lastOrDefault(null);
-            try {
-                boolean found = false;
-                for ( int i = 0; i < indexFig.maxRefreshSearches(); i++ ) {
-                    final SearchResponse response = builder.execute().get();
-                    if (response.getHits().totalHits() > 0) {
-                        found = true;
-                        break;
+        final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
+            final Observable<IndexRefreshCommandInfo> infoObservable = Observable
+                .range(0, indexFig.maxRefreshSearches())
+                .map(i ->
+                {
+                    try {
+                        return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits()
> 0, System.currentTimeMillis() - start);
+                    } catch (Exception ee) {
+                        logger.error("Failed during refresh search for " + uuid, ee);
+                        throw new RuntimeException("Failed during refresh search for " +
uuid, ee);
                     }
-                }
+                })
+                .takeWhile(info -> info.hasFinished())
+                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS );
 
-                return new IndexRefreshCommandInfo(found,System.currentTimeMillis() - start);
-            }
-            catch ( Exception ee ) {
-                logger.error( "Failed during refresh search for " + uuid, ee );
-                throw new RuntimeException( "Failed during refresh search for " + uuid, ee
);
-            }
-        }, rxTaskScheduler.getAsyncIOScheduler() ).call();
+            final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
+            combined.toBlocking().last();
 
+            final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
 
-        return future.doOnNext( found -> {
-            if ( !found.hasFinished() ) {
-                logger.error("Couldn't find record during refresh uuid: {} took ms:{} ",
uuid, found.getExecutionTime());
-            }else{
-                logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
-            }
-        } ).doOnCompleted(() -> {
-            //clean up our data
-            String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
-            DeIndexOperation deIndexRequest =
-                new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
-
-            //delete the item
-            IndexOperationMessage indexOperationMessage =
-                new IndexOperationMessage();
-            indexOperationMessage.addDeIndexRequest( deIndexRequest );
-            producer.put( indexOperationMessage );
-
-            refreshTimer.stop();
-        });
-    }
+            return info;
+
+        },rxTaskScheduler.getAsyncIOScheduler()).call();
 
-    private Observable<Boolean> refresh(final String[] indexes) {
+
+            return future.doOnNext(found -> {
+                if (!found.hasFinished()) {
+                    logger.error("Couldn't find record during refresh uuid: {} took ms:{}
", uuid, found.getExecutionTime());
+                } else {
+                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid,
found.getExecutionTime());
+                }
+            }).doOnCompleted(() -> {
+                //clean up our data
+                String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
+                DeIndexOperation deIndexRequest =
+                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
+
+                //delete the item
+                IndexOperationMessage indexOperationMessage =
+                    new IndexOperationMessage();
+                indexOperationMessage.addDeIndexRequest(deIndexRequest);
+                producer.put(indexOperationMessage);
+
+                refreshTimer.stop();
+            });
+        }
+
+        private Observable<Boolean> refresh(final String[] indexes) {
 
         return Observable.create(subscriber -> {
             try {


Mime
View raw message