helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] helix git commit: [HELIX-777] TASK: Handle null currentState for unscheduled tasks
Date Thu, 01 Nov 2018 00:24:31 GMT
[HELIX-777] TASK: Handle null currentState for unscheduled tasks

It was observed that when a workflow is submitted and the Controller attempts to schedule
its tasks, ZK read fails to read the appropriate job's context, causing the job to be stuck
in an unscheduled state. The job remained unscheduled because it had no currentStates, and
its job context did not contain any assignment/state information. This RB fixes such stuck
states by detecting null currentStates.
Changelist:
1. Check if currentState is null and if it is, manually assign an INIT state


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5d24ed54
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5d24ed54
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5d24ed54

Branch: refs/heads/master
Commit: 5d24ed544898ff69f289f54be71a04413735d118
Parents: 6090732
Author: Hunter Lee <hulee@linkedin.com>
Authored: Wed Oct 31 14:21:49 2018 -0700
Committer: Hunter Lee <hulee@linkedin.com>
Committed: Wed Oct 31 17:17:16 2018 -0700

----------------------------------------------------------------------
 .../helix/task/AbstractTaskDispatcher.java      | 213 +++++++++----------
 1 file changed, 106 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5d24ed54/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 617263b..aa72f2d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -23,7 +23,6 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.assigner.AssignableInstance;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +72,6 @@ public abstract class AbstractTaskDispatcher {
         // this pending state transition, essentially "waiting" until this pending message
clears
         Message pendingMessage =
             currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
-
         if (pendingMessage != null && !pendingMessage.getToState().equals(currState.name()))
{
           // If there is a pending message whose destination state is different from the
current
           // state, just make the same assignment as the pending message. This is essentially
@@ -125,26 +123,26 @@ public abstract class AbstractTaskDispatcher {
         }
 
         switch (currState) {
-          case RUNNING: {
-            TaskPartitionState nextState = TaskPartitionState.RUNNING;
-            if (jobState == TaskState.TIMING_OUT) {
-              nextState = TaskPartitionState.TASK_ABORTED;
-            } else if (jobTgtState == TargetState.STOP) {
-              nextState = TaskPartitionState.STOPPED;
-            } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
-                || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
-              // Drop tasks if parent job is not in progress
-              paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-              break;
-            }
+        case RUNNING: {
+          TaskPartitionState nextState = TaskPartitionState.RUNNING;
+          if (jobState == TaskState.TIMING_OUT) {
+            nextState = TaskPartitionState.TASK_ABORTED;
+          } else if (jobTgtState == TargetState.STOP) {
+            nextState = TaskPartitionState.STOPPED;
+          } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
+              || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
+            // Drop tasks if parent job is not in progress
+            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
+            break;
+          }
 
-            paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
-            assignedPartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format("Setting task partition %s state to %s on instance
%s.", pName,
-                  nextState, instance));
-            }
+          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+          assignedPartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
+                nextState, instance));
           }
+        }
           break;
         case STOPPED: {
           // TODO: This case statement might be unreachable code - Hunter
@@ -166,106 +164,107 @@ public abstract class AbstractTaskDispatcher {
           assignedPartitions.add(pId);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName, nextState, instance));
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
+                nextState, instance));
           }
         }
           break;
-          case COMPLETED: {
-            // The task has completed on this partition. Mark as such in the context object.
-            donePartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has completed with state %s. Marking as such in rebalancer
context.",
-                  pName, currState));
-            }
-            partitionsToDropFromIs.add(pId);
-            markPartitionCompleted(jobCtx, pId);
-
-            // This task is COMPLETED, so release this task
-            assignableInstance.release(taskConfig, quotaType);
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has completed with state %s. Marking as such in rebalancer
context.",
+                pName, currState));
           }
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+
+          // This task is COMPLETED, so release this task
+          assignableInstance.release(taskConfig, quotaType);
+        }
           break;
-          case TIMED_OUT:
+        case TIMED_OUT:
 
-          case TASK_ERROR:
+        case TASK_ERROR:
 
-          case TASK_ABORTED:
+        case TASK_ABORTED:
 
-          case ERROR: {
-            donePartitions.add(pId); // The task may be rescheduled on a different instance.
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has error state %s with msg %s. Marking as such in rebalancer
context.",
-                  pName, currState, jobCtx.getPartitionInfo(pId)));
-            }
-            markPartitionError(jobCtx, pId, currState, true);
-            // The error policy is to fail the task as soon a single partition fails for
a specified
-            // maximum number of attempts or task is in ABORTED state.
-            // But notice that if job is TIMED_OUT, aborted task won't be treated as fail
and won't
-            // cause job fail.
-            // After all tasks are aborted, they will be dropped, because of job timeout.
-            if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT)
{
-              if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
-                  || currState.equals(TaskPartitionState.TASK_ABORTED)
-                  || currState.equals(TaskPartitionState.ERROR)) {
-                skippedPartitions.add(pId);
-                partitionsToDropFromIs.add(pId);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("skippedPartitions:" + skippedPartitions);
-                }
-              } else {
-                // Mark the task to be started at some later time (if enabled)
-                markPartitionDelayed(jobCfg, jobCtx, pId);
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has error state %s with msg %s. Marking as such in rebalancer
context.",
+                pName, currState, jobCtx.getPartitionInfo(pId)));
+          }
+          markPartitionError(jobCtx, pId, currState, true);
+          // The error policy is to fail the task as soon a single partition fails for a
specified
+          // maximum number of attempts or task is in ABORTED state.
+          // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and
won't
+          // cause job fail.
+          // After all tasks are aborted, they will be dropped, because of job timeout.
+          if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT)
{
+            if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
+                || currState.equals(TaskPartitionState.TASK_ABORTED)
+                || currState.equals(TaskPartitionState.ERROR)) {
+              skippedPartitions.add(pId);
+              partitionsToDropFromIs.add(pId);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("skippedPartitions:" + skippedPartitions);
               }
+            } else {
+              // Mark the task to be started at some later time (if enabled)
+              markPartitionDelayed(jobCfg, jobCtx, pId);
             }
-            // Release this task
-            assignableInstance.release(taskConfig, quotaType);
           }
+          // Release this task
+          assignableInstance.release(taskConfig, quotaType);
+        }
           break;
-          case INIT: {
-            // INIT is a temporary state for tasks
-            // Two possible scenarios for INIT:
-            // 1. Task is getting scheduled for the first time. In this case, Task's state
will go
-            // from null->INIT->RUNNING, and this INIT state will be transient and
very short-lived
-            // 2. Task is getting scheduled for the first time, but in this case, job is
timed out or
-            // timing out. In this case, it will be sent back to INIT state to be removed.
Here we
-            // ensure that this task then goes from INIT to DROPPED so that it will be released
from
-            // AssignableInstance to prevent resource leak
-            if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
-                || jobTgtState == TargetState.DELETE) {
-              // Job is timed out or timing out or targetState is to be deleted, so its tasks
will be
-              // sent back to INIT
-              // In this case, tasks' IdealState will be removed, and they will be sent to
DROPPED
-              partitionsToDropFromIs.add(pId);
+        case INIT: {
+          // INIT is a temporary state for tasks
+          // Two possible scenarios for INIT:
+          // 1. Task is getting scheduled for the first time. In this case, Task's state
will go
+          // from null->INIT->RUNNING, and this INIT state will be transient and very
short-lived
+          // 2. Task is getting scheduled for the first time, but in this case, job is timed
out or
+          // timing out. In this case, it will be sent back to INIT state to be removed.
Here we
+          // ensure that this task then goes from INIT to DROPPED so that it will be released
from
+          // AssignableInstance to prevent resource leak
+          if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
+              || jobTgtState == TargetState.DELETE) {
+            // Job is timed out or timing out or targetState is to be deleted, so its tasks
will be
+            // sent back to INIT
+            // In this case, tasks' IdealState will be removed, and they will be sent to
DROPPED
+            partitionsToDropFromIs.add(pId);
 
-              // Also release resources for these tasks
-              assignableInstance.release(taskConfig, quotaType);
+            // Also release resources for these tasks
+            assignableInstance.release(taskConfig, quotaType);
 
-            } else if (jobState == TaskState.IN_PROGRESS
-                && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE))
{
-              // Job is in progress, implying that tasks are being re-tried, so set it to
RUNNING
-              paMap.put(pId,
-                  new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
-              assignedPartitions.add(pId);
-            }
+          } else if (jobState == TaskState.IN_PROGRESS
+              && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE))
{
+            // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
+            paMap.put(pId,
+                new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
+            assignedPartitions.add(pId);
           }
+        }
 
-          case DROPPED: {
-            // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be
reassigned.
-            donePartitions.add(pId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(String.format(
-                  "Task partition %s has state %s. It will be dropped from the current ideal
state.",
-                  pName, currState));
-            }
-            // If it's DROPPED, release this task. If INIT, do not release
-            if (currState == TaskPartitionState.DROPPED) {
-              assignableInstance.release(taskConfig, quotaType);
-            }
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(
+                "Task partition %s has state %s. It will be dropped from the current ideal
state.",
+                pName, currState));
+          }
+          // If it's DROPPED, release this task. If INIT, do not release
+          if (currState == TaskPartitionState.DROPPED) {
+            assignableInstance.release(taskConfig, quotaType);
           }
+        }
           break;
-          default:
-            throw new AssertionError("Unknown enum symbol: " + currState);
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
         }
       }
 
@@ -301,7 +300,8 @@ public abstract class AbstractTaskDispatcher {
         currentStateOutput.getCurrentState(jobResource, new Partition(pName), instance);
     if (currentStateString == null) {
       // Task state is either DROPPED or INIT
-      return jobCtx.getPartitionState(pId);
+      TaskPartitionState stateFromContext = jobCtx.getPartitionState(pId);
+      return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
     jobCtx.setPartitionState(pId, currentState);
@@ -859,7 +859,6 @@ public abstract class AbstractTaskDispatcher {
         incomplete = true;
       }
     }
-
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
       return true;
@@ -922,7 +921,7 @@ public abstract class AbstractTaskDispatcher {
     long nextRebalanceTime = _rebalanceScheduler.getRebalanceTime(resourceId);
     if (nextTimeout >= System.currentTimeMillis()
         && (nextRebalanceTime == TaskConstants.DEFAULT_NEVER_TIMEOUT
-        || nextTimeout < nextRebalanceTime)) {
+            || nextTimeout < nextRebalanceTime)) {
       _rebalanceScheduler.scheduleRebalance(_manager, resourceId, nextTimeout);
     }
   }
@@ -942,8 +941,8 @@ public abstract class AbstractTaskDispatcher {
   private long getTimeoutTime(long startTime, long timeoutPeriod) {
     return (timeoutPeriod == TaskConstants.DEFAULT_NEVER_TIMEOUT
         || timeoutPeriod > Long.MAX_VALUE - startTime) // check long overflow
-        ? TaskConstants.DEFAULT_NEVER_TIMEOUT
-        : startTime + timeoutPeriod;
+            ? TaskConstants.DEFAULT_NEVER_TIMEOUT
+            : startTime + timeoutPeriod;
   }
 
   /**


Mime
View raw message