usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [1/6] incubator-usergrid git commit: Add batching to re-index requests.
Date Thu, 02 Jul 2015 19:28:56 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 136135d0c -> 6f2d0bf54


Add batching to re-index requests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/446a8e8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/446a8e8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/446a8e8a

Branch: refs/heads/two-dot-o-dev
Commit: 446a8e8af99aca59934400caa64c3b021bcb460e
Parents: 03d0534
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Wed Jul 1 17:23:34 2015 -0700
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Wed Jul 1 17:23:34 2015 -0700

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  6 ++++-
 .../asyncevents/AmazonAsyncEventService.java    | 25 +++++++++++++++++++-
 .../asyncevents/InMemoryAsyncEventService.java  | 13 ++++++++--
 .../index/IndexProcessorFig.java                |  8 ++++++-
 .../corepersistence/index/ReIndexAction.java    | 11 +++++++--
 .../index/ReIndexServiceImpl.java               | 24 +++++++++----------
 6 files changed, 67 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d6ce318..b5b0e8c 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -259,6 +259,10 @@ elasticsearch.index_prefix=elasticsearch
 #
 #elasticsearch.buffer_size=1000
 
+# Set the maximum buffer size to use when performing re-index requests.
+#
+#elasticsearch.reindex.buffer_size=1000
+
 # Set the batch size to use when sending batched index write requests to Elasticsearch.
 #
 #elasticsearch.batch_size=1000
@@ -322,7 +326,7 @@ usergrid.twodoto.appinfo.migration=true
 
 # The number of worker threads used to read index write requests from the queue.
 #
-#elasticsearch.worker_count=1
+#elasticsearch.worker_count=8
 
 # Set the number of worker threads used for processing index write requests to
 # Elasticsearch from the buffer.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b32d594..e652405 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.Histogram;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.corepersistence.CpEntityManager;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -143,6 +144,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
+    private void offerBatch(final List operations){
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessages(operations);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
 
     /**
      * Take message from SQS
@@ -427,9 +441,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
-    @Override
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince)
{
         //change to id scope to avoid serialization issues
         offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
     }
+
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+
+        List batch = new ArrayList<EdgeScope>();
+        for ( EdgeScope e : edges){
+            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+        }
+        //change to id scope to avoid serialization issues
+        offerBatch(batch);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index f035b43..b8e544d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -24,7 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -35,6 +35,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
+import java.util.List;
 
 
 /**
@@ -95,13 +96,21 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     }
 
 
-    @Override
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince
) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope,
id, updatedSince );
 
         run(eventBuilder.index( entityIndexOperation ));
     }
 
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+        for ( EdgeScope e : edges){
+            final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
+                e.getEdge().getTargetNode(), updatedSince);
+
+            run(eventBuilder.index (entityIndexOperation));
+        }
+
+    }
 
     public void run( Observable<?> observable ) {
         //start it in the background on an i/o thread

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c3942cc..34f8cb5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,6 +40,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String INDEX_QUEUE_VISIBILITY_TIMEOUT = "elasticsearch.queue_visibility_timeout";
 
+    String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
+
 
     /**
      * Set the amount of time to wait when Elasticsearch rejects a requests before
@@ -66,7 +68,7 @@ public interface IndexProcessorFig extends GuicyFig {
     /**
      * The number of worker threads used to read index write requests from the queue.
      */
-    @Default( "1" )
+    @Default( "8" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 
@@ -83,6 +85,10 @@ public interface IndexProcessorFig extends GuicyFig {
     @Key("elasticsearch.reindex.flush.interval")
     int getUpdateInterval();
 
+    @Default("1000")
+    @Key( REINDEX_BUFFER_SIZE )
+    int getReindexBufferSize();
+
     /**
      * Flag to resolve the LOCAL queue implementation service synchronously.
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 8a74540..5e201fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,15 +20,15 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
+import java.util.List;
 
 
 /**
  * Callback to perform an index operation based on an scope during bulk re-index operations
  */
-@FunctionalInterface
 public interface ReIndexAction {
 
     /**
@@ -37,4 +37,11 @@ public interface ReIndexAction {
      * @param id
      */
     void index( final ApplicationScope applicationScope, final Id id, final long updatedSince
);
+
+    /**
+     * Index a batch list of entities.
+     * @param edges
+     * @param updatedSince
+     */
+    void indexBatch ( final List<EdgeScope> edges, final long updatedSince);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/446a8e8a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 5ee545e..1353982 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -120,28 +120,26 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE
);
 
-        //create an observable that loads each entity and indexes it, start it running with
publish
-        final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities(
applicationScopes,
-            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+        // create an observable that loads a batch to be indexed
 
-            //for each edge, create our scope and index on it
-            .doOnNext( edge -> {
+        final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities(
applicationScopes,
+            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+            .buffer( indexProcessorFig.getReindexBufferSize())
+            .doOnNext(edges -> {
 
                 if(logger.isInfoEnabled()) {
-                    logger.info("Queueing {} {}", edge.getApplicationScope(), edge.getEdge().getTargetNode());
+                    logger.info("Sending batch of {} to be indexed.", edges.size());
                 }
+                indexService.indexBatch(edges, modifiedSince);
 
-                indexService.index(edge.getApplicationScope(),edge.getEdge().getTargetNode(),
modifiedSince);
-
-            } );
+            });
 
 
         //start our sampler and state persistence
         //take a sample every sample interval to allow us to resume state with minimal loss
-        runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
-            //create our flushing collector and flush the edge scopes to it
-            .collect( () -> new FlushingCollector( jobId ),
-                ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer(
edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+        //create our flushing collector and flush the edge scopes to it
+        runningReIndex.collect(() -> new FlushingCollector(jobId),
+            ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes))).doOnNext(
flushingCollector-> flushingCollector.complete() )
                 //subscribe on our I/O scheduler and run the task
             .subscribeOn( Schedulers.io() ).subscribe();
 


Mime
View raw message