helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: Add forcefully workflow and job delete API into HelixAdmin and Helix Rest.
Date Thu, 19 Apr 2018 21:20:40 GMT
Repository: helix
Updated Branches:
  refs/heads/master b00d65961 -> b8355b9a6


Add forcefully workflow and job delete API into HelixAdmin and Helix Rest.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b8355b9a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b8355b9a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b8355b9a

Branch: refs/heads/master
Commit: b8355b9a6b1071c149305f147bdb524d8a67f2d2
Parents: b00d659
Author: Lei Xia <lxia@linkedin.com>
Authored: Wed Apr 11 10:28:22 2018 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Apr 19 14:18:16 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  |  91 ++++++++++++----
 .../java/org/apache/helix/task/TaskUtil.java    |  27 +++--
 .../integration/task/TestDeleteWorkflow.java    | 104 ++++++++++++++++++-
 .../server/resources/helix/JobAccessor.java     |   8 +-
 .../resources/helix/WorkflowAccessor.java       |   7 +-
 5 files changed, 197 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 174cae1..4a7707a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -228,12 +228,37 @@ public class TaskDriver {
    * @param job  job name
    */
   public void deleteJob(final String queue, final String job) {
-    WorkflowConfig workflowCfg =
-        TaskUtil.getWorkflowConfig(_accessor, queue);
+    deleteJob(queue, job, false);
+  }
+
+  /**
+   * Delete a job from an existing named queue,
+   * the queue has to be stopped prior to this call
+   *
+   * @param queue queue name
+   * @param job  job name
+   * @param forceDelete  CAUTION: if set true, all job's related zk nodes will
+   *                     be clean up from zookeeper even if its workflow information can
not be found.
+   */
+  public void deleteJob(final String queue, final String job, boolean forceDelete) {
+    WorkflowConfig workflowCfg = TaskUtil.getWorkflowConfig(_accessor, queue);
 
     if (workflowCfg == null) {
-      throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
+      if (forceDelete) {
+        // remove all job znodes if its original workflow config was already gone.
+        LOG.info("Forcefully removing job: " + job + " from queue: " + queue);
+        boolean success = TaskUtil
+            .removeJob(_accessor, _propertyStore, TaskUtil.getNamespacedJobName(queue, job));
+        if (!success) {
+          LOG.info("Failed to delete job: " + job + " from queue: " + queue);
+          throw new HelixException("Failed to delete job: " + job + " from queue: " + queue);
+        }
+      } else {
+        throw new IllegalArgumentException("Queue " + queue + " does not yet exist!");
+      }
+      return;
     }
+
     if (workflowCfg.isTerminable()) {
       throw new IllegalArgumentException(queue + " is not a queue!");
     }
@@ -283,13 +308,14 @@ public class TaskDriver {
     }
 
     String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
-    Set<String> jobs = new HashSet<String>(Arrays.asList(namespacedJobName));
+    Set<String> jobs = new HashSet<>(Arrays.asList(namespacedJobName));
     if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, jobs, true)) {
       LOG.error("Failed to delete job " + job + " from queue " + queue);
       throw new HelixException("Failed to delete job " + job + " from queue " + queue);
     }
   }
 
+
   /**
    * Adds a new job to the end an existing named queue.
    *
@@ -549,29 +575,56 @@ public class TaskDriver {
    * @param workflow
    */
   public void delete(String workflow) {
-    // After set DELETE state, rebalancer may remove the workflow instantly.
-    // So record context before set DELETE state.
-    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+    delete(workflow, false);
+  }
 
-    setWorkflowTargetState(workflow, TargetState.DELETE);
+  /**
+   * Public method to delete a workflow/queue.
+   *
+   * @param workflow
+   * @param forceDelete, CAUTION: if set true, workflow and all of its jobs' related zk nodes
will
+   *                     be clean up immediately from zookeeper, no matter whether there
are jobs
+   *                     are running or not.
+   */
+  public void delete(String workflow, boolean forceDelete) {
+    WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow);
+    if (forceDelete) {
+      // if forceDelete, remove the workflow and its jobs immediately from zookeeper.
+      LOG.info("Forcefully removing workflow: " + workflow);
+      removeWorkflowFromZK(workflow);
+    } else {
+      // Set the workflow target state to DELETE, and let Helix controller to remove the
workflow.
+      // Controller may remove the workflow instantly, so record context before set DELETE
state.
+      setWorkflowTargetState(workflow, TargetState.DELETE);
+    }
 
-    // Delete all finished scheduled workflows.
+    // Delete all previously scheduled workflows.
     if (wCtx != null && wCtx.getScheduledWorkflows() != null) {
       for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) {
-        WorkflowContext scheduledWorkflowCtx = TaskUtil.getWorkflowContext(_propertyStore,
scheduledWorkflow);
-        if (scheduledWorkflowCtx != null && scheduledWorkflowCtx.getFinishTime()
!= WorkflowContext.UNFINISHED) {
-          Set<String> jobSet = new HashSet<String>();
-          // Note that even WorkflowConfig is null, if WorkflowContext exists, still need
to remove workflow
-          WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, scheduledWorkflow);
-          if (wCfg != null) {
-            jobSet.addAll(wCfg.getJobDag().getAllNodes());
-          }
-          TaskUtil.removeWorkflow(_accessor, _propertyStore, scheduledWorkflow, jobSet);
+        WorkflowContext scheduledWorkflowCtx =
+            TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow);
+        if (scheduledWorkflowCtx != null
+            && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED)
{
+          removeWorkflowFromZK(scheduledWorkflow);
         }
       }
     }
   }
 
+  private void removeWorkflowFromZK(String workflow) {
+    Set<String> jobSet = new HashSet<>();
+    // Note that even WorkflowConfig is null, if WorkflowContext exists, still need to remove
workflow
+    WorkflowConfig wCfg = TaskUtil.getWorkflowConfig(_accessor, workflow);
+    if (wCfg != null) {
+      jobSet.addAll(wCfg.getJobDag().getAllNodes());
+    }
+    boolean success = TaskUtil.removeWorkflow(_accessor, _propertyStore, workflow, jobSet);
+    if (!success) {
+      LOG.info("Failed to delete the workflow " + workflow);
+      throw new HelixException("Failed to delete the workflow " + workflow);
+    }
+  }
+
   /**
    * Public synchronized method to wait for a delete operation to fully complete with timeout.
    * When this method returns, it means that a queue (workflow) has been completely deleted,
meaning
@@ -893,4 +946,4 @@ public class TaskDriver {
           "Cannot create more workflows or jobs because there are already too many items
created in the path CONFIGS.");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 47a5cfd..c6d7a55 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -537,7 +537,7 @@ public class TaskUtil {
       String workflowJobResource) {
     boolean success = true;
     PropertyKey isKey = accessor.keyBuilder().idealStates(workflowJobResource);
-    if (accessor.getProperty(isKey) != null) {
+    if (accessor.getPropertyStat(isKey) != null) {
       if (!accessor.removeProperty(isKey)) {
         LOG.warn(String.format(
             "Error occurred while trying to remove IdealState for %s. Failed to remove node
%s.",
@@ -548,7 +548,7 @@ public class TaskUtil {
 
     // Delete external view
     PropertyKey evKey = accessor.keyBuilder().externalView(workflowJobResource);
-    if (accessor.getProperty(evKey) != null) {
+    if (accessor.getPropertyStat(evKey) != null) {
       if (!accessor.removeProperty(evKey)) {
         LOG.warn(String.format(
             "Error occurred while trying to remove ExternalView of resource %s. Failed to
remove node %s.",
@@ -583,17 +583,17 @@ public class TaskUtil {
       }
     }
 
+    if (!removeWorkflowConfig(accessor, workflow)) {
+      LOG.warn(
+          String.format("Error occurred while trying to remove workflow config for %s.",
workflow));
+      success = false;
+    }
     if (!cleanupWorkflowIdealStateExtView(accessor, workflow)) {
       LOG.warn(String
           .format("Error occurred while trying to remove workflow idealstate/externalview
for %s.",
               workflow));
       success = false;
     }
-    if (!removeWorkflowConfig(accessor, workflow)) {
-      LOG.warn(
-          String.format("Error occurred while trying to remove workflow config for %s.",
workflow));
-      success = false;
-    }
     if (!removeWorkflowContext(propertyStore, workflow)) {
       LOG.warn(String
           .format("Error occurred while trying to remove workflow context for %s.", workflow));
@@ -679,16 +679,15 @@ public class TaskUtil {
   protected static boolean removeJob(HelixDataAccessor accessor, HelixPropertyStore propertyStore,
       String job) {
     boolean success = true;
-    if (!cleanupJobIdealStateExtView(accessor, job)) {
-      LOG.warn(String
-          .format("Error occurred while trying to remove job idealstate/externalview for
%s.",
-              job));
-      success = false;
-    }
     if (!removeJobConfig(accessor, job)) {
       LOG.warn(String.format("Error occurred while trying to remove job config for %s.",
job));
       success = false;
     }
+    if (!cleanupJobIdealStateExtView(accessor, job)) {
+      LOG.warn(String
+          .format("Error occurred while trying to remove job idealstate/externalview for
%s.", job));
+      success = false;
+    }
     if (!removeJobContext(propertyStore, job)) {
       LOG.warn(String.format("Error occurred while trying to remove job context for %s.",
job));
       success = false;
@@ -789,7 +788,7 @@ public class TaskUtil {
   private static boolean removeWorkflowJobConfig(HelixDataAccessor accessor,
       String workflowJobResource) {
     PropertyKey cfgKey = accessor.keyBuilder().resourceConfig(workflowJobResource);
-    if (accessor.getProperty(cfgKey) != null) {
+    if (accessor.getPropertyStat(cfgKey) != null) {
       if (!accessor.removeProperty(cfgKey)) {
         LOG.warn(String.format(
             "Error occurred while trying to remove config for %s. Failed to remove node %s.",

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
index 91b7f32..a151827 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestDeleteWorkflow.java
@@ -1,8 +1,12 @@
 package org.apache.helix.integration.task;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
@@ -14,7 +18,7 @@ import org.testng.annotations.Test;
 
 
 public class TestDeleteWorkflow extends TaskTestBase  {
-  private static final int DELETE_DELAY = 3000;
+  private static final int DELETE_DELAY = 1000;
 
   private HelixAdmin admin;
 
@@ -31,7 +35,7 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
         .setMaxAttemptsPerTask(1)
         .setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "100000"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1", jobBuilder);
@@ -64,4 +68,98 @@ public class TestDeleteWorkflow extends TaskTestBase  {
     Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
     Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testDeleteWorkflowForcefully() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1", jobBuilder);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job1"),
+        TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for
this job queue
+    Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName));
+    Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
"job1")));
+    Assert.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
"job1")));
+    Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+
+    // Delete the idealstate of workflow
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuild = accessor.keyBuilder();
+    accessor.removeProperty(keyBuild.idealStates(jobQueueName));
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+
+    // Attempt the deletion and and it should time out since idealstate does not exist anymore.
+    try {
+      _driver.deleteAndWaitForCompletion(jobQueueName, DELETE_DELAY);
+      Assert.fail("Delete must time out and throw a HelixException with the Controller paused,
but did not!");
+    } catch (HelixException e) {
+      // Pass
+    }
+
+    // delete forcefully
+    _driver.delete(jobQueueName, true);
+
+    Assert.assertNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
+    Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1")));
+    Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1")));
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+  }
+
+  @Test
+  public void testDeleteHangingJobs() throws InterruptedException {
+    String jobQueueName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setMaxAttemptsPerTask(1)
+        .setWorkflow(jobQueueName)
+        .setJobCommandConfigMap(ImmutableMap.of(MockTask.TIMEOUT_CONFIG, "1000000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("job1", jobBuilder);
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, TaskUtil.getNamespacedJobName(jobQueueName, "job1"),
+        TaskState.IN_PROGRESS);
+
+    // Check that WorkflowConfig, WorkflowContext, and IdealState are indeed created for
this job queue
+    Assert.assertNotNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNotNull(_driver.getWorkflowContext(jobQueueName));
+    Assert.assertNotNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName,
"job1")));
+    Assert.assertNotNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName,
"job1")));
+    Assert.assertNotNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+
+    // Delete the idealstate, workflowconfig and context of workflow
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuild = accessor.keyBuilder();
+    accessor.removeProperty(keyBuild.idealStates(jobQueueName));
+    accessor.removeProperty(keyBuild.resourceConfig(jobQueueName));
+    accessor.removeProperty(keyBuild.workflowContext(jobQueueName));
+
+    Assert.assertNull(admin.getResourceIdealState(CLUSTER_NAME, jobQueueName));
+    Assert.assertNull(_driver.getWorkflowConfig(jobQueueName));
+    Assert.assertNull(_driver.getWorkflowContext(jobQueueName));
+
+    // attemp to delete the job and it should fail with exception.
+    try {
+      _driver.deleteJob(jobQueueName, "job1");
+      Assert.fail("Delete must be rejected and throw a HelixException, but did not!");
+    } catch (IllegalArgumentException e) {
+      // Pass
+    }
+
+    // delete forcefully
+    _driver.deleteJob(jobQueueName, "job1", true);
+
+    Assert.assertNull(_driver.getJobConfig(TaskUtil.getNamespacedJobName(jobQueueName, "job1")));
+    Assert.assertNull(_driver.getJobContext(TaskUtil.getNamespacedJobName(jobQueueName, "job1")));
+    Assert.assertNull(admin
+        .getResourceIdealState(CLUSTER_NAME, TaskUtil.getNamespacedJobName(jobQueueName,
"job1")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
index 2d27f51..9a085f1 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java
@@ -26,10 +26,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
 
 import org.apache.helix.HelixException;
@@ -128,11 +130,13 @@ public class JobAccessor extends AbstractHelixResource {
   @DELETE
   @Path("{jobName}")
   public Response deleteJob(@PathParam("clusterId") String clusterId,
-      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName)
{
+      @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName,
+      @QueryParam("force") @DefaultValue("false") String forceDelete) {
+    boolean force = Boolean.valueOf(forceDelete);
     TaskDriver driver = getTaskDriver(clusterId);
 
     try {
-      driver.deleteJob(workflowName, jobName);
+      driver.deleteJob(workflowName, jobName, force);
     } catch (Exception e) {
       return badRequest(e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/b8355b9a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
index 5efb26c..679b3cc 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/WorkflowAccessor.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -173,10 +174,12 @@ public class WorkflowAccessor extends AbstractHelixResource {
   @DELETE
   @Path("{workflowId}")
   public Response deleteWorkflow(@PathParam("clusterId") String clusterId,
-      @PathParam("workflowId") String workflowId) {
+      @PathParam("workflowId") String workflowId,
+      @QueryParam("force") @DefaultValue("false") String forceDelete) {
+    boolean force = Boolean.valueOf(forceDelete);
     TaskDriver driver = getTaskDriver(clusterId);
     try {
-      driver.delete(workflowId);
+      driver.delete(workflowId, force);
     } catch (HelixException e) {
       return badRequest(String
           .format("Failed to delete workflow %s for reason : %s", workflowId, e.getMessage()));


Mime
View raw message