usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [2/2] usergrid git commit: 1. Remove messages from dead letter queue if the message map entry was never received after a configurable timeout (15 minutes currently). 2. Log messages as they are moved from the dead letter queue back to the originating que
Date Mon, 19 Jun 2017 19:23:17 GMT
1. Remove messages from dead letter queue if the message map entry was never received after
a configurable timeout (15 minutes currently).
2. Log messages as they are moved from the dead letter queue back to the originating queue.
Also log messages that are in dead letter queue and can't be moved to originating queue for
some reason.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c748242f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c748242f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c748242f

Branch: refs/heads/master
Commit: c748242fa6a9bc79ebb60b4071cc532ab798fb3c
Parents: 5935460
Author: Mike Dunker <mdunker@google.com>
Authored: Mon Jun 19 12:18:34 2017 -0700
Committer: Mike Dunker <mdunker@google.com>
Committed: Mon Jun 19 12:18:34 2017 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 37 +++++++++++++++++---
 .../persistence/queue/LegacyQueueFig.java       |  3 ++
 2 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c748242f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 7c33969..530cf7d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -724,10 +724,13 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                indexOperationMessage =
                    ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
 
+           } else if (System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime()
+ queueFig.getMapMessageTimeout()) {
+                // if esMapPersistence message hasn't been received yet, log and return (will
be acked)
+                logger.error("ES map message never received, removing message from queue.
indexBatchId={}", messageId);
+                return;
            } else {
-
-               throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
-
+                logger.warn("ES map message not received yet. indexBatchId={} elapsedTimeMsec={}",
messageId, System.currentTimeMillis() - elasticsearchIndexEvent.getCreationTime());
+                throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
            }
 
         } else {
@@ -1105,14 +1108,38 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                                  try {
                                                      // put the dead letter messages back
in the appropriate queue
                                                      LegacyQueueManager returnQueue = null;
+                                                     String queueType;
                                                      if (isUtilityDeadQueue) {
-                                                         logger.warn("Utility dead queue
message count: {}", messages.size());
                                                          returnQueue = utilityQueue;
+                                                         queueType = "utility";
                                                      } else {
-                                                         logger.warn("Index dead queue message
count: {}", messages.size());
                                                          returnQueue = indexQueue;
+                                                         queueType = "index";
                                                      }
                                                      List<LegacyQueueMessage> successMessages
= returnQueue.sendQueueMessages(messages);
+                                                     for (LegacyQueueMessage msg : successMessages)
{
+                                                         logger.warn("Returning message to
{} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
+                                                     }
+                                                     int unsuccessfulMessagesSize = messages.size()
- successMessages.size();
+                                                     if (unsuccessfulMessagesSize > 0)
{
+                                                         // some messages couldn't be sent
to originating queue, log
+                                                         Set<String> successMessageIds
= new HashSet<>();
+                                                         for (LegacyQueueMessage msg : successMessages)
{
+                                                             String messageId = msg.getMessageId();
+                                                             if (successMessageIds.contains(messageId))
{
+                                                                 logger.warn("Found duplicate
messageId in returned messages: {}", messageId);
+                                                             } else {
+                                                                 successMessageIds.add(messageId);
+                                                             }
+                                                         }
+                                                         for (LegacyQueueMessage msg : messages)
{
+                                                             String messageId = msg.getMessageId();
+                                                             if (!successMessageIds.contains(messageId))
{
+                                                                 logger.warn("Failed to return
message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId,
msg.getStringBody());
+                                                             }
+                                                         }
+                                                     }
+
                                                      if (isUtilityDeadQueue) {
                                                          ackUtilityDeadQueue(successMessages);
                                                      } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c748242f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 6fe96dd..0ebcc7b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -106,4 +106,7 @@ public interface LegacyQueueFig extends GuicyFig {
     @Default("false") // 30 seconds
     boolean getQuorumFallback();
 
+    @Key("usergrid.queue.map.message.timeout")
+    @Default("900000") // 15 minutes
+    int getMapMessageTimeout();
 }


Mime
View raw message