helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [3/4] helix git commit: Clean up unit tests for task framework.
Date Thu, 16 Jun 2016 23:36:12 GMT
Clean up unit tests for task framework.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d381a3a1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d381a3a1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d381a3a1

Branch: refs/heads/helix-0.6.x
Commit: d381a3a1cc69d129388896907b9cc696811650c7
Parents: 79c490f
Author: Lei Xia <lxia@linkedin.com>
Authored: Wed Feb 10 16:33:08 2016 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Wed Apr 13 10:43:23 2016 -0700

----------------------------------------------------------------------
 .../webapp/resources/TestJobQueuesResource.java |   7 +-
 .../helix/integration/task/DummyTask.java       |  72 ------
 .../apache/helix/integration/task/MockTask.java |  73 ++++++
 .../helix/integration/task/TaskTestUtil.java    | 253 +++++++++++++++++++
 .../task/TestIndependentTaskRebalancer.java     |  23 +-
 .../integration/task/TestRecurringJobQueue.java | 157 +++---------
 .../integration/task/TestTaskRebalancer.java    |  72 +-----
 .../task/TestTaskRebalancerFailover.java        |  12 +-
 .../task/TestTaskRebalancerParallel.java        |  60 +----
 .../task/TestTaskRebalancerRetryLimit.java      |   2 +-
 .../task/TestTaskRebalancerStopResume.java      | 154 ++++-------
 .../apache/helix/integration/task/TestUtil.java | 207 ---------------
 .../integration/task/WorkflowGenerator.java     |   4 +-
 13 files changed, 451 insertions(+), 645 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index 9c2306a..5d8a93b 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -29,7 +29,7 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.DummyTask;
+import org.apache.helix.integration.task.MockTask;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.Task;
@@ -42,9 +42,6 @@ import org.apache.helix.task.beans.WorkflowBean;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.webapp.AdminTestBase;
 import org.apache.helix.webapp.AdminTestHelper;
-import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
-import org.apache.helix.webapp.resources.JsonParameters;
-import org.apache.helix.webapp.resources.ResourceUtil;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -81,7 +78,7 @@ public class TestJobQueuesResource extends AdminTestBase {
     taskFactoryReg.put("DummyTask", new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new DummyTask(context);
+        return new MockTask(context);
       }
     });
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
deleted file mode 100644
index b6054d0..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-
-public class DummyTask implements Task {
-  private static final String TIMEOUT_CONFIG = "Timeout";
-  private final long _delay;
-  private volatile boolean _canceled;
-
-  public DummyTask(TaskCallbackContext context) {
-    JobConfig jobCfg = context.getJobConfig();
-    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-    if (cfg == null) {
-      cfg = Collections.emptyMap();
-    }
-    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-  }
-
-  @Override
-  public TaskResult run() {
-    long expiry = System.currentTimeMillis() + _delay;
-    long timeLeft;
-    while (System.currentTimeMillis() < expiry) {
-      if (_canceled) {
-        timeLeft = expiry - System.currentTimeMillis();
-        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-            : timeLeft));
-      }
-      sleep(50);
-    }
-    timeLeft = expiry - System.currentTimeMillis();
-    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-  }
-
-  @Override
-  public void cancel() {
-    _canceled = true;
-  }
-
-  private static void sleep(long d) {
-    try {
-      Thread.sleep(d);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
new file mode 100644
index 0000000..71fa12d
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -0,0 +1,73 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskResult;
+
+public class MockTask implements Task {
+  public static final String TASK_COMMAND = "Reindex";
+  private static final String TIMEOUT_CONFIG = "Timeout";
+  private final long _delay;
+  private volatile boolean _canceled;
+
+  public MockTask(TaskCallbackContext context) {
+    JobConfig jobCfg = context.getJobConfig();
+    Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
+    if (cfg == null) {
+      cfg = Collections.emptyMap();
+    }
+    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
+  }
+
+  @Override
+  public TaskResult run() {
+    long expiry = System.currentTimeMillis() + _delay;
+    long timeLeft;
+    while (System.currentTimeMillis() < expiry) {
+      if (_canceled) {
+        timeLeft = expiry - System.currentTimeMillis();
+        return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+            : timeLeft));
+      }
+      sleep(50);
+    }
+    timeLeft = expiry - System.currentTimeMillis();
+    return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+  }
+
+  @Override
+  public void cancel() {
+    _canceled = true;
+  }
+
+  private static void sleep(long d) {
+    try {
+      Thread.sleep(d);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
new file mode 100644
index 0000000..c5dd099
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -0,0 +1,253 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+
+/**
+ * Static test utility methods.
+ */
+public class TaskTestUtil {
+  private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
+
+  /**
+   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
+   * reached.
+   * If the task has not reached target state by then, an error is thrown
+   * @param workflowResource Resource to poll for completeness
+   * @throws InterruptedException
+   */
+  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+      TaskState state) throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
+        && System.currentTimeMillis() < st + _default_timeout);
+
+    Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getWorkflowState(), state);
+  }
+
+  /**
+   * poll for job until it is at either state in targetStates.
+   * @param manager
+   * @param workflowResource
+   * @param jobName
+   * @param targetStates
+   * @throws InterruptedException
+   */
+  public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
+      TaskState... targetStates) throws InterruptedException {
+    // Get workflow config
+    WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
+    Assert.assertNotNull(wfCfg);
+    WorkflowContext ctx;
+    if (wfCfg.isRecurring()) {
+      // if it's recurring, need to reconstruct workflow and job name
+      do {
+        Thread.sleep(100);
+        ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
+      Assert.assertNotNull(ctx);
+      Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
+      jobName = jobName.substring(workflowResource.length() + 1);
+      workflowResource = ctx.getLastScheduledSingleWorkflow();
+      jobName = String.format("%s_%s", workflowResource, jobName);
+    }
+
+    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
+    // Wait for state
+    long st = System.currentTimeMillis();
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    }
+    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
+        && System.currentTimeMillis() < st + _default_timeout);
+    Assert.assertNotNull(ctx);
+    Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
+  }
+
+  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
+      final String jobName) throws Exception {
+    final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
+    boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
+        return ctx == null || ctx.getJobState(namespacedJobName) == null;
+      }
+    }, _default_timeout);
+    Assert.assertTrue(succeed);
+  }
+
+  public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
+      throws InterruptedException {
+    // Wait for completion.
+    long st = System.currentTimeMillis();
+    WorkflowContext ctx;
+    do {
+      Thread.sleep(100);
+      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
+    } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
+    Assert.assertNotNull(ctx);
+    return ctx;
+  }
+
+  // 1. Different jobs in a same work flow is in RUNNING at the same time
+  // 2. No two jobs in the same work flow is in RUNNING at the same instance
+  public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
+      throws InterruptedException {
+
+    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
+    Assert.assertNotNull(workflowConfig);
+
+    WorkflowContext workflowContext = null;
+    while (workflowContext == null) {
+      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      Thread.sleep(100);
+    }
+
+    int maxRunningCount = 0;
+    boolean finished = false;
+
+    while (!finished) {
+      finished = true;
+      int runningCount = 0;
+
+      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
+      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+        TaskState jobState = workflowContext.getJobState(jobName);
+        if (jobState == TaskState.IN_PROGRESS) {
+          ++runningCount;
+          finished = false;
+        }
+      }
+
+      if (runningCount > maxRunningCount ) {
+        maxRunningCount = runningCount;
+      }
+
+      List<JobContext> jobContextList = new ArrayList<JobContext>();
+      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
+        JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
+        if (jobContext != null) {
+          jobContextList.add(TaskUtil.getJobContext(manager, jobName));
+        }
+      }
+
+      Set<String> instances = new HashSet<String>();
+      for (JobContext jobContext : jobContextList) {
+        for (int partition : jobContext.getPartitionSet()) {
+          String instance = jobContext.getAssignedParticipant(partition);
+          TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
+
+          if (instance == null) {
+            continue;
+          }
+          if (taskPartitionState != TaskPartitionState.INIT &&
+              taskPartitionState != TaskPartitionState.RUNNING) {
+            continue;
+          }
+          if (instances.contains(instance)) {
+            return false;
+          }
+
+          TaskPartitionState state = jobContext.getPartitionState(partition);
+          if (state != TaskPartitionState.COMPLETED) {
+            instances.add(instance);
+          }
+        }
+      }
+
+      Thread.sleep(100);
+    }
+
+    return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
+  }
+
+  public static Date getDateFromStartTime(String startTime)
+  {
+    int splitIndex = startTime.indexOf(':');
+    int hourOfDay = 0, minutes = 0;
+    try
+    {
+      hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
+      minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
+    }
+    catch (NumberFormatException e)
+    {
+
+    }
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
+    cal.set(Calendar.MINUTE, minutes);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTime();
+  }
+
+  public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
+    Map<String, String> cfgMap = new HashMap<String, String>();
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
+    cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
+    cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
+    cal.set(Calendar.MILLISECOND, 0);
+    cfgMap.put(WorkflowConfig.START_TIME,
+        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    //cfgMap.put(WorkflowConfig.START_TIME,
+    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
+    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
+  }
+
+  public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
+    return buildRecurrentJobQueue(jobQueueName, 0);
+  }
+
+  public static boolean pollForParticipantParallelState() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 40c2485..ba8367e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -32,7 +32,6 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -158,8 +157,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -185,8 +184,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -214,8 +213,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that each class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -242,8 +241,8 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -277,7 +276,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
@@ -309,7 +308,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(workflowBuilder.build());
 
     // Ensure completion
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
 
     // Ensure a single retry happened
     JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName + "_" + jobName);
@@ -317,7 +316,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertTrue(jobCtx.getFinishTime() - jobCtx.getStartTime() >= delay);
   }
 
-  private class TaskOne extends ReindexTask {
+  private class TaskOne extends MockTask {
     private final boolean _shouldFail;
     private final String _instanceName;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index da13ada..4e21ef7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -20,9 +20,6 @@ package org.apache.helix.integration.task;
  */
 
 import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -52,11 +49,9 @@ import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
@@ -108,10 +103,10 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory() {
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new ReindexTask(context);
+        return new MockTask(context);
       }
     });
 
@@ -162,46 +157,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
-  private Date getDateFromStartTime(String startTime)
-  {
-    int splitIndex = startTime.indexOf(':');
-    int hourOfDay = 0, minutes = 0;
-    try
-    {
-      hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
-      minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
-    }
-    catch (NumberFormatException e)
-    {
-
-    }
-    Calendar cal = Calendar.getInstance();
-    cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
-    cal.set(Calendar.MINUTE, minutes);
-    cal.set(Calendar.SECOND, 0);
-    cal.set(Calendar.MILLISECOND, 0);
-    return cal.getTime();
-  }
 
-  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
-    Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
-    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
-    cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
-    Calendar cal = Calendar.getInstance();
-    cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
-    cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
-    cal.set(Calendar.MILLISECOND, 0);
-    cfgMap.put(WorkflowConfig.START_TIME,
-        WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
-    //cfgMap.put(WorkflowConfig.START_TIME,
-    //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
-    return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
-  }
-
-  private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
-    return buildRecurrentJobQueue(jobQueueName, 0);
-  }
 
   @Test
   public void deleteRecreateRecurrentQueue() throws Exception {
@@ -209,14 +165,14 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuild = buildRecurrentJobQueue(queueName);
+    JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName);
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder jobConfig =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -226,25 +182,25 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     _driver.start(queueBuild.build());
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure job 1 is started before stop it
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
-    TestUtil
+    TaskTestUtil
         .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS);
 
     _driver.stop(queueName);
     _driver.delete(queueName);
     Thread.sleep(500);
 
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
     currentJobNames.clear();
     for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -255,17 +211,17 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.createQueue(queueBuilder.build());
 
 
-    wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
 
     // ensure jobs are started and completed
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0));
-    TestUtil
+    TaskTestUtil
         .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED);
 
     scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
     String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1));
-    TestUtil
+    TaskTestUtil
         .pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED);
   }
 
@@ -275,7 +231,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName, 5);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
@@ -285,7 +241,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setJobCommandConfigMap(commandConfig)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
@@ -296,20 +252,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     }
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
-        TaskState.COMPLETED);
+    TaskTestUtil
+        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+            TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -320,21 +277,21 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS,
         TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
-    TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -350,7 +307,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -366,7 +323,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
 
     // create jobs
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
@@ -378,7 +335,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setJobCommandConfigMap(commandConfig)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       jobs.add(job);
@@ -395,12 +352,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -424,17 +381,17 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
-    JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName);
 
-    JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex")
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
 
-    JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex")
+    JobConfig.Builder job2 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
 
-    JobConfig.Builder job3 = new JobConfig.Builder().setCommand("Reindex")
+    JobConfig.Builder job3 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
 
@@ -445,12 +402,12 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     _driver.createQueue(queueBuilder.build());
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are completed
     String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3");
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
 
     Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
     String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
@@ -488,51 +445,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
-    TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
-  }
-
-  public static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(TaskCallbackContext context) {
-      JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-      if (cfg == null) {
-        cfg = Collections.emptyMap();
-      }
-      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(10L);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 3352d1c..787ebcc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -45,7 +44,6 @@ import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
@@ -98,10 +96,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory() {
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new ReindexTask(context);
+        return new MockTask(context);
       }
     });
 
@@ -171,7 +169,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
             .setExpiry(expiry).build();
 
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS);
 
     // Running workflow should have config and context viewable through accessor
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -185,7 +183,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
 
     // Wait for job to finish and expire
-    TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
     TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
     Thread.sleep(expiry);
@@ -215,7 +213,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait for job completion
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // Ensure all partitions are completed individually
     JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
@@ -242,7 +240,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // wait for job completeness/timeout
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     // see if resulting context completed successfully for our partition set
     String namespacedName = TaskUtil.getNamespacedJobName(jobResource);
@@ -267,11 +265,11 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     new TaskDriver(_manager).start(flow);
 
     // Wait until the workflow completes
-    TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
 
     // Assert completion for all tasks within two minutes
     for (String task : flow.getJobConfigs().keySet()) {
-      TestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, workflowName, task, TaskState.COMPLETED);
     }
   }
 
@@ -287,7 +285,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait until the job reports failure.
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.FAILED);
 
     // Check that all partitions timed out up to maxAttempts
     JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
@@ -314,10 +312,10 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Set<String> master = Sets.newHashSet("MASTER");
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     _driver.enqueueJob(queueName, "masterJob", job1);
     _driver.enqueueJob(queueName, "slaveJob", job2);
@@ -325,8 +323,8 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Ensure successful completion
     String namespacedJob1 = queueName + "_masterJob";
     String namespacedJob2 = queueName + "_slaveJob";
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
     JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
     JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
 
@@ -352,48 +350,4 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
   }
-
-  private static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(TaskCallbackContext context) {
-      JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-      if (cfg == null) {
-        cfg = Collections.emptyMap();
-      }
-      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(50);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
index b8e1c09..6f1c48e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java
@@ -82,10 +82,10 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("DummyTask", new TaskFactory() {
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
       @Override
       public Task createNewTask(TaskCallbackContext context) {
-        return new DummyTask(context);
+        return new MockTask(context);
       }
     });
 
@@ -143,7 +143,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job =
-        new JobConfig.Builder().setCommand("DummyTask")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job: " + job1Name);
@@ -151,7 +151,7 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
 
     // check all tasks completed on MASTER
     String namespacedJob1 = String.format("%s_%s", queueName, job1Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
 
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -178,9 +178,9 @@ public class TestTaskRebalancerFailover extends ZkUnitTestBase {
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job);
 
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS);
     _participants[0].syncStop();
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
 
     // tasks previously assigned to localhost_12918 should be re-scheduled on new master
     ctx = TaskUtil.getJobContext(_manager, namespacedJob2);

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 2ff8c56..5180a04 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -26,31 +26,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.AccessOption;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
@@ -58,9 +48,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-
 public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
   private static final int n = 5;
   private static final int START_PORT = 12918;
@@ -105,10 +92,10 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
 
       final long delay = (i + 1) * 1000L;
       Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-      taskFactoryReg.put("Reindex", new TaskFactory() {
+      taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
         @Override
         public Task createNewTask(TaskCallbackContext context) {
-          return new ReindexTask(delay);
+          return new MockTask(context);
         }
       });
 
@@ -164,7 +151,7 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
     List<JobConfig.Builder> jobConfigBuilders = new ArrayList<JobConfig.Builder>();
     for (String testDbName : testDbNames) {
       jobConfigBuilders.add(
-          new JobConfig.Builder().setCommand("Reindex").setTargetResource(testDbName)
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(testDbName)
               .setTargetPartitionStates(Collections.singleton("SLAVE")));
     }
 
@@ -172,45 +159,6 @@ public class TestTaskRebalancerParallel extends ZkIntegrationTestBase {
       _driver.enqueueJob(queueName, "job_" + (i + 1), jobConfigBuilders.get(i));
     }
 
-    Assert.assertTrue(TestUtil.pollForWorkflowParallelState(_manager, queueName));
-  }
-
-  public static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(long delay) {
-      _delay = delay;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(50);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
+    Assert.assertTrue(TaskTestUtil.pollForWorkflowParallelState(_manager, queueName));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
index efe90b0..d25ffc5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java
@@ -138,7 +138,7 @@ public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase {
     _driver.start(flow);
 
     // Wait until the job completes.
-    TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED);
 
     JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource));
     for (int i = 0; i < _p; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 7437b72..b67fa90 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -21,7 +21,6 @@ package org.apache.helix.integration.task;
 
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -41,7 +40,6 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -53,7 +51,6 @@ import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
@@ -62,7 +59,6 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.util.PathUtils;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -108,12 +104,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
 
     Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put("Reindex", new TaskFactory() {
-      @Override
-      public Task createNewTask(TaskCallbackContext context) {
-        return new ReindexTask(context);
-      }
-    });
+    taskFactoryReg
+        .put(MockTask.TASK_COMMAND, new TaskFactory() {
+          @Override public Task createNewTask(TaskCallbackContext context) {
+            return new MockTask(context);
+          }
+        });
 
     // start dummy participants
     for (int i = 0; i < n; i++) {
@@ -173,15 +169,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     LOG.info("Starting flow " + flow.getName());
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing job");
     _driver.stop(JOB_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.STOPPED);
 
     LOG.info("Resuming job");
     _driver.resume(JOB_RESOURCE);
-    TestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, JOB_RESOURCE, TaskState.COMPLETED);
   }
 
   @Test
@@ -191,15 +187,15 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     LOG.info("Starting flow " + workflow);
     _driver.start(flow);
-    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS);
 
     LOG.info("Pausing workflow");
     _driver.stop(workflow);
-    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED);
 
     LOG.info("Resuming workflow");
     _driver.resume(workflow);
-    TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
+    TaskTestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
   }
 
   @Test
@@ -214,7 +210,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job: " + job1Name);
@@ -222,32 +218,32 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     String job2Name = "slaveJob";
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
 
     // stop job1
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // Ensure job2 is not started
     TimeUnit.MILLISECONDS.sleep(200);
     String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
-    TestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
 
     LOG.info("Resuming job-queue: " + queueName);
     _driver.resume(queueName);
 
     // Ensure successful completion
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
     JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
     JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
 
@@ -282,7 +278,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
@@ -294,13 +290,13 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", queueName, deletedJob1);
-    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -310,22 +306,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TestUtil.pollForJobState(_manager, queueName,
+    TaskTestUtil.pollForJobState(_manager, queueName,
         String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager,
-        queueName,
-        String.format("%s_%s", queueName, currentJobNames.get(1)),
-        TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", queueName, deletedJob2);
-    TestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -339,7 +333,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     // add job 3 back
     JobConfig.Builder job =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
             .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
     LOG.info("Enqueuing job: " + deletedJob2);
@@ -350,7 +344,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", queueName, currentJobNames.get(i));
-      TestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, queueName, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -398,7 +392,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex")
+          new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setJobCommandConfigMap(commandConfig)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition));
@@ -408,19 +402,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
       currentJobNames.add(i, jobName);
     }
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
+    TaskTestUtil
+        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // delete the in-progress job (job 1) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob1);
@@ -431,20 +426,20 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     _driver.resume(queueName);
 
     // ensure job 2 is started
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
-    TestUtil.pollForJobState(_manager, scheduledQueue,
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue,
         String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED);
-    TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
+    TaskTestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
     String deletedJob2 = currentJobNames.get(2);
     String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2);
-    TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
+    TaskTestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2);
 
     // delete not-started job (job 3) and verify it being deleted
     _driver.deleteJob(queueName, deletedJob2);
@@ -460,7 +455,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     long preJobFinish = 0;
     for (int i = 0; i < currentJobNames.size(); i++) {
       String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i));
-      TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
+      TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED);
 
       JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName);
       long jobStart = jobContext.getStartTime();
@@ -489,10 +484,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     for (int i = 0; i < JOB_COUNTS; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
-      JobConfig.Builder job =
-          new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
-              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+          .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+          .setTargetPartitionStates(Sets.newHashSet(targetPartition));
       jobs.add(job);
       jobNames.add(targetPartition.toLowerCase() + "Job" + i);
     }
@@ -504,12 +498,12 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     }
     String currentLastJob = jobNames.get(JOB_COUNTS - 2);
 
-    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
+    WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
     // ensure all jobs are finished
     String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob);
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED);
 
     // enqueue the last job
     LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
@@ -537,7 +531,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     // Enqueue 2 jobs
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job1: " + job1Name);
@@ -545,17 +539,17 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex")
+        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
             .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     String job2Name = "slaveJob";
     LOG.info("Enqueuing job2: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job2);
 
     String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
 
     String namespacedJob2 = String.format("%s_%s", queueName,  job2Name);
-    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    TaskTestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
 
     // Stop queue
     _driver.stop(queueName);
@@ -605,7 +599,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
 
     Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName)));
     Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName)));
-    TestUtil.pollForEmptyJobState(_manager, queueName, jobName);
+    TaskTestUtil.pollForEmptyJobState(_manager, queueName, jobName);
   }
 
   private void verifyJobNotInQueue(String queueName, String namedSpacedJobName) {
@@ -615,48 +609,4 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
     Assert.assertFalse(dag.getChildrenToParents().containsKey(namedSpacedJobName));
     Assert.assertFalse(dag.getParentsToChildren().containsKey(namedSpacedJobName));
   }
-
-  public static class ReindexTask implements Task {
-    private final long _delay;
-    private volatile boolean _canceled;
-
-    public ReindexTask(TaskCallbackContext context) {
-      JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
-      if (cfg == null) {
-        cfg = Collections.emptyMap();
-      }
-      _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L;
-    }
-
-    @Override
-    public TaskResult run() {
-      long expiry = System.currentTimeMillis() + _delay;
-      long timeLeft;
-      while (System.currentTimeMillis() < expiry) {
-        if (_canceled) {
-          timeLeft = expiry - System.currentTimeMillis();
-          return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
-              : timeLeft));
-        }
-        sleep(50);
-      }
-      timeLeft = expiry - System.currentTimeMillis();
-      return new TaskResult(TaskResult.Status.COMPLETED,
-          String.valueOf(timeLeft < 0 ? 0 : timeLeft));
-    }
-
-    @Override
-    public void cancel() {
-      _canceled = true;
-    }
-
-    private static void sleep(long d) {
-      try {
-        Thread.sleep(d);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
deleted file mode 100644
index d40ac89..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package org.apache.helix.integration.task;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.TestHelper;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.testng.Assert;
-
-/**
- * Static test utility methods.
- */
-public class TestUtil {
-  private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
-
-  /**
-   * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is
-   * reached.
-   * If the task has not reached target state by then, an error is thrown
-   * @param workflowResource Resource to poll for completeness
-   * @throws InterruptedException
-   */
-  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
-      TaskState state) throws InterruptedException {
-    // Wait for completion.
-    long st = System.currentTimeMillis();
-    WorkflowContext ctx;
-    do {
-      Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
-        && System.currentTimeMillis() < st + _default_timeout);
-
-    Assert.assertNotNull(ctx);
-    Assert.assertEquals(ctx.getWorkflowState(), state);
-  }
-
-  /**
-   * poll for job until it is at either state in targetStates.
-   * @param manager
-   * @param workflowResource
-   * @param jobName
-   * @param targetStates
-   * @throws InterruptedException
-   */
-  public static void pollForJobState(HelixManager manager, String workflowResource, String jobName,
-      TaskState... targetStates) throws InterruptedException {
-    // Get workflow config
-    WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource);
-    Assert.assertNotNull(wfCfg);
-    WorkflowContext ctx;
-    if (wfCfg.isRecurring()) {
-      // if it's recurring, need to reconstruct workflow and job name
-      do {
-        Thread.sleep(100);
-        ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-      } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null));
-      Assert.assertNotNull(ctx);
-      Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow());
-      jobName = jobName.substring(workflowResource.length() + 1);
-      workflowResource = ctx.getLastScheduledSingleWorkflow();
-      jobName = String.format("%s_%s", workflowResource, jobName);
-    }
-
-    Set<TaskState> allowedStates = new HashSet<TaskState>(Arrays.asList(targetStates));
-    // Wait for state
-    long st = System.currentTimeMillis();
-    do {
-      Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    }
-    while ((ctx == null || ctx.getJobState(jobName) == null || !allowedStates.contains(ctx.getJobState(jobName)))
-        && System.currentTimeMillis() < st + _default_timeout);
-    Assert.assertNotNull(ctx);
-    Assert.assertTrue(allowedStates.contains(ctx.getJobState(jobName)));
-  }
-
-  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
-      final String jobName) throws Exception {
-    final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
-    boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
-
-      @Override
-      public boolean verify() throws Exception {
-        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
-        return ctx == null || ctx.getJobState(namespacedJobName) == null;
-      }
-    }, _default_timeout);
-    Assert.assertTrue(succeed);
-  }
-
-  public static WorkflowContext pollForWorkflowContext(HelixManager manager, String workflowResource)
-      throws InterruptedException {
-    // Wait for completion.
-    long st = System.currentTimeMillis();
-    WorkflowContext ctx;
-    do {
-      Thread.sleep(100);
-      ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    } while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
-    Assert.assertNotNull(ctx);
-    return ctx;
-  }
-
-  // 1. Different jobs in a same work flow is in RUNNING at the same time
-  // 2. No two jobs in the same work flow is in RUNNING at the same instance
-  public static boolean pollForWorkflowParallelState(HelixManager manager, String workflowName)
-      throws InterruptedException {
-
-    WorkflowConfig workflowConfig = TaskUtil.getWorkflowCfg(manager, workflowName);
-    Assert.assertNotNull(workflowConfig);
-
-    WorkflowContext workflowContext = null;
-    while (workflowContext == null) {
-      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
-      Thread.sleep(100);
-    }
-
-    int maxRunningCount = 0;
-    boolean finished = false;
-
-    while (!finished) {
-      finished = true;
-      int runningCount = 0;
-
-      workflowContext = TaskUtil.getWorkflowContext(manager, workflowName);
-      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
-        TaskState jobState = workflowContext.getJobState(jobName);
-        if (jobState == TaskState.IN_PROGRESS) {
-          ++runningCount;
-          finished = false;
-        }
-      }
-
-      if (runningCount > maxRunningCount ) {
-        maxRunningCount = runningCount;
-      }
-
-      List<JobContext> jobContextList = new ArrayList<JobContext>();
-      for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
-        JobContext jobContext = TaskUtil.getJobContext(manager, jobName);
-        if (jobContext != null) {
-          jobContextList.add(TaskUtil.getJobContext(manager, jobName));
-        }
-      }
-
-      Set<String> instances = new HashSet<String>();
-      for (JobContext jobContext : jobContextList) {
-        for (int partition : jobContext.getPartitionSet()) {
-          String instance = jobContext.getAssignedParticipant(partition);
-          TaskPartitionState taskPartitionState = jobContext.getPartitionState(partition);
-
-          if (instance == null) {
-            continue;
-          }
-          if (taskPartitionState != TaskPartitionState.INIT &&
-              taskPartitionState != TaskPartitionState.RUNNING) {
-            continue;
-          }
-          if (instances.contains(instance)) {
-            return false;
-          }
-
-          TaskPartitionState state = jobContext.getPartitionState(partition);
-          if (state != TaskPartitionState.COMPLETED) {
-            instances.add(instance);
-          }
-        }
-      }
-
-      Thread.sleep(100);
-    }
-
-    return maxRunningCount > 1 && maxRunningCount <= workflowConfig.getParallelJobs();
-  }
-
-  public static boolean pollForParticipantParallelState() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/d381a3a1/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 23c35af..ce3a36a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -19,7 +19,6 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
@@ -27,7 +26,6 @@ import java.util.TreeMap;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.Workflow;
 import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Convenience class for generating various test workflows
@@ -44,7 +42,7 @@ public class WorkflowGenerator {
     Map<String, String> tmpMap = new TreeMap<String, String>();
     tmpMap.put("TargetResource", DEFAULT_TGT_DB);
     tmpMap.put("TargetPartitionStates", "MASTER");
-    tmpMap.put("Command", "Reindex");
+    tmpMap.put("Command", MockTask.TASK_COMMAND);
     tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000));
     DEFAULT_JOB_CONFIG = Collections.unmodifiableMap(tmpMap);
   }


Mime
View raw message