Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 1a4bec71f -> 24b99e877
[HELIX-422] Simplify single job creation
This both augments the Java API to specify fully-built job configs, as
well as reduces the confusion between the job configs and the command
config maps they contain.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/24b99e87
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/24b99e87
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/24b99e87
Branch: refs/heads/helix-0.6.x
Commit: 24b99e877037d56c5a8515b606b366c8a74f1d8d
Parents: 1a4bec7
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Aug 5 13:16:07 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Aug 5 13:16:07 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/JobConfig.java | 26 ++++++++++----------
.../java/org/apache/helix/task/TaskUtil.java | 4 +--
.../java/org/apache/helix/task/Workflow.java | 11 +++++----
.../task/TestIndependentTaskRebalancer.java | 10 ++++----
.../integration/task/TestTaskRebalancer.java | 2 +-
.../task/TestTaskRebalancerStopResume.java | 2 +-
.../integration/task/WorkflowGenerator.java | 6 ++---
7 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 3f9ab41..1dad5e4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -56,7 +56,7 @@ public class JobConfig {
/** The command that is to be run by participants in the case of identical tasks. */
public static final String COMMAND = "Command";
/** The command configuration to be used by the tasks. */
- public static final String JOB_CONFIG_MAP = "JobConfig";
+ public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig";
/** The timeout for a task. */
public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
/** The maximum number of times the task rebalancer may attempt to execute a task. */
@@ -84,7 +84,7 @@ public class JobConfig {
private final List<String> _targetPartitions;
private final Set<String> _targetPartitionStates;
private final String _command;
- private final Map<String, String> _jobConfigMap;
+ private final Map<String, String> _jobCommandConfigMap;
private final long _timeoutPerTask;
private final int _numConcurrentTasksPerInstance;
private final int _maxAttemptsPerTask;
@@ -93,7 +93,7 @@ public class JobConfig {
private final Map<String, TaskConfig> _taskConfigMap;
private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
- Set<String> targetPartitionStates, String command, Map<String, String>
jobConfigMap,
+ Set<String> targetPartitionStates, String command, Map<String, String>
jobCommandConfigMap,
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig>
taskConfigMap) {
_workflow = workflow;
@@ -101,7 +101,7 @@ public class JobConfig {
_targetPartitions = targetPartitions;
_targetPartitionStates = targetPartitionStates;
_command = command;
- _jobConfigMap = jobConfigMap;
+ _jobCommandConfigMap = jobCommandConfigMap;
_timeoutPerTask = timeoutPerTask;
_numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
_maxAttemptsPerTask = maxAttemptsPerTask;
@@ -134,8 +134,8 @@ public class JobConfig {
return _command;
}
- public Map<String, String> getJobConfigMap() {
- return _jobConfigMap;
+ public Map<String, String> getJobCommandConfigMap() {
+ return _jobCommandConfigMap;
}
public long getTimeoutPerTask() {
@@ -172,10 +172,10 @@ public class JobConfig {
if (_command != null) {
cfgMap.put(JobConfig.COMMAND, _command);
}
- if (_jobConfigMap != null) {
- String serializedConfig = TaskUtil.serializeJobConfigMap(_jobConfigMap);
+ if (_jobCommandConfigMap != null) {
+ String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
if (serializedConfig != null) {
- cfgMap.put(JobConfig.JOB_CONFIG_MAP, serializedConfig);
+ cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig);
}
}
if (_targetResource != null) {
@@ -242,10 +242,10 @@ public class JobConfig {
if (cfg.containsKey(COMMAND)) {
b.setCommand(cfg.get(COMMAND));
}
- if (cfg.containsKey(JOB_CONFIG_MAP)) {
+ if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) {
Map<String, String> commandConfigMap =
- TaskUtil.deserializeJobConfigMap(cfg.get(JOB_CONFIG_MAP));
- b.setJobConfigMap(commandConfigMap);
+ TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP));
+ b.setJobCommandConfigMap(commandConfigMap);
}
if (cfg.containsKey(TIMEOUT_PER_TASK)) {
b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
@@ -292,7 +292,7 @@ public class JobConfig {
return this;
}
- public Builder setJobConfigMap(Map<String, String> v) {
+ public Builder setJobCommandConfigMap(Map<String, String> v) {
_commandConfig = v;
return this;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 1aa75d6..e1a96a8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -258,7 +258,7 @@ public class TaskUtil {
* @param commandConfig map of job config key to config value
* @return serialized string
*/
- public static String serializeJobConfigMap(Map<String, String> commandConfig) {
+ public static String serializeJobCommandConfigMap(Map<String, String> commandConfig)
{
ObjectMapper mapper = new ObjectMapper();
try {
String serializedMap = mapper.writeValueAsString(commandConfig);
@@ -274,7 +274,7 @@ public class TaskUtil {
* @param commandConfig the serialized job config map
* @return a map of job config key to config value
*/
- public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
+ public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig)
{
ObjectMapper mapper = new ObjectMapper();
try {
Map<String, String> commandConfigMap =
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 84680d3..77a5ba7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -172,7 +172,7 @@ public class Workflow {
builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
builder.addConfig(job.name, JobConfig.COMMAND, job.command);
if (job.jobConfigMap != null) {
- builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
+ builder.addConfig(job.name, JobConfig.JOB_COMMAND_CONFIG_MAP, job.jobConfigMap.toString());
}
builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
if (job.targetPartitionStates != null) {
@@ -265,17 +265,18 @@ public class Workflow {
return this;
}
- public Builder addJobConfigMap(String job, Map<String, String> jobConfigMap) {
- return addConfig(job, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+ public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap)
{
+ return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP,
+ TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
}
- public Builder addJobConfig(String job, JobConfig jobConfig) {
+ public Builder addJobConfig(String job, JobConfig.Builder jobConfigBuilder) {
+ JobConfig jobConfig = jobConfigBuilder.setWorkflow(_name).build();
for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet())
{
String key = e.getKey();
String val = e.getValue();
addConfig(job, key, val);
}
- jobConfig.getJobConfigMap().put(JobConfig.WORKFLOW_ID, _name);
addTaskConfigs(job, jobConfig.getTaskConfigMap().values());
return this;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index b5856b1..b7f20d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -147,7 +147,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -175,7 +175,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -202,7 +202,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -231,7 +231,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
+ (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
_driver.start(workflowBuilder.build());
// Ensure the job completes
@@ -261,7 +261,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
Map<String, String> jobConfigMap = Maps.newHashMap();
jobConfigMap.put("Timeout", "1000");
- workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+ workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
_driver.start(workflowBuilder.build());
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 208480c..2ce76c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -293,7 +293,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
public ReindexTask(TaskCallbackContext context) {
JobConfig jobCfg = context.getJobConfig();
- Map<String, String> cfg = jobCfg.getJobConfigMap();
+ Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
if (cfg == null) {
cfg = Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 97b8c7e..4e0d92a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -183,7 +183,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
public ReindexTask(TaskCallbackContext context) {
JobConfig jobCfg = context.getJobConfig();
- Map<String, String> cfg = jobCfg.getJobConfigMap();
+ Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
if (cfg == null) {
cfg = Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 921a5f9..318673f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -84,7 +84,7 @@ public class WorkflowGenerator {
ObjectMapper mapper = new ObjectMapper();
try {
String serializedMap = mapper.writeValueAsString(commandConfig);
- builder.addConfig(jobName, JobConfig.JOB_CONFIG_MAP, serializedMap);
+ builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
} catch (IOException e) {
LOG.error("Error serializing " + commandConfig, e);
}
@@ -103,8 +103,8 @@ public class WorkflowGenerator {
ObjectMapper mapper = new ObjectMapper();
try {
String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
- builder.addConfig(JOB_NAME_1, JobConfig.JOB_CONFIG_MAP, serializedMap);
- builder.addConfig(JOB_NAME_2, JobConfig.JOB_CONFIG_MAP, serializedMap);
+ builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
+ builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
} catch (IOException e) {
LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
}
|