helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [7/8] helix git commit: Improve helix message timeout task
Date Thu, 01 Nov 2018 23:02:21 GMT
Improve helix message timeout task

>From logs and code, it could be a very rare race condition that the message actually has
been processed and completed but message has not been removed. Once it is completed, it should
cancel the timeout task running with separated thread. But just before it tried to cancel
the task, the message has been timed out and message handling thread has been interrupted
by time out task thread, which shown in the log.

So the message handling thread did not catch the interrupted exception at that moment and
failed to remove message from ZK with READ state. After I manually removed the message, we
got an error log that showing the partition is already LEADER now. That proves the assumption
that the message has been successfully process.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1507f016
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1507f016
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1507f016

Branch: refs/heads/master
Commit: 1507f0161df24f4bec3ba3632b2d23a7a9bed5d4
Parents: c783ae7
Author: Junkai Xue <jxue@linkedin.com>
Authored: Wed Oct 10 16:59:28 2018 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Thu Nov 1 14:38:58 2018 -0700

----------------------------------------------------------------------
 .../helix/messaging/handling/HelixTask.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1507f016/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2f3d805..fb55e76 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -22,7 +22,6 @@ package org.apache.helix.messaging.handling;
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -96,6 +95,9 @@ public class HelixTask implements MessageTask {
       handlerStart = System.currentTimeMillis();
       taskResult = _handler.handleMessage();
       handlerEnd = System.currentTimeMillis();
+
+      // cancel timeout task
+      _executor.cancelTimeoutTask(this);
     } catch (InterruptedException e) {
       taskResult = new HelixTaskResult();
       taskResult.setException(e);
@@ -116,9 +118,6 @@ public class HelixTask implements MessageTask {
       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, _manager);
     }
 
-    // cancel timeout task
-    _executor.cancelTimeoutTask(this);
-
     Exception exception = null;
     try {
       if (taskResult.isSuccess()) {
@@ -182,13 +181,9 @@ public class HelixTask implements MessageTask {
         }
       }
 
-      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
-        removeMessageFromZk(accessor, _message);
-        reportMessageStat(_manager, _message, taskResult);
-        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
-        _executor.finishTask(this);
-      }
+      finalCleanup(taskResult);
     } catch (Exception e) {
+      finalCleanup(taskResult);
       exception = e;
       type = ErrorType.FRAMEWORK;
       code = ErrorCode.ERROR;
@@ -377,4 +372,17 @@ public class HelixTask implements MessageTask {
     }
     _isStarted = true;
   }
+
+  private void finalCleanup(HelixTaskResult taskResult) {
+    try {
+      if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
+        removeMessageFromZk(_manager.getHelixDataAccessor(), _message);
+        reportMessageStat(_manager, _message, taskResult);
+        sendReply(getSrcClusterDataAccessor(_message), _message, taskResult);
+        _executor.finishTask(this);
+      }
+    } catch (Exception e) {
+      logger.error(String.format("Error to final clean up for message : %s", _message.getId()));
+    }
+  }
 }


Mime
View raw message