helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [33/50] [abbrv] Port recent task framework changes
Date Thu, 10 Jul 2014 17:05:16 GMT
http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 0287657..547ba48 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -19,315 +19,108 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
 
-import org.apache.helix.task.Workflow.WorkflowEnum;
+import org.apache.helix.task.beans.TaskBean;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
 /**
- * Provides a typed interface to task configurations.
+ * Configuration for an individual task to be run as part of a job.
  */
 public class TaskConfig {
-  // // Property names ////
-
-  /** The name of the workflow to which the task belongs. */
-  public static final String WORKFLOW_ID = "WorkflowID";
-  /** The name of the target resource. */
-  public static final String TARGET_RESOURCE = "TargetResource";
-  /**
-   * The set of the target partition states. The value must be a comma-separated list of partition
-   * states.
-   */
-  public static final String TARGET_PARTITION_STATES = "TargetPartitionStates";
-  /**
-   * The set of the target partition ids. The value must be a comma-separated list of partition ids.
-   */
-  public static final String TARGET_PARTITIONS = "TargetPartitions";
-  /** The command that is to be run by participants. */
-  public static final String COMMAND = "Command";
-  /** The command configuration to be used by the task partitions. */
-  public static final String COMMAND_CONFIG = "CommandConfig";
-  /** The timeout for a task partition. */
-  public static final String TIMEOUT_PER_PARTITION = "TimeoutPerPartition";
-  /** The maximum number of times the task rebalancer may attempt to execute a task partitions. */
-  public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
-  /** The number of concurrent tasks that are allowed to run on an instance. */
-  public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
-  /** Support overarching tasks that hang around for a while */
-  public static final String LONG_LIVED = "LongLived";
-  /** Support giving mapping partition IDs to specific task names **/
-  public static final String TASK_NAME_MAP = "TaskNameMap";
-
-  // // Default property values ////
-
-  public static final long DEFAULT_TIMEOUT_PER_PARTITION = 60 * 60 * 1000; // 1 hr.
-  public static final int DEFAULT_MAX_ATTEMPTS_PER_PARTITION = 10;
-  public static final int DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE = 1;
-
-  private final String _workflow;
-  private final String _targetResource;
-  private final List<Integer> _targetPartitions;
-  private final Set<String> _targetPartitionStates;
-  private final String _command;
-  private final String _commandConfig;
-  private final long _timeoutPerPartition;
-  private final int _numConcurrentTasksPerInstance;
-  private final int _maxAttemptsPerPartition;
-  private final boolean _longLived;
-  private final Map<String, String> _taskNameMap;
-
-  private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
-      Set<String> targetPartitionStates, String command, String commandConfig,
-      long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition,
-      boolean longLived, Map<String, String> taskNameMap) {
-    _workflow = workflow;
-    _targetResource = targetResource;
-    _targetPartitions = targetPartitions;
-    _targetPartitionStates = targetPartitionStates;
-    _command = command;
-    _commandConfig = commandConfig;
-    _timeoutPerPartition = timeoutPerPartition;
-    _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
-    _maxAttemptsPerPartition = maxAttemptsPerPartition;
-    _longLived = longLived;
-    _taskNameMap = taskNameMap;
-  }
-
-  public String getWorkflow() {
-    return _workflow == null ? WorkflowEnum.UNSPECIFIED.name() : _workflow;
-  }
-
-  public String getTargetResource() {
-    return _targetResource;
-  }
-
-  public List<Integer> getTargetPartitions() {
-    return _targetPartitions;
-  }
-
-  public Set<String> getTargetPartitionStates() {
-    return _targetPartitionStates;
+  private enum TaskConfigFields {
+    TASK_ID,
+    TASK_COMMAND
   }
 
-  public String getCommand() {
-    return _command;
-  }
+  private static final Logger LOG = Logger.getLogger(TaskConfig.class);
 
-  public String getCommandConfig() {
-    return _commandConfig;
-  }
+  private final Map<String, String> _configMap;
 
-  public long getTimeoutPerPartition() {
-    return _timeoutPerPartition;
+  /**
+   * Instantiate the task config
+   * @param command the command to invoke for the task
+   * @param configMap configuration to be passed as part of the invocation
+   * @param id existing task ID
+   */
+  public TaskConfig(String command, Map<String, String> configMap, String id) {
+    if (configMap == null) {
+      configMap = Maps.newHashMap();
+    }
+    if (id == null) {
+      id = UUID.randomUUID().toString();
+    }
+    configMap.put(TaskConfigFields.TASK_COMMAND.toString(), command);
+    configMap.put(TaskConfigFields.TASK_ID.toString(), id);
+    _configMap = configMap;
   }
 
-  public int getNumConcurrentTasksPerInstance() {
-    return _numConcurrentTasksPerInstance;
+  /**
+   * Instantiate the task config
+   * @param command the command to invoke for the task
+   * @param configMap configuration to be passed as part of the invocation
+   */
+  public TaskConfig(String command, Map<String, String> configMap) {
+    this(command, configMap, null);
   }
 
-  public int getMaxAttemptsPerPartition() {
-    return _maxAttemptsPerPartition;
+  /**
+   * Unique identifier for this task
+   * @return UUID as a string
+   */
+  public String getId() {
+    return _configMap.get(TaskConfigFields.TASK_ID.toString());
   }
 
-  public boolean isLongLived() {
-    return _longLived;
+  /**
+   * Get the command to invoke for this task
+   * @return string command
+   */
+  public String getCommand() {
+    return _configMap.get(TaskConfigFields.TASK_COMMAND.toString());
   }
 
-  public Map<String, String> getTaskNameMap() {
-    return _taskNameMap;
+  /**
+   * Get the configuration map for this task's command
+   * @return map of configuration key to value
+   */
+  public Map<String, String> getConfigMap() {
+    return _configMap;
   }
 
-  public Map<String, String> getResourceConfigMap() {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
-    cfgMap.put(TaskConfig.COMMAND, _command);
-    cfgMap.put(TaskConfig.COMMAND_CONFIG, _commandConfig);
-    cfgMap.put(TaskConfig.TARGET_RESOURCE, _targetResource);
-    if (_targetPartitionStates != null) {
-      cfgMap.put(TaskConfig.TARGET_PARTITION_STATES, Joiner.on(",").join(_targetPartitionStates));
-    }
-    if (_targetPartitions != null) {
-      cfgMap.put(TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(_targetPartitions));
+  @Override
+  public String toString() {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.writeValueAsString(this);
+    } catch (IOException e) {
+      LOG.error("Could not serialize TaskConfig", e);
     }
-    cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
-    cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
-    cfgMap.put(TaskConfig.LONG_LIVED + "", String.valueOf(_longLived));
-    cfgMap.put(TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE + "",
-        String.valueOf(_numConcurrentTasksPerInstance));
-    return cfgMap;
+    return super.toString();
   }
 
   /**
-   * A builder for {@link TaskConfig}. Validates the configurations.
+   * Instantiate a typed configuration from a bean
+   * @param bean plain bean describing the task
+   * @return instantiated TaskConfig
    */
-  public static class Builder {
-    private String _workflow;
-    private String _targetResource;
-    private List<Integer> _targetPartitions;
-    private Set<String> _targetPartitionStates;
-    private String _command;
-    private String _commandConfig;
-    private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
-    private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
-    private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
-    private boolean _longLived = false;
-    private Map<String, String> _taskNameMap = Collections.emptyMap();
-
-    public TaskConfig build() {
-      validate();
-
-      return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
-          _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerPartition, _longLived, _taskNameMap);
-    }
-
-    /**
-     * Convenience method to build a {@link TaskConfig} from a {@code Map&lt;String, String&gt;}.
-     * @param cfg A map of property names to their string representations.
-     * @return A {@link Builder}.
-     */
-    public static Builder fromMap(Map<String, String> cfg) {
-      Builder b = new Builder();
-      if (cfg.containsKey(WORKFLOW_ID)) {
-        b.setWorkflow(cfg.get(WORKFLOW_ID));
-      }
-      if (cfg.containsKey(TARGET_RESOURCE)) {
-        b.setTargetResource(cfg.get(TARGET_RESOURCE));
-      }
-      if (cfg.containsKey(TARGET_PARTITIONS)) {
-        b.setTargetPartitions(csvToIntList(cfg.get(TARGET_PARTITIONS)));
-      }
-      if (cfg.containsKey(TARGET_PARTITION_STATES)) {
-        b.setTargetPartitionStates(new HashSet<String>(Arrays.asList(cfg.get(
-            TARGET_PARTITION_STATES).split(","))));
-      }
-      if (cfg.containsKey(COMMAND)) {
-        b.setCommand(cfg.get(COMMAND));
-      }
-      if (cfg.containsKey(COMMAND_CONFIG)) {
-        b.setCommandConfig(cfg.get(COMMAND_CONFIG));
-      }
-      if (cfg.containsKey(TIMEOUT_PER_PARTITION)) {
-        b.setTimeoutPerPartition(Long.parseLong(cfg.get(TIMEOUT_PER_PARTITION)));
-      }
-      if (cfg.containsKey(NUM_CONCURRENT_TASKS_PER_INSTANCE)) {
-        b.setNumConcurrentTasksPerInstance(Integer.parseInt(cfg
-            .get(NUM_CONCURRENT_TASKS_PER_INSTANCE)));
-      }
-      if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
-        b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
-      }
-      if (cfg.containsKey(LONG_LIVED)) {
-        b.setLongLived(Boolean.parseBoolean(cfg.get(LONG_LIVED)));
-      }
-      return b;
-    }
-
-    public Builder setWorkflow(String v) {
-      _workflow = v;
-      return this;
-    }
-
-    public Builder setTargetResource(String v) {
-      _targetResource = v;
-      return this;
-    }
-
-    public Builder setTargetPartitions(List<Integer> v) {
-      _targetPartitions = ImmutableList.copyOf(v);
-      return this;
-    }
-
-    public Builder setTargetPartitionStates(Set<String> v) {
-      _targetPartitionStates = ImmutableSet.copyOf(v);
-      return this;
-    }
-
-    public Builder setCommand(String v) {
-      _command = v;
-      return this;
-    }
-
-    public Builder setCommandConfig(String v) {
-      _commandConfig = v;
-      return this;
-    }
-
-    public Builder setTimeoutPerPartition(long v) {
-      _timeoutPerPartition = v;
-      return this;
-    }
-
-    public Builder setNumConcurrentTasksPerInstance(int v) {
-      _numConcurrentTasksPerInstance = v;
-      return this;
-    }
-
-    public Builder setMaxAttemptsPerPartition(int v) {
-      _maxAttemptsPerPartition = v;
-      return this;
-    }
-
-    public Builder setLongLived(boolean isLongLived) {
-      _longLived = isLongLived;
-      return this;
-    }
-
-    public Builder setTaskNameMap(Map<String, String> taskNameMap) {
-      _taskNameMap = taskNameMap;
-      return this;
-    }
-
-    private void validate() {
-      if (_targetResource == null && _targetPartitions == null) {
-        throw new IllegalArgumentException(String.format(
-            "%s cannot be null without specified partitions", TARGET_RESOURCE));
-      }
-      if (_targetResource != null && _targetPartitionStates != null
-          && _targetPartitionStates.isEmpty()) {
-        throw new IllegalArgumentException(String.format("%s cannot be empty",
-            TARGET_PARTITION_STATES));
-      }
-      if (_command == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", COMMAND));
-      }
-      if (_timeoutPerPartition < 0) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            TIMEOUT_PER_PARTITION, _timeoutPerPartition));
-      }
-      if (_numConcurrentTasksPerInstance < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            NUM_CONCURRENT_TASKS_PER_INSTANCE, _numConcurrentTasksPerInstance));
-      }
-      if (_maxAttemptsPerPartition < 1) {
-        throw new IllegalArgumentException(String.format("%s has invalid value %s",
-            MAX_ATTEMPTS_PER_PARTITION, _maxAttemptsPerPartition));
-      }
-      if (_workflow == null) {
-        throw new IllegalArgumentException(String.format("%s cannot be null", WORKFLOW_ID));
-      }
-    }
-
-    private static List<Integer> csvToIntList(String csv) {
-      String[] vals = csv.split(",");
-      List<Integer> l = new ArrayList<Integer>();
-      for (String v : vals) {
-        if (v != null && !v.isEmpty()) {
-          l.add(Integer.parseInt(v));
-        }
-      }
+  public static TaskConfig from(TaskBean bean) {
+    return new TaskConfig(bean.command, bean.taskConfigMap);
+  }
 
-      return l;
-    }
+  /**
+   * Instantiate a typed configuration from a raw string map
+   * @param rawConfigMap mixed map of configuration and task metadata
+   * @return instantiated TaskConfig
+   */
+  public static TaskConfig from(Map<String, String> rawConfigMap) {
+    String taskId = rawConfigMap.get(TaskConfigFields.TASK_ID.toString());
+    String command = rawConfigMap.get(TaskConfigFields.TASK_COMMAND.toString());
+    return new TaskConfig(command, rawConfigMap, taskId);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java b/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
deleted file mode 100644
index d416a86..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskContext.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-
-/**
- * Provides a typed interface to the context information stored by {@link TaskRebalancer} in the
- * Helix property store.
- */
-public class TaskContext extends HelixProperty {
-  public static final String START_TIME = "START_TIME";
-  public static final String PARTITION_STATE = "STATE";
-  public static final String NUM_ATTEMPTS = "NUM_ATTEMPTS";
-  public static final String FINISH_TIME = "FINISH_TIME";
-
-  public TaskContext(ZNRecord record) {
-    super(record);
-  }
-
-  public void setStartTime(long t) {
-    _record.setSimpleField(START_TIME, String.valueOf(t));
-  }
-
-  public long getStartTime() {
-    String tStr = _record.getSimpleField(START_TIME);
-    if (tStr == null) {
-      return -1;
-    }
-
-    return Long.parseLong(tStr);
-  }
-
-  public void setPartitionState(int p, TaskPartitionState s) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(PARTITION_STATE, s.name());
-  }
-
-  public TaskPartitionState getPartitionState(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return null;
-    }
-
-    String str = map.get(PARTITION_STATE);
-    if (str != null) {
-      return TaskPartitionState.valueOf(str);
-    } else {
-      return null;
-    }
-  }
-
-  public void setPartitionNumAttempts(int p, int n) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(NUM_ATTEMPTS, String.valueOf(n));
-  }
-
-  public int incrementNumAttempts(int pId) {
-    int n = this.getPartitionNumAttempts(pId);
-    if (n < 0) {
-      n = 0;
-    }
-    n += 1;
-    this.setPartitionNumAttempts(pId, n);
-    return n;
-  }
-
-  public int getPartitionNumAttempts(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return -1;
-    }
-
-    String nStr = map.get(NUM_ATTEMPTS);
-    if (nStr == null) {
-      return -1;
-    }
-
-    return Integer.parseInt(nStr);
-  }
-
-  public void setPartitionFinishTime(int p, long t) {
-    String pStr = String.valueOf(p);
-    Map<String, String> map = _record.getMapField(pStr);
-    if (map == null) {
-      map = new TreeMap<String, String>();
-      _record.setMapField(pStr, map);
-    }
-    map.put(FINISH_TIME, String.valueOf(t));
-  }
-
-  public long getPartitionFinishTime(int p) {
-    Map<String, String> map = _record.getMapField(String.valueOf(p));
-    if (map == null) {
-      return -1;
-    }
-
-    String tStr = map.get(FINISH_TIME);
-    if (tStr == null) {
-      return -1;
-    }
-
-    return Long.parseLong(tStr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java b/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
deleted file mode 100644
index ab5bc62..0000000
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDag.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package org.apache.helix.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Provides a convenient way to construct, traverse,
- * and validate a task dependency graph
- */
-public class TaskDag {
-  @JsonProperty("parentsToChildren")
-  private final Map<String, Set<String>> _parentsToChildren;
-
-  @JsonProperty("childrenToParents")
-  private final Map<String, Set<String>> _childrenToParents;
-
-  @JsonProperty("allNodes")
-  private final Set<String> _allNodes;
-
-  public static final TaskDag EMPTY_DAG = new TaskDag();
-
-  public TaskDag() {
-    _parentsToChildren = new TreeMap<String, Set<String>>();
-    _childrenToParents = new TreeMap<String, Set<String>>();
-    _allNodes = new TreeSet<String>();
-  }
-
-  public void addParentToChild(String parent, String child) {
-    if (!_parentsToChildren.containsKey(parent)) {
-      _parentsToChildren.put(parent, new TreeSet<String>());
-    }
-    _parentsToChildren.get(parent).add(child);
-
-    if (!_childrenToParents.containsKey(child)) {
-      _childrenToParents.put(child, new TreeSet<String>());
-    }
-    _childrenToParents.get(child).add(parent);
-
-    _allNodes.add(parent);
-    _allNodes.add(child);
-  }
-
-  public void addNode(String node) {
-    _allNodes.add(node);
-  }
-
-  public Map<String, Set<String>> getParentsToChildren() {
-    return _parentsToChildren;
-  }
-
-  public Map<String, Set<String>> getChildrenToParents() {
-    return _childrenToParents;
-  }
-
-  public Set<String> getAllNodes() {
-    return _allNodes;
-  }
-
-  public Set<String> getDirectChildren(String node) {
-    if (!_parentsToChildren.containsKey(node)) {
-      return Collections.emptySet();
-    }
-    return _parentsToChildren.get(node);
-  }
-
-  public Set<String> getDirectParents(String node) {
-    if (!_childrenToParents.containsKey(node)) {
-      return Collections.emptySet();
-    }
-    return _childrenToParents.get(node);
-  }
-
-  public String toJson() throws Exception {
-    return new ObjectMapper().writeValueAsString(this);
-  }
-
-  public static TaskDag fromJson(String json) {
-    try {
-      return new ObjectMapper().readValue(json, TaskDag.class);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Unable to parse json " + json + " into task dag");
-    }
-  }
-
-  /**
-   * Checks that dag contains no cycles and all nodes are reachable.
-   */
-  public void validate() {
-    Set<String> prevIteration = new TreeSet<String>();
-
-    // get all unparented nodes
-    for (String node : _allNodes) {
-      if (getDirectParents(node).isEmpty()) {
-        prevIteration.add(node);
-      }
-    }
-
-    // visit children nodes up to max iteration count, by which point we should have exited
-    // naturally
-    Set<String> allNodesReached = new TreeSet<String>();
-    int iterationCount = 0;
-    int maxIterations = _allNodes.size() + 1;
-
-    while (!prevIteration.isEmpty() && iterationCount < maxIterations) {
-      // construct set of all children reachable from prev iteration
-      Set<String> thisIteration = new TreeSet<String>();
-      for (String node : prevIteration) {
-        thisIteration.addAll(getDirectChildren(node));
-      }
-
-      allNodesReached.addAll(prevIteration);
-      prevIteration = thisIteration;
-      iterationCount++;
-    }
-
-    allNodesReached.addAll(prevIteration);
-
-    if (iterationCount >= maxIterations) {
-      throw new IllegalArgumentException("DAG invalid: cycles detected");
-    }
-
-    if (!allNodesReached.containsAll(_allNodes)) {
-      throw new IllegalArgumentException("DAG invalid: unreachable nodes found. Reachable set is "
-          + allNodesReached);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index dd47625..ada2f99 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -20,8 +20,8 @@ package org.apache.helix.task;
  */
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -41,11 +41,13 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
-import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.log4j.Logger;
 
+import com.beust.jcommander.internal.Lists;
+
 /**
  * CLI for scheduling/canceling workflows
  */
@@ -152,67 +154,77 @@ public class TaskDriver {
         flow.getResourceConfigMap());
 
     // then schedule tasks
-    for (String task : flow.getTaskConfigs().keySet()) {
-      scheduleTask(task, TaskConfig.Builder.fromMap(flow.getTaskConfigs().get(task)).build());
+    for (String job : flow.getJobConfigs().keySet()) {
+      JobConfig.Builder builder = JobConfig.Builder.fromMap(flow.getJobConfigs().get(job));
+      if (flow.getTaskConfigs() != null && flow.getTaskConfigs().containsKey(job)) {
+        builder.addTaskConfigs(flow.getTaskConfigs().get(job));
+      }
+      scheduleJob(job, builder.build());
     }
   }
 
-  /** Posts new task to cluster */
-  private void scheduleTask(String taskResource, TaskConfig taskConfig) throws Exception {
-    // Set up task resource based on partitions provided, or from target resource
-    int numPartitions;
-    List<Integer> partitions = taskConfig.getTargetPartitions();
-    String targetResource = taskConfig.getTargetResource();
-    if (partitions != null && !partitions.isEmpty()) {
-      numPartitions = partitions.size();
-    } else if (targetResource != null) {
-      numPartitions =
-          _admin.getResourceIdealState(_clusterName, taskConfig.getTargetResource())
-              .getPartitionSet().size();
-    } else {
-      numPartitions = 0;
+  /** Posts new job to cluster */
+  private void scheduleJob(String jobResource, JobConfig jobConfig) throws Exception {
+    // Set up job resource based on partitions from target resource
+    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
+    int numPartitions =
+        (numIndependentTasks > 0) ? numIndependentTasks : _admin
+            .getResourceIdealState(_clusterName, jobConfig.getTargetResource()).getPartitionSet()
+            .size();
+    _admin.addResource(_clusterName, jobResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
+
+    // Set the job configuration
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    HelixProperty resourceConfig = new HelixProperty(jobResource);
+    resourceConfig.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    if (taskConfigMap != null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        resourceConfig.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
+      }
     }
-    _admin.addResource(_clusterName, taskResource, numPartitions, TaskConstants.STATE_MODEL_NAME);
-    _admin.setConfig(TaskUtil.getResourceConfigScope(_clusterName, taskResource),
-        taskConfig.getResourceConfigMap());
+    accessor.setProperty(keyBuilder.resourceConfig(jobResource), resourceConfig);
 
     // Push out new ideal state based on number of target partitions
-    CustomModeISBuilder builder = new CustomModeISBuilder(taskResource);
+    CustomModeISBuilder builder = new CustomModeISBuilder(jobResource);
     builder.setRebalancerMode(IdealState.RebalanceMode.USER_DEFINED);
     builder.setNumReplica(1);
     builder.setNumPartitions(numPartitions);
     builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
     for (int i = 0; i < numPartitions; i++) {
-      builder.add(taskResource + "_" + i);
+      builder.add(jobResource + "_" + i);
     }
     IdealState is = builder.build();
-    Class<? extends HelixRebalancer> rebalancerClass =
-        (targetResource != null) ? TaskRebalancer.class : IndependentTaskRebalancer.class;
-    is.setRebalancerClassName(rebalancerClass.getName());
-    _admin.setResourceIdealState(_clusterName, taskResource, is);
+    if (taskConfigMap != null && !taskConfigMap.isEmpty()) {
+      is.setRebalancerClassName(GenericTaskRebalancer.class.getName());
+    } else {
+      is.setRebalancerClassName(FixedTargetTaskRebalancer.class.getName());
+    }
+    _admin.setResourceIdealState(_clusterName, jobResource, is);
   }
 
-  /** Public method to resume a task/workflow */
+  /** Public method to resume a job/workflow */
   public void resume(String resource) {
     setTaskTargetState(resource, TargetState.START);
   }
 
-  /** Public method to stop a task/workflow */
+  /** Public method to stop a job/workflow */
   public void stop(String resource) {
     setTaskTargetState(resource, TargetState.STOP);
   }
 
-  /** Public method to delete a task/workflow */
+  /** Public method to delete a job/workflow */
   public void delete(String resource) {
     setTaskTargetState(resource, TargetState.DELETE);
   }
 
   /** Helper function to change target state for a given task */
-  private void setTaskTargetState(String taskResource, TargetState state) {
+  private void setTaskTargetState(String jobResource, TargetState state) {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-    HelixProperty p = new HelixProperty(taskResource);
+    HelixProperty p = new HelixProperty(jobResource);
     p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name());
-    accessor.updateProperty(accessor.keyBuilder().resourceConfig(taskResource), p);
+    accessor.updateProperty(accessor.keyBuilder().resourceConfig(jobResource), p);
 
     invokeRebalance();
   }
@@ -222,34 +234,24 @@ public class TaskDriver {
     WorkflowContext wCtx = TaskUtil.getWorkflowContext(_manager, resource);
 
     LOG.info("Workflow " + resource + " consists of the following tasks: "
-        + wCfg.getTaskDag().getAllNodes());
+        + wCfg.getJobDag().getAllNodes());
     LOG.info("Current state of workflow is " + wCtx.getWorkflowState().name());
-    LOG.info("Task states are: ");
+    LOG.info("Job states are: ");
     LOG.info("-------");
-    for (String task : wCfg.getTaskDag().getAllNodes()) {
-      LOG.info("Task " + task + " is " + wCtx.getTaskState(task));
+    for (String job : wCfg.getJobDag().getAllNodes()) {
+      LOG.info("Task " + job + " is " + wCtx.getJobState(job));
 
       // fetch task information
-      TaskContext tCtx = TaskUtil.getTaskContext(_manager, task);
-      TaskConfig tCfg = TaskUtil.getTaskCfg(_manager, task);
+      JobContext jCtx = TaskUtil.getJobContext(_manager, job);
 
       // calculate taskPartitions
-      List<Integer> partitions;
-      if (tCfg.getTargetPartitions() != null) {
-        partitions = tCfg.getTargetPartitions();
-      } else {
-        partitions = new ArrayList<Integer>();
-        for (String pStr : _admin.getResourceIdealState(_clusterName, tCfg.getTargetResource())
-            .getPartitionSet()) {
-          partitions
-              .add(Integer.parseInt(pStr.substring(pStr.lastIndexOf("_") + 1, pStr.length())));
-        }
-      }
+      List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
+      Collections.sort(partitions);
 
       // group partitions by status
       Map<TaskPartitionState, Integer> statusCount = new TreeMap<TaskPartitionState, Integer>();
       for (Integer i : partitions) {
-        TaskPartitionState s = tCtx.getPartitionState(i);
+        TaskPartitionState s = jCtx.getPartitionState(i);
         if (!statusCount.containsKey(s)) {
           statusCount.put(s, 0);
         }
@@ -288,20 +290,26 @@ public class TaskDriver {
     return options;
   }
 
-  /** Constructs option group containing options required by all drivable tasks */
+  /** Constructs option group containing options required by all drivable jobs */
   @SuppressWarnings("static-access")
   private static OptionGroup contructGenericRequiredOptionGroup() {
     Option zkAddressOption =
-        OptionBuilder.isRequired().hasArgs(1).withArgName("zkAddress").withLongOpt(ZK_ADDRESS)
-            .withDescription("ZK address managing target cluster").create();
+        OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
+            .withDescription("ZK address managing cluster").create();
+    zkAddressOption.setArgs(1);
+    zkAddressOption.setArgName("zkAddress");
 
     Option clusterNameOption =
-        OptionBuilder.isRequired().hasArgs(1).withArgName("clusterName")
-            .withLongOpt(CLUSTER_NAME_OPTION).withDescription("Target cluster name").create();
+        OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
+            .create();
+    clusterNameOption.setArgs(1);
+    clusterNameOption.setArgName("clusterName");
 
     Option taskResourceOption =
-        OptionBuilder.isRequired().hasArgs(1).withArgName("resourceName")
-            .withLongOpt(RESOURCE_OPTION).withDescription("Target workflow or task").create();
+        OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
+            .withDescription("Workflow or job name").create();
+    taskResourceOption.setArgs(1);
+    taskResourceOption.setArgName("resourceName");
 
     OptionGroup group = new OptionGroup();
     group.addOption(zkAddressOption);
@@ -310,12 +318,14 @@ public class TaskDriver {
     return group;
   }
 
-  /** Constructs option group containing options required by all drivable tasks */
-  @SuppressWarnings("static-access")
+  /** Constructs option group containing options required by all drivable jobs */
   private static OptionGroup constructStartOptionGroup() {
+    @SuppressWarnings("static-access")
     Option workflowFileOption =
-        OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION).hasArgs(1).withArgName("workflowFile")
+        OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
             .withDescription("Local file describing workflow").create();
+    workflowFileOption.setArgs(1);
+    workflowFileOption.setArgName("workflowFile");
 
     OptionGroup group = new OptionGroup();
     group.addOption(workflowFileOption);

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
index 0cbf24c..31fddc7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskFactory.java
@@ -19,14 +19,15 @@ package org.apache.helix.task;
  * under the License.
  */
 
+
 /**
  * A factory for {@link Task} objects.
  */
 public interface TaskFactory {
   /**
    * Returns a {@link Task} instance.
-   * @param config Configuration information for the task.
+   * @param context Contextual information for the task, including task and job configurations
    * @return A {@link Task} instance.
    */
-  Task createNewTask(String config);
+  Task createNewTask(TaskCallbackContext context);
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 7b93b82..829f0c4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,112 +19,585 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.context.ControllerContextProvider;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 
 /**
- * Custom rebalancer implementation for the {@code Task} state model. Tasks are assigned to
- * instances hosting target resource partitions in target states
+ * Custom rebalancer implementation for the {@code Task} state model.
  */
-public class TaskRebalancer extends AbstractTaskRebalancer {
+public abstract class TaskRebalancer implements HelixRebalancer {
+  private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
+  private HelixManager _manager;
+
+  /**
+   * Get all the partitions that should be created by this task
+   * @param jobCfg the task configuration
+   * @param jobCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param cache cluster snapshot
+   * @return set of partition numbers
+   */
+  public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Cluster cache);
+
+  /**
+   * Compute an assignment of tasks to instances
+   * @param currStateOutput the current state of the instances
+   * @param prevAssignment the previous task partition assignment
+   * @param instanceList the instances
+   * @param jobCfg the task configuration
+   * @param taskCtx the task context
+   * @param workflowCfg the workflow configuration
+   * @param workflowCtx the workflow context
+   * @param partitionSet the partitions to assign
+   * @param cache cluster snapshot
+   * @return map of instances to set of partition numbers
+   */
+  public abstract Map<ParticipantId, SortedSet<Integer>> getTaskAssignment(
+      ResourceCurrentState currStateOutput, ResourceAssignment prevAssignment,
+      Iterable<ParticipantId> instanceList, JobConfig jobCfg, JobContext jobContext,
+      WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
+      Cluster cache);
+
   @Override
-  public Set<Integer> getAllTaskPartitions(TaskConfig taskCfg, WorkflowConfig workflowCfg,
-      WorkflowContext workflowCtx, Cluster cluster) {
-    return getAllTaskPartitions(getTgtIdealState(taskCfg, cluster), taskCfg);
+  public void init(HelixManager manager, ControllerContextProvider contextProvider) {
+    _manager = manager;
   }
 
   @Override
-  public Map<String, SortedSet<Integer>> getTaskAssignment(ResourceCurrentState currStateOutput,
-      ResourceAssignment prevAssignment, Iterable<ParticipantId> instanceList, TaskConfig taskCfg,
-      TaskContext taskCtx, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
-      Set<Integer> partitionSet, Cluster cluster) {
-    IdealState tgtIs = getTgtIdealState(taskCfg, cluster);
-    if (tgtIs == null) {
-      return Collections.emptyMap();
+  public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+      ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) {
+    IdealState taskIs = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState();
+    return computeBestPossiblePartitionState(cluster, taskIs,
+        cluster.getResource(rebalancerConfig.getResourceId()), currentState);
+  }
+
+  public ResourceAssignment computeBestPossiblePartitionState(Cluster clusterData,
+      IdealState taskIs, Resource resource, ResourceCurrentState currStateOutput) {
+    final String resourceName = resource.getId().toString();
+
+    // Fetch job configuration
+    JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
+    String workflowResource = jobCfg.getWorkflow();
+
+    // Fetch workflow configuration and context
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
+    WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
+
+    // Initialize workflow context if needed
+    if (workflowCtx == null) {
+      workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
+      workflowCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // Check parent dependencies
+    for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) {
+      if (workflowCtx.getJobState(parent) == null
+          || !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) {
+        return emptyAssignment(resourceName);
+      }
+    }
+
+    // Clean up if workflow marked for deletion
+    TargetState targetState = workflowCfg.getTargetState();
+    if (targetState == TargetState.DELETE) {
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName);
+    }
+
+    // Check if this workflow has been finished past its expiry.
+    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
+        && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
+      markForDeletion(_manager, workflowResource);
+      cleanup(_manager, resourceName, workflowCfg, workflowResource);
+      return emptyAssignment(resourceName);
+    }
+
+    // Fetch any existing context information from the property store.
+    JobContext jobCtx = TaskUtil.getJobContext(_manager, resourceName);
+    if (jobCtx == null) {
+      jobCtx = new JobContext(new ZNRecord("TaskContext"));
+      jobCtx.setStartTime(System.currentTimeMillis());
+    }
+
+    // The job is already in a final state (completed/failed).
+    if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
+        || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
+      return emptyAssignment(resourceName);
+    }
+
+    ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
+    if (prevAssignment == null) {
+      prevAssignment = new ResourceAssignment(ResourceId.from(resourceName));
+    }
+
+    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+    // is stored in zk.
+    // Fetch the previous resource assignment from the property store. This is required because of
+    // HELIX-230.
+    Set<Integer> partitionsToDrop = new TreeSet<Integer>();
+
+    ResourceAssignment newAssignment =
+        computeResourceMapping(resourceName, workflowCfg, jobCfg, prevAssignment, clusterData
+            .getLiveParticipantMap().keySet(), currStateOutput, workflowCtx, jobCtx,
+            partitionsToDrop, clusterData);
+
+    if (!partitionsToDrop.isEmpty()) {
+      for (Integer pId : partitionsToDrop) {
+        taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
+      }
+      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+      PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+      accessor.setProperty(propertyKey, taskIs);
+    }
+
+    // Update rebalancer context, previous ideal state.
+    TaskUtil.setJobContext(_manager, resourceName, jobCtx);
+    TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
+    TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
+
+    return newAssignment;
+  }
+
+  private ResourceAssignment computeResourceMapping(String jobResource,
+      WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment,
+      Iterable<ParticipantId> liveInstances, ResourceCurrentState currStateOutput,
+      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
+      Cluster cache) {
+    TargetState jobTgtState = workflowConfig.getTargetState();
+
+    // Update running status in workflow context
+    if (jobTgtState == TargetState.STOP) {
+      workflowCtx.setJobState(jobResource, TaskState.STOPPED);
+      // Workflow has been stopped if all jobs are stopped
+      if (isWorkflowStopped(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.STOPPED);
+      }
+    } else {
+      workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
+      // Workflow is in progress if any task is in progress
+      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+    }
+
+    // Used to keep track of tasks that have already been assigned to instances.
+    Set<Integer> assignedPartitions = new HashSet<Integer>();
+
+    // Keeps a mapping of (partition) -> (instance, state)
+    Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
+
+    // Process all the current assignments of tasks.
+    Set<Integer> allPartitions =
+        getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+    Map<ParticipantId, SortedSet<Integer>> taskAssignments =
+        getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+    for (ParticipantId instance : taskAssignments.keySet()) {
+      Set<Integer> pSet = taskAssignments.get(instance);
+      // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+      // TASK_ERROR, ERROR.
+      Set<Integer> donePartitions = new TreeSet<Integer>();
+      for (int pId : pSet) {
+        final String pName = pName(jobResource, pId);
+
+        // Check for pending state transitions on this (partition, instance).
+        State pendingState =
+            currStateOutput.getPendingState(ResourceId.from(jobResource), PartitionId.from(pName),
+                instance);
+        if (pendingState != null) {
+          // There is a pending state transition for this (partition, instance). Just copy forward
+          // the state
+          // assignment from the previous ideal state.
+          Map<ParticipantId, State> stateMap =
+              prevAssignment.getReplicaMap(PartitionId.from(pName));
+          if (stateMap != null) {
+            State prevState = stateMap.get(instance);
+            paMap.put(pId, new PartitionAssignment(instance.toString(), prevState.toString()));
+            assignedPartitions.add(pId);
+            LOG.debug(String
+                .format(
+                    "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+                    pName, instance, prevState));
+          }
+
+          continue;
+        }
+
+        State currHelixState =
+            currStateOutput.getCurrentState(ResourceId.from(jobResource), PartitionId.from(pName),
+                instance);
+        TaskPartitionState currState =
+            (currHelixState != null) ? TaskPartitionState.valueOf(currHelixState.toString()) : null;
+
+        // Process any requested state transitions.
+        State requestedStateStr =
+            currStateOutput.getRequestedState(ResourceId.from(jobResource),
+                PartitionId.from(pName), instance);
+        if (requestedStateStr != null && !requestedStateStr.toString().isEmpty()) {
+          TaskPartitionState requestedState =
+              TaskPartitionState.valueOf(requestedStateStr.toString());
+          if (requestedState.equals(currState)) {
+            LOG.warn(String.format(
+                "Requested state %s is the same as the current state for instance %s.",
+                requestedState, instance));
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance.toString(), requestedState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format(
+              "Instance %s requested a state transition to %s for partition %s.", instance,
+              requestedState, pName));
+          continue;
+        }
+
+        switch (currState) {
+        case RUNNING:
+        case STOPPED: {
+          TaskPartitionState nextState;
+          if (jobTgtState == TargetState.START) {
+            nextState = TaskPartitionState.RUNNING;
+          } else {
+            nextState = TaskPartitionState.STOPPED;
+          }
+
+          paMap.put(pId, new PartitionAssignment(instance.toString(), nextState.name()));
+          assignedPartitions.add(pId);
+          LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+              nextState, instance));
+        }
+          break;
+        case COMPLETED: {
+          // The task has completed on this partition. Mark as such in the context object.
+          donePartitions.add(pId);
+          LOG.debug(String
+              .format(
+                  "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+                  pName, currState));
+          partitionsToDropFromIs.add(pId);
+          markPartitionCompleted(jobCtx, pId);
+        }
+          break;
+        case TIMED_OUT:
+        case TASK_ERROR:
+        case ERROR: {
+          donePartitions.add(pId); // The task may be rescheduled on a different instance.
+          LOG.debug(String.format(
+              "Task partition %s has error state %s. Marking as such in rebalancer context.",
+              pName, currState));
+          markPartitionError(jobCtx, pId, currState);
+          // The error policy is to fail the task as soon a single partition fails for a specified
+          // maximum number of
+          // attempts.
+          if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()) {
+            workflowCtx.setJobState(jobResource, TaskState.FAILED);
+            workflowCtx.setWorkflowState(TaskState.FAILED);
+            addAllPartitions(allPartitions, partitionsToDropFromIs);
+            return emptyAssignment(jobResource);
+          }
+        }
+          break;
+        case INIT:
+        case DROPPED: {
+          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+          donePartitions.add(pId);
+          LOG.debug(String.format(
+              "Task partition %s has state %s. It will be dropped from the current ideal state.",
+              pName, currState));
+        }
+          break;
+        default:
+          throw new AssertionError("Unknown enum symbol: " + currState);
+        }
+      }
+
+      // Remove the set of task partitions that are completed or in one of the error states.
+      pSet.removeAll(donePartitions);
+    }
+
+    if (isJobComplete(jobCtx, allPartitions)) {
+      workflowCtx.setJobState(jobResource, TaskState.COMPLETED);
+      if (isWorkflowComplete(workflowCtx, workflowConfig)) {
+        workflowCtx.setWorkflowState(TaskState.COMPLETED);
+        workflowCtx.setFinishTime(System.currentTimeMillis());
+      }
+    }
+
+    // Make additional task assignments if needed.
+    if (jobTgtState == TargetState.START) {
+      // Contains the set of task partitions that must be excluded from consideration when making
+      // any new assignments.
+      // This includes all completed, failed, already assigned partitions.
+      Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
+      addCompletedPartitions(excludeSet, jobCtx, allPartitions);
+      // Get instance->[partition, ...] mappings for the target resource.
+      Map<ParticipantId, SortedSet<Integer>> tgtPartitionAssignments =
+          getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
+              workflowConfig, workflowCtx, allPartitions, cache);
+      for (Map.Entry<ParticipantId, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
+        ParticipantId instance = entry.getKey();
+        if (!tgtPartitionAssignments.containsKey(instance)) {
+          continue;
+        }
+        // Contains the set of task partitions currently assigned to the instance.
+        Set<Integer> pSet = entry.getValue();
+        int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
+        if (numToAssign > 0) {
+          List<Integer> nextPartitions =
+              getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+          for (Integer pId : nextPartitions) {
+            String pName = pName(jobResource, pId);
+            paMap.put(pId,
+                new PartitionAssignment(instance.toString(), TaskPartitionState.RUNNING.name()));
+            excludeSet.add(pId);
+            LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+                TaskPartitionState.RUNNING, instance));
+          }
+        }
+      }
     }
-    Set<String> tgtStates = taskCfg.getTargetPartitionStates();
-    return getTgtPartitionAssignment(currStateOutput, instanceList, tgtIs, tgtStates, partitionSet);
+
+    // Construct a ResourceAssignment object from the map of partition assignments.
+    ResourceAssignment ra = new ResourceAssignment(ResourceId.from(jobResource));
+    for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
+      PartitionAssignment pa = e.getValue();
+      ra.addReplicaMap(PartitionId.from(pName(jobResource, e.getKey())),
+          ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
+    }
+
+    return ra;
   }
 
   /**
-   * Gets the ideal state of the target resource of this task
-   * @param taskCfg task config containing target resource id
-   * @param cluster snapshot of the cluster containing the task and target resource
-   * @return target resource ideal state, or null
+   * Checks if the job has completed.
+   * @param ctx The rebalancer context.
+   * @param allPartitions The set of partitions to check.
+   * @return true if all task partitions have been marked with status
+   *         {@link TaskPartitionState#COMPLETED} in the rebalancer
+   *         context, false otherwise.
    */
-  private static IdealState getTgtIdealState(TaskConfig taskCfg, Cluster cluster) {
-    ResourceId tgtResourceId = ResourceId.from(taskCfg.getTargetResource());
-    Resource resource = cluster.getResource(tgtResourceId);
-    return resource != null ? resource.getIdealState() : null;
+  private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions) {
+    for (Integer pId : allPartitions) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state != TaskPartitionState.COMPLETED) {
+        return false;
+      }
+    }
+    return true;
   }
 
   /**
-   * Returns the set of all partition ids for a task.
-   * <p/>
-   * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
-   * use the list of all partition ids from the target resource.
+   * Checks if the workflow has completed.
+   * @param ctx Workflow context containing job states
+   * @param cfg Workflow config containing set of jobs
+   * @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
    */
-  private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
-    if (tgtResourceIs == null) {
-      return null;
+  private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      if (ctx.getJobState(job) != TaskState.COMPLETED) {
+        return false;
+      }
     }
-    Set<Integer> taskPartitions = new HashSet<Integer>();
-    if (taskCfg.getTargetPartitions() != null) {
-      for (Integer pId : taskCfg.getTargetPartitions()) {
-        taskPartitions.add(pId);
+    return true;
+  }
+
+  /**
+   * Checks if the workflow has been stopped.
+   * @param ctx Workflow context containing task states
+   * @param cfg Workflow config containing set of tasks
+   * @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
+   */
+  private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      if (ctx.getJobState(job) != TaskState.STOPPED && ctx.getJobState(job) != null) {
+        return false;
       }
-    } else {
-      for (String pName : tgtResourceIs.getPartitionSet()) {
-        taskPartitions.add(pId(pName));
+    }
+    return true;
+  }
+
+  private static void markForDeletion(HelixManager mgr, String resourceName) {
+    mgr.getConfigAccessor().set(
+        TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+        WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
+  }
+
+  /**
+   * Cleans up all Helix state associated with this job, wiping workflow-level information if this
+   * is the last remaining job in its workflow.
+   */
+  private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+      String workflowResource) {
+    HelixDataAccessor accessor = mgr.getHelixDataAccessor();
+    // Delete resource configs.
+    PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(cfgKey)) {
+      throw new RuntimeException(
+          String
+              .format(
+                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                  resourceName, cfgKey));
+    }
+    // Delete property store information for this resource.
+    String propStoreKey = getRebalancerPropStoreKey(resourceName);
+    if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+      throw new RuntimeException(
+          String
+              .format(
+                  "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                  resourceName, propStoreKey));
+    }
+    // Finally, delete the ideal state itself.
+    PropertyKey isKey = getISPropertyKey(accessor, resourceName);
+    if (!accessor.removeProperty(isKey)) {
+      throw new RuntimeException(String.format(
+          "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
+          resourceName, isKey));
+    }
+    LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
+
+    boolean lastInWorkflow = true;
+    for (String job : cfg.getJobDag().getAllNodes()) {
+      // check if property store information or resource configs exist for this job
+      if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(job),
+          AccessOption.PERSISTENT)
+          || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null
+          || accessor.getProperty(getISPropertyKey(accessor, job)) != null) {
+        lastInWorkflow = false;
+      }
+    }
+
+    // clean up job-level info if this was the last in workflow
+    if (lastInWorkflow) {
+      // delete workflow config
+      PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
+      if (!accessor.removeProperty(workflowCfgKey)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowCfgKey));
+      }
+      // Delete property store information for this workflow
+      String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
+      if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+        throw new RuntimeException(
+            String
+                .format(
+                    "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+                    workflowResource, workflowPropStoreKey));
+      }
+    }
+
+  }
+
+  private static String getRebalancerPropStoreKey(String resource) {
+    return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
+  }
+
+  private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().idealStates(resource);
+  }
+
+  private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
+    return accessor.keyBuilder().resourceConfig(resource);
+  }
+
+  private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) {
+    for (Integer pId : toAdd) {
+      destination.add(pId);
+    }
+  }
+
+  private static ResourceAssignment emptyAssignment(String name) {
+    return new ResourceAssignment(ResourceId.from(name));
+  }
+
+  private static void addCompletedPartitions(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds) {
+    for (Integer pId : pIds) {
+      TaskPartitionState state = ctx.getPartitionState(pId);
+      if (state == TaskPartitionState.COMPLETED) {
+        set.add(pId);
       }
     }
-    return taskPartitions;
+  }
+
+  private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+      Set<Integer> excluded, int n) {
+    List<Integer> result = new ArrayList<Integer>();
+    for (Integer pId : candidatePartitions) {
+      if (result.size() >= n) {
+        break;
+      }
+
+      if (!excluded.contains(pId)) {
+        result.add(pId);
+      }
+    }
+
+    return result;
+  }
+
+  private static void markPartitionCompleted(JobContext ctx, int pId) {
+    ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
+  }
+
+  private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state) {
+    ctx.setPartitionState(pId, state);
+    ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
+    ctx.incrementNumAttempts(pId);
   }
 
   /**
-   * Get partition assignments for the target resource, but only for the partitions of interest.
-   * @param currStateOutput The current state of the instances in the cluster.
-   * @param instanceList The set of instances.
-   * @param tgtIs The ideal state of the target resource.
-   * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
-   *          do not need to
-   *          be in any specific state to be considered.
-   * @param includeSet The set of partitions to consider.
-   * @return A map of instance vs set of partition ids assigned to that instance.
+   * Return the assignment of task partitions per instance.
    */
-  private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
-      ResourceCurrentState currStateOutput, Iterable<ParticipantId> instanceList, IdealState tgtIs,
-      Set<String> tgtStates, Set<Integer> includeSet) {
-    Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
+  private static Map<ParticipantId, SortedSet<Integer>> getTaskPartitionAssignments(
+      Iterable<ParticipantId> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
+    Map<ParticipantId, SortedSet<Integer>> result =
+        new HashMap<ParticipantId, SortedSet<Integer>>();
     for (ParticipantId instance : instanceList) {
-      result.put(instance.stringify(), new TreeSet<Integer>());
+      result.put(instance, new TreeSet<Integer>());
     }
 
-    for (String pName : tgtIs.getPartitionSet()) {
-      int pId = pId(pName);
+    for (PartitionId partition : assignment.getMappedPartitionIds()) {
+      int pId = pId(partition.toString());
       if (includeSet.contains(pId)) {
-        for (ParticipantId instance : instanceList) {
-          State s =
-              currStateOutput.getCurrentState(ResourceId.from(tgtIs.getResourceName()),
-                  PartitionId.from(pName), instance);
-          String state = (s == null ? null : s.toString());
-          if (tgtStates == null || tgtStates.contains(state)) {
-            result.get(instance).add(pId);
+        Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partition);
+        for (ParticipantId instance : replicaMap.keySet()) {
+          SortedSet<Integer> pList = result.get(instance);
+          if (pList != null) {
+            pList.add(pId);
           }
         }
       }
@@ -132,4 +605,32 @@ public class TaskRebalancer extends AbstractTaskRebalancer {
 
     return result;
   }
+
+  /**
+   * Computes the partition name given the resource name and partition id.
+   */
+  protected static String pName(String resource, int pId) {
+    return resource + "_" + pId;
+  }
+
+  /**
+   * Extracts the partition id from the given partition name.
+   */
+  protected static int pId(String pName) {
+    String[] tokens = pName.split("_");
+    return Integer.valueOf(tokens[tokens.length - 1]);
+  }
+
+  /**
+   * An (instance, state) pair.
+   */
+  private static class PartitionAssignment {
+    private final String _instance;
+    private final String _state;
+
+    private PartitionAssignment(String instance, String state) {
+      _instance = instance;
+      _state = state;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
index 92976b0..cd909ed 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRunner.java
@@ -29,7 +29,6 @@ import org.apache.log4j.Logger;
  */
 public class TaskRunner implements Runnable {
   private static final Logger LOG = Logger.getLogger(TaskRunner.class);
-
   private final StateModel _taskStateModel;
   private final HelixManager _manager;
   private final String _taskName;

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
index c399930..a44a8cb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java
@@ -34,12 +34,7 @@ import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.log4j.Logger;
 
-/**
- * task state model
- */
-@StateModelInfo(states = {
-    "INIT", "RUNNING", "STOPPED", "COMPLETED", "TIMED_OUT", "TASK_ERROR", "DROPPED"
-}, initialState = "INIT")
+@StateModelInfo(states = "{'NOT USED BY HELIX'}", initialState = "INIT")
 public class TaskStateModel extends StateModel {
   private static final Logger LOG = Logger.getLogger(TaskStateModel.class);
   private final HelixManager _manager;
@@ -217,19 +212,37 @@ public class TaskStateModel extends StateModel {
   }
 
   private void startTask(Message msg, String taskPartition) {
-    TaskConfig cfg = TaskUtil.getTaskCfg(_manager, msg.getResourceName());
+    JobConfig cfg = TaskUtil.getJobCfg(_manager, msg.getResourceName());
+    TaskConfig taskConfig = null;
     String command = cfg.getCommand();
-    Map<String, String> taskNameMap = cfg.getTaskNameMap();
-    if (taskNameMap != null && taskNameMap.containsKey(taskPartition)) {
-      // Support a partition-specifc override of tasks to run
-      String taskName = taskNameMap.get(taskPartition);
-      if (_taskFactoryRegistry.containsKey(taskName)) {
-        command = taskName;
+
+    // Get a task-specific command if specified
+    JobContext ctx = TaskUtil.getJobContext(_manager, msg.getResourceName());
+    int pId = Integer.parseInt(taskPartition.substring(taskPartition.lastIndexOf('_') + 1));
+    if (ctx.getTaskIdForPartition(pId) != null) {
+      taskConfig = cfg.getTaskConfig(ctx.getTaskIdForPartition(pId));
+      if (taskConfig != null) {
+        if (taskConfig.getCommand() != null) {
+          command = taskConfig.getCommand();
+        }
       }
     }
+
+    // Populate a task callback context
+    TaskCallbackContext callbackContext = new TaskCallbackContext();
+    callbackContext.setManager(_manager);
+    callbackContext.setJobConfig(cfg);
+    callbackContext.setTaskConfig(taskConfig);
+
+    // Create a task instance with this command
+    if (command == null || _taskFactoryRegistry == null
+        || !_taskFactoryRegistry.containsKey(command)) {
+      throw new IllegalStateException("No callback implemented for task " + command);
+    }
     TaskFactory taskFactory = _taskFactoryRegistry.get(command);
-    Task task = taskFactory.createNewTask(cfg.getCommandConfig());
+    Task task = taskFactory.createNewTask(callbackContext);
 
+    // Submit the task for execution
     _taskRunner =
         new TaskRunner(this, task, msg.getResourceName(), taskPartition, msg.getTgtName(),
             _manager, msg.getTgtSessionId());
@@ -244,6 +257,6 @@ public class TaskStateModel extends StateModel {
           _taskRunner.timeout();
         }
       }
-    }, cfg.getTimeoutPerPartition());
+    }, cfg.getTimeoutPerTask());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/97ca4de4/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 0f980b8..96b7e55 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -19,11 +19,17 @@ package org.apache.helix.task;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.State;
@@ -31,54 +37,48 @@ import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.ResourceAssignment;
-import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
 
 /**
  * Static utility methods.
  */
 public class TaskUtil {
   private static final Logger LOG = Logger.getLogger(TaskUtil.class);
-
-  enum TaskUtilEnum {
-    CONTEXT_NODE("Context"),
-    PREV_RA_NODE("PreviousResourceAssignment");
-
-    final String _value;
-
-    private TaskUtilEnum(String value) {
-      _value = value;
-    }
-
-    public String value() {
-      return _value;
-    }
-  }
+  private static final String CONTEXT_NODE = "Context";
+  private static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   /**
-   * Parses task resource configurations in Helix into a {@link TaskConfig} object.
+   * Parses job resource configurations in Helix into a {@link JobConfig} object.
    * @param manager HelixManager object used to connect to Helix.
-   * @param taskResource The name of the task resource.
-   * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+   * @param jobResource The name of the job resource.
+   * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
    *         otherwise.
    */
-  public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
-    ResourceConfiguration config = getResourceConfig(manager, taskResource);
-    Map<String, String> taskCfg = config.getRecord().getSimpleFields();
-    TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
-    if (config.getRecord().getMapFields().containsKey(TaskConfig.TASK_NAME_MAP)) {
-      b.setTaskNameMap(config.getRecord().getMapField(TaskConfig.TASK_NAME_MAP));
+  public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
+    HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
+    JobConfig.Builder b =
+        JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
+    Map<String, Map<String, String>> rawTaskConfigMap =
+        jobResourceConfig.getRecord().getMapFields();
+    Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
+    for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
+      TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
+      taskConfigMap.put(taskConfig.getId(), taskConfig);
     }
+    b.addTaskConfigMap(taskConfigMap);
     return b.build();
   }
 
   public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
-    ResourceConfiguration config = getResourceConfig(manager, workflowResource);
-    Map<String, String> workflowCfg = config.getRecord().getSimpleFields();
+    Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
     WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
+
     return b.build();
   }
 
@@ -109,56 +109,97 @@ public class TaskUtil {
       String resourceName) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
-                TaskUtilEnum.PREV_RA_NODE.value()), null, AccessOption.PERSISTENT);
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+            null, AccessOption.PERSISTENT);
     return r != null ? new ResourceAssignment(r) : null;
   }
 
   public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
       ResourceAssignment ra) {
     manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName,
-            TaskUtilEnum.PREV_RA_NODE.value()), ra.getRecord(), AccessOption.PERSISTENT);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+        ra.getRecord(), AccessOption.PERSISTENT);
   }
 
-  public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+  public static JobContext getJobContext(HelixManager manager, String jobResource) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
-            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
-                TaskUtilEnum.CONTEXT_NODE.value()), null, AccessOption.PERSISTENT);
-    return r != null ? new TaskContext(r) : null;
+            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
+            null, AccessOption.PERSISTENT);
+    return r != null ? new JobContext(r) : null;
   }
 
-  public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+  public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
     manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource,
-            TaskUtilEnum.CONTEXT_NODE.value()), ctx.getRecord(), AccessOption.PERSISTENT);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
+        ctx.getRecord(), AccessOption.PERSISTENT);
   }
 
   public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
     ZNRecord r =
         manager.getHelixPropertyStore().get(
             Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
-                TaskUtilEnum.CONTEXT_NODE.value()), null, AccessOption.PERSISTENT);
+                CONTEXT_NODE), null, AccessOption.PERSISTENT);
     return r != null ? new WorkflowContext(r) : null;
   }
 
   public static void setWorkflowContext(HelixManager manager, String workflowResource,
       WorkflowContext ctx) {
     manager.getHelixPropertyStore().set(
-        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
-            TaskUtilEnum.CONTEXT_NODE.value()), ctx.getRecord(), AccessOption.PERSISTENT);
+        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
+        ctx.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  public static String getNamespacedJobName(String singleJobWorkflow) {
+    return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow);
   }
 
-  public static String getNamespacedTaskName(String singleTaskWorkflow) {
-    return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
+  public static String getNamespacedJobName(String workflowResource, String jobName) {
+    return workflowResource + "_" + jobName;
   }
 
-  public static String getNamespacedTaskName(String workflowResource, String taskName) {
-    return workflowResource + "_" + taskName;
+  public static String serializeJobConfigMap(Map<String, String> commandConfig) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String serializedMap = mapper.writeValueAsString(commandConfig);
+      return serializedMap;
+    } catch (IOException e) {
+      LOG.error("Error serializing " + commandConfig, e);
+    }
+    return null;
+  }
+
+  public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      Map<String, String> commandConfigMap =
+          mapper.readValue(commandConfig, new TypeReference<HashMap<String, String>>() {
+          });
+      return commandConfigMap;
+    } catch (IOException e) {
+      LOG.error("Error deserializing " + commandConfig, e);
+    }
+    return Collections.emptyMap();
+  }
+
+  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
+    HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+
+    Map<String, String> taskCfg = new HashMap<String, String>();
+    List<String> cfgKeys = configAccessor.getKeys(scope);
+    if (cfgKeys == null || cfgKeys.isEmpty()) {
+      return null;
+    }
+
+    for (String cfgKey : cfgKeys) {
+      taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
+    }
+
+    return getResourceConfig(manager, resource).getRecord().getSimpleFields();
   }
 
-  private static ResourceConfiguration getResourceConfig(HelixManager manager, String resource) {
+  private static HelixProperty getResourceConfig(HelixManager manager, String resource) {
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.resourceConfig(resource));


Mime
View raw message