usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [1/2] incubator-usergrid git commit: add queue depth
Date Fri, 14 Aug 2015 20:49:03 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-543 [created] ee6e087f8


add queue depth


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bb6ca8ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bb6ca8ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bb6ca8ed

Branch: refs/heads/USERGRID-543
Commit: bb6ca8edfed851cd03f23b96bb91bec0ee3b990a
Parents: 35430a5
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Fri Aug 14 14:45:58 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Fri Aug 14 14:45:58 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  2 +-
 .../asyncevents/AsyncEventService.java          |  2 ++
 .../persistence/queue/DefaultQueueManager.java  |  5 ++++
 .../persistence/queue/QueueManager.java         |  6 ++++
 .../queue/impl/SNSQueueManagerImpl.java         | 14 +++++++++
 .../queue/impl/SQSQueueManagerImpl.java         | 31 +++++++++-----------
 .../persistence/queue/QueueManagerTest.java     | 18 ++++++++++++
 .../org/apache/usergrid/rest/RootResource.java  | 17 ++++++++++-
 .../services/queues/ImportQueueManager.java     |  5 ++++
 9 files changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b71a549..ed106e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -80,7 +80,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     // SQS maximum receive messages is 10
     private static final int MAX_TAKE = 10;
-    private static final String QUEUE_NAME = "es_queue";
+    public static final String QUEUE_NAME = "es_queue";
 
     private final QueueManager queue;
     private final QueueScope queueScope;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1a5e865..7cce8b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -76,4 +76,6 @@ public interface AsyncEventService extends ReIndexAction {
 
 
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index dc5878c..c72e109 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -47,6 +47,11 @@ public class DefaultQueueManager implements QueueManager {
     }
 
     @Override
+    public long getQueueDepth() {
+        return queue.size();
+    }
+
+    @Override
     public void commitMessage(QueueMessage queueMessage) {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 09ae95a..0ec2337 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -39,6 +39,12 @@ public interface QueueManager {
     Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime,
Class klass);
 
     /**
+     * get the queue depth
+     * @return
+     */
+    long getQueueDepth();
+
+    /**
      * Commit the transaction
      * @param queueMessage
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index f41d238..6d0e18b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -381,6 +381,20 @@ public class SNSQueueManagerImpl implements QueueManager {
     }
 
     @Override
+    public long getQueueDepth() {
+        String key = "ApproximateNumberOfMessages";
+        try {
+            GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(),
Collections.singletonList(key));
+            String depthString = result.getAttributes().get(key);
+            return depthString != null ? Long.parseLong(depthString) : 0;
+        }catch (Exception e){
+            logger.error("Exception getting queue depth",e);
+            return -1;
+
+        }
+    }
+
+    @Override
     public void sendMessages(final List bodies) throws IOException {
 
         if (snsAsync == null) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index effa373..075e90c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import com.amazonaws.services.sqs.model.*;
 import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
 import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
 import org.slf4j.Logger;
@@ -36,22 +37,6 @@ import org.apache.usergrid.persistence.queue.QueueScope;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.common.base.Preconditions;
@@ -147,7 +132,6 @@ public class SQSQueueManagerImpl implements QueueManager {
             //pretty print, disabling for speed
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT,
"@class");
-
             sqs = createClient();
 
         } catch (Exception e) {
@@ -219,6 +203,19 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
+    public long getQueueDepth() {
+        String key = "ApproximateNumberOfMessages";
+        try {
+            GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
+            String depthString = result.getAttributes().get(key);
+            return depthString != null ? Long.parseLong(depthString) : 0;
+        }catch (Exception e){
+            logger.error("Exception getting queue depth",e);
+            return -1;
+
+        }
+    }
+    @Override
     public void sendMessages(final List bodies) throws IOException {
 
         if (sqs == null) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 3be02e1..e948015 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -105,5 +105,23 @@ public class QueueManagerTest {
 
     }
 
+    @Test
+    public void queueSize() throws IOException,ClassNotFoundException{
+        HashMap<String,String> values = new HashMap<>();
+        values.put("test", "Test");
+
+        List<Map<String,String>> bodies = new ArrayList<>();
+        bodies.add(values);
+        qm.sendMessages(bodies);
+        long depth = qm.getQueueDepth();
+        assertTrue(depth>0);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+        assertTrue(messageList.size() >= 1);
+        for(QueueMessage message : messageList){
+            assertTrue(message.getBody().equals(values));
+        }
+        qm.commitMessages(messageList);
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index 5b5e711..989df26 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -37,8 +37,15 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
 
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexRefreshCommand;
 import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -102,6 +109,9 @@ public class RootResource extends AbstractContextResource implements MetricProce
     @Autowired
     private UsergridSystemMonitor usergridSystemMonitor;
 
+    @Autowired
+    private Injector injector;
+
 
     public RootResource() {
     }
@@ -181,6 +191,10 @@ public class RootResource extends AbstractContextResource implements
MetricProce
 
         ApiResponse response = createApiResponse();
 
+        QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class);
+        QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS);
+        QueueManager queue = queueManagerFactory.getQueueManager(queueScope);
+
         if ( !ignoreError ) {
 
             if ( !emf.getEntityStoreHealth().equals( Health.GREEN )) {
@@ -205,8 +219,9 @@ public class RootResource extends AbstractContextResource implements MetricProce
         node.put( "cassandraStatus", emf.getEntityStoreHealth().toString() );
 
         // Core Persistence Query Index module status for Management App Index
-        EntityManager em = emf.getEntityManager(emf.getManagementAppId());
         node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() );
+        node.put( "queueDepth", queue.getQueueDepth() );
+
 
         dumpMetrics(node);
         response.setProperty( "status", node );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index 5f42484..d74f688 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -40,6 +40,11 @@ public class ImportQueueManager implements QueueManager {
         return Observable.empty();
     }
 
+    @Override
+    public long getQueueDepth() {
+        return 0;
+    }
+
 
     @Override
     public void commitMessage( final QueueMessage queueMessage ) {


Mime
View raw message