helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-468] Make TaskDriver list output correct state
Date Fri, 01 Aug 2014 17:23:12 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 4f85a26ff -> 47199d507


[HELIX-468] Make TaskDriver list output correct state


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/47199d50
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/47199d50
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/47199d50

Branch: refs/heads/helix-0.6.x
Commit: 47199d507ab4a7584dbcd9c1aa47722e69e87612
Parents: 4f85a26
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Jul 31 13:00:51 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Thu Jul 31 13:00:51 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobContext.java  | 19 +++++++-
 .../java/org/apache/helix/task/TaskDriver.java  | 47 +++++++++++++-------
 .../org/apache/helix/task/TaskRebalancer.java   | 20 +++++++--
 .../java/org/apache/helix/task/TaskState.java   |  5 +++
 4 files changed, 71 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 7742c67..4e1d31e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -43,7 +43,8 @@ public class JobContext extends HelixProperty {
     NUM_ATTEMPTS,
     FINISH_TIME,
     TARGET,
-    TASK_ID
+    TASK_ID,
+    ASSIGNED_PARTICIPANT
   }
 
   public JobContext(ZNRecord record) {
@@ -224,4 +225,20 @@ public class JobContext extends HelixProperty {
     }
     return partitionMap;
   }
+
+  public void setAssignedParticipant(int p, String participantName) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    if (map == null) {
+      map = new TreeMap<String, String>();
+      _record.setMapField(pStr, map);
+    }
+    map.put(ContextProperties.ASSIGNED_PARTICIPANT.toString(), participantName);
+  }
+
+  public String getAssignedParticipant(int p) {
+    String pStr = String.valueOf(p);
+    Map<String, String> map = _record.getMapField(pStr);
+    return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index d7e8e25..e4871b5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.commons.cli.CommandLine;
@@ -261,37 +260,53 @@ public class TaskDriver {
 
   public void list(String resource) {
     WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+    if (wCfg == null) {
+      LOG.error("Workflow " + resource + " does not exist!");
+      return;
+    }
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
 
     LOG.info("Workflow " + resource + " consists of the following tasks: "
         + wCfg.getJobDag().getAllNodes());
-    LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
+    String workflowState =
+        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+    LOG.info("Current state of workflow is " + workflowState);
     LOG.info("Job states are: ");
     LOG.info("-------");
     for (String job : wCfg.getJobDag().getAllNodes()) {
-      LOG.info("Task " + job + " is " + wCtx.getJobState(job));
+      TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
+      LOG.info("Job " + job + " is " + jobState);
 
-      // fetch task information
+      // fetch job information
+      JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
       JobContext jCtx = TaskUtil.getJobContext(_manager, job);
+      if (jCfg == null || jCtx == null) {
+        LOG.info("-------");
+        continue;
+      }
 
       // calculate taskPartitions
       List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
       Collections.sort(partitions);
 
-      // group partitions by status
-      Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState,
Integer>();
-      for (Integer i : partitions) {
-        TaskPartitionState s = jCtx.getPartitionState(i);
-        if (!statusCount.containsKey(s)) {
-          statusCount.put(s, 0);
+      // report status
+      for (Integer partition : partitions) {
+        String taskId = jCtx.getTaskIdForPartition(partition);
+        taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
+        LOG.info("Task: " + taskId);
+        TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
+        if (taskConfig != null) {
+          LOG.info("Configuration: " + taskConfig.getConfigMap());
         }
-        statusCount.put(s, statusCount.get(s) + 1);
-      }
-
-      for (TaskPartitionState s : statusCount.keySet()) {
-        LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
+        TaskPartitionState state = jCtx.getPartitionState(partition);
+        state = (state != null) ? state : TaskPartitionState.INIT;
+        LOG.info("State: " + state);
+        String assignedParticipant = jCtx.getAssignedParticipant(partition);
+        if (assignedParticipant != null) {
+          LOG.info("Assigned participant: " + assignedParticipant);
+        }
+        LOG.info("-------");
       }
-
       LOG.info("-------");
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2e4e300..7c50479 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -268,6 +268,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         TaskPartitionState currState =
             TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition(
                 pName), instance));
+        jobCtx.setPartitionState(pId, currState);
 
         // Process any requested state transitions.
         String requestedStateStr =
@@ -322,7 +323,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
           LOG.debug(String.format(
               "Task partition %s has error state %s. Marking as such in rebalancer context.",
               pName, currState));
-          markPartitionError(jobCtx, pId, currState);
+          markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a
specified
           // maximum number of attempts.
           if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
@@ -347,6 +348,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
               workflowCtx.setJobState(jobResource, TaskState.FAILED);
               workflowCtx.setWorkflowState(TaskState.FAILED);
               workflowCtx.setFinishTime(System.currentTimeMillis());
+              markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);
               return emptyAssignment(jobResource, currStateOutput);
             } else {
@@ -409,6 +411,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
             String pName = pName(jobResource, pId);
             paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
             excludeSet.add(pId);
+            jobCtx.setAssignedParticipant(pId, instance);
+            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
             LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
                 TaskPartitionState.RUNNING, instance));
           }
@@ -721,10 +725,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     ctx.incrementNumAttempts(pId);
   }
 
-  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state)
{
+  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+      boolean incrementAttempts) {
     ctx.setPartitionState(pId, state);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
+    if (incrementAttempts) {
+      ctx.incrementNumAttempts(pId);
+    }
+  }
+
+  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+      boolean incrementAttempts) {
+    for (int pId : ctx.getPartitionSet()) {
+      markPartitionError(ctx, pId, state, incrementAttempts);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/47199d50/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 2cc6d6c..dac5df9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -24,6 +24,11 @@ package org.apache.helix.task;
  */
 public enum TaskState {
   /**
+   * The task has yet to start
+   */
+  NOT_STARTED,
+
+  /**
    * The task is in progress.
    */
   IN_PROGRESS,


Mime
View raw message