helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-519] Add integration tests to ensure that kill-switch for Helix tasks work as expected, rb=26212
Date Wed, 01 Oct 2014 19:55:06 GMT
Repository: helix
Updated Branches:
  refs/heads/master 00dc16db3 -> cfacbbac3


[HELIX-519] Add integration tests to ensure that kill-switch for Helix tasks work as expected,
rb=26212


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cfacbbac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cfacbbac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cfacbbac

Branch: refs/heads/master
Commit: cfacbbac33d302d80968e7f11fe3b69468a17606
Parents: 00dc16d
Author: zzhang <zzhang@apache.org>
Authored: Wed Oct 1 11:08:39 2014 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Wed Oct 1 12:54:58 2014 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 20 +----
 .../java/org/apache/helix/task/TaskDriver.java  | 28 +++---
 .../task/TestTaskRebalancerStopResume.java      | 90 ++++++++++++++++++++
 .../apache/helix/integration/task/TestUtil.java | 21 ++++-
 4 files changed, 126 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index d36b6f5..6fa3d05 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
@@ -66,7 +65,6 @@ import org.apache.helix.controller.stages.ResourceValidationStage;
 import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -94,7 +92,7 @@ import com.google.common.collect.Lists;
  */
 public class GenericHelixController implements IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener,
+    ControllerChangeListener, InstanceConfigChangeListener,
     ScopedConfigChangeListener {
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
   volatile boolean init = false;
@@ -180,7 +178,7 @@ public class GenericHelixController implements IdealStateChangeListener,
   }
 
   /**
-   * Starts the rebalancing timer
+   * Stops the rebalancing timer
    */
   void stopRebalancingTimer() {
     if (_rebalanceTimer != null) {
@@ -278,7 +276,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     if (context != null) {
       if (context.getType() == Type.FINALIZE) {
         stopRebalancingTimer();
-        logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
+        logger.info("Get FINALIZE notification, skip the pipeline. Event: " + event.getName());
         return;
       } else {
         if (_clusterStatusMonitor == null) {
@@ -319,18 +317,6 @@ public class GenericHelixController implements IdealStateChangeListener,
   // callback
 
   @Override
-  public void onExternalViewChange(List<ExternalView> externalViewList,
-      NotificationContext changeContext) {
-    // logger.info("START: GenericClusterController.onExternalViewChange()");
-    // ClusterEvent event = new ClusterEvent("externalViewChange");
-    // event.addAttribute("helixmanager", changeContext.getManager());
-    // event.addAttribute("changeContext", changeContext);
-    // event.addAttribute("eventData", externalViewList);
-    // _eventQueue.put(event);
-    // logger.info("END: GenericClusterController.onExternalViewChange()");
-  }
-
-  @Override
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onStateChange()");

http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a341a3b..bcbe76a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -125,13 +125,13 @@ public class TaskDriver {
         }
         break;
       case stop:
-        driver.setTaskTargetState(resource, TargetState.STOP);
+        driver.setWorkflowTargetState(resource, TargetState.STOP);
         break;
       case resume:
-        driver.setTaskTargetState(resource, TargetState.START);
+        driver.setWorkflowTargetState(resource, TargetState.START);
         break;
       case delete:
-        driver.setTaskTargetState(resource, TargetState.DELETE);
+        driver.setWorkflowTargetState(resource, TargetState.DELETE);
         break;
       case list:
         driver.list(resource);
@@ -357,36 +357,36 @@ public class TaskDriver {
 
   /** Public method to resume a workflow/queue */
   public void resume(String workflow) {
-    setTaskTargetState(workflow, TargetState.START);
+    setWorkflowTargetState(workflow, TargetState.START);
   }
 
   /** Public method to stop a workflow/queue */
   public void stop(String workflow) {
-    setTaskTargetState(workflow, TargetState.STOP);
+    setWorkflowTargetState(workflow, TargetState.STOP);
   }
 
   /** Public method to delete a workflow/queue */
   public void delete(String workflow) {
-    setTaskTargetState(workflow, TargetState.DELETE);
+    setWorkflowTargetState(workflow, TargetState.DELETE);
   }
 
-  /** Helper function to change target state for a given task */
-  private void setTaskTargetState(String jobResource, TargetState state) {
-    setSingleTaskTargetState(jobResource, state);
+  /** Helper function to change target state for a given workflow */
+  private void setWorkflowTargetState(String workflowName, TargetState state) {
+    setSingleWorkflowTargetState(workflowName, state);
 
     // For recurring schedules, child workflows must also be handled
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs());
+    String prefix = workflowName + "_" + TaskConstants.SCHEDULED;
     for (String resource : resources) {
-      String prefix = resource + "_" + TaskConstants.SCHEDULED;
       if (resource.startsWith(prefix)) {
-        setSingleTaskTargetState(resource, state);
+        setSingleWorkflowTargetState(resource, state);
       }
     }
   }
 
-  /** Helper function to change target state for a given task */
-  private void setSingleTaskTargetState(String jobResource, final TargetState state) {
+  /** Helper function to change target state for a given workflow */
+  private void setSingleWorkflowTargetState(String workflowName, final TargetState state)
{
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
@@ -402,7 +402,7 @@ public class TaskDriver {
     List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
     updaters.add(updater);
     List<String> paths = Lists.newArrayList();
-    paths.add(accessor.keyBuilder().resourceConfig(jobResource).getPath());
+    paths.add(accessor.keyBuilder().resourceConfig(workflowName).getPath());
     accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
     invokeRebalance();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 18d2df1..b9e9811 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -20,17 +20,26 @@ package org.apache.helix.integration.task;
  */
 
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.MockParticipant;
 import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskDriver;
@@ -38,8 +47,10 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskResult;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
@@ -48,6 +59,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 
 public class TestTaskRebalancerStopResume extends ZkTestBase {
   private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
@@ -68,6 +80,8 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
+    System.out.println("START " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis()));
+
     String namespace = "/" + CLUSTER_NAME;
     if (_zkclient.exists(namespace)) {
       _zkclient.deleteRecursive(namespace);
@@ -127,15 +141,18 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
             .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr,
                 CLUSTER_NAME));
     Assert.assertTrue(result);
+    System.out.println("END BEFORECLASS " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 
   @AfterClass
   public void afterClass() throws Exception {
+    System.out.println("START AFTERCLASS " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis()));
     _controller.syncStop();
     for (int i = 0; i < n; i++) {
       _participants[i].syncStop();
     }
     _manager.disconnect();
+    System.out.println("END " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 
   @Test
@@ -176,6 +193,79 @@ public class TestTaskRebalancerStopResume extends ZkTestBase {
     TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
   }
 
+  @Test
+  public void stopAndResumeNamedQueue() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = new JobQueue.Builder(queueName).build();
+    _driver.createQueue(queue);
+
+    // Enqueue jobs
+    Set<String> master = Sets.newHashSet("MASTER");
+    JobConfig.Builder job1 =
+        new JobConfig.Builder().setCommand("Reindex")
+            .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+    String job1Name = "masterJob";
+    LOG.info("Enqueuing job: " + job1Name);
+    _driver.enqueueJob(queueName, job1Name, job1);
+
+    Set<String> slave = Sets.newHashSet("SLAVE");
+    JobConfig.Builder job2 =
+    new JobConfig.Builder().setCommand("Reindex")
+        .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+    String job2Name = "slaveJob";
+    LOG.info("Enqueuing job: " + job2Name);
+    _driver.enqueueJob(queueName, job2Name, job2);
+
+    String namespacedJob1 = String.format("%s_%s", queueName,  job1Name);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS);
+
+    // stop job1
+    LOG.info("Pausing job-queue: " + queueName);
+    _driver.stop(queueName);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED);
+    TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
+
+    // Ensure job2 is not started
+    TimeUnit.MILLISECONDS.sleep(200);
+    String namespacedJob2 = String.format("%s_%s", queueName, job2Name);
+    TestUtil.pollForEmptyJobState(_manager, queueName, job2Name);
+
+    LOG.info("Resuming job-queue: " + queueName);
+    _driver.resume(queueName);
+
+    // Ensure successful completion
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED);
+    TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED);
+    JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1);
+    JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2);
+
+    // Ensure correct ordering
+    long job1Finish = masterJobContext.getFinishTime();
+    long job2Start = slaveJobContext.getStartTime();
+    Assert.assertTrue(job2Start >= job1Finish);
+
+    // Flush queue and check cleanup
+    LOG.info("Flusing job-queue: " + queueName);
+    _driver.flushQueue(queueName);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2)));
+    Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2)));
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName);
+    JobDag dag = workflowCfg.getJobDag();
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1));
+    Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1));
+    Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2));
+  }
+
   public static class ReindexTask implements Task {
     private final long _delay;
     private volatile boolean _canceled;

http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index f599920..413b98a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.task;
  */
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowContext;
@@ -29,6 +30,8 @@ import org.testng.Assert;
  * Static test utility methods.
  */
 public class TestUtil {
+  private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
+
   /**
    * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout
is
    * reached.
@@ -45,7 +48,7 @@ public class TestUtil {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() !=
state)
-        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+        && System.currentTimeMillis() < st + _default_timeout);
 
     Assert.assertNotNull(ctx);
     Assert.assertEquals(ctx.getWorkflowState(), state);
@@ -60,8 +63,22 @@ public class TestUtil {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
     } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName)
!= state)
-        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+        && System.currentTimeMillis() < st + _default_timeout);
     Assert.assertNotNull(ctx);
+    Assert.assertEquals(ctx.getJobState(jobName), state);
   }
 
+  public static void pollForEmptyJobState(final HelixManager manager, final String workflowName,
+      final String jobName) throws Exception {
+    final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
+    boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
+
+      @Override
+      public boolean verify() throws Exception {
+        WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName);
+        return ctx == null || ctx.getJobState(namespacedJobName) == null;
+      }
+    }, _default_timeout);
+    Assert.assertTrue(succeed);
+  }
 }


Mime
View raw message