helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: Fix a few of unstable integration tests.
Date Tue, 24 Apr 2018 19:46:09 GMT
Repository: helix
Updated Branches:
  refs/heads/master 3d2d57b05 -> d2fb22d1f


Fix a few of unstable integration tests.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d2fb22d1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d2fb22d1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d2fb22d1

Branch: refs/heads/master
Commit: d2fb22d1f3a3db602b27cba7ed8d814cb931622e
Parents: 3d2d57b
Author: Lei Xia <lxia@linkedin.com>
Authored: Fri Apr 20 16:11:27 2018 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Tue Apr 24 12:45:43 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/integration/TestDrop.java  |  5 +++-
 .../helix/integration/TestZkConnectionLost.java |  2 +-
 .../messaging/TestP2PMessageSemiAuto.java       | 16 +++++++++---
 .../apache/helix/integration/task/MockTask.java | 10 ++++----
 .../helix/integration/task/TaskTestUtil.java    |  3 ++-
 .../integration/task/TestDeleteWorkflow.java    |  8 +++---
 .../helix/integration/task/TestJobFailure.java  |  2 +-
 .../integration/task/TestJobQueueCleanUp.java   |  2 +-
 .../helix/integration/task/TestJobTimeout.java  |  2 +-
 .../task/TestRebalanceRunningTask.java          | 12 ++++-----
 .../integration/task/TestRecurringJobQueue.java |  4 +--
 .../integration/task/TestTaskRebalancer.java    | 11 ++++-----
 .../task/TestTaskRebalancerStopResume.java      | 23 +++++++++--------
 .../integration/task/TestTaskThreadLeak.java    | 26 +++++++++++++-------
 .../integration/task/TestTaskThrottling.java    |  4 +--
 .../task/TestWorkflowTermination.java           |  2 +-
 .../integration/task/TestWorkflowTimeout.java   |  4 +--
 .../task/TestGetLastScheduledTaskTimestamp.java |  2 +-
 18 files changed, 78 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
index 805f55d..3142b80 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDrop.java
@@ -179,6 +179,8 @@ public class TestDrop extends ZkIntegrationTestBase {
         new BestPossibleExternalViewVerifier.Builder(clusterName).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(verifier.verify());
 
+    Thread.sleep(400);
+
     assertEmptyCSandEV(className, "TestDB0", participants);
 
     // clean up
@@ -372,6 +374,7 @@ public class TestDrop extends ZkIntegrationTestBase {
     verifier =
         new BestPossibleExternalViewVerifier.Builder(clusterName).setZkAddr(ZK_ADDR).build();
     Assert.assertTrue(verifier.verify(), "Should be empty exeternal-view");
+    Thread.sleep(400);
 
     assertEmptyCSandEV(clusterName, "TestDB0", participants);
 
@@ -438,7 +441,7 @@ public class TestDrop extends ZkIntegrationTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
     Assert.assertTrue(verifier.verify());
 
-    Thread.sleep(2000);
+    Thread.sleep(400);
     assertEmptyCSandEV(clusterName, "schemata", participants);
 
     // clean up

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
index 8327697..6fb966e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
@@ -142,7 +142,7 @@ public class TestZkConnectionLost extends TaskTestBase {
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
               .setTargetPartitionStates(Sets.newHashSet(targetPartition))
-              .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "100"));
+              .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100"));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
       queueBuild.enqueueJob(jobName, jobConfig);
       currentJobNames.add(jobName);

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
index 67a59c0..43fedad 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -205,20 +205,30 @@ public class TestP2PMessageSemiAuto extends ZkIntegrationTestBase {
     Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
     LiveInstance liveInstance = liveInstanceMap.get(instance);
 
-    Map<String, CurrentState> currentStateMap = dataCache.getCurrentState(instance,
liveInstance.getSessionId());
+    Map<String, CurrentState> currentStateMap = dataCache.getCurrentState(instance,
+        liveInstance.getSessionId());
     Assert.assertNotNull(currentStateMap);
     CurrentState currentState = currentStateMap.get(dbName);
     Assert.assertNotNull(currentState);
     Assert.assertEquals(currentState.getPartitionStateMap().size(), PARTITION_NUMBER);
 
+    int total = 0;
+    int expectedHost = 0;
     for (String partition : currentState.getPartitionStateMap().keySet()) {
       String state = currentState.getState(partition);
       Assert.assertEquals(state, expectedState,
           dbName + " Partition " + partition + "'s state is different as expected!");
       String triggerHost = currentState.getTriggerHost(partition);
-      Assert.assertEquals(triggerHost, expectedTriggerHost,
-          "Partition " + partition + "'s transition to Master was not triggered by expected
host!");
+      if (triggerHost.equals(expectedTriggerHost)) {
+        expectedHost ++;
+      }
+      total ++;
     }
+
+    double ratio = ((double) expectedHost) / ((double) total);
+    Assert.assertTrue(ratio >= 0.7, String
+        .format("Only %d out of %d percent transitions to Master were triggered by expected
host!",
+            expectedHost, total));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
index 3e03cac..778bd10 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
@@ -33,7 +33,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 
 public class MockTask extends UserContentStore implements Task {
   public static final String TASK_COMMAND = "Reindex";
-  public static final String TIMEOUT_CONFIG = "Timeout";
+  public static final String JOB_DELAY = "Delay";
   public static final String TASK_RESULT_STATUS = "TaskResultStatus";
   public static final String THROW_EXCEPTION = "ThrowException";
   public static final String ERROR_MESSAGE = "ErrorMessage";
@@ -55,7 +55,7 @@ public class MockTask extends UserContentStore implements Task {
   public MockTask(TaskCallbackContext context) {
     Map<String, String> cfg = context.getJobConfig().getJobCommandConfigMap();
     if (cfg == null) {
-      cfg = new HashMap<String, String>();
+      cfg = new HashMap<>();
     }
 
     TaskConfig taskConfig = context.getTaskConfig();
@@ -64,7 +64,7 @@ public class MockTask extends UserContentStore implements Task {
       cfg.putAll(taskConfigMap);
     }
 
-    _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) :
100L;
+    _delay = cfg.containsKey(JOB_DELAY) ? Long.parseLong(cfg.get(JOB_DELAY)) : 100L;
     _notAllowToCancel = cfg.containsKey(NOT_ALLOW_TO_CANCEL)
         ? Boolean.parseBoolean(cfg.get(NOT_ALLOW_TO_CANCEL))
         : false;
@@ -92,8 +92,8 @@ public class MockTask extends UserContentStore implements Task {
           deserializeTargetPartitionConfig(cfg.get(TARGET_PARTITION_CONFIG));
       if (targetPartitionConfigs.containsKey(targetPartition)) {
         Map<String, String> targetPartitionConfig = targetPartitionConfigs.get(targetPartition);
-        if (targetPartitionConfig.containsKey(TIMEOUT_CONFIG)) {
-          _delay = Long.parseLong(targetPartitionConfig.get(TIMEOUT_CONFIG));
+        if (targetPartitionConfig.containsKey(JOB_DELAY)) {
+          _delay = Long.parseLong(targetPartitionConfig.get(JOB_DELAY));
         }
         if (targetPartitionConfig.containsKey(TASK_RESULT_STATUS)) {
           _taskResultStatus = TaskResult.Status.valueOf(targetPartitionConfig.get(TASK_RESULT_STATUS));

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 76f0e32..f4afbed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -70,7 +70,8 @@ public class TaskTestUtil {
 
       @Override public boolean verify() throws Exception {
         WorkflowContext ctx = driver.getWorkflowContext(workflowName);
-        return ctx == null || ctx.getJobState(namespacedJobName) == null;
+        return ctx == null || ctx.getJobState(namespacedJobName) == null
+            || ctx.getJobState(namespacedJobName) == TaskState.NOT_STARTED;
       }
     }, _default_timeout);
     Assert.assertTrue(succeed);

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
index 381bdeb..0b7ba95 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -1,8 +1,6 @@
 package org.apache.helix.integration.task;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -35,7 +33,7 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
         .setMaxAttemptsPerTask(1)
         .setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "100000"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1", jobBuilder);
@@ -75,7 +73,7 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
         .setMaxAttemptsPerTask(1)
         .setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000000"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1", jobBuilder);
@@ -120,7 +118,7 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
         .setMaxAttemptsPerTask(1)
         .setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000000"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1", jobBuilder);

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
index 79c085e..5eb462b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobFailure.java
@@ -139,7 +139,7 @@ public final class TestJobFailure extends TaskSynchronizedTestBase {
       } else if (taskStates.get(i).equals(TaskPartitionState.TASK_ABORTED.name())) {
         config.put(MockTask.TASK_RESULT_STATUS, TaskResult.Status.FATAL_FAILED.name());
       } else if (taskStates.get(i).equals(TaskPartitionState.RUNNING.name())) {
-        config.put(MockTask.TIMEOUT_CONFIG, "99999999");
+        config.put(MockTask.JOB_DELAY, "99999999");
       } else {
         throw new IllegalArgumentException("Invalid taskStates input: " + taskStates.get(i));
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
index dceee8f..d2a495c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobQueueCleanUp.java
@@ -70,7 +70,7 @@ public class TestJobQueueCleanUp extends TaskTestBase {
       builder.enqueueJob("JOB" + i, jobBuilder);
     }
     builder.enqueueJob("JOB" + 3,
-        jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000")));
+        jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000000")));
     builder.enqueueJob("JOB" + 4, jobBuilder);
     _driver.start(builder.build());
     _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + 3),

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
index 56fc04a..8482f69 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobTimeout.java
@@ -78,7 +78,7 @@ public final class TestJobTimeout extends TaskSynchronizedTestBase {
         .setTargetResource(DB_NAME)
         .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
         .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")) //
task stuck
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")) // task
stuck
         .setTimeout(1000);
 
     JobConfig.Builder secondJobBuilder = new JobConfig.Builder()

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
index 7fb3042..f518d5c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
@@ -135,7 +135,7 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase
{
         // different instances
         .setNumConcurrentTasksPerInstance(100)
         .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); //
task stuck
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); // task
stuck
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
         .addJob(JOB, jobBuilder);
@@ -163,7 +163,7 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase
{
         .setCommand(MockTask.TASK_COMMAND)
         .setFailureThreshold(10)
         .setMaxAttemptsPerTask(2)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); //
task stuck
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); // task
stuck
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
         .addJob(JOB, jobBuilder);
@@ -198,7 +198,7 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase
{
         .setNumConcurrentTasksPerInstance(100)
         .setCommand(MockTask.TASK_COMMAND)
         .setRebalanceRunningTask(true)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); //
task stuck
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); // task
stuck
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
         .addJob(JOB, jobBuilder);
@@ -227,7 +227,7 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase
{
         .setTargetResource(DATABASE)
         .setNumConcurrentTasksPerInstance(100)
         .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW)
         .addJob(JOB, jobBuilder);
@@ -258,7 +258,7 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase
{
         .setMaxAttemptsPerTask(2)
         .setCommand(MockTask.TASK_COMMAND)
         .setJobCommandConfigMap(
-            ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck
+            ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); // task stuck
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW).addJob(JOB, jobBuilder);
 
@@ -292,7 +292,7 @@ public final class TestRebalanceRunningTask extends TaskSynchronizedTestBase
{
         .setRebalanceRunningTask(true)
         .setCommand(MockTask.TASK_COMMAND)
         .setJobCommandConfigMap(
-            ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999")); // task stuck
+            ImmutableMap.of(MockTask.JOB_DELAY, "99999999")); // task stuck
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW).addJob(JOB, jobBuilder);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index fbac2ac..4569453 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -94,7 +94,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5);
 
     // Create and Enqueue jobs
-    Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(500));
     Thread.sleep(100);
     List<String> currentJobNames = createAndEnqueueJob(queueBuilder, 5);
     _driver.createQueue(queueBuilder.build());
@@ -174,7 +174,7 @@ public class TestRecurringJobQueue extends TaskTestBase {
     // create jobs
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
     List<String> jobNames = new ArrayList<String>();
-    Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(500));
 
     final int JOB_COUNTS = 3;
     for (int i = 0; i < JOB_COUNTS; i++) {

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 1aeefb9..17c0b1b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -47,8 +47,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
 public class TestTaskRebalancer extends TaskTestBase {
-  private static final String TIMEOUT_CONFIG = "Timeout";
-
   @Test
   public void basic() throws Exception {
     basic(100);
@@ -63,7 +61,7 @@ public class TestTaskRebalancer extends TaskTestBase {
   public void testExpiry() throws Exception {
     String jobName = "Expiry";
     long expiry = 1000;
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(100));
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(commandConfig);
 
@@ -104,7 +102,7 @@ public class TestTaskRebalancer extends TaskTestBase {
     // resource.
     final String jobResource = "basic" + jobCompletionTime;
     Map<String, String> commandConfig =
-        ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(jobCompletionTime));
+        ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(jobCompletionTime));
 
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(commandConfig);
@@ -130,7 +128,7 @@ public class TestTaskRebalancer extends TaskTestBase {
         ImmutableList.of("TestDB_1", "TestDB_2", "TestDB_3", "TestDB_5", "TestDB_8", "TestDB_13");
 
     // construct and submit our basic workflow
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(100));
 
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
     jobBuilder.setJobCommandConfigMap(commandConfig).setMaxAttemptsPerTask(1)
@@ -199,7 +197,8 @@ public class TestTaskRebalancer extends TaskTestBase {
           sawTimedoutTask = true;
         }
         // At least one task timed out, other might be aborted due to job failure.
-        Assert.assertTrue(state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ABORTED);
+        Assert.assertTrue(
+            state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ABORTED);
         maxAttempts = Math.max(maxAttempts, ctx.getPartitionNumAttempts(i));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 8540a84..2f5445b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.task;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -35,7 +36,6 @@ import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.ScheduleConfig;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskState;
@@ -56,11 +56,10 @@ import com.google.common.collect.Sets;
 
 public class TestTaskRebalancerStopResume extends TaskTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(TestTaskRebalancerStopResume.class);
-  private static final String TIMEOUT_CONFIG = "Timeout";
   private static final String JOB_RESOURCE = "SomeJob";
 
   @Test public void stopAndResume() throws Exception {
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(100));
 
     JobConfig.Builder jobBuilder =
         JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG);
@@ -110,9 +109,9 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     // Enqueue jobs
     Set<String> master = Sets.newHashSet("MASTER");
-    JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master)
+        .setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "200"));
     String job1Name = "masterJob";
     LOG.info("Enqueuing job: " + job1Name);
     _driver.enqueueJob(queueName, job1Name, job1);
@@ -135,7 +134,6 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     _driver.pollForWorkflowState(queueName, TaskState.STOPPED);
 
     // Ensure job2 is not started
-    TimeUnit.MILLISECONDS.sleep(200);
     String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
     TaskTestUtil.pollForEmptyJobState(_driver, queueName, job2Name);
 
@@ -180,7 +178,8 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
       JobConfig.Builder jobBuilder =
           new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND)
               .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition))
+              .setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "200"));
       String jobName = targetPartition.toLowerCase() + "Job" + i;
       LOG.info("Enqueuing job: " + jobName);
       queueBuilder.enqueueJob(jobName, jobBuilder);
@@ -279,7 +278,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(200));
     for (int i = 0; i <= 4; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
@@ -368,7 +367,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
     List<String> jobNames = new ArrayList<String>();
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    Map<String, String> commandConfig = ImmutableMap.of(MockTask.JOB_DELAY, String.valueOf(200));
 
 
     int JOB_COUNTS = 3;
@@ -492,7 +491,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
 
     // Add 2 jobs
     Map<String, String> jobCommandConfigMap = new HashMap<String, String>();
-    jobCommandConfigMap.put(MockTask.TIMEOUT_CONFIG, "1000000");
+    jobCommandConfigMap.put(MockTask.JOB_DELAY, "1000000");
     jobCommandConfigMap.put(MockTask.NOT_ALLOW_TO_CANCEL, String.valueOf(true));
     List<TaskConfig> taskConfigs = ImmutableList
         .of(new TaskConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTaskId("testTask")
@@ -510,7 +509,7 @@ public class TestTaskRebalancerStopResume extends TaskTestBase {
     builder.addJob(job2Name, job2);
 
     _driver.start(builder.build());
-    Thread.sleep(2000);
+    Thread.sleep(1000);
     _driver.stop(workflowName);
     _driver.pollForWorkflowState(workflowName, TaskState.STOPPING);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
index 188e61c..95ac55b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThreadLeak.java
@@ -19,26 +19,25 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.util.HashSet;
 import java.util.Set;
 import org.apache.helix.TestHelper;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.TaskUtil;
-import org.apache.helix.task.Workflow;
 import org.apache.helix.task.WorkflowConfig;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestTaskThreadLeak extends TaskTestBase {
+  private int _threadCountBefore = 0;
 
   @BeforeClass
   public void beforeClass() throws Exception {
+    _threadCountBefore = getThreadCount("TaskStateModelFactory");
     setSingleTestEnvironment();
     _numNodes = 1;
     super.beforeClass();
@@ -69,14 +68,23 @@ public class TestTaskThreadLeak extends TaskTestBase {
     String nameSpacedJob = TaskUtil.getNamespacedJobName(queueName, lastJob);
     _driver.pollForJobState(queueName, nameSpacedJob, TaskState.COMPLETED);
 
-    Set<Thread> threads = Thread.getAllStackTraces().keySet();
-    Set<Thread> taskThreads = new HashSet<>();
-    for (Thread t : threads) {
-      if (t.getName().contains("TaskStateModelFactory")) {
-        taskThreads.add(t);
+
+    int threadCountAfter = getThreadCount("TaskStateModelFactory");
+
+    Assert.assertTrue(
+        (threadCountAfter - _threadCountBefore) <= TaskStateModelFactory.TASK_THREADPOOL_SIZE
+ 1);
+  }
+
+
+  private int getThreadCount(String threadPrefix) {
+    int count = 0;
+    Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
+    for (Thread t : allThreads) {
+      if (t.getName().contains(threadPrefix)) {
+        count ++;
       }
     }
 
-    Assert.assertTrue(taskThreads.size() <= TaskStateModelFactory.TASK_THREADPOOL_SIZE
+ 1);
+    return count;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
index d22db85..bec9505 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskThrottling.java
@@ -94,7 +94,7 @@ public class TestTaskThrottling extends TaskTestBase {
     _driver.pollForWorkflowState(flow.getName(), TaskState.STOPPED);
 
     // 3. Ensure job can finish normally
-    jobConfig.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "10"));
+    jobConfig.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "10"));
     String jobName3 = "Job3";
     flow = WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName3, jobConfig).build();
     _driver.start(flow);
@@ -167,7 +167,7 @@ public class TestTaskThrottling extends TaskTestBase {
     }
     jobConfig.addTaskConfigs(taskConfigs)
         .setNumConcurrentTasksPerInstance(numTasks)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "120000"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "120000"));
     return jobConfig;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index 71d494b..9e3e880 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -46,7 +46,7 @@ public class TestWorkflowTermination extends TaskTestBase {
         .setCommand(MockTask.TASK_COMMAND)
         .setJobCommandConfigMap(
             ImmutableMap.of(
-                MockTask.TIMEOUT_CONFIG, Long.toString(timeoutMs),
+                MockTask.JOB_DELAY, Long.toString(timeoutMs),
                 MockTask.TASK_RESULT_STATUS, taskState
             )
         );

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
index 0229943..18cfa6c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTimeout.java
@@ -33,7 +33,7 @@ public class TestWorkflowTimeout extends TaskTestBase {
         .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
         .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
         .setCommand(MockTask.TASK_COMMAND)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
   }
 
   @Test
@@ -80,7 +80,7 @@ public class TestWorkflowTimeout extends TaskTestBase {
         TaskState.COMPLETED);
 
     // Add back the config
-    _jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "99999999"));
+    _jobBuilder.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "99999999"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/helix/blob/d2fb22d1/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
index 5b41584..614278b 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestGetLastScheduledTaskTimestamp.java
@@ -78,7 +78,7 @@ public class TestGetLastScheduledTaskTimestamp extends TaskTestBase {
       taskConfigs.add(new TaskConfig.Builder()
           .setTaskId("task_" + i)
           .setCommand(MockTask.TASK_COMMAND)
-          .addConfig(MockTask.TIMEOUT_CONFIG, String.valueOf(taskTimeout))
+          .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout))
           .build());
     }
     // Run up to 2 tasks at a time


Mime
View raw message