usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [17/50] [abbrv] incubator-usergrid git commit: Renamed SQSAsyncEventService to AmazonAsyncEventService, implemented functions for Async indexing
Date Thu, 28 May 2015 12:53:22 GMT
Renamed SQSAsyncEventService to AmazonAsyncEventService, implemented functions for Async indexing


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/be0e43c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/be0e43c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/be0e43c1

Branch: refs/heads/USERGRID-669
Commit: be0e43c1975045441947821f98eab8ce88382f93
Parents: 6d0ebd0
Author: Jeff West <jwest@apigee.com>
Authored: Tue May 26 09:01:58 2015 -0700
Committer: Jeff West <jwest@apigee.com>
Committed: Tue May 26 09:01:58 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 425 +++++++++++++++++++
 .../asyncevents/SQSAsyncEventService.java       | 356 ----------------
 2 files changed, 425 insertions(+), 356 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/be0e43c1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
new file mode 100644
index 0000000..2cb6b3b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.CpEntityManager;
+import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+@Singleton
+public class AmazonAsyncEventService implements AsyncEventService {
+
+
+    private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
+    private static final int MAX_TAKE = 10;
+    private static final String QUEUE_NAME = "es_queue";
+
+    private final QueueManager queue;
+    private final IndexProcessorFig indexProcessorFig;
+    private final IndexService indexService;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final RxTaskScheduler rxTaskScheduler;
+
+    private final Timer readTimer;
+    private final Timer writeTimer;
+    private final Timer messageProcessingTimer;
+
+    private final Object mutex = new Object();
+
+    private final Counter indexErrorCounter;
+    private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong inFlight = new AtomicLong();
+
+    //the actively running subscription
+    private List<Subscription> subscriptions = new ArrayList<>();
+
+
+    @Inject
+    public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
+                                   final IndexProcessorFig indexProcessorFig,
+                                   final MetricsFactory metricsFactory,
+                                   final IndexService indexService,
+                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                   final RxTaskScheduler rxTaskScheduler) {
+
+        this.indexService = indexService;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
+
+        final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+        this.queue = queueManagerFactory.getQueueManager(queueScope);
+        this.indexProcessorFig = indexProcessorFig;
+
+        this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "write");
+        this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "read");
+        this.messageProcessingTimer = metricsFactory.getTimer(AmazonAsyncEventService.class,
"message.processing");
+        this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class,
"error");
+
+
+        //wire up the gauge of inflight message
+        metricsFactory.addGauge(AmazonAsyncEventService.class, "inflight.meter", new Gauge<Long>()
{
+            @Override
+            public Long getValue() {
+                return inFlight.longValue();
+            }
+        });
+
+        start();
+    }
+
+
+    /**
+     * Offer the EntityIdScope to SQS
+     */
+    private void offer(final Object operation) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessage(operation);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Take message from SQS
+     */
+    public List<QueueMessage> take() {
+
+        //SQS doesn't support more than 10
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return queue.getMessages(MAX_TAKE,
+                indexProcessorFig.getIndexQueueVisibilityTimeout(),
+                indexProcessorFig.getIndexQueueTimeout(),
+                AsyncEvent.class);
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Ack message in SQS
+     */
+    public void ack(final List<QueueMessage> messages) {
+
+        /**
+         * No op
+         */
+        if (messages.size() == 0) {
+            return;
+        }
+
+        queue.commitMessages(messages);
+    }
+
+    /**
+     * Ack message in SQS
+     */
+    public void ack(final QueueMessage message) {
+
+        queue.commitMessage(message);
+    }
+
+    private void handleMessages(final List<QueueMessage> messages) {
+
+        if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size());
+
+        for (QueueMessage message : messages) {
+            final AsyncEvent event = (AsyncEvent) message.getBody();
+
+            if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType());
+
+            switch (event.getEventType()) {
+
+                case EDGE_DELETE:
+                    handleEdgeDelete(message);
+                    break;
+
+                case EDGE_INDEX:
+                    handleEdgeIndex(message);
+                    break;
+
+                case ENTITY_DELETE:
+                    handleEntityDelete(message);
+                    break;
+
+                case ENTITY_INDEX:
+                    handleEntityIndexUpdate(message);
+                    break;
+
+                default:
+                    logger.error("Unknown EventType: {}", event.getEventType());
+
+            }
+        }
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
+                                       final Entity entity) {
+
+        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId())));
+    }
+
+
+    public void handleEntityIndexUpdate(final QueueMessage message) {
+
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+
+        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
+        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX,
String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+        final EntityIdScope entityIdScope = event.getEntityIdScope();
+        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+
+        ecm.load(entityIdScope.getId())
+            .first()
+            .flatMap(entity -> indexService.indexEntity(applicationScope, entity))
+            .doOnNext(ignore -> ack(message)).subscribe();
+    }
+
+
+    @Override
+    public void queueNewEdge(final ApplicationScope applicationScope,
+                             final Entity entity,
+                             final Edge newEdge) {
+
+        EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);
+
+        offer(operation);
+    }
+
+    public void handleEdgeIndex(final QueueMessage message) {
+
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+
+        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex");
+        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX,
String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType()));
+
+        final ApplicationScope applicationScope = event.getApplicationScope();
+        final Edge edge = event.getEdge();
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+
+        ecm.load(event.getEntityId())
+            .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge))
+            .doOnNext(ignore -> ack(message)).subscribe();
+    }
+
+    @Override
+    public void queueDeleteEdge(final ApplicationScope applicationScope,
+                                final Edge edge) {
+
+        offer(new EdgeDeleteEvent(applicationScope, edge));
+    }
+
+    public void handleEdgeDelete(final QueueMessage message) {
+
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+
+        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete");
+        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE,
String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType()));
+
+        final ApplicationScope applicationScope = event.getApplicationScope();
+        final Edge edge = event.getEdge();
+
+        if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}",
applicationScope, edge);
+
+        indexService.deleteIndexEdge(applicationScope, edge)
+            .doOnNext(ignore -> ack(message)).subscribe();
+    }
+
+
+    @Override
+    public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId)
{
+
+        offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId)));
+    }
+
+    public void handleEntityDelete(final QueueMessage message) {
+
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
+        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType()));
+
+        final ApplicationScope applicationScope = event.getApplicationScope();
+        final Id entityId = event.getEntityId();
+
+        if (logger.isDebugEnabled())
+            logger.debug("Deleting entity id from index in app scope {} with entityId {}",
applicationScope, entityId);
+
+        ack(message);
+
+        indexService.deleteEntityIndexes(applicationScope, entityId)
+            .doOnNext(ignore -> ack(message)).subscribe();
+    }
+
+
+    /**
+     * Loop through and start the workers
+     */
+    public void start() {
+        final int count = indexProcessorFig.getWorkerCount();
+
+        for (int i = 0; i < count; i++) {
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized (mutex) {
+            //stop consuming
+
+            for (final Subscription subscription : subscriptions) {
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker() {
+        synchronized (mutex) {
+
+            Observable<List<QueueMessage>> consumer =
+                Observable.create(new Observable.OnSubscribe<List<QueueMessage>>()
{
+                    @Override
+                    public void call(final Subscriber<? super List<QueueMessage>>
subscriber) {
+
+                        //name our thread so it's easy to see
+                        Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());
+
+                        List<QueueMessage> drainList = null;
+
+                        do {
+                            Timer.Context timer = readTimer.time();
+
+                            try {
+                                drainList = take();
+
+                                //emit our list in it's entity to hand off to a worker pool
+                                subscriber.onNext(drainList);
+
+                                //take since  we're in flight
+                                inFlight.addAndGet(drainList.size());
+                            } catch (Throwable t) {
+                                final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+                                logger.error("Failed to dequeue.  Sleeping for {} milliseconds",
sleepTime, t);
+
+                                if (drainList != null) {
+                                    inFlight.addAndGet(-1 * drainList.size());
+                                }
+
+
+                                try {
+                                    Thread.sleep(sleepTime);
+                                } catch (InterruptedException ie) {
+                                    //swallow
+                                }
+
+                                indexErrorCounter.inc();
+                            } finally {
+                                timer.stop();
+                            }
+                        }
+                        while (true);
+                    }
+                })
+                    //this won't block our read loop, just reads and proceeds
+                    .doOnNext(this::handleMessages).subscribeOn(Schedulers.newThread());
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribe();
+
+            subscriptions.add(subscription);
+        }
+    }
+
+    @Override
+    public void index(final EntityIndexOperation entityIdScope) {
+
+        //todo need this?
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/be0e43c1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
deleted file mode 100644
index 1dbfd4e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.IndexService;
-import org.apache.usergrid.exception.NotImplementedException;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.schedulers.Schedulers;
-
-
-@Singleton
-public class SQSAsyncEventService implements AsyncEventService {
-
-
-    private static final Logger log = LoggerFactory.getLogger( SQSAsyncEventService.class
);
-
-    /**
-     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries
to get removed
-     */
-    public static final int TTL = 60 * 60 * 24 * 30;
-
-
-    private static final int MAX_TAKE = 10;
-
-    private static final String QUEUE_NAME = "es_queue";
-
-    private final QueueManager queue;
-    private final IndexProcessorFig indexProcessorFig;
-    private final IndexService indexService;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final RxTaskScheduler rxTaskScheduler;
-
-    private final Timer readTimer;
-    private final Timer writeTimer;
-    private final Timer messageProcessingTimer;
-
-    private final Object mutex = new Object();
-
-
-    private final Counter indexErrorCounter;
-    private final AtomicLong counter = new AtomicLong();
-    private final AtomicLong inFlight = new AtomicLong();
-
-    //the actively running subscription
-    private List<Subscription> subscriptions = new ArrayList<>();
-
-
-    @Inject
-    public SQSAsyncEventService( final QueueManagerFactory queueManagerFactory,
-                                 final IndexProcessorFig indexProcessorFig, final MetricsFactory
metricsFactory,
-                                 final IndexService indexService,
-                                 final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                 final RxTaskScheduler rxTaskScheduler ) {
-
-        this.indexService = indexService;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.rxTaskScheduler = rxTaskScheduler;
-
-        final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
-        this.queue = queueManagerFactory.getQueueManager( queueScope );
-        this.indexProcessorFig = indexProcessorFig;
-
-        this.writeTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "write" );
-        this.readTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "read" );
-        this.messageProcessingTimer = metricsFactory.getTimer( SQSAsyncEventService.class,
"message.processing" );
-        this.indexErrorCounter = metricsFactory.getCounter( SQSAsyncEventService.class, "error"
);
-
-
-        //wire up the gauge of inflight messages
-        metricsFactory.addGauge( SQSAsyncEventService.class, "inflight.meter", new Gauge<Long>()
{
-            @Override
-            public Long getValue() {
-                return inFlight.longValue();
-            }
-        } );
-
-        start();
-    }
-
-
-    /**
-     * Offer the EntityIdScope to SQS
-     */
-    private void offer( final EntityIdScope operation ) {
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessage( operation );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    /**
-     * Take messages from SQS
-     */
-    public List<QueueMessage> take() {
-
-        //SQS doesn't support more than 10
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return queue.getMessages( MAX_TAKE, indexProcessorFig.getIndexQueueTimeout(),
indexProcessorFig.getIndexQueueTimeout(),
-                EntityIdScope.class );
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    /**
-     * Ack messages in SQS
-     */
-    public void ack( final List<QueueMessage> messages ) {
-
-        /**
-         * No op
-         */
-        if ( messages.size() == 0 ) {
-            return;
-        }
-
-        queue.commitMessages( messages );
-    }
-
-
-//    @Override
-    public void index( final EntityIdScope entityIdScope ) {
-        //queue the re-inex operation
-        offer( entityIdScope );
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity
entity ) {
-
-        //create our scope
-        final EntityIdScope entityIdScope = new EntityIdScope( applicationScope, entity.getId()
);
-
-        //send it to SQS  for indexing
-        index( entityIdScope );
-    }
-
-
-    @Override
-    public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
final Edge newEdge ) {
-       throw new NotImplementedException( "Implement me" );
-    }
-
-
-    @Override
-    public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge
) {
-        throw new NotImplementedException( "Implement me" );
-    }
-
-
-    @Override
-    public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId
) {
-        throw new NotImplementedException( "Implement me" );
-    }
-
-
-    /**
-     * Index an entity and return an observable of the queue message on success
-     */
-    private Observable<IndexOperationMessage> indexEntity( final QueueMessage queueMessage
) {
-        final EntityIdScope entityIdScope = ( EntityIdScope ) queueMessage.getBody();
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        //run the index operation from the entity
-        return entityCollectionManager.load( entityIdScope.getId() )
-            //invoke the indexing and take the last value
-            .flatMap( entity -> indexService.indexEntity( applicationScope, entity ).last()
);
-    }
-
-
-    /**
-     * Do the indexing for a list of queue messages
-     */
-    private void doIndex( final List<QueueMessage> queueMessages ) {
-        //create parallel observables to process all 10 messages
-        final Observable<Long> observable = Observable.from( queueMessages ).flatMap(
( QueueMessage queueMessage ) -> {
-            return indexEntity( queueMessage ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler()
);
-        }, MAX_TAKE ).countLong()
-
-            //remove our in flight
-            .doOnNext( count -> inFlight.addAndGet( -1 * count ) )
-
-                //do on completed ack messages.  Assumes no expections were thrown
-            .doOnCompleted( () -> ack( queueMessages ) );
-
-        //wrap with our timer and fire
-        ObservableTimer.time( observable, messageProcessingTimer ).subscribe();
-    }
-
-
-    /**
-     * Loop throught and start the workers
-     */
-    public void start() {
-        final int count = indexProcessorFig.getWorkerCount();
-
-        for ( int i = 0; i < count; i++ ) {
-            startWorker();
-        }
-    }
-
-
-    /**
-     * Stop the workers
-     */
-    public void stop() {
-        synchronized ( mutex ) {
-            //stop consuming
-
-            for ( final Subscription subscription : subscriptions ) {
-                subscription.unsubscribe();
-            }
-        }
-    }
-
-
-    private void startWorker() {
-        synchronized ( mutex ) {
-
-            Observable<List<QueueMessage>> consumer =
-                Observable.create( new Observable.OnSubscribe<List<QueueMessage>>()
{
-                        @Override
-                        public void call( final Subscriber<? super List<QueueMessage>>
subscriber ) {
-
-                            //name our thread so it's easy to see
-                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet()
);
-
-                            List<QueueMessage> drainList = null;
-
-                            do {
-                                Timer.Context timer = readTimer.time();
-
-                                try {
-                                    drainList = take();
-
-                                    //emit our list in it's entirity to hand off to a worker
pool
-                                    subscriber.onNext( drainList );
-
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
-                                }
-
-                                catch ( Throwable t ) {
-                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
-
-                                    log.error( "Failed to dequeue.  Sleeping for {} milliseconds",
sleepTime, t );
-
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * drainList.size() );
-                                    }
-
-
-                                    try {
-                                        Thread.sleep( sleepTime );
-                                    }
-                                    catch ( InterruptedException ie ) {
-                                        //swallow
-                                    }
-
-                                    indexErrorCounter.inc();
-                                }
-
-                                finally{
-                                    timer.stop();
-                                }
-                            }
-                            while ( true );
-                        }
-                    } )
-                    //this won't block our read loop, just reads and proceeds
-                    .doOnNext( messages -> doIndex( messages ) ).subscribeOn( Schedulers.newThread()
);
-
-            //start in the background
-
-            final Subscription subscription = consumer.subscribe();
-
-            subscriptions.add( subscription );
-        }
-    }
-
-
-    @Override
-    public void index( final EntityIndexOperation entityIdScope ) {
-        throw new NotImplementedException( "Implement me" );
-    }
-}


Mime
View raw message