helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/3] helix git commit: [HELIX-778] TASK: Fix a race condition in updatePreviousAssignedTasksStatus
Date Thu, 01 Nov 2018 00:24:32 GMT
[HELIX-778] TASK: Fix a race condition in updatePreviousAssignedTasksStatus

It was observed that TestUnregisteredCommand is very unstable. The reason was identified to
be a race condition where when a task fails, sometimes a pending message for that task (from
INIT to RUNNING) wasn't being cleaned up on time, so AbstractTaskDispatcher's updatePreviousAssignedTasksStatus
would try to process that message and skip the status update of that task (like updating its
status and NUM_ATTEMPTS field in JobContext).

A short, temporary fix is to call markPartitionError() prior to checking the pending message,
but over the long haul, we would need to revisit the task status update's design here to avoid
this type of race conditions.

Changelist:
1. Move markPartitionError() up before checking for a pending message on the task
2. Fix TestUnregisteredCommand's instability


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ceba1a55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ceba1a55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ceba1a55

Branch: refs/heads/master
Commit: ceba1a55ae351090144c001324f908f2364212a4
Parents: 5d24ed5
Author: Hunter Lee <hulee@linkedin.com>
Authored: Wed Oct 31 17:20:37 2018 -0700
Committer: Hunter Lee <hulee@linkedin.com>
Committed: Wed Oct 31 17:20:37 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/task/AbstractTaskDispatcher.java    | 15 ++++++++++++---
 .../integration/task/TestUnregisteredCommand.java    |  3 ++-
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index aa72f2d..cbf9fb8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -67,6 +67,16 @@ public abstract class AbstractTaskDispatcher {
         TaskPartitionState currState = updateJobContextAndGetTaskCurrentState(currStateOutput,
             jobResource, pId, pName, instance, jobCtx);
 
+        // This avoids a race condition in the case that although currentState is in the
following
+        // error condition, the pending message (INIT->RUNNNING) might still be present.
+        // This is undesirable because this prevents JobContext from getting the proper update
of
+        // fields including task state and task's NUM_ATTEMPTS
+        if (currState == TaskPartitionState.ERROR || currState == TaskPartitionState.TASK_ERROR
+            || currState == TaskPartitionState.TIMED_OUT
+            || currState == TaskPartitionState.TASK_ABORTED) {
+          markPartitionError(jobCtx, pId, currState, true);
+        }
+
         // Check for pending state transitions on this (partition, instance). If there is
a pending
         // state transition, we prioritize this pending state transition and set the assignment
from
         // this pending state transition, essentially "waiting" until this pending message
clears
@@ -197,7 +207,6 @@ public abstract class AbstractTaskDispatcher {
                 "Task partition %s has error state %s with msg %s. Marking as such in rebalancer
context.",
                 pName, currState, jobCtx.getPartitionInfo(pId)));
           }
-          markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a
specified
           // maximum number of attempts or task is in ABORTED state.
           // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and
won't
@@ -239,7 +248,6 @@ public abstract class AbstractTaskDispatcher {
 
             // Also release resources for these tasks
             assignableInstance.release(taskConfig, quotaType);
-
           } else if (jobState == TaskState.IN_PROGRESS
               && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE))
{
             // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
@@ -940,7 +948,8 @@ public abstract class AbstractTaskDispatcher {
 
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
+        || timeoutPeriod > Long.MAX_VALUE - startTime)
+            // check long overflow
             ? TaskConstants.DEFAULT_NEVER_TIMEOUT
             : startTime + timeoutPeriod;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/ceba1a55/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
index 95a9be4..6f78cc0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
@@ -37,7 +37,8 @@ public class TestUnregisteredCommand extends TaskTestBase {
     super.beforeClass();
   }
 
-  @Test public void testUnregisteredCommand() throws InterruptedException {
+  @Test
+  public void testUnregisteredCommand() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
     Workflow.Builder builder = new Workflow.Builder(workflowName);
 


Mime
View raw message