usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [6/7] incubator-usergrid git commit: moving back to refresh its faster
Date Fri, 01 May 2015 16:50:13 GMT
moving back to refresh its faster


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/faafa378
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/faafa378
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/faafa378

Branch: refs/heads/USERGRID-609
Commit: faafa378524ced0a85fd4cb3e989c79e02dda0dd
Parents: ac6b394
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Thu Apr 30 13:15:28 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Thu Apr 30 13:15:28 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |   2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 174 ++++++++-----------
 2 files changed, 72 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/faafa378/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 961719d..39714f2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -187,7 +187,7 @@ public interface IndexFig extends GuicyFig {
     String getClientType();
 
     @Key("elasticsearch.refresh_search_max")
-    @Default("25")
+    @Default("10")
     int maxRefreshSearches();
 
     @Key("elasticsearch.refresh_wait_ms")

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/faafa378/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index c8e96db..71a05a0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -92,115 +92,83 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
 
 
     @Override
-    public  Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
-        Timer.Context refreshTimer = timer.time();
+    public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes)
{
+
+
         final long start = System.currentTimeMillis();
 
+        Timer.Context refreshTimer = timer.time();
+        //id to hunt for
+        final UUID uuid = UUIDUtils.newTimeUUID();
+        final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
+        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+        final Id appId = new SimpleId( "ug_refresh_index" );
+        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
+        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE,
uuid.timestamp() );
+        final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
+        final Map<String, Object> entityData = EntityToMapConverter.convert( appScope,
edge, entity );
+        final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
+        //add a tracer record
+        IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData
);
+        //save the item
+        final IndexOperationMessage message = new IndexOperationMessage();
+        message.addIndexRequest( indexRequest );
+        final Observable addRecord = producer.put(message);
+        final Observable refresh = refresh(indexes);
+
+        /**
+         * We have to search.  Get by ID returns immediately, even if search isn't ready,
therefore we have to search
+         */
+        //set our filter for entityId fieldname
+        final SearchRequestBuilder builder =
+            esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
+                .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
entityId));
+
+
+        //start our processing immediately
         final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
-            Boolean worked = refresh(indexes).toBlocking().last();
-            final IndexRefreshCommandInfo info = new IndexRefreshCommandInfo(worked, System.currentTimeMillis()
- start);
+            final Observable<IndexRefreshCommandInfo> infoObservable = Observable
+                .range(0, indexFig.maxRefreshSearches())
+                .map(i ->
+                {
+                    try {
+                        return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits()
> 0, System.currentTimeMillis() - start);
+                    } catch (Exception ee) {
+                        logger.error("Failed during refresh search for " + uuid, ee);
+                        throw new RuntimeException("Failed during refresh search for " +
uuid, ee);
+                    }
+                })
+                .takeWhile(info -> info.hasFinished())
+                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS);
+
+            final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
+            combined.toBlocking().last();
+            final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
             return info;
+        },rxTaskScheduler.getAsyncIOScheduler()).call();
 
-        }, rxTaskScheduler.getAsyncIOScheduler()).call();
-        return future.doOnNext(found -> {
-            if (!found.hasFinished()) {
-                logger.error("Couldn't find record during refresh  took ms:{} ", found.getExecutionTime());
-            } else {
-                logger.info("found record during refresh  took ms:{} ", found.getExecutionTime());
-            }
-        }).doOnCompleted(() -> {
-            refreshTimer.stop();
-        });
-    }
 
-    private void insertRecord(){
-
-//        final long start = System.currentTimeMillis();
-//
-//        Timer.Context refreshTimer = timer.time();
-//        //id to hunt for
-//        final UUID uuid = UUIDUtils.newTimeUUID();
-//
-//        final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" )
);
-//        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
-//        final Id appId = new SimpleId( "ug_refresh_index" );
-//        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
-//        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE,
uuid.timestamp() );
-//
-//
-//        final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
-//
-//        final Map<String, Object> entityData = EntityToMapConverter.convert( appScope,
edge, entity );
-//
-//        final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
-//
-//        //add a tracer record
-//        IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId,
entityData );
-//
-//        //save the item
-//        final IndexOperationMessage message = new IndexOperationMessage();
-//        message.addIndexRequest( indexRequest );
-//        final Observable addRecord = producer.put(message);
-//        final Observable refresh = refresh(indexes);
-//
-//        /**
-//         * We have to search.  Get by ID returns immediately, even if search isn't ready,
therefore we have to search
-//         */
-//
-//        final SearchRequestBuilder builder =
-//            esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
-//
-//                //set our filter for entityId fieldname
-//        .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
-//
-//
-//        //start our processing immediately
-//        final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() ->
{
-//            final Observable<IndexRefreshCommandInfo> infoObservable = Observable
-//                .range(0, indexFig.maxRefreshSearches())
-//                .map(i ->
-//                {
-//                    try {
-//                        return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits()
> 0, System.currentTimeMillis() - start);
-//                    } catch (Exception ee) {
-//                        logger.error("Failed during refresh search for " + uuid, ee);
-//                        throw new RuntimeException("Failed during refresh search for "
+ uuid, ee);
-//                    }
-//                })
-//                .takeWhile(info -> info.hasFinished())
-//                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS);
-//
-//            final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
-//            combined.toBlocking().last();
-//
-//            final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
-//
-//            return info;
-//
-//        },rxTaskScheduler.getAsyncIOScheduler()).call();
-//
-//
-//            return future.doOnNext(found -> {
-//                if (!found.hasFinished()) {
-//                    logger.error("Couldn't find record during refresh uuid: {} took ms:{}
", uuid, found.getExecutionTime());
-//                } else {
-//                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid,
found.getExecutionTime());
-//                }
-//            }).doOnCompleted(() -> {
-//                //clean up our data
-//                String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
-//                DeIndexOperation deIndexRequest =
-//                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
-//
-//                //delete the item
-//                IndexOperationMessage indexOperationMessage =
-//                    new IndexOperationMessage();
-//                indexOperationMessage.addDeIndexRequest(deIndexRequest);
-//                producer.put(indexOperationMessage);
-//
-//                refreshTimer.stop();
-//            });
-    }
+            return future.doOnNext(found -> {
+                if (!found.hasFinished()) {
+                    logger.error("Couldn't find record during refresh uuid: {} took ms:{}
", uuid, found.getExecutionTime());
+                } else {
+                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid,
found.getExecutionTime());
+                }
+            }).doOnCompleted(() -> {
+                //clean up our data
+                String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
+                DeIndexOperation deIndexRequest =
+                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
+
+                //delete the item
+                IndexOperationMessage indexOperationMessage =
+                    new IndexOperationMessage();
+                indexOperationMessage.addDeIndexRequest(deIndexRequest);
+                producer.put(indexOperationMessage);
+
+                refreshTimer.stop();
+            });
+        }
 
         private Observable<Boolean> refresh(final String[] indexes) {
 


Mime
View raw message