helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [1/3] helix git commit: [HELIX-617] Job IdealState is generated even the job is not running and not removed when it is completed.
Date Thu, 10 Dec 2015 07:04:53 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 7bbb20be6 -> 1798e7935


http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 38c9113..79adcd5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -24,13 +24,17 @@ import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
@@ -195,7 +199,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     return new JobQueue.Builder(jobQueueName).fromMap(cfgMap);
   }
 
-
   private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
     return buildRecurrentJobQueue(jobQueueName, 0);
   }
@@ -299,9 +302,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     // ensure job 1 is started before deleting it
     String deletedJob1 = currentJobNames.get(0);
     String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1);
-    TestUtil
-        .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
-            TaskState.COMPLETED);
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS,
+        TaskState.COMPLETED);
 
     // stop the queue
     LOG.info("Pausing job-queue: " + scheduledQueue);
@@ -417,54 +419,68 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
   public void testJobsDisableExternalView() throws Exception {
     String queueName = TestHelper.getTestMethodName();
 
+    ExternviewChecker externviewChecker = new ExternviewChecker();
+    _manager.addExternalViewChangeListener(externviewChecker);
+
     // Create a queue
     LOG.info("Starting job-queue: " + queueName);
     JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName);
 
-    // create jobs
-    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500));
+    JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("SLAVE"));
 
-    JobConfig.Builder job1 =
-        new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-            .setTargetPartitionStates(Sets.newHashSet("SLAVE"))
-            .setDisableExternalView(true);
+    JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true);
 
-    JobConfig.Builder job2 =
-        new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
-            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
-            .setTargetPartitionStates(Sets.newHashSet("MASTER"));
+    JobConfig.Builder job3 = new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+        .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false);
 
     // enqueue both jobs
     queueBuilder.enqueueJob("job1", job1);
     queueBuilder.enqueueJob("job2", job2);
+    queueBuilder.enqueueJob("job3", job3);
 
     _driver.createQueue(queueBuilder.build());
 
-
     WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName);
     String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
 
-    // ensure job1 is started
-    String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS,
-        TaskState.COMPLETED);
-
-    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
-    // verify external view for job does not exists
-    ExternalView externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob1));
-    Assert.assertNull(externalView, "External View for " + namedSpaceJob1 + " shoudld not
exist!");
+    // ensure all jobs are completed
+    String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3");
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED);
 
-    // ensure job2 is completed
+    Set<String> seenExternalViews = externviewChecker.getSeenExternalViews();
+    String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1");
     String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, "job2");
-    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.IN_PROGRESS,
-        TaskState.COMPLETED);
 
-    // verify external view for job does not exists
-    externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob2));
-    Assert.assertNotNull(externalView, "Can not find external View for " + namedSpaceJob2
+ "!");
+    Assert.assertTrue(seenExternalViews.contains(namedSpaceJob1),
+        "Can not find external View for " + namedSpaceJob1 + "!");
+    Assert.assertTrue(!seenExternalViews.contains(namedSpaceJob2),
+        "External View for " + namedSpaceJob2 + " shoudld not exist!");
+    Assert.assertTrue(seenExternalViews.contains(namedSpaceJob3),
+        "Can not find external View for " + namedSpaceJob3 + "!");
+
+    _manager
+        .removeListener(new PropertyKey.Builder(CLUSTER_NAME).externalViews(), externviewChecker);
   }
 
+  private static class ExternviewChecker implements ExternalViewChangeListener {
+    private Set<String> _seenExternalViews = new HashSet<String>();
+
+    @Override public void onExternalViewChange(List<ExternalView> externalViewList,
+        NotificationContext changeContext) {
+      for (ExternalView view : externalViewList) {
+        _seenExternalViews.add(view.getResourceName());
+      }
+    }
+
+    public Set<String> getSeenExternalViews() {
+      return _seenExternalViews;
+    }
+  }
 
   private void verifyJobDeleted(String queueName, String jobName) throws Exception {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();

http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index f49f941..f402b82 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -185,7 +185,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
     // Wait for job to finish and expire
     TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED);
     Thread.sleep(expiry);
-    _driver.invokeRebalance();
+    TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName());
     Thread.sleep(expiry);
 
     // Ensure workflow config and context were cleaned up by now


Mime
View raw message