Repository: helix
Updated Branches:
refs/heads/master ee8ef6a70 -> 8a3705714
[HELIX-468] Make TaskDriver list output correct state
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8a370571
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8a370571
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8a370571
Branch: refs/heads/master
Commit: 8a37057142816b96640d22ee0dfac4b4dae704bc
Parents: ee8ef6a
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Jul 31 13:00:51 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Aug 1 10:30:48 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobContext.java | 6 +-
.../java/org/apache/helix/task/TaskDriver.java | 61 ++++++++------------
.../org/apache/helix/task/TaskRebalancer.java | 21 ++++++-
.../java/org/apache/helix/task/TaskState.java | 5 ++
4 files changed, 48 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index c10173d..4e1d31e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -239,10 +239,6 @@ public class JobContext extends HelixProperty {
public String getAssignedParticipant(int p) {
String pStr = String.valueOf(p);
Map<String, String> map = _record.getMapField(pStr);
- if (map == null) {
- return null;
- } else {
- return map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString());
- }
+ return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0610c01..e4871b5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -132,7 +132,6 @@ public class TaskDriver {
break;
case list:
driver.list(resource);
- break;
default:
throw new IllegalArgumentException("Unknown command " + args[0]);
}
@@ -261,24 +260,30 @@ public class TaskDriver {
public void list(String resource) {
WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+ if (wCfg == null) {
+ LOG.error("Workflow " + resource + " does not exist!");
+ return;
+ }
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
- System.out.println("Workflow " + resource + " consists of the following tasks: "
+ LOG.info("Workflow " + resource + " consists of the following tasks: "
+ wCfg.getJobDag().getAllNodes());
- TaskState workflowState = wCtx.getWorkflowState();
- if (workflowState == null) {
- workflowState = TaskState.IN_PROGRESS;
- }
- System.out.println("Current state of workflow is " + wCtx.getWorkflowState().name());
- System.out.println("Job states are: ");
- System.out.println("-------");
+ String workflowState =
+ (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+ LOG.info("Current state of workflow is " + workflowState);
+ LOG.info("Job states are: ");
+ LOG.info("-------");
for (String job : wCfg.getJobDag().getAllNodes()) {
- System.out.println("Job " + job + " is " + wCtx.getJobState(job));
- System.out.println("-------");
+ TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
+ LOG.info("Job " + job + " is " + jobState);
- // fetch task information
+ // fetch job information
JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
JobContext jCtx = TaskUtil.getJobContext(_manager, job);
+ if (jCfg == null || jCtx == null) {
+ LOG.info("-------");
+ continue;
+ }
// calculate taskPartitions
List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
@@ -288,39 +293,21 @@ public class TaskDriver {
for (Integer partition : partitions) {
String taskId = jCtx.getTaskIdForPartition(partition);
taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
- System.out.println("Task: " + taskId);
+ LOG.info("Task: " + taskId);
TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
if (taskConfig != null) {
- System.out.println("Configuration: " + taskConfig.getConfigMap());
+ LOG.info("Configuration: " + taskConfig.getConfigMap());
}
TaskPartitionState state = jCtx.getPartitionState(partition);
- if (state == null) {
- state = TaskPartitionState.INIT;
- }
- System.out.println("State: " + state);
+ state = (state != null) ? state : TaskPartitionState.INIT;
+ LOG.info("State: " + state);
String assignedParticipant = jCtx.getAssignedParticipant(partition);
if (assignedParticipant != null) {
- System.out.println("Assigned participant: " + assignedParticipant);
+ LOG.info("Assigned participant: " + assignedParticipant);
}
- System.out.println("-------");
+ LOG.info("-------");
}
-
- // group partitions by status
- /*
- * Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState,
Integer>();
- * for (Integer i : partitions) {
- * TaskPartitionState s = jCtx.getPartitionState(i);
- * if (!statusCount.containsKey(s)) {
- * statusCount.put(s, 0);
- * }
- * statusCount.put(s, statusCount.get(s) + 1);
- * }
- * for (TaskPartitionState s : statusCount.keySet()) {
- * LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
- * }
- */
-
- System.out.println("-------");
+ LOG.info("-------");
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 7fdf734..8f861cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -283,6 +283,9 @@ public abstract class TaskRebalancer implements HelixRebalancer {
instance);
TaskPartitionState currState =
(currHelixState != null) ? TaskPartitionState.valueOf(currHelixState.toString())
: null;
+ if (currState != null) {
+ jobCtx.setPartitionState(pId, currState);
+ }
// Process any requested state transitions.
State requestedStateStr =
@@ -339,7 +342,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
LOG.debug(String.format(
"Task partition %s has error state %s. Marking as such in rebalancer context.",
pName, currState));
- markPartitionError(jobCtx, pId, currState);
+ markPartitionError(jobCtx, pId, currState, true);
// The error policy is to fail the task as soon a single partition fails for a
specified
// maximum number of attempts.
if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
@@ -364,6 +367,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
workflowCtx.setJobState(jobResource, TaskState.FAILED);
workflowCtx.setWorkflowState(TaskState.FAILED);
workflowCtx.setFinishTime(System.currentTimeMillis());
+ markAllPartitionsError(jobCtx, currState, false);
addAllPartitions(allPartitions, partitionsToDropFromIs);
return emptyAssignment(jobResource, currStateOutput);
} else {
@@ -428,6 +432,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
new PartitionAssignment(instance.toString(), TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
jobCtx.setAssignedParticipant(pId, instance.toString());
+ jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
TaskPartitionState.RUNNING, instance));
}
@@ -742,10 +747,20 @@ public abstract class TaskRebalancer implements HelixRebalancer {
ctx.incrementNumAttempts(pId);
}
- private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state)
{
+ private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+ boolean incrementAttempts) {
ctx.setPartitionState(pId, state);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
- ctx.incrementNumAttempts(pId);
+ if (incrementAttempts) {
+ ctx.incrementNumAttempts(pId);
+ }
+ }
+
+ private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+ boolean incrementAttempts) {
+ for (int pId : ctx.getPartitionSet()) {
+ markPartitionError(ctx, pId, state, incrementAttempts);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 2cc6d6c..dac5df9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -24,6 +24,11 @@ package org.apache.helix.task;
*/
public enum TaskState {
/**
+ * The task has yet to start
+ */
+ NOT_STARTED,
+
+ /**
* The task is in progress.
*/
IN_PROGRESS,
|