helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 04/10: HELIX: Fix all DataUpdaters so that it checks for null previous records
Date Thu, 28 Mar 2019 19:31:48 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 177a3fb8485391e353cfc99e394d212bf8bc9950
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Thu Mar 28 12:26:58 2019 -0700

    HELIX: Fix all DataUpdaters so that it checks for null previous records
    
    Some implementations are missing the null check for the ZNRecord data prior to the update
in the DataUpdater pattern. This is a fatal bug that could cause NullPointerExceptions. This
diff goes through the codebase and address this.
    Changelist:
    1. Add null checks for all DataUpdater implementations
---
 .../java/org/apache/helix/manager/zk/ZKUtil.java   |  8 +++++-
 .../java/org/apache/helix/task/TaskDriver.java     | 11 ++++++--
 .../main/java/org/apache/helix/task/TaskUtil.java  | 29 ++++++++++++++++------
 .../java/org/apache/helix/tools/ClusterSetup.java  | 10 ++++++--
 .../tools/commandtools/CurrentStateCleanUp.java    |  8 ++++--
 5 files changed, 52 insertions(+), 14 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 2dbb31b..59e81fa 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
@@ -315,7 +316,8 @@ public final class ZKUtil {
     }
   }
 
-  public static void subtract(HelixZkClient client, String path, final ZNRecord recordTosubtract)
{
+  public static void subtract(HelixZkClient client, final String path,
+      final ZNRecord recordTosubtract) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
       try {
@@ -323,6 +325,10 @@ public final class ZKUtil {
           DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
             @Override
             public ZNRecord update(ZNRecord currentData) {
+              if (currentData == null) {
+                throw new HelixException(
+                    String.format("subtract DataUpdater: ZNode at path %s is not found!",
path));
+              }
               currentData.subtract(recordTosubtract);
               return currentData;
             }
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a522a86..f31c68b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -417,6 +417,13 @@ public class TaskDriver {
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          // For some reason, the WorkflowConfig for this JobQueue doesn't exist
+          // In this case, we cannot proceed and must alert the user
+          throw new HelixException(
+              String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!",
queue));
+        }
+
         // Add the node to the existing DAG
         JobDag jobDag = JobDag.fromJson(
             currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
@@ -1078,8 +1085,8 @@ public class TaskDriver {
    */
   public void addOrUpdateWorkflowUserContentMap(String workflowName,
       final Map<String, String> contentToAddOrUpdate) {
-    TaskUtil
-        .addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName, contentToAddOrUpdate);
+    TaskUtil.addOrUpdateWorkflowJobUserContentMap(_propertyStore, workflowName,
+        contentToAddOrUpdate);
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 42467eb..4565209 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -366,18 +366,26 @@ public class TaskUtil {
   static void addOrUpdateWorkflowJobUserContentMap(final HelixPropertyStore<ZNRecord>
propertyStore,
       String workflowJobResource, final Map<String, String> contentToAddOrUpdate) {
     if (workflowJobResource == null) {
-      throw new IllegalArgumentException("workflowJobResource must be not null when adding
workflow / job user content");
+      throw new IllegalArgumentException(
+          "workflowJobResource must be not null when adding workflow / job user content");
     }
-    String path = Joiner.on("/")
-        .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE);
+    String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource,
+        USER_CONTENT_NODE);
 
-    propertyStore.update(path, new DataUpdater<ZNRecord>() {
+    if (!propertyStore.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord znRecord) {
+        if (znRecord == null) {
+          // This indicates that somehow the UserContentStore ZNode is missing
+          // This should not happen, but if it is missing, create one
+          znRecord = new ZNRecord(new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+        }
         znRecord.getSimpleFields().putAll(contentToAddOrUpdate);
         return znRecord;
       }
-    }, AccessOption.PERSISTENT);
+    }, AccessOption.PERSISTENT)) {
+      LOG.error("Failed to update the UserContentStore for {}", workflowJobResource);
+    }
   }
 
   /**
@@ -433,16 +441,23 @@ public class TaskUtil {
     String path =
         Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE);
 
-    propertyStore.update(path, new DataUpdater<ZNRecord>() {
+    if (!propertyStore.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord znRecord) {
+        if (znRecord == null) {
+          // This indicates that somehow the UserContentStore ZNode is missing
+          // This should not happen, but if it is missing, create one
+          znRecord = new ZNRecord(new ZNRecord(TaskUtil.USER_CONTENT_NODE));
+        }
         if (znRecord.getMapField(task) == null) {
           znRecord.setMapField(task, new HashMap<String, String>());
         }
         znRecord.getMapField(task).putAll(contentToAddOrUpdate);
         return znRecord;
       }
-    }, AccessOption.PERSISTENT);
+    }, AccessOption.PERSISTENT)) {
+      LOG.error("Failed to update the task UserContentStore for task {} in job {}", task,
job);
+    }
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index d21877f..e8903dc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -293,7 +293,7 @@ public class ClusterSetup {
     List<String> existingIdealStateNames =
         accessor.getChildNames(accessor.keyBuilder().idealStates());
 
-    for (String resourceName : existingIdealStateNames) {
+    for (final String resourceName : existingIdealStateNames) {
       IdealState resourceIdealState =
           accessor.getProperty(accessor.keyBuilder().idealStates(resourceName));
       if (resourceIdealState.getRebalanceMode().equals(RebalanceMode.FULL_AUTO)) {
@@ -306,7 +306,13 @@ public class ClusterSetup {
       // Update ideal state
       accessor.updateProperty(accessor.keyBuilder().idealStates(resourceName),
           new DataUpdater<ZNRecord>() {
-            @Override public ZNRecord update(ZNRecord znRecord) {
+            @Override
+            public ZNRecord update(ZNRecord znRecord) {
+              if (znRecord == null) {
+                throw new HelixException(String.format(
+                    "swapInstance DataUpdater: IdealState for resource %s no longer exists!",
+                    resourceName));
+              }
               // Need to swap again in case there are added partition with old instance
               swapInstanceInIdealState(new IdealState(znRecord), oldInstanceName, newInstanceName);
               return znRecord;
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
index b2ceed7..3bdc290 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/CurrentStateCleanUp.java
@@ -81,14 +81,18 @@ public class CurrentStateCleanUp {
   public static void cleanupCurrentStatesForCluster(String zkConnectString, String clusterName,
       String instanceName, String session) throws Exception {
     HelixManager manager = HelixManagerFactory
-        .getZKHelixManager(clusterName, "Administorator", InstanceType.ADMINISTRATOR,
+        .getZKHelixManager(clusterName, "Administrator", InstanceType.ADMINISTRATOR,
             zkConnectString);
     manager.connect();
     try {
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
       DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
-        @Override public ZNRecord update(ZNRecord currentData) {
+        @Override
+        public ZNRecord update(ZNRecord currentData) {
+          if (currentData == null) {
+            return null;
+          }
           Set<String> partitionToRemove = new HashSet<>();
           for (String partition : currentData.getMapFields().keySet()) {
             if (currentData.getMapField(partition).get("CURRENT_STATE")


Mime
View raw message