helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [21/33] helix git commit: Add a new task state (TASK_ABORTED) to TaskResult. This allows client to abort a task and let Helix not retry it even Task.RetryCount is bigger than 1.
Date Wed, 17 Aug 2016 04:27:17 GMT
Add a new task state (TASK_ABORTED) to TaskResult. This allows client to abort a task and let
Helix not retry it even Task.RetryCount is bigger than 1.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7f184839
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7f184839
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7f184839

Branch: refs/heads/helix-0.6.x
Commit: 7f184839aebcb817e3c9b0626bcc2a9b44c6926c
Parents: ddc11f9
Author: Lei Xia <lxia@linkedin.com>
Authored: Tue Jul 5 15:58:51 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Tue Jul 5 16:14:30 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/model/TaskSMD.java    |  38 ++++---
 .../java/org/apache/helix/task/JobConfig.java   |   1 -
 .../org/apache/helix/task/JobRebalancer.java    |  21 ++--
 .../apache/helix/task/TaskPartitionState.java   |   4 +-
 .../java/org/apache/helix/task/TaskResult.java  |  10 +-
 .../java/org/apache/helix/task/TaskRunner.java  |  12 ++-
 .../org/apache/helix/task/TaskStateModel.java   |  32 +++++-
 .../org/apache/helix/task/WorkflowConfig.java   |   2 +-
 .../apache/helix/integration/task/MockTask.java |  35 +++++--
 .../integration/task/TestRecurringJobQueue.java |   5 +-
 .../task/TestTaskConditionalRetry.java          | 102 +++++++++++++++++++
 11 files changed, 226 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java b/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
index d826358..fd8b585 100644
--- a/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
+++ b/helix-core/src/main/java/org/apache/helix/model/TaskSMD.java
@@ -54,6 +54,7 @@ public final class TaskSMD extends StateModelDefinition {
     builder.addState(TaskPartitionState.COMPLETED.name(), 3);
     builder.addState(TaskPartitionState.TIMED_OUT.name(), 4);
     builder.addState(TaskPartitionState.TASK_ERROR.name(), 5);
+    builder.addState(TaskPartitionState.TASK_ABORTED.name(), 6);
     builder.addState(TaskPartitionState.DROPPED.name());
 
     // add transitions
@@ -62,22 +63,26 @@ public final class TaskSMD extends StateModelDefinition {
     builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.COMPLETED.name(),
2);
     builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TIMED_OUT.name(),
3);
     builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ERROR.name(),
4);
-    builder.addTransition(TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING.name(),
5);
+    builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ABORTED.name(),
5);
+    builder.addTransition(TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING.name(),
6);
 
     // All states have a transition to DROPPED.
-    builder.addTransition(TaskPartitionState.INIT.name(), TaskPartitionState.DROPPED.name(),
6);
-    builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.DROPPED.name(),
7);
-    builder.addTransition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.DROPPED.name(),
8);
-    builder.addTransition(TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED.name(),
9);
-    builder.addTransition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED.name(),
10);
-    builder.addTransition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED.name(),
11);
+    builder.addTransition(TaskPartitionState.INIT.name(), TaskPartitionState.DROPPED.name(),
7);
+    builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.DROPPED.name(),
8);
+    builder.addTransition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.DROPPED.name(),
9);
+    builder.addTransition(TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED.name(),
10);
+    builder.addTransition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED.name(),
11);
+    builder.addTransition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED.name(),
12);
+    builder.addTransition(TaskPartitionState.TASK_ABORTED.name(), TaskPartitionState.DROPPED.name(),
13);
+
 
     // All states, except DROPPED, have a transition to INIT.
-    builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT.name(),
12);
-    builder.addTransition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.INIT.name(),
13);
-    builder.addTransition(TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT.name(),
14);
-    builder.addTransition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT.name(),
15);
-    builder.addTransition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT.name(),
16);
+    builder.addTransition(TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT.name(),
14);
+    builder.addTransition(TaskPartitionState.COMPLETED.name(), TaskPartitionState.INIT.name(),
15);
+    builder.addTransition(TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT.name(),
16);
+    builder.addTransition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT.name(),
17);
+    builder.addTransition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT.name(),
18);
+    builder.addTransition(TaskPartitionState.TASK_ABORTED.name(), TaskPartitionState.INIT.name(),
19);
 
     return builder.build();
   }
@@ -99,6 +104,7 @@ public final class TaskSMD extends StateModelDefinition {
     statePriorityList.add(TaskPartitionState.COMPLETED.name());
     statePriorityList.add(TaskPartitionState.TIMED_OUT.name());
     statePriorityList.add(TaskPartitionState.TASK_ERROR.name());
+    statePriorityList.add(TaskPartitionState.TASK_ABORTED.name());
     statePriorityList.add(TaskPartitionState.DROPPED.name());
     record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), statePriorityList);
     for (String state : statePriorityList) {
@@ -115,6 +121,7 @@ public final class TaskSMD extends StateModelDefinition {
     states.add(TaskPartitionState.COMPLETED.name());
     states.add(TaskPartitionState.TIMED_OUT.name());
     states.add(TaskPartitionState.TASK_ERROR.name());
+    states.add(TaskPartitionState.TASK_ABORTED.name());
     states.add(TaskPartitionState.DROPPED.name());
 
     List<Transition> transitions = new ArrayList<Transition>();
@@ -123,6 +130,7 @@ public final class TaskSMD extends StateModelDefinition {
     transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.COMPLETED.name()));
     transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TIMED_OUT.name()));
     transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ERROR.name()));
+    transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.TASK_ABORTED.name()));
     transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.RUNNING.name()));
 
     // All states have a transition to DROPPED.
@@ -132,6 +140,8 @@ public final class TaskSMD extends StateModelDefinition {
     transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.DROPPED.name()));
     transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.DROPPED.name()));
     transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.DROPPED.name()));
+    transitions.add(new Transition(TaskPartitionState.TASK_ABORTED.name(), TaskPartitionState.DROPPED.name()));
+
 
     // All states, except DROPPED, have a transition to INIT.
     transitions.add(new Transition(TaskPartitionState.RUNNING.name(), TaskPartitionState.INIT.name()));
@@ -139,6 +149,7 @@ public final class TaskSMD extends StateModelDefinition {
     transitions.add(new Transition(TaskPartitionState.STOPPED.name(), TaskPartitionState.INIT.name()));
     transitions.add(new Transition(TaskPartitionState.TIMED_OUT.name(), TaskPartitionState.INIT.name()));
     transitions.add(new Transition(TaskPartitionState.TASK_ERROR.name(), TaskPartitionState.INIT.name()));
+    transitions.add(new Transition(TaskPartitionState.TASK_ABORTED.name(), TaskPartitionState.INIT.name()));
 
     StateTransitionTableBuilder builder = new StateTransitionTableBuilder();
     Map<String, Map<String, String>> next = builder.buildTransitionTable(states,
transitions);
@@ -154,6 +165,7 @@ public final class TaskSMD extends StateModelDefinition {
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
TaskPartitionState.COMPLETED.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
TaskPartitionState.TIMED_OUT.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
TaskPartitionState.TASK_ERROR.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
TaskPartitionState.TASK_ABORTED.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
TaskPartitionState.RUNNING.name()));
 
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.INIT.name(),
TaskPartitionState.DROPPED.name()));
@@ -162,12 +174,14 @@ public final class TaskSMD extends StateModelDefinition {
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
TaskPartitionState.DROPPED.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(),
TaskPartitionState.DROPPED.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(),
TaskPartitionState.DROPPED.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ABORTED.name(),
TaskPartitionState.DROPPED.name()));
 
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.RUNNING.name(),
TaskPartitionState.INIT.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.COMPLETED.name(),
TaskPartitionState.INIT.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.STOPPED.name(),
TaskPartitionState.INIT.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TIMED_OUT.name(),
TaskPartitionState.INIT.name()));
     stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ERROR.name(),
TaskPartitionState.INIT.name()));
+    stateTransitionPriorityList.add(String.format("%s-%s", TaskPartitionState.TASK_ABORTED.name(),
TaskPartitionState.INIT.name()));
 
     record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
                         stateTransitionPriorityList);

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 1eeca60..d26c83b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -99,7 +99,6 @@ public class JobConfig {
      * The amount of time in ms to wait before retrying a task
      */
     TaskRetryDelay,
-
     /**
      * Whether failure of directly dependent jobs should fail this job.
      */

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 5b41773..b02089f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -314,15 +314,17 @@ public class JobRebalancer extends TaskRebalancer {
           break;
         case TIMED_OUT:
         case TASK_ERROR:
+        case TASK_ABORTED:
         case ERROR: {
           donePartitions.add(pId); // The task may be rescheduled on a different instance.
           LOG.debug(String.format(
-              "Task partition %s has error state %s. Marking as such in rebalancer context.",
-              pName, currState));
+              "Task partition %s has error state %s. Marking as such in rebalancer context.",
pName,
+              currState));
           markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a
specified
-          // maximum number of attempts.
-          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+          // maximum number of attempts or task is in ABORTED state.
+          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() ||
+              currState.equals(TaskPartitionState.TASK_ABORTED)) {
             // If the user does not require this task to succeed in order for the job to
succeed,
             // then we don't have to fail the job right now
             boolean successOptional = false;
@@ -352,6 +354,8 @@ public class JobRebalancer extends TaskRebalancer {
               skippedPartitions.add(pId);
               partitionsToDropFromIs.add(pId);
             }
+
+            LOG.debug("skippedPartitions:" + skippedPartitions);
           } else {
             // Mark the task to be started at some later time (if enabled)
             markPartitionDelayed(jobCfg, jobCtx, pId);
@@ -391,7 +395,7 @@ public class JobRebalancer extends TaskRebalancer {
       // any new assignments.
       // This includes all completed, failed, delayed, and already assigned partitions.
       Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
-      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+      addCompletedTasks(excludeSet, jobCtx, allPartitions);
       addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
       excludeSet.addAll(skippedPartitions);
       excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime));
@@ -529,7 +533,7 @@ public class JobRebalancer extends TaskRebalancer {
     }
   }
 
-  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
+  private static void addCompletedTasks(Set<Integer> set, JobContext ctx,
       Iterable<Integer> pIds) {
     for (Integer pId : pIds) {
       TaskPartitionState state = ctx.getPartitionState(pId);
@@ -540,6 +544,11 @@ public class JobRebalancer extends TaskRebalancer {
   }
 
   private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) {
+    TaskPartitionState state = ctx.getPartitionState(pId);
+    if (state != null && (state.equals(TaskPartitionState.TASK_ABORTED) | state
+        .equals(TaskPartitionState.ERROR))) {
+      return true;
+    }
     return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
index d41668d..5b4a5a4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskPartitionState.java
@@ -33,8 +33,10 @@ public enum TaskPartitionState {
   COMPLETED,
   /** Indicates that the task timed out. */
   TIMED_OUT,
-  /** Indicates an error occurred during task execution. */
+  /** Indicates an error occurred during task execution, but the task can be retried. */
   TASK_ERROR,
+  /** Indicates an error occurred during task execution, and the task should not be retried.
*/
+  TASK_ABORTED,
   /** Helix's own internal error state. */
   ERROR,
   /** A Helix internal state. */

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
index 95b8d72..02cd162 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskResult.java
@@ -34,8 +34,14 @@ public class TaskResult {
      * called.
      */
     CANCELED,
-    /** The task encountered an error from which it could not recover. */
-    ERROR
+    /** The task encountered an error from which it can not recover.
+     * This is equivalent to {@link org.apache.helix.task.TaskResult.Status#FAILED}.*/
+    @Deprecated
+    ERROR,
+    /** The task encountered an error which can not be recovered from this run, but it may
still succeed by retrying the task. */
+    FAILED,
+    /** The task encountered an error, which will not be recoverable even with retrying the
task */
+    FATAL_FAILED
   }
 
   private final Status _status;

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 1bf88ec..c43d0ce 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -71,8 +71,8 @@ public class TaskRunner implements Runnable {
       } catch (ThreadDeath death) {
         throw death;
       } catch (Throwable t) {
-        LOG.error("Problem running the task", t);
-        _result = new TaskResult(Status.ERROR, null);
+        LOG.error("Problem running the task, report task as FAILED.", t);
+        _result = new TaskResult(Status.FAILED, null);
       }
 
       switch (_result.getStatus()) {
@@ -88,8 +88,14 @@ public class TaskRunner implements Runnable {
       case ERROR:
         requestStateTransition(TaskPartitionState.TASK_ERROR);
         break;
+      case FAILED:
+        requestStateTransition(TaskPartitionState.TASK_ERROR);
+        break;
+      case FATAL_FAILED:
+        requestStateTransition(TaskPartitionState.TASK_ABORTED);
+        break;
       default:
-        throw new AssertionError("Unknown result type.");
+        throw new AssertionError("Unknown task result type: " + _result.getStatus().name());
       }
     } catch (Exception e) {
       requestStateTransition(TaskPartitionState.TASK_ERROR);

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index d3ee003..ba68a78 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -133,7 +133,27 @@ public class TaskStateModel extends StateModel {
     }
 
     TaskResult r = _taskRunner.waitTillDone();
-    if (r.getStatus() != TaskResult.Status.ERROR) {
+    if (r.getStatus() != TaskResult.Status.ERROR && r.getStatus() != TaskResult.Status.FAILED)
{
+      throw new IllegalStateException(String.format(
+          "Partition %s received a state transition to %s but the result status code is %s.",
+          msg.getPartitionName(), msg.getToState(), r.getStatus()));
+    }
+
+    timeout_task.cancel(false);
+
+    return r.getInfo();
+  }
+
+  @Transition(to = "TASK_ABORTED", from = "RUNNING")
+  public String onBecomeTaskAbortedFromRunning(Message msg, NotificationContext context)
{
+    String taskPartition = msg.getPartitionName();
+    if (_taskRunner == null) {
+      throw new IllegalStateException(String.format(
+          "Invalid state transition. There is no running task for partition %s.", taskPartition));
+    }
+
+    TaskResult r = _taskRunner.waitTillDone();
+    if (r.getStatus() != TaskResult.Status.FATAL_FAILED) {
       throw new IllegalStateException(String.format(
           "Partition %s received a state transition to %s but the result status code is %s.",
           msg.getPartitionName(), msg.getToState(), r.getStatus()));
@@ -189,6 +209,11 @@ public class TaskStateModel extends StateModel {
     reset();
   }
 
+  @Transition(to = "DROPPED", from = "TASK_ABORTED")
+  public void onBecomeDroppedFromTaskAborted(Message msg, NotificationContext context) {
+    reset();
+  }
+
   @Transition(to = "INIT", from = "RUNNING")
   public void onBecomeInitFromRunning(Message msg, NotificationContext context) {
     String taskPartition = msg.getPartitionName();
@@ -223,6 +248,11 @@ public class TaskStateModel extends StateModel {
     reset();
   }
 
+  @Transition(to = "INIT", from = "TASK_ABORTED")
+  public void onBecomeInitFromTaskAborted(Message msg, NotificationContext context) {
+    reset();
+  }
+
   @Override
   public void reset() {
     if (_taskRunner != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index ddd37d5..2881b61 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
  * Provides a typed interface to workflow level configurations. Validates the configurations.
  */
 // TODO: extends WorkflowConfig from ResourceConfig
-public class WorkflowConfig {
+public class  WorkflowConfig {
   private static final Logger LOG = Logger.getLogger(WorkflowConfig.class);
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index dad9949..f415b8e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -19,27 +19,45 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskResult;
 
 public class MockTask implements Task {
   public static final String TASK_COMMAND = "Reindex";
-  private static final String TIMEOUT_CONFIG = "Timeout";
+  public static final String TIMEOUT_CONFIG = "Timeout";
+  public static final String TASK_RESULT_STATUS = "TaskResultStatus";
+  public static final String THROW_EXCEPTION = "ThrowException";
   private final long _delay;
   private volatile boolean _canceled;
+  private TaskResult.Status _taskResultStatus;
+  private boolean _throwException;
 
   public MockTask(TaskCallbackContext context) {
-    JobConfig jobCfg = context.getJobConfig();
-    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+    Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
     if (cfg == null) {
-      cfg = Collections.emptyMap();
+      cfg = new HashMap<String, String>();
     }
+
+    TaskConfig taskConfig = context.getTaskConfig();
+    Map<String, String> taskCfg = taskConfig.getConfigMap();
+    if (taskCfg != null) {
+      cfg.putAll(taskCfg);
+    }
+
     _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) :
100L;
+    _taskResultStatus = cfg.containsKey(TASK_RESULT_STATUS) ?
+        TaskResult.Status.valueOf(cfg.get(TASK_RESULT_STATUS)) :
+        TaskResult.Status.COMPLETED;
+    _throwException = cfg.containsKey(THROW_EXCEPTION) ?
+        Boolean.valueOf(cfg.containsKey(THROW_EXCEPTION)) :
+        false;
   }
 
   @Override
@@ -55,7 +73,12 @@ public class MockTask implements Task {
       sleep(50);
     }
     timeLeft = expiry - System.currentTimeMillis();
-    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0
: timeLeft));
+
+    if (_throwException) {
+      throw new RuntimeException("Test failed");
+    }
+
+    return new TaskResult(_taskResultStatus, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 8262b9b..65ec458 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -42,7 +42,6 @@ import com.google.common.collect.Sets;
 
 public class TestRecurringJobQueue extends TaskTestBase {
   private static final Logger LOG = Logger.getLogger(TestRecurringJobQueue.class);
-  private static final String TIMEOUT_CONFIG = "Timeout";
 
   @Test
   public void deleteRecreateRecurrentQueue() throws Exception {
@@ -120,7 +119,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500));
     Thread.sleep(100);
     for (int i = 0; i <= 4; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
@@ -211,7 +210,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // create jobs
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
     List<String> jobNames = new ArrayList<String>();
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500));
 
     final int JOB_COUNTS = 3;
     for (int i = 0; i < JOB_COUNTS; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7f184839/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
new file mode 100644
index 0000000..5fa370d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskConditionalRetry.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test Conditional Task Retry
+ */
+public class TestTaskConditionalRetry extends TaskTestBase {
+
+  @Test public void test() throws Exception {
+    int taskRetryCount = 5;
+    int num_tasks = 5;
+
+    String jobResource = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = new JobConfig.Builder();
+    jobBuilder.setCommand(MockTask.TASK_COMMAND).setTimeoutPerTask(10000)
+        .setMaxAttemptsPerTask(taskRetryCount).setFailureThreshold(Integer.MAX_VALUE);
+
+    // create each task configs.
+    final int abortedTask = 1;
+    final int failedTask = 2;
+    final int exceptionTask = 3;
+
+    List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+    for (int j = 0; j < num_tasks; j++) {
+      TaskConfig.Builder configBuilder = new TaskConfig.Builder().setTaskId("task_" + j);
+      switch (j) {
+      case abortedTask:
+        configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name());
+        break;
+      case failedTask:
+        configBuilder.addConfig(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FAILED.name());
+        break;
+      case exceptionTask:
+        configBuilder.addConfig(MockTask.THROW_EXCEPTION, Boolean.TRUE.toString());
+        break;
+      default:
+        break;
+      }
+      configBuilder.setTargetPartition(String.valueOf(j));
+      taskConfigs.add(configBuilder.build());
+    }
+    jobBuilder.addTaskConfigs(taskConfigs);
+
+    Workflow flow =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobResource, jobBuilder).build();
+
+    _driver.start(flow);
+
+    // Wait until the job completes.
+    TaskTestUtil.pollForWorkflowState(_driver, jobResource, TaskState.COMPLETED);
+
+    JobContext ctx = _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource));
+    for (int i = 0; i < num_tasks; i++) {
+      TaskPartitionState state = ctx.getPartitionState(i);
+      int retriedCount = ctx.getPartitionNumAttempts(i);
+      String taskId = ctx.getTaskIdForPartition(i);
+
+      if (taskId.equals("task_" + abortedTask)) {
+        Assert.assertEquals(state, TaskPartitionState.TASK_ABORTED);
+        Assert.assertEquals(retriedCount, 1);
+      } else if (taskId.equals("task_" + failedTask) || taskId.equals("task_" + exceptionTask))
{
+        Assert.assertEquals(state, TaskPartitionState.TASK_ERROR);
+        Assert.assertEquals(retriedCount, taskRetryCount);
+      } else {
+        Assert.assertEquals(state, TaskPartitionState.COMPLETED);
+        Assert.assertEquals(retriedCount, 1);
+      }
+    }
+  }
+}


Mime
View raw message