helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] helix git commit: [HELIX-782] TASK: Make TaskDriver use ZKClient's create when creating workflows
Date Fri, 02 Nov 2018 00:06:49 GMT
Repository: helix
Updated Branches:
  refs/heads/master b64a72c48 -> befb1036f


[HELIX-782] TASK: Make TaskDriver use ZKClient's create when creating workflows

TaskDriver should use create() but currently is using set(), which just overwrites ZNodes
that are in ZK. This is undesirable and we need to fix it, especially in the wake of ZNode
restructuring.

AC:
1. Make TaskDriver use create() instead of set()
2. Add an integration test: TestWorkflowCreation:testWorkflowCreationNoDuplicates()


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3844ad60
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3844ad60
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3844ad60

Branch: refs/heads/master
Commit: 3844ad60034b029f3bbd916f629a7969117c1b26
Parents: b64a72c
Author: narendly <narendly@gmail.com>
Authored: Thu Nov 1 16:54:48 2018 -0700
Committer: narendly <narendly@gmail.com>
Committed: Thu Nov 1 16:54:48 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 27 +++++-----
 .../java/org/apache/helix/task/TaskUtil.java    | 32 ++++++++++--
 .../apache/helix/task/TestWorkflowCreation.java | 55 ++++++++++++++++++++
 3 files changed, 99 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3844ad60/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 27670e9..a522a86 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -129,7 +129,6 @@ public class TaskDriver {
    * @param flow
    */
   public void start(Workflow flow) {
-    // TODO: check that namespace for workflow is available
     LOG.info("Starting workflow " + flow.getName());
     flow.validate();
 
@@ -154,8 +153,17 @@ public class TaskDriver {
     newWorkflowConfig.setJobTypes(jobTypes);
 
     // add workflow config.
-    if (!TaskUtil.setWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
-      LOG.error("Failed to add workflow configuration for workflow " + flow.getName());
+    if (!TaskUtil.createWorkflowConfig(_accessor, flow.getName(), newWorkflowConfig)) {
+      // workflow config creation failed; try to delete job configs back
+      Set<String> failedJobRemoval = new HashSet<>();
+      for (String job : flow.getJobConfigs().keySet()) {
+        if (!TaskUtil.removeJobConfig(_accessor, job)) {
+          failedJobRemoval.add(job);
+        }
+      }
+      throw new HelixException(String.format(
+          "Failed to add workflow configuration for workflow %s. It's possible that a workflow
of the same name already exists or there was a connection issue. JobConfig deletion attempted
but failed for the following jobs: %s",
+          flow.getName(), failedJobRemoval));
     }
 
     // Finally add workflow resource.
@@ -171,19 +179,14 @@ public class TaskDriver {
    * the new configuration will be applied to the next scheduled runs of the workflow.
    * For non-recurrent workflow, the new configuration may (or may not) be applied
    * on the current running jobs, but it will be applied on the following unscheduled jobs.
-   *
    * Example:
-   *
    * _driver = new TaskDriver ...
    * WorkflowConfig currentWorkflowConfig = _driver.getWorkflowCfg(_manager, workflow);
    * WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(currentWorkflowConfig);
-
    * // make needed changes to the config here
    * configBuilder.setXXX();
-   *
    * // update workflow configuration
    * _driver.updateWorkflow(workflow, configBuilder.build());
-   *
    * @param workflow
    * @param newWorkflowConfig
    */
@@ -575,21 +578,21 @@ public class TaskDriver {
   }
 
   /**
-   * Add new job config to cluster
+   * Add new job config to cluster by way of create
    */
   private void addJobConfig(String job, JobConfig jobConfig) {
     LOG.info("Add job configuration " + job);
 
     // Set the job configuration
     JobConfig newJobCfg = new JobConfig(job, jobConfig);
-    if (!TaskUtil.setJobConfig(_accessor, job, newJobCfg)) {
-      throw new HelixException("Failed to add job configuration for job " + job);
+    if (!TaskUtil.createJobConfig(_accessor, job, newJobCfg)) {
+      throw new HelixException("Failed to add job configuration for job " + job
+          + ". It's possible that a job of the same name already exists or there was a connection
issue");
     }
   }
 
   /**
    * Public method to resume a workflow/queue.
-   *
    * @param workflow
    */
   public void resume(String workflow) {

http://git-wip-us.apache.org/repos/asf/helix/blob/3844ad60/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 379026d..958805b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -86,15 +86,15 @@ public class TaskUtil {
   }
 
   /**
-   * Set the job config
+   * Creates a job config. Returns false if the job of the same name already exists.
    * @param accessor Accessor to Helix configs
    * @param job The job name
    * @param jobConfig The job config to be set
    * @return True if set successfully, otherwise false
    */
-  protected static boolean setJobConfig(HelixDataAccessor accessor, String job,
+  protected static boolean createJobConfig(HelixDataAccessor accessor, String job,
       JobConfig jobConfig) {
-    return setResourceConfig(accessor, job, jobConfig);
+    return createResourceConfig(accessor, job, jobConfig);
   }
 
   /**
@@ -139,6 +139,18 @@ public class TaskUtil {
   }
 
   /**
+   * Create the workflow config. Fails if the ZNode already exists.
+   * @param accessor
+   * @param workflow
+   * @param workflowConfig
+   * @return
+   */
+  protected static boolean createWorkflowConfig(HelixDataAccessor accessor, String workflow,
+      WorkflowConfig workflowConfig) {
+    return createResourceConfig(accessor, workflow, workflowConfig);
+  }
+
+  /**
    * Set the workflow config
    * @param accessor Accessor to Helix configs
    * @param workflow The workflow name
@@ -854,6 +866,20 @@ public class TaskUtil {
   }
 
   /**
+   * Create the resource config. Fails if it already exists in ZK.
+   * @param accessor
+   * @param resource
+   * @param resourceConfig
+   * @return
+   */
+  private static boolean createResourceConfig(HelixDataAccessor accessor, String resource,
+      ResourceConfig resourceConfig) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    return accessor.getBaseDataAccessor().create(keyBuilder.resourceConfig(resource).getPath(),
+        resourceConfig.getRecord(), AccessOption.PERSISTENT);
+  }
+
+  /**
    * Set the resource config
    * @param accessor Accessor to Helix configs
    * @param resource The resource name

http://git-wip-us.apache.org/repos/asf/helix/blob/3844ad60/helix-core/src/test/java/org/apache/helix/task/TestWorkflowCreation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestWorkflowCreation.java b/helix-core/src/test/java/org/apache/helix/task/TestWorkflowCreation.java
new file mode 100644
index 0000000..bddef11
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestWorkflowCreation.java
@@ -0,0 +1,55 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestWorkflowCreation extends TaskSynchronizedTestBase {
+
+  /**
+   * Test that submitting workflows of the same name throws an exception.
+   */
+  @Test(expectedExceptions = HelixException.class)
+  public void testWorkflowCreationNoDuplicates() {
+    String queue = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queue);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setExpiry(1L);
+    for (int i = 0; i < 8; i++) {
+      builder.enqueueJob("JOB" + i, jobBuilder);
+    }
+
+    // First try
+    _driver.createQueue(builder.build());
+    Assert.assertNotNull(_driver.getWorkflowConfig(queue));
+
+    // Second try (this should throw an exception)
+    _driver.createQueue(builder.build());
+  }
+}


Mime
View raw message