[HELIX-784] TASK: Fix a bug in getExpiredJobs
getExpiredJobs(), when the job config is null, would just continue instead of adding it to
expiredJobs so that the job cleanup/purge would be re-tried. This could possibly cause purge
failures to leave a lot of jobs un-purged with just the job config missing in ZK. This RB
fixes this.
Changelist:
1. Add the job name to expiredJobs if the job config does not exist in ZK
2. Add a more detailed description in the error log
3. Add an integration test for two task-related stages: TaskPersistDataStage and TaskGarbageCollectionStage
in TestTaskStage.java
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/befb1036
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/befb1036
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/befb1036
Branch: refs/heads/master
Commit: befb1036f8d8be2729a800d3dde88fc1362a6489
Parents: 3d9c030
Author: narendly <narendly@gmail.com>
Authored: Thu Nov 1 16:57:33 2018 -0700
Committer: narendly <narendly@gmail.com>
Committed: Thu Nov 1 16:57:33 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskUtil.java | 6 +-
.../stages/TestTaskPersistDataStage.java | 91 ----------
.../helix/controller/stages/TestTaskStage.java | 165 +++++++++++++++++++
3 files changed, 170 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index b587408..8392d89 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -719,7 +719,11 @@ public class TaskUtil {
JobConfig jobConfig = TaskUtil.getJobConfig(dataAccessor, job);
JobContext jobContext = TaskUtil.getJobContext(propertyStore, job);
if (jobConfig == null) {
- LOG.error(String.format("Job %s exists in JobDAG but JobConfig is missing!", job));
+ LOG.error(String.format(
+ "Job %s exists in JobDAG but JobConfig is missing! Job might have been deleted
manually from the JobQueue: %s, or left in the DAG due to a failed clean-up attempt from last
purge.",
+ job, workflowConfig.getWorkflowId()));
+ // Add the job name to expiredJobs so that purge operation will be tried again
on this job
+ expiredJobs.add(job);
continue;
}
long expiry = jobConfig.getExpiry();
http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
deleted file mode 100644
index 8fcedfa..0000000
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskPersistDataStage.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.helix.controller.stages;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.common.ZkTestBase;
-import org.apache.helix.common.caches.TaskDataCache;
-import org.apache.helix.controller.stages.task.TaskPersistDataStage;
-import org.apache.helix.participant.MockZKHelixManager;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.TaskState;
-import org.apache.helix.task.Workflow;
-import org.apache.helix.task.WorkflowContext;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestTaskPersistDataStage extends ZkTestBase {
- private String CLUSTER_NAME = "TestCluster_" + TestHelper.getTestClassName();
- private HelixManager _helixManager;
- private TaskDriver _driver;
-
- @BeforeClass
- public void beforeClass() {
- _helixManager = new MockZKHelixManager(CLUSTER_NAME, "MockInstance", InstanceType.ADMINISTRATOR,
- _gZkClient);
- _driver = new TaskDriver(_gZkClient, CLUSTER_NAME);
- }
-
- @Test
- public void testTaskContextUpdate() {
- ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
- event.addAttribute(AttributeName.helixmanager.name(), _helixManager);
- TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
-
- ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
- TaskDataCache taskDataCache = cache.getTaskDataCache();
- String testWorkflow = TestHelper.getTestMethodName();
- String testJobPrefix = testWorkflow + "_Job_";
-
- WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(testWorkflow));
- wfCtx.setJobState(testJobPrefix + "0", TaskState.IN_PROGRESS);
- wfCtx.setJobState(testJobPrefix + "1", TaskState.COMPLETED);
- wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
- wfCtx.setName(testWorkflow);
- wfCtx.setStartTime(System.currentTimeMillis());
-
- JobContext jbCtx0 = new JobContext(new ZNRecord(testJobPrefix + "0"));
- jbCtx0.setName(testJobPrefix + "0");
- jbCtx0.setStartTime(System.currentTimeMillis());
- jbCtx0.setPartitionState(0, TaskPartitionState.RUNNING);
-
- JobContext jbCtx1 = new JobContext((new ZNRecord(testJobPrefix + "1")));
- jbCtx1.setName(testJobPrefix + "1");
- jbCtx1.setStartTime(System.currentTimeMillis());
- jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
-
- taskDataCache.updateWorkflowContext(testWorkflow, wfCtx);
- taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0);
- taskDataCache.updateJobContext(testJobPrefix + "1", jbCtx1);
-
- event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
- persistDataStage.process(event);
-
- jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
- wfCtx.setJobState(testJobPrefix + "0", TaskState.FAILED);
- taskDataCache.updateJobContext(testJobPrefix + "0", jbCtx0);
-
- wfCtx.getJobStates().remove(testJobPrefix + "1");
- taskDataCache.removeContext(testJobPrefix + "1");
-
- JobContext jbCtx2 = new JobContext(new ZNRecord(testJobPrefix + "2"));
- jbCtx2.setName(testJobPrefix + "2");
- jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
- wfCtx.setJobState(testJobPrefix + "2", TaskState.IN_PROGRESS);
- taskDataCache.updateJobContext(testJobPrefix + "2", jbCtx2);
-
- taskDataCache.updateWorkflowContext(testWorkflow, wfCtx);
- persistDataStage.process(event);
-
- Assert.assertEquals(_driver.getWorkflowContext(testWorkflow), wfCtx);
- Assert.assertEquals(_driver.getJobContext(testJobPrefix + "0"), jbCtx0);
- Assert.assertEquals(_driver.getJobContext(testJobPrefix + "2"), jbCtx2);
- Assert.assertNull(_driver.getJobContext(testJobPrefix + "1"));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/befb1036/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
new file mode 100644
index 0000000..6dea943
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestTaskStage.java
@@ -0,0 +1,165 @@
+package org.apache.helix.controller.stages;
+
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.TaskDataCache;
+import org.apache.helix.controller.stages.task.TaskPersistDataStage;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowContext;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestTaskStage extends TaskTestBase {
+ private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
+ private PropertyKey.Builder _keyBuilder;
+ private String _testWorkflow = TestHelper.getTestClassName();
+ private String _testJobPrefix = _testWorkflow + "_Job_";
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ // Stop the controller for isolated testing of the stage
+ _controller.syncStop();
+ _keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
+ }
+
+ @Test
+ public void testPersistContextData() {
+ _event.addAttribute(AttributeName.helixmanager.name(), _manager);
+
+ ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
+ cache.setTaskCache(true);
+ TaskDataCache taskDataCache = cache.getTaskDataCache();
+
+ // Build a queue
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(_testWorkflow);
+ JobConfig.Builder jobBuilder_0 =
+ new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
+ JobConfig.Builder jobBuilder_1 =
+ new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
+ JobConfig.Builder jobBuilder_2 =
+ new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
+ queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
+ .enqueueJob("Job_2", jobBuilder_2);
+
+ _driver.createQueue(queueBuilder.build());
+ // Manually trigger a cache refresh
+ cache.refresh(new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor));
+
+ // Create the IdealState ZNode for the jobs
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "0",
1,
+ TaskConstants.STATE_MODEL_NAME);
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "1",
1,
+ TaskConstants.STATE_MODEL_NAME);
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "2",
1,
+ TaskConstants.STATE_MODEL_NAME);
+
+ // Create the context
+ WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
+ wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
+ wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
+ wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
+ wfCtx.setName(_testWorkflow);
+ wfCtx.setStartTime(System.currentTimeMillis());
+
+ JobContext jbCtx0 = new JobContext(new ZNRecord(_testJobPrefix + "0"));
+ jbCtx0.setName(_testJobPrefix + "0");
+ jbCtx0.setStartTime(System.currentTimeMillis());
+ jbCtx0.setPartitionState(0, TaskPartitionState.COMPLETED);
+
+ JobContext jbCtx1 = new JobContext((new ZNRecord(_testJobPrefix + "1")));
+ jbCtx1.setName(_testJobPrefix + "1");
+ jbCtx1.setStartTime(System.currentTimeMillis());
+ jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
+
+ taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
+ taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
+ taskDataCache.updateJobContext(_testJobPrefix + "1", jbCtx1);
+
+ _event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+
+ // Write contexts to ZK first
+ TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
+ persistDataStage.process(_event);
+
+ Assert.assertNotNull(_driver.getWorkflowContext(_testWorkflow));
+ Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "0"));
+ Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "1"));
+
+ jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
+ wfCtx.setJobState(_testJobPrefix + "0", TaskState.FAILED);
+ taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
+
+ wfCtx.getJobStates().remove(_testJobPrefix + "1");
+ taskDataCache.removeContext(_testJobPrefix + "1");
+
+ JobContext jbCtx2 = new JobContext(new ZNRecord(_testJobPrefix + "2"));
+ jbCtx2.setName(_testJobPrefix + "2");
+ jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
+ wfCtx.setJobState(_testJobPrefix + "2", TaskState.IN_PROGRESS);
+ taskDataCache.updateJobContext(_testJobPrefix + "2", jbCtx2);
+
+ taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
+
+ persistDataStage.process(_event);
+
+ Assert.assertEquals(_driver.getWorkflowContext(_testWorkflow), wfCtx);
+ Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "0"), jbCtx0);
+ Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "2"), jbCtx2);
+ Assert.assertNull(_driver.getJobContext(_testJobPrefix + "1"));
+ }
+
+ /**
+ * Test that if there is a job in the DAG with JobConfig gone (due to ZK delete failure),
the
+ * async job purge will try to delete it again.
+ */
+ @Test(dependsOnMethods = "testPersistContextData")
+ public void testPartialDataPurge() {
+ // Manually delete JobConfig
+ deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
+ deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
+ deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
+
+ // Then purge jobs
+ TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
+ garbageCollectionStage.execute(_event);
+
+ // Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode
paths
+ // IdealState check
+ checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "0");
+ checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "1");
+ checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
+ }
+
+ private void deleteJobConfigs(String workflowName, String jobName) {
+ String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
+ String newPath = _manager.getHelixDataAccessor().keyBuilder()
+ .jobConfigZNode(workflowName, jobName).getPath();
+ _baseAccessor.remove(oldPath, AccessOption.PERSISTENT);
+ _baseAccessor.remove(newPath, AccessOption.PERSISTENT);
+ }
+
+ private void checkForIdealStateAndContextRemoval(String workflow, String job) {
+ // IdealState
+ Assert.assertFalse(
+ _baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
+
+ // JobContexts in old ZNode path
+ String oldPath =
+ String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
+ String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
+ Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
+ Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
+ }
+}
|