helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-468] Make TaskDriver list output correct state
Date Fri, 01 Aug 2014 17:31:06 GMT
Repository: helix
Updated Branches:
  refs/heads/master ee8ef6a70 -> 8a3705714


[HELIX-468] Make TaskDriver list output correct state


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8a370571
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8a370571
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8a370571

Branch: refs/heads/master
Commit: 8a37057142816b96640d22ee0dfac4b4dae704bc
Parents: ee8ef6a
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Jul 31 13:00:51 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Aug 1 10:30:48 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobContext.java  |  6 +-
 .../java/org/apache/helix/task/TaskDriver.java  | 61 ++++++++------------
 .../org/apache/helix/task/TaskRebalancer.java   | 21 ++++++-
 .../java/org/apache/helix/task/TaskState.java   |  5 ++
 4 files changed, 48 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index c10173d..4e1d31e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -239,10 +239,6 @@ public class JobContext extends HelixProperty {
   public String getAssignedParticipant(int p) {
     String pStr = String.valueOf(p);
     Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      return null;
-    } else {
-      return map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString());
-    }
+    return (map != null) ? map.get(ContextProperties.ASSIGNED_PARTICIPANT.toString()) : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0610c01..e4871b5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -132,7 +132,6 @@ public class TaskDriver {
         break;
       case list:
         driver.list(resource);
-        break;
       default:
         throw new IllegalArgumentException("Unknown command " + args[0]);
       }
@@ -261,24 +260,30 @@ public class TaskDriver {
 
   public void list(String resource) {
     WorkflowConfig wCfg = TaskUtil.getWorkflowCfg(_manager, resource);
+    if (wCfg == null) {
+      LOG.error("Workflow " + resource + " does not exist!");
+      return;
+    }
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
 
-    System.out.println("Workflow " + resource + " consists of the following tasks: "
+    LOG.info("Workflow " + resource + " consists of the following tasks: "
         + wCfg.getJobDag().getAllNodes());
-    TaskState workflowState = wCtx.getWorkflowState();
-    if (workflowState == null) {
-      workflowState = TaskState.IN_PROGRESS;
-    }
-    System.out.println("Current state of workflow is " + wCtx.getWorkflowState().name());
-    System.out.println("Job states are: ");
-    System.out.println("-------");
+    String workflowState =
+        (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
+    LOG.info("Current state of workflow is " + workflowState);
+    LOG.info("Job states are: ");
+    LOG.info("-------");
     for (String job : wCfg.getJobDag().getAllNodes()) {
-      System.out.println("Job " + job + " is " + wCtx.getJobState(job));
-      System.out.println("-------");
+      TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
+      LOG.info("Job " + job + " is " + jobState);
 
-      // fetch task information
+      // fetch job information
       JobConfig jCfg = TaskUtil.getJobCfg(_manager, job);
       JobContext jCtx = TaskUtil.getJobContext(_manager, job);
+      if (jCfg == null || jCtx == null) {
+        LOG.info("-------");
+        continue;
+      }
 
       // calculate taskPartitions
       List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
@@ -288,39 +293,21 @@ public class TaskDriver {
       for (Integer partition : partitions) {
         String taskId = jCtx.getTaskIdForPartition(partition);
         taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
-        System.out.println("Task: " + taskId);
+        LOG.info("Task: " + taskId);
         TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
         if (taskConfig != null) {
-          System.out.println("Configuration: " + taskConfig.getConfigMap());
+          LOG.info("Configuration: " + taskConfig.getConfigMap());
         }
         TaskPartitionState state = jCtx.getPartitionState(partition);
-        if (state == null) {
-          state = TaskPartitionState.INIT;
-        }
-        System.out.println("State: " + state);
+        state = (state != null) ? state : TaskPartitionState.INIT;
+        LOG.info("State: " + state);
         String assignedParticipant = jCtx.getAssignedParticipant(partition);
         if (assignedParticipant != null) {
-          System.out.println("Assigned participant: " + assignedParticipant);
+          LOG.info("Assigned participant: " + assignedParticipant);
         }
-        System.out.println("-------");
+        LOG.info("-------");
       }
-
-      // group partitions by status
-      /*
-       * Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState,
Integer>();
-       * for (Integer i : partitions) {
-       * TaskPartitionState s = jCtx.getPartitionState(i);
-       * if (!statusCount.containsKey(s)) {
-       * statusCount.put(s, 0);
-       * }
-       * statusCount.put(s, statusCount.get(s) + 1);
-       * }
-       * for (TaskPartitionState s : statusCount.keySet()) {
-       * LOG.info(statusCount.get(s) + "/" + partitions.size() + " in state " + s.name());
-       * }
-       */
-
-      System.out.println("-------");
+      LOG.info("-------");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 7fdf734..8f861cd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -283,6 +283,9 @@ public abstract class TaskRebalancer implements HelixRebalancer {
                 instance);
         TaskPartitionState currState =
             (currHelixState != null) ? TaskPartitionState.valueOf(currHelixState.toString())
: null;
+        if (currState != null) {
+          jobCtx.setPartitionState(pId, currState);
+        }
 
         // Process any requested state transitions.
         State requestedStateStr =
@@ -339,7 +342,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
           LOG.debug(String.format(
               "Task partition %s has error state %s. Marking as such in rebalancer context.",
               pName, currState));
-          markPartitionError(jobCtx, pId, currState);
+          markPartitionError(jobCtx, pId, currState, true);
           // The error policy is to fail the task as soon a single partition fails for a
specified
           // maximum number of attempts.
           if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
@@ -364,6 +367,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
               workflowCtx.setJobState(jobResource, TaskState.FAILED);
               workflowCtx.setWorkflowState(TaskState.FAILED);
               workflowCtx.setFinishTime(System.currentTimeMillis());
+              markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);
               return emptyAssignment(jobResource, currStateOutput);
             } else {
@@ -428,6 +432,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
                 new PartitionAssignment(instance.toString(), TaskPartitionState.RUNNING.name()));
             excludeSet.add(pId);
             jobCtx.setAssignedParticipant(pId, instance.toString());
+            jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
             LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
pName,
                 TaskPartitionState.RUNNING, instance));
           }
@@ -742,10 +747,20 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     ctx.incrementNumAttempts(pId);
   }
 
-  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state)
{
+  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state,
+      boolean incrementAttempts) {
     ctx.setPartitionState(pId, state);
     ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
-    ctx.incrementNumAttempts(pId);
+    if (incrementAttempts) {
+      ctx.incrementNumAttempts(pId);
+    }
+  }
+
+  private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state,
+      boolean incrementAttempts) {
+    for (int pId : ctx.getPartitionSet()) {
+      markPartitionError(ctx, pId, state, incrementAttempts);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/8a370571/helix-core/src/main/java/org/apache/helix/task/TaskState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
index 2cc6d6c..dac5df9 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java
@@ -24,6 +24,11 @@ package org.apache.helix.task;
  */
 public enum TaskState {
   /**
+   * The task has yet to start
+   */
+  NOT_STARTED,
+
+  /**
    * The task is in progress.
    */
   IN_PROGRESS,


Mime
View raw message