helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch helix-0.9-release updated: Fix the race condition while Helix refresh cluster status cache. (#363)
Date Thu, 01 Aug 2019 20:44:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch helix-0.9-release
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-0.9-release by this push:
     new 37529e3  Fix the race condition while Helix refresh cluster status cache. (#363)
37529e3 is described below

commit 37529e39f6624d58bef5c4ec5110746cc59db243
Author: jiajunwang <1803880+jiajunwang@users.noreply.github.com>
AuthorDate: Tue Jul 30 14:41:32 2019 -0700

    Fix the race condition while Helix refresh cluster status cache. (#363)
    
    * Fix the race condition while Helix refresh cluster status cache.
    
    This change fix issue #331.
    The design is ensuring one read only to avoid locking during the change notification.
However, a later update introduced addition read. The result is that two reads may have different
results because notification is lock free. This leads the cache to be in an inconsistent state.
The impact is that the expected rebalance might not happen.
---
 .../dataproviders/BaseControllerDataProvider.java  | 86 ++++++++++++++--------
 .../ResourceControllerDataProvider.java            | 29 ++++----
 .../WorkflowControllerDataProvider.java            | 30 ++++----
 3 files changed, 83 insertions(+), 62 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9402029..0024177 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -19,17 +19,6 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
@@ -54,6 +43,19 @@ import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * Common building block for controller to cache their data. This common building block contains
  * information about cluster config, instance config, resource config, ideal states, current
state,
@@ -82,7 +84,7 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
   private ExecutorService _asyncTasksThreadPool;
 
   // A map recording what data has changed
-  protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
+  protected Map<HelixConstants.ChangeType, AtomicBoolean> _propertyDataChangedMap;
 
   // Property caches
   private final PropertyCache<ResourceConfig> _resourceConfigCache;
@@ -112,7 +114,8 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       // refresh every type when it is initialized
-      _propertyDataChangedMap.put(type, true);
+      _propertyDataChangedMap
+          .put(type, new AtomicBoolean(true));
     }
 
     // initialize caches
@@ -217,11 +220,11 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
   }
 
-  private void refreshIdealState(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.IDEAL_STATE, false);
-
+  private void refreshIdealState(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE).getAndSet(false))
{
       _idealStateCache.refresh(accessor);
+      refreshedType.add(HelixConstants.ChangeType.IDEAL_STATE);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No ideal state change for %s cluster, %s pipeline", _clusterName,
@@ -229,11 +232,12 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     }
   }
 
-  private void refreshLiveInstances(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, false);
+  private void refreshLiveInstances(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE).getAndSet(false))
{
       _liveInstanceCache.refresh(accessor);
       _updateInstanceOfflineTime = true;
+      refreshedType.add(HelixConstants.ChangeType.LIVE_INSTANCE);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No live instance change for %s cluster, %s pipeline", _clusterName,
@@ -241,13 +245,14 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     }
   }
 
-  private void refreshInstanceConfigs(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
+  private void refreshInstanceConfigs(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG).getAndSet(false))
{
       _instanceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: %s", _clusterName,
               getPipelineName(), _instanceConfigCache.getPropertyMap().keySet()));
+      refreshedType.add(HelixConstants.ChangeType.INSTANCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No instance config change for %s cluster, %s pipeline", _clusterName,
@@ -255,13 +260,14 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
     }
   }
 
-  private void refreshResourceConfig(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.RESOURCE_CONFIG, false);
+  private void refreshResourceConfig(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG).getAndSet(false))
{
       _resourceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded ResourceConfig for cluster %s, %s pipeline. Cnt: %s", _clusterName,
               getPipelineName(), _resourceConfigCache.getPropertyMap().keySet().size()));
+      refreshedType.add(HelixConstants.ChangeType.RESOURCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No resource config change for %s cluster, %s pipeline", _clusterName,
@@ -290,12 +296,22 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
+    doRefresh(accessor);
+  }
+
+  /**
+   * @param accessor
+   * @return The types that has been updated during the refresh.
+   */
+  protected synchronized Set<HelixConstants.ChangeType> doRefresh(HelixDataAccessor
accessor) {
+    Set<HelixConstants.ChangeType> refreshedTypes = new HashSet<>();
+
     // Refresh raw data
     _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
-    refreshIdealState(accessor);
-    refreshLiveInstances(accessor);
-    refreshInstanceConfigs(accessor);
-    refreshResourceConfig(accessor);
+    refreshIdealState(accessor, refreshedTypes);
+    refreshLiveInstances(accessor, refreshedTypes);
+    refreshInstanceConfigs(accessor, refreshedTypes);
+    refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
     updateMaintenanceInfo(accessor);
@@ -314,6 +330,8 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
 
     updateIdealRuleMap();
     updateDisabledInstances();
+
+    return refreshedTypes;
   }
 
   protected void dumpDebugInfo() {
@@ -595,9 +613,13 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
 
   /**
    * Notify the cache that some part of the cluster data has been changed.
+   *
+   * Don't lock the propertyDataChangedMap here because the refresh process, which also read
this map,
+   * may take a long time to finish. If a lock is required, the notification might be blocked
by refresh.
+   * In this case, the delayed notification processing might cause performance issue.
    */
   public void notifyDataChange(HelixConstants.ChangeType changeType) {
-    _propertyDataChangedMap.put(changeType, true);
+    _propertyDataChangedMap.get(changeType).set(true);
   }
 
   /**
@@ -669,7 +691,7 @@ public class BaseControllerDataProvider implements ControlContextProvider
{
   public void requireFullRefresh() {
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       if (!_noFullRefreshProperty.contains(type)) {
-        _propertyDataChangedMap.put(type, true);
+        _propertyDataChangedMap.get(type).set(true);
       }
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index cf21230..59c973b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -21,8 +21,11 @@ package org.apache.helix.controller.dataproviders;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
@@ -109,17 +112,17 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
 
-    // Invalidate cached information
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
+    // Refresh base
+    Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+
+    // Invalidate cached information if any of the important data has been refreshed
+    if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
       clearCachedResourceAssignments();
     }
 
-    // Refresh base
-    super.refresh(accessor);
-
     // Refresh resource controller specific property caches
     refreshExternalViews(accessor);
     refreshTargetExternalViews(accessor);
@@ -140,20 +143,16 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider
{
   private void refreshExternalViews(final HelixDataAccessor accessor) {
     // As we are not listening on external view change, external view will be
     // refreshed once during the cache's first refresh() call, or when full refresh is required
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW).getAndSet(false))
{
       _externalViewCache.refresh(accessor);
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, false);
     }
   }
 
   private void refreshTargetExternalViews(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW).getAndSet(false))
{
       if (getClusterConfig() != null && getClusterConfig().isTargetExternalViewEnabled())
{
+        // Only refresh with data accessor for the first time
         _targetExternalViewCache.refresh(accessor);
-
-        // Only set the change type back once we get refreshed with data accessor for the
-        // first time
-        _propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
       }
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 2eee09e..e637e3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -19,9 +19,6 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
@@ -41,6 +38,11 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Data provider for workflow controller.
@@ -69,24 +71,22 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider
{
     _taskDataCache = new TaskDataCache(this);
   }
 
-  private void refreshClusterStateChangeFlags() {
+  private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> propertyRefreshed)
{
     // This is for targeted jobs' task assignment. It needs to watch for current state changes
for
     // when targeted resources' state transitions complete
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _existsLiveInstanceOrCurrentStateChange = true;
-
-      // BaseControllerDataProvider will take care of marking live instance change
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
-    } else {
-      _existsLiveInstanceOrCurrentStateChange = false;
-    }
+    _existsLiveInstanceOrCurrentStateChange =
+        // TODO read and update CURRENT_STATE in the BaseControllerDataProvider as well.
+        // This check (and set) is necessary for now since the current state flag in _propertyDataChangedMap
is not used by the BaseControllerDataProvider for now.
+        _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
-    refreshClusterStateChangeFlags();
-    super.refresh(accessor);
+    Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+
+    refreshClusterStateChangeFlags(propertyRefreshed);
 
     // Refresh TaskCache
     _taskDataCache.refresh(accessor, getResourceConfigMap());


Mime
View raw message