helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch task-improvement updated: Make the task scheduling decision independent of the PreviousAssignment (#994)
Date Tue, 26 May 2020 23:49:51 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch task-improvement
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/task-improvement by this push:
     new 4bbead3  Make the task scheduling decision independent of the PreviousAssignment
(#994)
4bbead3 is described below

commit 4bbead3f68669ba92985e4bf4b4e71ed7a1c1efa
Author: Ali Reza Zamani Zadeh Najari <anajari@linkedin.com>
AuthorDate: Tue May 26 16:49:42 2020 -0700

    Make the task scheduling decision independent of the PreviousAssignment (#994)
    
    In this commit, the previous scheduling logic which was based on PreviousAssignment,
    has been changed and will no longer depend on prevAssignment. Instead, the task scheduling
will be based solely on the CurrentState.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 100 ++++++++++++++-------
 .../java/org/apache/helix/task/JobDispatcher.java  |  74 +++++----------
 .../helix/task/TestDropTerminalTasksUponReset.java |   3 +-
 3 files changed, 90 insertions(+), 87 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index bc06636..38aec34 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -66,7 +66,7 @@ public abstract class AbstractTaskDispatcher {
   // Job Update related methods
 
   public void updatePreviousAssignedTasksStatus(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String>
excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String>
excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig
jobCfg,
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
       Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs,
@@ -78,11 +78,11 @@ public abstract class AbstractTaskDispatcher {
     AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
 
     // Iterate through all instances
-    for (String instance : prevInstanceToTaskAssignments.keySet()) {
+    for (String instance : currentInstanceToTaskAssignments.keySet()) {
       assignedPartitions.put(instance, new HashSet<>());
 
       // Set all dropping transitions first. These are tasks coming from Participant disconnects
-      // that have some active current state (INIT or RUNNING) and the requestedState of
DROPPED.
+      // and have the requestedState of DROPPED.
       // These need to be prioritized over any other state transitions because of the race
condition
       // with the same pId (task) running on other instances. This is because in paMap, we
can only
       // define one transition per pId
@@ -99,7 +99,7 @@ public abstract class AbstractTaskDispatcher {
       }
 
       // If not an excluded instance, we must instantiate its entry in assignedPartitions
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
 
       // We need to remove all task pId's to be dropped because we already made an assignment
in
       // paMap above for them to be dropped. The following does this.
@@ -107,8 +107,7 @@ public abstract class AbstractTaskDispatcher {
         pSet.removeAll(tasksToDrop.get(instance));
       }
 
-      // Used to keep track of partitions that are in one of the final states: COMPLETED,
TIMED_OUT,
-      // TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in either INIT or DROPPED states
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
@@ -121,17 +120,6 @@ public abstract class AbstractTaskDispatcher {
               instance, pId);
           continue;
         }
-        // This avoids a race condition in the case that although currentState is in the
following
-        // error condition, the pending message (INIT->RUNNNING) might still be present.
-        // This is undesirable because this prevents JobContext from getting the proper update
of
-        // fields including task state and task's NUM_ATTEMPTS
-        if (currState == TaskPartitionState.ERROR || currState == TaskPartitionState.TASK_ERROR
-            || currState == TaskPartitionState.TIMED_OUT
-            || currState == TaskPartitionState.TASK_ABORTED) {
-          // Do not increment the task attempt count here - it will be incremented at scheduling
-          // time
-          markPartitionError(jobCtx, pId, currState);
-        }
 
         // Check for pending state transitions on this (partition, instance). If there is
a pending
         // state transition, we prioritize this pending state transition and set the assignment
from
@@ -242,16 +230,16 @@ public abstract class AbstractTaskDispatcher {
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the instance and add
it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has completed with state %s. Marking as such in rebalancer
context.",
                 pName, currState));
           }
           partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-
           // This task is COMPLETED, so release this task
           assignableInstanceManager.release(instance, taskConfig, quotaType);
         }
@@ -263,7 +251,11 @@ public abstract class AbstractTaskDispatcher {
         case TASK_ABORTED:
 
         case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          // First make this task which is in terminal state to be dropped.
+          // Later on, in next pipeline in handleAdditionalAssignments, the task will be
retried if possible.
+          // (meaning it is not ABORTED and max number of attempts has not been reached yet)
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has error state %s with msg %s. Marking as such in rebalancer
context.",
@@ -389,13 +381,59 @@ public abstract class AbstractTaskDispatcher {
       return stateFromContext == null ? TaskPartitionState.INIT : stateFromContext;
     }
     TaskPartitionState currentState = TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, currentState,
jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task information
in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the current state
and context
+    // information because it will prevent controller to assign the task to the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the assignment if instance
is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we do not want
to update
+    // context as long as current state existed. We just want to update context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }
+      if (currentState == TaskPartitionState.COMPLETED) {
+        markPartitionCompleted(jobCtx, pId);
+      }
+      // This avoids a race condition in the case that although currentState is in the following
+      // error condition, the pending message (INIT->RUNNNING) might still be present.
+      // This is undesirable because this prevents JobContext from getting the proper update
of
+      // fields including task state and task's NUM_ATTEMPTS
+      if (currentState == TaskPartitionState.ERROR || currentState == TaskPartitionState.TASK_ERROR
+          || currentState == TaskPartitionState.TIMED_OUT
+          || currentState == TaskPartitionState.TASK_ABORTED) {
+        // Do not increment the task attempt count here - it will be incremented at scheduling
+        // time
+        markPartitionError(jobCtx, pId, currentState);
+      }
     }
-    return currentState;
   }
 
   /**
@@ -511,7 +549,7 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String>
excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, Set<String>
excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx,
       final JobConfig jobCfg, final WorkflowConfig workflowConfig, WorkflowContext workflowCtx,
       final WorkflowControllerDataProvider cache,
@@ -580,7 +618,7 @@ public abstract class AbstractTaskDispatcher {
       // TODO: isRebalanceRunningTask() was originally put in place to allow users to move
       // ("rebalance") long-running tasks, but there hasn't been a clear use case for this
       // Previously, there was a bug in the condition above (it was || where it should have
been &&)
-      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments,
paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments,
paMap,
           jobCtx);
     }
 
@@ -588,11 +626,11 @@ public abstract class AbstractTaskDispatcher {
     if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange)
{
       // Drop current jobs only if they are assigned to a different instance, regardless
of
       // the jobCfg.isRebalanceRunningTask() setting
-      dropRebalancedRunningTasks(tgtPartitionAssignments, prevInstanceToTaskAssignments,
paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments,
paMap,
           jobCtx);
     }
     // Go through ALL instances and assign/throttle tasks accordingly
-    for (Map.Entry<String, SortedSet<Integer>> entry : prevInstanceToTaskAssignments.entrySet())
{
+    for (Map.Entry<String, SortedSet<Integer>> entry : currentInstanceToTaskAssignments.entrySet())
{
       String instance = entry.getKey();
       if (!tgtPartitionAssignments.containsKey(instance)) {
         // There is no assignment made for this instance, so it is safe to skip
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 479b47c..65d8a69 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -238,21 +238,20 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // These dropping transitions will be prioritized above all task state transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource,
tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
jobResource,
         currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState,
         assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions,
cache,
         tasksToDrop);
@@ -318,7 +317,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, jobResource,
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances,
jobResource,
           currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
           prevTaskToInstanceStateAssignment, assignedPartitions, paMap, skippedPartitions,
           taskAssignmentCal, allPartitions, currentTime, liveInstances);
@@ -380,45 +379,24 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied over expired
sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped
    * @return instance -> partitionIds from previous assignment, if the instance is still
live
    */
-  protected static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName,
+  protected static Map<String, SortedSet<Integer>> getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task currentStates copied
over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be dropped (after
a
-    // copy-over, the Controller will send a message to drop the state currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds to result and update their states in JobContext in
+    // updatePreviousAssignedTasksStatus method.
     Map<Partition, Map<String, String>> partitions = currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : partitions.entrySet())
{
       // Get all (instance -> currentState) mappings
@@ -426,26 +404,14 @@ public class JobDispatcher extends AbstractTaskDispatcher {
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), instance);
-        TaskPartitionState currState = TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping transition could
overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here because old
-          // Participants, upon connection reset, set tasks' currentStates to INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          result.get(instance).add(pId);
           // Check if this task needs to be dropped. If so, we need to add to tasksToDrop
no matter
           // what its current state is so that it will be dropped
+          // This is trying to drop tasks on a reconnected instance with a new sessionId
that have
+          // all of their requestedState == DROPPED
           if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name()))
{
             if (!tasksToDrop.containsKey(instance)) {
               tasksToDrop.put(instance, new HashSet<>());
@@ -462,10 +428,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
    * If partition is missing from prevInstanceToTaskAssignments (e.g. previous assignment
is
    * deleted) it is added from context. Otherwise, the context won't be updated.
    * @param jobCtx Job Context
-   * @param prevInstanceToTaskAssignments instance -> partitionIds from previous assignment
+   * @param currentInstanceToTaskAssignments instance -> partitionIds from CurrentStateOutput
    */
   protected void updateInstanceToTaskAssignmentsFromContext(JobContext jobCtx,
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments) {
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments) {
     for (Integer partition : jobCtx.getPartitionSet()) {
       // We must add all active task pIds back here
       // The states other than Running and Init do not need to be added.
@@ -474,9 +440,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
           || jobCtx.getPartitionState(partition) == TaskPartitionState.INIT) {
         String instance = jobCtx.getAssignedParticipant(partition);
         if (instance != null) {
-          if (prevInstanceToTaskAssignments.containsKey(instance)
-              && !prevInstanceToTaskAssignments.get(instance).contains(partition))
{
-            prevInstanceToTaskAssignments.get(instance).add(partition);
+          if (currentInstanceToTaskAssignments.containsKey(instance)
+              && !currentInstanceToTaskAssignments.get(instance).contains(partition))
{
+            currentInstanceToTaskAssignments.get(instance).add(partition);
           }
         }
       }
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
index 14ec6e7..fd1437e 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
@@ -90,8 +90,7 @@ public class TestDropTerminalTasksUponReset {
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
     // Call the static method we are testing
-    JobDispatcher.getPrevInstanceToTaskAssignments(liveInstances, prevAssignment, allTaskPartitions,
-        currentStateOutput, jobName, tasksToDrop);
+    JobDispatcher.getCurrentInstanceToTaskAssignments(liveInstances, currentStateOutput,
jobName, tasksToDrop);
 
     // Check that tasksToDrop has (numTasks / 2) partitions as we intended regardless of
what the
     // current states of the tasks were


Mime
View raw message