helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-497] Support named queues of jobs
Date Tue, 19 Aug 2014 18:10:32 GMT
Repository: helix
Updated Branches:
  refs/heads/master a298f23de -> 05a982a44


[HELIX-497] Support named queues of jobs


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/05a982a4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/05a982a4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/05a982a4

Branch: refs/heads/master
Commit: 05a982a44d5f1533bdfde11c089a0ca87bd3b385
Parents: a298f23
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Mon Aug 18 10:42:06 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Aug 19 11:10:19 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobContext.java  |  11 ++
 .../java/org/apache/helix/task/JobQueue.java    |  99 +++++++++++
 .../java/org/apache/helix/task/TaskConfig.java  |  37 +++-
 .../java/org/apache/helix/task/TaskDriver.java  | 173 +++++++++++++++++--
 .../org/apache/helix/task/TaskRebalancer.java   |  65 +++++--
 .../org/apache/helix/task/TaskStateModel.java   |   6 +
 .../java/org/apache/helix/task/TaskUtil.java    |   4 +-
 .../java/org/apache/helix/task/Workflow.java    |  28 +--
 .../org/apache/helix/task/WorkflowConfig.java   |  50 +++++-
 .../org/apache/helix/task/WorkflowContext.java  |   8 +-
 .../integration/task/TestTaskRebalancer.java    |  59 ++++++-
 .../apache/helix/integration/task/TestUtil.java |   6 +-
 12 files changed, 476 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/JobContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
index 4e1d31e..f843834 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java
@@ -60,7 +60,18 @@ public class JobContext extends HelixProperty {
     if (tStr == null) {
       return -1;
     }
+    return Long.parseLong(tStr);
+  }
+
+  public void setFinishTime(long t) {
+    _record.setSimpleField(ContextProperties.FINISH_TIME.toString(), String.valueOf(t));
+  }
 
+  public long getFinishTime() {
+    String tStr = _record.getSimpleField(ContextProperties.FINISH_TIME.toString());
+    if (tStr == null) {
+      return WorkflowContext.UNFINISHED;
+    }
     return Long.parseLong(tStr);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobQueue.java b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
new file mode 100644
index 0000000..39dc84c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/task/JobQueue.java
@@ -0,0 +1,99 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+/**
+ * A named queue to which jobs can be added
+ */
+public class JobQueue extends WorkflowConfig {
+  /* Config fields */
+  public static final String CAPACITY = "CAPACITY";
+
+  private final String _name;
+  private final int _capacity;
+
+  private JobQueue(String name, int capacity, WorkflowConfig config) {
+    super(config.getJobDag(), config.getTargetState(), config.getExpiry(), config.isTerminable(),
+        config.getScheduleConfig());
+    _name = name;
+    _capacity = capacity;
+  }
+
+  /**
+   * Get the name of this queue
+   * @return queue name
+   */
+  public String getName() {
+    return _name;
+  }
+
+  /**
+   * Determine the number of jobs that this queue can accept before rejecting further jobs
+   * @return queue capacity
+   */
+  public int getCapacity() {
+    return _capacity;
+  }
+
+  @Override
+  public Map<String, String> getResourceConfigMap() throws Exception {
+    Map<String, String> cfgMap = super.getResourceConfigMap();
+    cfgMap.put(CAPACITY, String.valueOf(_capacity));
+    return cfgMap;
+  }
+
+  /** Supports creation of a single empty queue */
+  public static class Builder {
+    private WorkflowConfig.Builder _builder;
+    private final String _name;
+    private int _capacity = Integer.MAX_VALUE;
+
+    public Builder(String name) {
+      _builder = new WorkflowConfig.Builder();
+      _name = name;
+    }
+
+    public Builder expiry(long expiry) {
+      _builder.setExpiry(expiry);
+      return this;
+    }
+
+    public Builder capacity(int capacity) {
+      _capacity = capacity;
+      return this;
+    }
+
+    public Builder fromMap(Map<String, String> cfg) {
+      _builder = WorkflowConfig.Builder.fromMap(cfg);
+      if (cfg.containsKey(CAPACITY)) {
+        _capacity = Integer.parseInt(cfg.get(CAPACITY));
+      }
+      return this;
+    }
+
+    public JobQueue build() {
+      _builder.setTerminable(false);
+      WorkflowConfig workflowConfig = _builder.build();
+      return new JobQueue(_name, _capacity, workflowConfig);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 4ddab1a..3e24725 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -36,7 +36,8 @@ public class TaskConfig {
   private enum TaskConfigFields {
     TASK_ID,
     TASK_COMMAND,
-    TASK_SUCCESS_OPTIONAL
+    TASK_SUCCESS_OPTIONAL,
+    TASK_TARGET_PARTITION
   }
 
   private static final Logger LOG = Logger.getLogger(TaskConfig.class);
@@ -50,19 +51,25 @@ public class TaskConfig {
    * @param successOptional true if this task need not pass for the job to succeed, false
    *          otherwise
    * @param id existing task ID
+   * @param target target partition for a task
    */
   public TaskConfig(String command, Map<String, String> configMap, boolean successOptional,
-      String id) {
+      String id, String target) {
     if (configMap == null) {
       configMap = Maps.newHashMap();
     }
     if (id == null) {
       id = UUID.randomUUID().toString();
     }
-    configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+    if (command != null) {
+      configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+    }
     configMap.put(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString(),
         Boolean.toString(successOptional));
     configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+    if (target != null) {
+      configMap.put(TaskConfigFields.TASK_TARGET_PARTITION.toString(), target);
+    }
     _configMap = configMap;
   }
 
@@ -74,7 +81,7 @@ public class TaskConfig {
    *          otherwise
    */
   public TaskConfig(String command, Map<String, String> configMap, boolean successOptional)
{
-    this(command, configMap, successOptional, null);
+    this(command, configMap, successOptional, null, null);
   }
 
   /**
@@ -87,13 +94,21 @@ public class TaskConfig {
 
   /**
    * Get the command to invoke for this task
-   * @return string command
+   * @return string command, or null if not overridden
    */
   public String getCommand() {
     return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
   }
 
   /**
+   * Get the target partition of this task, if any
+   * @return the target partition, or null
+   */
+  public String getTargetPartition() {
+    return _configMap.get(TaskConfigFields.TASK_TARGET_PARTITION.toString());
+  }
+
+  /**
    * Check if this task must succeed for a job to succeed
    * @return true if success is optional, false otherwise
    */
@@ -126,6 +141,15 @@ public class TaskConfig {
   }
 
   /**
+   * Instantiate a typed configuration from just a target
+   * @param target the target partition
+   * @return instantiated TaskConfig
+   */
+  public static TaskConfig from(String target) {
+    return new TaskConfig(null, null, false, null, target);
+  }
+
+  /**
    * Instantiate a typed configuration from a bean
    * @param bean plain bean describing the task
    * @return instantiated TaskConfig
@@ -142,9 +166,10 @@ public class TaskConfig {
   public static TaskConfig from(Map<String, String> rawConfigMap) {
     String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
     String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
+    String targetPartition = rawConfigMap.get(TaskConfigFields.TASK_TARGET_PARTITION.toString());
     String successOptionalStr = rawConfigMap.get(TaskConfigFields.TASK_SUCCESS_OPTIONAL.toString());
     boolean successOptional =
         (successOptionalStr != null) ? Boolean.valueOf(successOptionalStr) : null;
-    return new TaskConfig(command, rawConfigMap, successOptional, taskId);
+    return new TaskConfig(command, rawConfigMap, successOptional, taskId, targetPartition);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 6f4cc24..a341a3b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.commons.cli.CommandLine;
@@ -48,7 +49,9 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * CLI for scheduling/canceling workflows
@@ -152,10 +155,10 @@ public class TaskDriver {
     String flowName = flow.getName();
 
     // first, add workflow config to ZK
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName),
-        flow.getResourceConfigMap());
+    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, flowName), flow
+        .getWorkflowConfig().getResourceConfigMap());
 
-    // then schedule tasks
+    // then schedule jobs
     for (String job : flow.getJobConfigs().keySet()) {
       JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
       if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job))
{
@@ -165,6 +168,152 @@ public class TaskDriver {
     }
   }
 
+  /** Creates a new named job queue (workflow) */
+  public void createQueue(JobQueue queue) throws Exception {
+    String queueName = queue.getName();
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    HelixProperty property = new HelixProperty(queueName);
+    property.getRecord().getSimpleFields().putAll(queue.getResourceConfigMap());
+    boolean created =
+        accessor.createProperty(accessor.keyBuilder().resourceConfig(queueName), property);
+    if (!created) {
+      throw new IllegalArgumentException("Queue " + queueName + " already exists!");
+    }
+  }
+
+  /** Flushes a named job queue */
+  public void flushQueue(String queueName) throws Exception {
+    WorkflowConfig config = TaskUtil.getWorkflowCfg(_manager, queueName);
+    if (config == null) {
+      throw new IllegalArgumentException("Queue does not exist!");
+    }
+
+    // Remove all ideal states and resource configs to trigger a drop event
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final Set<String> toRemove = Sets.newHashSet(config.getJobDag().getAllNodes());
+    for (String resourceName : toRemove) {
+      accessor.removeProperty(keyBuilder.idealStates(resourceName));
+      accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
+      // Delete context
+      String contextKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
+      _manager.getHelixPropertyStore().remove(contextKey, AccessOption.PERSISTENT);
+    }
+
+    // Now atomically clear the DAG
+    String path = keyBuilder.resourceConfig(queueName).getPath();
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        for (String resourceName : toRemove) {
+          for (String child : jobDag.getDirectChildren(resourceName)) {
+            jobDag.getChildrenToParents().get(child).remove(resourceName);
+          }
+          for (String parent : jobDag.getDirectParents(resourceName)) {
+            jobDag.getParentsToChildren().get(parent).remove(resourceName);
+          }
+          jobDag.getChildrenToParents().remove(resourceName);
+          jobDag.getParentsToChildren().remove(resourceName);
+          jobDag.getAllNodes().remove(resourceName);
+        }
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          throw new IllegalArgumentException(e);
+        }
+        return currentData;
+      }
+    };
+    accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+
+    // Now atomically clear the results
+    path =
+        Joiner.on("/")
+            .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE);
+    updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES);
+        if (states != null) {
+          states.keySet().removeAll(toRemove);
+        }
+        return currentData;
+      }
+    };
+    _manager.getHelixPropertyStore().update(path, updater, AccessOption.PERSISTENT);
+  }
+
+  /** Adds a new job to the end an existing named queue */
+  public void enqueueJob(final String queueName, final String jobName, JobConfig.Builder
jobBuilder)
+      throws Exception {
+    // Get the job queue config and capacity
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    HelixProperty workflowConfig =
+        accessor.getProperty(accessor.keyBuilder().resourceConfig(queueName));
+    if (workflowConfig == null) {
+      throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!");
+    }
+    boolean isTerminable =
+        workflowConfig.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+    if (isTerminable) {
+      throw new IllegalArgumentException(queueName + " is not a queue!");
+    }
+    final int capacity =
+        workflowConfig.getRecord().getIntField(JobQueue.CAPACITY, Integer.MAX_VALUE);
+
+    // Create the job to ensure that it validates
+    JobConfig jobConfig = jobBuilder.setWorkflow(queueName).build();
+
+    // Add the job to the end of the queue in the DAG
+    final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName);
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        // Add the node to the existing DAG
+        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        Set<String> allNodes = jobDag.getAllNodes();
+        if (allNodes.size() >= capacity) {
+          throw new IllegalStateException("Queue " + queueName + " is at capacity, will not
add "
+              + jobName);
+        }
+        if (allNodes.contains(namespacedJobName)) {
+          throw new IllegalStateException("Could not add to queue " + queueName + ", job
"
+              + jobName + " already exists");
+        }
+        jobDag.addNode(namespacedJobName);
+
+        // Add the node to the end of the queue
+        String candidate = null;
+        for (String node : allNodes) {
+          if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty())
{
+            candidate = node;
+            break;
+          }
+        }
+        if (candidate != null) {
+          jobDag.addParentToChild(candidate, namespacedJobName);
+        }
+
+        // Save the updated DAG
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          throw new IllegalStateException(
+              "Could not add job " + jobName + " to queue " + queueName, e);
+        }
+        return currentData;
+      }
+    };
+    String path = accessor.keyBuilder().resourceConfig(queueName).getPath();
+    boolean status = accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
+    if (!status) {
+      throw new IllegalArgumentException("Could not enqueue job");
+    }
+    // Schedule the job
+    scheduleJob(namespacedJobName, jobConfig);
+  }
+
   /** Posts new job to cluster */
   private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception {
     // Set up job resource based on partitions from target resource
@@ -206,19 +355,19 @@ public class TaskDriver {
     _admin.setResourceIdealState(_clusterName, jobResource, is);
   }
 
-  /** Public method to resume a job/workflow */
-  public void resume(String resource) {
-    setTaskTargetState(resource, TargetState.START);
+  /** Public method to resume a workflow/queue */
+  public void resume(String workflow) {
+    setTaskTargetState(workflow, TargetState.START);
   }
 
-  /** Public method to stop a job/workflow */
-  public void stop(String resource) {
-    setTaskTargetState(resource, TargetState.STOP);
+  /** Public method to stop a workflow/queue */
+  public void stop(String workflow) {
+    setTaskTargetState(workflow, TargetState.STOP);
   }
 
-  /** Public method to delete a job/workflow */
-  public void delete(String resource) {
-    setTaskTargetState(resource, TargetState.DELETE);
+  /** Public method to delete a workflow/queue */
+  public void delete(String workflow) {
+    setTaskTargetState(workflow, TargetState.DELETE);
   }
 
   /** Helper function to change target state for a given task */

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index af89944..b4b2937 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
@@ -176,6 +177,14 @@ public abstract class TaskRebalancer implements HelixRebalancer {
       jobCtx.setStartTime(System.currentTimeMillis());
     }
 
+    // Check for expired jobs for non-terminable workflows
+    long jobFinishTime = jobCtx.getFinishTime();
+    if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
+        && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis())
{
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName, currStateOutput);
+    }
+
     // The job is already in a final state (completed/failed).
     if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
         || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
@@ -373,9 +382,13 @@ public abstract class TaskRebalancer implements HelixRebalancer {
             }
 
             if (!successOptional) {
+              long finishTime = System.currentTimeMillis();
               workflowCtx.setJobState(jobResource, TaskState.FAILED);
-              workflowCtx.setWorkflowState(TaskState.FAILED);
-              workflowCtx.setFinishTime(System.currentTimeMillis());
+              if (workflowConfig.isTerminable()) {
+                workflowCtx.setWorkflowState(TaskState.FAILED);
+                workflowCtx.setFinishTime(finishTime);
+              }
+              jobCtx.setFinishTime(finishTime);
               markAllPartitionsError(jobCtx, currState, false);
               addAllPartitions(allPartitions, partitionsToDropFromIs);
               return emptyAssignment(jobResource, currStateOutput);
@@ -405,10 +418,12 @@ public abstract class TaskRebalancer implements HelixRebalancer {
     }
 
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) {
+      long currentTime = System.currentTimeMillis();
       workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
+      jobCtx.setFinishTime(currentTime);
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.COMPLETED);
-        workflowCtx.setFinishTime(System.currentTimeMillis());
+        workflowCtx.setFinishTime(currentTime);
       }
     }
 
@@ -591,6 +606,9 @@ public abstract class TaskRebalancer implements HelixRebalancer {
    * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
    */
   private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+    if (!cfg.isTerminable()) {
+      return false;
+    }
     for (String job : cfg.getJobDag().getAllNodes()) {
       if (ctx.getJobState(job) != TaskState.COMPLETED) {
         return false;
@@ -622,18 +640,45 @@ public abstract class TaskRebalancer implements HelixRebalancer {
 
   /**
    * Cleans up all Helix state associated with this job, wiping workflow-level information
if this
-   * is the last remaining job in its workflow.
+   * is the last remaining job in its workflow, and the workflow is terminable.
    */
-  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+  private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig
cfg,
       String workflowResource) {
     HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+
+    // Remove any DAG references in workflow
+    PropertyKey workflowKey = getConfigPropertyKey(accessor, workflowResource);
+    DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
+        for (String child : jobDag.getDirectChildren(resourceName)) {
+          jobDag.getChildrenToParents().get(child).remove(resourceName);
+        }
+        for (String parent : jobDag.getDirectParents(resourceName)) {
+          jobDag.getParentsToChildren().get(parent).remove(resourceName);
+        }
+        jobDag.getChildrenToParents().remove(resourceName);
+        jobDag.getParentsToChildren().remove(resourceName);
+        jobDag.getAllNodes().remove(resourceName);
+        try {
+          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+        } catch (Exception e) {
+          LOG.equals("Could not update DAG for job " + resourceName);
+        }
+        return currentData;
+      }
+    };
+    accessor.getBaseDataAccessor().update(workflowKey.getPath(), dagRemover,
+        AccessOption.PERSISTENT);
+
     // Delete resource configs.
     PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
     if (!accessor.removeProperty(cfgKey)) {
       throw new RuntimeException(
           String
               .format(
-                  "Error occurred while trying to clean up task %s. Failed to remove node
%s from Helix. Aborting further clean up steps.",
+                  "Error occurred while trying to clean up job %s. Failed to remove node
%s from Helix. Aborting further clean up steps.",
                   resourceName, cfgKey));
     }
     // Delete property store information for this resource.
@@ -642,7 +687,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
       throw new RuntimeException(
           String
               .format(
-                  "Error occurred while trying to clean up task %s. Failed to remove node
%s from Helix. Aborting further clean up steps.",
+                  "Error occurred while trying to clean up job %s. Failed to remove node
%s from Helix. Aborting further clean up steps.",
                   resourceName, propStoreKey));
     }
     // Finally, delete the ideal state itself.
@@ -652,7 +697,7 @@ public abstract class TaskRebalancer implements HelixRebalancer {
           "Error occurred while trying to clean up task %s. Failed to remove node %s from
Helix.",
           resourceName, isKey));
     }
-    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
+    LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName));
 
     boolean lastInWorkflow = true;
     for (String job : cfg.getJobDag().getAllNodes()) {
@@ -665,8 +710,8 @@ public abstract class TaskRebalancer implements HelixRebalancer {
       }
     }
 
-    // clean up job-level info if this was the last in workflow
-    if (lastInWorkflow) {
+    // clean up workflow-level info if this was the last in workflow
+    if (lastInWorkflow && cfg.isTerminable()) {
       // delete workflow config
       PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
       if (!accessor.removeProperty(workflowCfgKey)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index 69a1b88..c790608 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -228,6 +228,12 @@ public class TaskStateModel extends TransitionHandler {
       }
     }
 
+    // Report a target if that was used to assign the partition
+    String target = ctx.getTargetForPartition(pId);
+    if (taskConfig == null && target != null) {
+      taskConfig = TaskConfig.from(target);
+    }
+
     // Populate a task callback context
     TaskCallbackContext callbackContext = new TaskCallbackContext();
     callbackContext.setManager(_manager);

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index cca7d76..ad5770d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -56,8 +56,8 @@ import com.google.common.collect.Maps;
  */
 public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
-  private static final String CONTEXT_NODE = "Context";
-  private static final String PREV_RA_NODE = "PreviousResourceAssignment";
+  public static final String CONTEXT_NODE = "Context";
+  public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   /**
    * Parses job resource configurations in Helix into a {@link JobConfig} object.

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index a8a2e12..e7946b4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -27,8 +27,6 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -87,30 +85,6 @@ public class Workflow {
     return _workflowConfig;
   }
 
-  public Map<String, String> getResourceConfigMap() throws Exception {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson());
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry()));
-    cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name());
-
-    // Populate schedule if present
-    ScheduleConfig scheduleConfig = _workflowConfig.getScheduleConfig();
-    if (scheduleConfig != null) {
-      Date startTime = scheduleConfig.getStartTime();
-      if (startTime != null) {
-        String formattedTime = WorkflowConfig.DEFAULT_DATE_FORMAT.format(startTime);
-        cfgMap.put(WorkflowConfig.START_TIME, formattedTime);
-      }
-      if (scheduleConfig.isRecurring()) {
-        cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString());
-        cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval()
-            .toString());
-      }
-    }
-
-    return cfgMap;
-  }
-
   /**
    * Parses the YAML description from a file into a {@link Workflow} object.
    * @param file An abstract path name to the file containing the workflow description.
@@ -339,7 +313,7 @@ public class Workflow {
       }
 
       WorkflowConfig.Builder builder = new WorkflowConfig.Builder();
-      builder.setTaskDag(_dag);
+      builder.setJobDag(_dag);
       builder.setTargetState(TargetState.START);
       if (_scheduleConfig != null) {
         builder.setScheduleConfig(_scheduleConfig);

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index 782c375..a4d0545 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -20,6 +20,8 @@ package org.apache.helix.task;
  */
 
 import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
@@ -34,6 +36,7 @@ public class WorkflowConfig {
   public static final String START_TIME = "StartTime";
   public static final String RECURRENCE_UNIT = "RecurrenceUnit";
   public static final String RECURRENCE_INTERVAL = "RecurrenceInterval";
+  public static final String TERMINABLE = "Terminable";
 
   /* Default values */
   public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000;
@@ -47,13 +50,15 @@ public class WorkflowConfig {
   private final JobDag _jobDag;
   private final TargetState _targetState;
   private final long _expiry;
+  private final boolean _terminable;
   private final ScheduleConfig _scheduleConfig;
 
-  private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry,
+  protected WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry, boolean terminable,
       ScheduleConfig scheduleConfig) {
     _jobDag = jobDag;
     _targetState = targetState;
     _expiry = expiry;
+    _terminable = terminable;
     _scheduleConfig = scheduleConfig;
   }
 
@@ -69,23 +74,52 @@ public class WorkflowConfig {
     return _expiry;
   }
 
+  public boolean isTerminable() {
+    return _terminable;
+  }
+
   public ScheduleConfig getScheduleConfig() {
     return _scheduleConfig;
   }
 
+  public Map<String, String> getResourceConfigMap() throws Exception {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson());
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(getExpiry()));
+    cfgMap.put(WorkflowConfig.TARGET_STATE, getTargetState().name());
+    cfgMap.put(WorkflowConfig.TERMINABLE, String.valueOf(isTerminable()));
+
+    // Populate schedule if present
+    ScheduleConfig scheduleConfig = getScheduleConfig();
+    if (scheduleConfig != null) {
+      Date startTime = scheduleConfig.getStartTime();
+      if (startTime != null) {
+        String formattedTime = WorkflowConfig.DEFAULT_DATE_FORMAT.format(startTime);
+        cfgMap.put(WorkflowConfig.START_TIME, formattedTime);
+      }
+      if (scheduleConfig.isRecurring()) {
+        cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString());
+        cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval()
+            .toString());
+      }
+    }
+    return cfgMap;
+  }
+
   public static class Builder {
     private JobDag _taskDag = JobDag.EMPTY_DAG;
     private TargetState _targetState = TargetState.START;
     private long _expiry = DEFAULT_EXPIRY;
+    private boolean _isTerminable = true;
     private ScheduleConfig _scheduleConfig;
 
     public WorkflowConfig build() {
       validate();
 
-      return new WorkflowConfig(_taskDag, _targetState, _expiry, _scheduleConfig);
+      return new WorkflowConfig(_taskDag, _targetState, _expiry, _isTerminable, _scheduleConfig);
     }
 
-    public Builder setTaskDag(JobDag v) {
+    public Builder setJobDag(JobDag v) {
       _taskDag = v;
       return this;
     }
@@ -95,6 +129,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setTerminable(boolean isTerminable) {
+      _isTerminable = isTerminable;
+      return this;
+    }
+
     public Builder setTargetState(TargetState v) {
       _targetState = v;
       return this;
@@ -114,11 +153,14 @@ public class WorkflowConfig {
         b.setExpiry(Long.parseLong(cfg.get(EXPIRY)));
       }
       if (cfg.containsKey(DAG)) {
-        b.setTaskDag(JobDag.fromJson(cfg.get(DAG)));
+        b.setJobDag(JobDag.fromJson(cfg.get(DAG)));
       }
       if (cfg.containsKey(TARGET_STATE)) {
         b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE)));
       }
+      if (cfg.containsKey(TERMINABLE)) {
+        b.setTerminable(Boolean.parseBoolean(cfg.get(TERMINABLE)));
+      }
 
       // Parse schedule-specific configs, if they exist
       ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg);

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
index 6ad71a1..5fc7e66 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java
@@ -33,7 +33,7 @@ public class WorkflowContext extends HelixProperty {
   public static final String WORKFLOW_STATE = "STATE";
   public static final String START_TIME = "START_TIME";
   public static final String FINISH_TIME = "FINISH_TIME";
-  public static final String TASK_STATES = "TASK_STATES";
+  public static final String JOB_STATES = "JOB_STATES";
   public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW";
   public static final int UNFINISHED = -1;
 
@@ -60,16 +60,16 @@ public class WorkflowContext extends HelixProperty {
   }
 
   public void setJobState(String jobResource, TaskState s) {
-    Map<String, String> states = _record.getMapField(TASK_STATES);
+    Map<String, String> states = _record.getMapField(JOB_STATES);
     if (states == null) {
       states = new TreeMap<String, String>();
-      _record.setMapField(TASK_STATES, states);
+      _record.setMapField(JOB_STATES, states);
     }
     states.put(jobResource, s.name());
   }
 
   public TaskState getJobState(String jobResource) {
-    Map<String, String> states = _record.getMapField(TASK_STATES);
+    Map<String, String> states = _record.getMapField(JOB_STATES);
     if (states == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index aaf01a3..c5a6148 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.task;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
@@ -29,12 +30,15 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
@@ -46,6 +50,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.testutil.ZkTestBase;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -57,6 +62,7 @@ import org.testng.annotations.Test;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 
 public class TestTaskRebalancer extends ZkTestBase {
   private static final int n = 5;
@@ -287,6 +293,57 @@ public class TestTaskRebalancer extends ZkTestBase {
     Assert.assertEquals(maxAttempts, 2);
   }
 
+  @Test
+  public void testNamedQueue() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    JobQueue queue = new JobQueue.Builder(queueName).build();
+    _driver.createQueue(queue);
+
+    // Enqueue jobs
+    Set<String> master = Sets.newHashSet("MASTER");
+    Set<String> slave = Sets.newHashSet("SLAVE");
+    JobConfig.Builder job1 =
+        new JobConfig.Builder().setCommand("Reindex")
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    JobConfig.Builder job2 =
+        new JobConfig.Builder().setCommand("Reindex")
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+    _driver.enqueueJob(queueName, "masterJob", job1);
+    _driver.enqueueJob(queueName, "slaveJob", job2);
+
+    // Ensure successful completion
+    String namespacedJob1 = queueName + "_masterJob";
+    String namespacedJob2 = queueName + "_slaveJob";
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
+    JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
+
+    // Ensure correct ordering
+    long job1Finish = masterJobContext.getFinishTime();
+    long job2Start = slaveJobContext.getStartTime();
+    Assert.assertTrue(job2Start >= job1Finish);
+
+    // Flush queue and check cleanup
+    _driver.flushQueue(queueName);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    JobDag dag = workflowCfg.getJobDag();
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+  }
+
   private static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;

http://git-wip-us.apache.org/repos/asf/helix/blob/05a982a4/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 520d7c0..f599920 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -51,8 +51,8 @@ public class TestUtil {
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
-  public static void pollForJobState(HelixManager manager, String workflowResource,
-      String jobName, TaskState state) throws InterruptedException {
+  public static void pollForJobState(HelixManager manager, String workflowResource, String
jobName,
+      TaskState state) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
@@ -61,9 +61,7 @@ public class TestUtil {
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName)
!= state)
         && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
-
     Assert.assertNotNull(ctx);
-    Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
 }


Mime
View raw message