helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 05/10: TASK: Fix bug where JobDispatcher does not create UserContentStore for new jobs
Date Thu, 28 Mar 2019 19:31:49 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3c3318805d16af3a40187ff67544f294c52b8019
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:27:26 2019 -0700

    TASK: Fix bug where JobDispatcher does not create UserContentStore for new jobs
    
        It was observed that there are multiple logic paths where a new job could get scheduled:
1. scheduleJobs() 2. processJobStatusUpdateAndAssignment(). When a job is being assigned by
the latter, JobDispatcher would fail to create the UserContentStore for the job, causing all
subsequent read/writes to this UserContentStore fail. This is a temporary fix and further
refactoring of code paths would be required in order to consolidate where new jobs get scheduled.
        Changelist:
        1. Add UserContentStore for jobs with null contexts
---
 .../java/org/apache/helix/task/JobDispatcher.java  | 25 ++++++++++++++++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  |  2 +-
 .../org/apache/helix/task/WorkflowDispatcher.java  | 24 +++++++++++++++++++--
 3 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 596b54b..6530f3a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -1,6 +1,24 @@
 package org.apache.helix.task;
 
-import com.google.common.base.Joiner;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.Collection;
@@ -11,7 +29,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -98,6 +115,10 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
       jobCtx.setStartTime(System.currentTimeMillis());
       jobCtx.setName(jobName);
+      // This job's JobContext has not been created yet. Since we are creating a new JobContext
+      // here, we must also create its UserContentStore
+      TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName,
+          new ZNRecord(TaskUtil.USER_CONTENT_NODE));
       workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 4565209..d15cf8f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -315,7 +315,7 @@ public class TaskUtil {
    * @param workflowJobResource the name of workflow or job
    * @param record the initial data
    */
-  protected static void createUserContent(HelixPropertyStore propertyStore,
+  protected static void createUserContent(HelixPropertyStore<ZNRecord> propertyStore,
       String workflowJobResource, ZNRecord record) {
     propertyStore.create(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
         workflowJobResource, TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT);
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index eec2e87..80d9afb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -1,5 +1,24 @@
 package org.apache.helix.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import com.google.common.collect.Lists;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -305,11 +324,12 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher {
     }
 
     // Set up job resource based on partitions from target resource
+
+    // Create the UserContentStore for the job first
     TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource,
         new ZNRecord(TaskUtil.USER_CONTENT_NODE));
-    int numIndependentTasks = jobConfig.getTaskConfigMap().size();
 
-    int numPartitions = numIndependentTasks;
+    int numPartitions = jobConfig.getTaskConfigMap().size();
     if (numPartitions == 0) {
       IdealState targetIs =
           admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());


Mime
View raw message