[HELIX-615] Naming problem of scheduled jobs from recurrent queue.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7569a0a7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7569a0a7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7569a0a7
Branch: refs/heads/helix-0.6.x
Commit: 7569a0a7b999fb6675447919d91e756200364ff5
Parents: d129d3a
Author: Lei Xia <lxia@linkedin.com>
Authored: Fri Nov 20 15:50:30 2015 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Fri Nov 20 15:50:30 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskConstants.java | 4 ----
.../main/java/org/apache/helix/task/TaskDriver.java | 16 +++++++++-------
.../java/org/apache/helix/task/TaskRebalancer.java | 7 +++++--
.../integration/task/TestRecurringJobQueue.java | 2 +-
4 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
index 34008d6..305323d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
@@ -39,8 +39,4 @@ public class TaskConstants {
* The root property store path at which the {@link TaskRebalancer} stores context information.
*/
public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer";
- /**
- * Resource prefix for scheduled workflows
- */
- public static final String SCHEDULED = "SCHEDULED";
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index b4b94f8..cc1eac1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -570,13 +570,15 @@ public class TaskDriver {
private void setWorkflowTargetState(String workflowName, TargetState state) {
setSingleWorkflowTargetState(workflowName, state);
- // For recurring schedules, child workflows must also be handled
- List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs());
- String prefix = workflowName + "_" + TaskConstants.SCHEDULED;
- for (String resource : resources) {
- if (resource.startsWith(prefix)) {
- setSingleWorkflowTargetState(resource, state);
- }
+ // For recurring schedules, last scheduled incomplete workflow must also be handled
+ WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName);
+ String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow();
+ WorkflowContext lastScheduledWorkflowCtx =
+ TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow);
+ if (lastScheduledWorkflowCtx != null &&
+ !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED
+ || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) {
+ setSingleWorkflowTargetState(lastScheduledWorkflow, state);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 5a86c3d..3842b66 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -19,6 +19,8 @@ package org.apache.helix.task;
* under the License.
*/
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -579,8 +581,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
long timeToSchedule = period * offsetMultiplier + startTime.getTime();
// Now clone the workflow if this clone has not yet been created
- String newWorkflowName =
- workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
+ DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ");
+ // Now clone the workflow if this clone has not yet been created
+ String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule));
LOG.debug("Ready to start workflow " + newWorkflowName);
if (!newWorkflowName.equals(lastScheduled)) {
Workflow clonedWf =
http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index deca8a7..eef1ce6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;;
+import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
|