helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 11/44: Check sibling nodes to guarantee MIN_ACTIVE_REPLICAS satisfied
Date Sat, 25 May 2019 01:19:45 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5b972f1971d3ca959906ac31353787fb1f9bff32
Author: Yi Wang <ywang4@linkedin.com>
AuthorDate: Fri Mar 29 17:28:07 2019 -0700

    Check sibling nodes to guarantee MIN_ACTIVE_REPLICAS satisfied
    
    RB=1614128
    G=helix-reviewers
    A=jxue,hulee
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 .../java/org/apache/helix/model/ExternalView.java  |  21 +++-
 .../apache/helix/util/InstanceValidationUtil.java  | 105 +++++++++++++++---
 .../helix/util/TestInstanceValidationUtil.java     | 121 +++++++++++++++++++++
 .../helix/rest/server/service/InstanceService.java |   7 +-
 .../rest/server/service/InstanceServiceImpl.java   |   4 +
 .../helix/rest/server/AbstractTestClass.java       |   7 +-
 6 files changed, 244 insertions(+), 21 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
index 7b201b0..aba8d85 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java
@@ -38,7 +38,9 @@ public class ExternalView extends HelixProperty {
   public enum ExternalViewProperty {
     INSTANCE_GROUP_TAG,
     RESOURCE_GROUP_NAME,
-    GROUP_ROUTING_ENABLED
+    GROUP_ROUTING_ENABLED,
+    MIN_ACTIVE_REPLICAS,
+    STATE_MODEL_DEF_REF
   }
 
   /**
@@ -131,6 +133,23 @@ public class ExternalView extends HelixProperty {
     return _record.getSimpleField(ExternalViewProperty.INSTANCE_GROUP_TAG.toString());
   }
 
+  /**
+   * Get the number of minimum active partitions for this resource.
+   *
+   * @return
+   */
+  public int getMinActiveReplicas() {
+    return _record.getIntField(ExternalViewProperty.MIN_ACTIVE_REPLICAS.toString(), -1);
+  }
+
+  /**
+   * Get the state model associated with this resource
+   * @return an identifier of the state model
+   */
+  public String getStateModelDefRef() {
+    return _record.getSimpleField(ExternalViewProperty.STATE_MODEL_DEF_REF.toString());
+  }
+
   @Override
   public boolean isValid() {
     return true;
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index 385920f..dba8f94 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -19,9 +19,11 @@ package org.apache.helix.util;
  * under the License.
  */
 
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
@@ -35,10 +37,14 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.RESTConfig;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
 /**
  * Utility class for validating Helix properties
  * Warning: each method validates one single property of instance individually and independently.
@@ -48,6 +54,9 @@ import org.slf4j.LoggerFactory;
 public class InstanceValidationUtil {
   private static final Logger _logger = LoggerFactory.getLogger(InstanceValidationUtil.class);
 
+  public static Set<String> UNHEALTHY_STATES =
+      ImmutableSet.of(HelixDefinedState.DROPPED.name(), HelixDefinedState.ERROR.name());
+
   public enum HealthStatusType {
     instanceHealthStatus,
     partitionHealthStatus
@@ -115,12 +124,10 @@ public class InstanceValidationUtil {
     if (liveInstance != null) {
       String sessionId = liveInstance.getSessionId();
 
-      PropertyKey currentStatesKey =
-          propertyKeyBuilder.currentStates(instanceName, sessionId);
+      PropertyKey currentStatesKey = propertyKeyBuilder.currentStates(instanceName, sessionId);
       List<String> resourceNames = dataAccessor.getChildNames(currentStatesKey);
       for (String resourceName : resourceNames) {
-        PropertyKey key =
-            propertyKeyBuilder.currentState(instanceName, sessionId, resourceName);
+        PropertyKey key = propertyKeyBuilder.currentState(instanceName, sessionId, resourceName);
         CurrentState currentState = dataAccessor.getProperty(key);
         if (currentState != null && currentState.getPartitionStateMap().size() >
0) {
           return true;
@@ -179,12 +186,10 @@ public class InstanceValidationUtil {
     if (liveInstance != null) {
       String sessionId = liveInstance.getSessionId();
 
-      PropertyKey currentStatesKey =
-          propertyKeyBuilder.currentStates(instanceName, sessionId);
+      PropertyKey currentStatesKey = propertyKeyBuilder.currentStates(instanceName, sessionId);
       List<String> resourceNames = dataAccessor.getChildNames(currentStatesKey);
       for (String resourceName : resourceNames) {
-        PropertyKey key =
-            propertyKeyBuilder.currentState(instanceName, sessionId, resourceName);
+        PropertyKey key = propertyKeyBuilder.currentState(instanceName, sessionId, resourceName);
 
         CurrentState currentState = dataAccessor.getProperty(key);
         if (currentState != null
@@ -200,10 +205,9 @@ public class InstanceValidationUtil {
 
   /**
    * Check the overall health status for instance including:
-   *  1. Per instance health status with application customized key-value entries
-   *  2. Sibling partitions (replicas for same partition holding on different node
-   *     health status for the entire cluster.
-   *
+   * 1. Per instance health status with application customized key-value entries
+   * 2. Sibling partitions (replicas for same partition holding on different node
+   * health status for the entire cluster.
    * @param configAccessor
    * @param clustername
    * @param hostName
@@ -221,8 +225,8 @@ public class InstanceValidationUtil {
       return isHealthy;
     }
     // TODO : 1. Call REST with customized URL
-    //        2. Parse mapping result with string -> boolean value and return out for
per instance
-    //        3. Check sibling nodes for partition health
+    // 2. Parse mapping result with string -> boolean value and return out for per instance
+    // 3. Check sibling nodes for partition health
     isHealthy =
         perInstanceHealthCheck(instanceHealthMap) || perPartitionHealthCheck(partitionHealthMap);
 
@@ -254,9 +258,7 @@ public class InstanceValidationUtil {
   /**
    * Check instance is already in the stable state. Here stable means all the ideal state
mapping
    * matches external view (view of current state).
-   *
-   * It requires persist assignment on!
-   *
+   * It requires PERSIST_INTERMEDIATE_ASSIGNMENT turned on!
    * @param dataAccessor
    * @param instanceName
    * @return
@@ -295,4 +297,71 @@ public class InstanceValidationUtil {
     }
     return true;
   }
+
+  /**
+   * Check if sibling nodes of the instance meet min active replicas constraint
+   * Two instances are sibling of each other if they host the same partition
+   *
+   * WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
+   * due to external view propagation latency
+   *
+   * TODO: Use in memory cache and query instance's currentStates
+   *
+   * @param dataAccessor
+   * @param instanceName
+   * @return
+   */
+  public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String
instanceName) {
+    PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
+    List<String> idealStates = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());
+    List<String> externalViews = dataAccessor.getChildNames(propertyKeyBuilder.externalViews());
+    if (idealStates.size() != externalViews.size()) {
+      throw new HelixException(
+          "The following resources in IdealStates are not found in ExternalViews: "
+              + Sets.difference(new HashSet<>(idealStates), new HashSet<>(externalViews)));
+    }
+
+    for (String externalViewName : externalViews) {
+      ExternalView externalView =
+          dataAccessor.getProperty(propertyKeyBuilder.externalView(externalViewName));
+      if (externalView == null) {
+        _logger.error("ExternalView for {} doesn't exist", externalViewName);
+        continue;
+      }
+      // Get the minActiveReplicas constraint for the resource
+      int minActiveReplicas = externalView.getMinActiveReplicas();
+      if (minActiveReplicas == -1) {
+        throw new HelixException(
+            "ExternalView " + externalViewName + " is missing minActiveReplica field");
+      }
+      String stateModeDef = externalView.getStateModelDefRef();
+      StateModelDefinition stateModelDefinition =
+          dataAccessor.getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
+      Set<String> unhealthyStates = new HashSet<>(UNHEALTHY_STATES);
+      if (stateModelDefinition != null) {
+        unhealthyStates.add(stateModelDefinition.getInitialState());
+      }
+      for (String partition : externalView.getPartitionSet()) {
+        Map<String, String> stateByInstanceMap = externalView.getStateMap(partition);
+        // found the resource hosted on the instance
+        if (stateByInstanceMap.containsKey(instanceName)) {
+          int numHealthySiblings = 0;
+          for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
+            if (!entry.getKey().equals(instanceName)
+                && !unhealthyStates.contains(entry.getValue())) {
+              numHealthySiblings++;
+            }
+          }
+          if (numHealthySiblings < minActiveReplicas) {
+            _logger.info(
+                "Partition {} doesn't have enough active replicas in sibling nodes. NumHealthySiblings:
{}, minActiveReplicas: {}",
+                partition, numHealthySiblings, minActiveReplicas);
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
new file mode 100644
index 0000000..f26d2bb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -0,0 +1,121 @@
+package org.apache.helix.util;
+
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.StateModelDefinition;
+import org.mockito.ArgumentMatcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+
+public class TestInstanceValidationUtil {
+  private static final String TEST_CLUSTER = "testCluster";
+  private static final String TEST_INSTANCE = "instance0";
+
+  @Test
+  public void TestSiblingNodesActiveReplicaCheck_success() {
+    String resource = "resource";
+    Mock mock = new Mock();
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new
PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+    ExternalView externalView = mock(ExternalView.class);
+    when(externalView.getMinActiveReplicas()).thenReturn(2);
+    when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
+    when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(
+        TEST_INSTANCE, "Master",
+        "instance1", "Slave",
+        "instance2", "Slave",
+        "instance3", "Slave"));
+    doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
+    when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
+    doReturn(stateModelDefinition).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
+
+    boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor,
TEST_INSTANCE);
+
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void TestSiblingNodesActiveReplicaCheck_fail() {
+    String resource = "resource";
+    Mock mock = new Mock();
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new
PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    ExternalView externalView = mock(ExternalView.class);
+    when(externalView.getMinActiveReplicas()).thenReturn(3);
+    when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
+    when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(
+        TEST_INSTANCE, "Master",
+        "instance1", "Slave",
+        "instance2", "Slave"));
+    doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
+    when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
+    doReturn(stateModelDefinition).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));
+
+    boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor,
TEST_INSTANCE);
+
+    Assert.assertFalse(result);
+  }
+
+  @Test (expectedExceptions = HelixException.class)
+  public void TestSiblingNodesActiveReplicaCheck_exception_whenIdealStatesMisMatch() {
+    String resource = "resource";
+    Mock mock = new Mock();
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+    doReturn(Collections.emptyList()).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+    ExternalView externalView = mock(ExternalView.class);
+    when(externalView.getMinActiveReplicas()).thenReturn(-1);
+    doReturn(externalView).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+
+    InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
+  }
+
+  @Test (expectedExceptions = HelixException.class)
+  public void TestSiblingNodesActiveReplicaCheck_exception_whenMissingMinActiveReplicas()
{
+    String resource = "resource";
+    Mock mock = new Mock();
+    doReturn(ImmutableList.of(resource)).when(mock.dataAccessor).getChildNames(argThat(new
PropertyKeyArgument(PropertyType.IDEALSTATES)));
+    doReturn(Collections.emptyList()).when(mock.dataAccessor).getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
+
+    InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
+  }
+
+  private class Mock {
+    HelixDataAccessor dataAccessor;
+
+    Mock() {
+      this.dataAccessor = mock(HelixDataAccessor.class);
+      when(dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
+    }
+  }
+
+  public static class PropertyKeyArgument extends ArgumentMatcher<PropertyKey> {
+    private PropertyType propertyType;
+
+    public PropertyKeyArgument(PropertyType propertyType) {
+      this.propertyType = propertyType;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      PropertyKey propertyKey = (PropertyKey) o;
+
+      return this.propertyType == propertyKey.getType();
+    }
+  }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index f32551b..471a4ec 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -58,7 +58,12 @@ public interface InstanceService {
     /**
      * Check if instance has error partitions
      */
-    HAS_ERROR_PARTITION;
+    HAS_ERROR_PARTITION,
+    /**
+     * Check if all resources hosted on the instance can still meet the min active replica
+     * constraint if this instance is shutdown
+     */
+    MIN_ACTIVE_REPLICA_CHECK_FAILED;
 
     /**
      * Pre-defined list of checks to test if an instance can be stopped at runtime
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index a928b98..ff12678 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -87,6 +87,10 @@ public class InstanceServiceImpl implements InstanceService {
         healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
             InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
         break;
+      case MIN_ACTIVE_REPLICA_CHECK_FAILED:
+        healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
+            InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
+        break;
       default:
         _logger.error("Unsupported health check: {}", healthCheck);
         break;
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index ab30419..e5c42a0 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -54,6 +54,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.rest.common.ContextPropertyKeys;
@@ -98,7 +99,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
   protected static final String WORKFLOW_PREFIX = "Workflow_";
   protected static final String JOB_PREFIX = "Job_";
   protected static int NUM_PARTITIONS = 10;
-  protected static int NUM_REPLICA = 3;
+  protected static int NUM_REPLICA = 2;
+  protected static int MIN_ACTIVE_REPLICA = 3;
   protected static ZkServer _zkServer;
   protected static HelixZkClient _gZkClient;
   protected static ClusterSetup _gSetupTool;
@@ -316,6 +318,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
     for (int i = 0; i < numResources; i++) {
       String resource = cluster + "_db_" + i;
       _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave");
+      IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster,
resource);
+      idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA);
+      _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
       _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA);
       resources.add(resource);
     }


Mime
View raw message