Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 4f85a26ff -> 47199d507
[HELIX-468] Make TaskDriver list output correct state
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/47199d50
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/47199d50
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/47199d50
Branch: refs/heads/helix-0.6.x
Commit: 47199d507ab4a7584dbcd9c1aa47722e69e87612
Parents: 4f85a26
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Jul 31 13:00:51 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Thu Jul 31 13:00:51 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobContext.java | 19 +++++++-
.../java/org/apache/helix/task/TaskDriver.java | 47 +++++++++++++-------
.../org/apache/helix/task/TaskRebalancer.java | 20 +++++++--
.../java/org/apache/helix/task/TaskState.java | 5 +++
4 files changed, 71 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 7742c67..4e1d31e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -43,7 +43,8 @@ public class JobContext extends HelixProperty {
NUM_ATTEMPTS,
FINISH_TIME,
TARGET,
- TASK_ID
+ TASK_ID,
+ ASSIGNED_PARTICIPANT
}
public JobContext(ZNRecord record) {
@@ -224,4 +225,20 @@ public class JobContext extends HelixProperty {
}
return partitionMap;
}
+
+ public void setAssignedParticipant(int p, String participantName) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ if (map == null) {
+ map = new TreeMap<String, String>();
+ _record.setMapField(pStr, map);
+ }
+ map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+ }
+
+ public String getAssignedParticipant(int p) {
+ String pStr = String.valueOf(p);
+ Map<String, String> map = _record.getMapField(pStr);
+ return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index d7e8e25..e4871b5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import org.I0Itec.zkclient.DataUpdater;
import org.apache.commons.cli.CommandLine;
@@ -261,37 +260,53 @@ public class TaskDriver {
public void list(String resource) {
WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+ if (wCfg == null) {
+ LOG.error("Workflow " + resource + " does not exist!");
+ return;
+ }
WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
LOG.info("Workflow " + resource + " consists of the following tasks: "
+ wCfg.getJobDag().getAllNodes());
- LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
+ String workflowState =
+ (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+ LOG.info("Current state of workflow is " + workflowState);
LOG.info("Job states are: ");
LOG.info("-------");
for (String job : wCfg.getJobDag().getAllNodes()) {
- LOG.info("Task " + job + " is " + wCtx.getJobState(job));
+ TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
+ LOG.info("Job " + job + " is " + jobState);
- // fetch task information
+ // fetch job information
+ JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
JobContext jCtx = TaskUtil.getJobContext(_manager, job);
+ if (jCfg == null || jCtx == null) {
+ LOG.info("-------");
+ continue;
+ }
// calculate taskPartitions
List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
Collections.sort(partitions);
- // group partitions by status
- Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState,
Integer>();
- for (Integer i : partitions) {
- TaskPartitionState s = jCtx.getPartitionState(i);
- if (!statusCount.containsKey(s)) {
- statusCount.put(s, 0);
+ // report status
+ for (Integer partition : partitions) {
+ String taskId = jCtx.getTaskIdForPartition(partition);
+ taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
+ LOG.info("Task: " + taskId);
+ TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
+ if (taskConfig != null) {
+ LOG.info("Configuration: " + taskConfig.getConfigMap());
}
- statusCount.put(s, statusCount.get(s) + 1);
- }
-
- for (TaskPartitionState s : statusCount.keySet()) {
- LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+ TaskPartitionState state = jCtx.getPartitionState(partition);
+ state = (state != null) ? state : TaskPartitionState.INIT;
+ LOG.info("State: " + state);
+ String assignedParticipant = jCtx.getAssignedParticipant(partition);
+ if (assignedParticipant != null) {
+ LOG.info("Assigned participant: " + assignedParticipant);
+ }
+ LOG.info("-------");
}
-
LOG.info("-------");
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2e4e300..7c50479 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -268,6 +268,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
TaskPartitionState currState =
TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
pName), instance));
+ jobCtx.setPartitionState(pId, currState);
// Process any requested state transitions.
String requestedStateStr =
@@ -322,7 +323,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
LOG.debug(String.format(
"Task partition %s has error state %s. Marking as such in rebalancer context.",
pName, currState));
- markPartitionError(jobCtx, pId, currState);
+ markPartitionError(jobCtx, pId, currState, true);
// The error policy is to fail the task as soon a single partition fails for a
specified
// maximum number of attempts.
if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
@@ -347,6 +348,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
workflowCtx.setJobState(jobResource, TaskState.FAILED);
workflowCtx.setWorkflowState(TaskState.FAILED);
workflowCtx.setFinishTime(System.currentTimeMillis());
+ markAllPartitionsError(jobCtx, currState, false);
addAllPartitions(allPartitions, partitionsToDropFromIs);
return emptyAssignment(jobResource, currStateOutput);
} else {
@@ -409,6 +411,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
String pName = pName(jobResource, pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
+ jobCtx.setAssignedParticipant(pId, instance);
+ jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
TaskPartitionState.RUNNING, instance));
}
@@ -721,10 +725,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
ctx.incrementNumAttempts(pId);
}
- private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state)
{
+ private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+ boolean incrementAttempts) {
ctx.setPartitionState(pId, state);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
- ctx.incrementNumAttempts(pId);
+ if (incrementAttempts) {
+ ctx.incrementNumAttempts(pId);
+ }
+ }
+
+ private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+ boolean incrementAttempts) {
+ for (int pId : ctx.getPartitionSet()) {
+ markPartitionError(ctx, pId, state, incrementAttempts);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 2cc6d6c..dac5df9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -24,6 +24,11 @@ package org.apache.helix.task;
*/
public enum TaskState {
/**
+ * The task has yet to start
+ */
+ NOT_STARTED,
+
+ /**
* The task is in progress.
*/
IN_PROGRESS,
|