helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [helix] 01/02: Add InstanceServieImpl#batchGetInstancesStoppableChecks to solve performance issue #366
Date Tue, 13 Aug 2019 21:40:40 GMT
This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 4d3cd0c32c2c97891e43d73f8577f03a48b38726
Author: Yi Wang <i3.wangyi@gmail.com>
AuthorDate: Mon Aug 5 18:33:38 2019 -0700

    Add InstanceServieImpl#batchGetInstancesStoppableChecks to solve performance issue #366
---
 .../rest/common/HelixDataAccessorWrapper.java      |  63 +++--
 .../server/resources/helix/InstancesAccessor.java  |  24 +-
 .../helix/rest/server/service/InstanceService.java | 140 +++++-----
 .../rest/server/service/InstanceServiceImpl.java   | 310 +++++++++++++--------
 .../rest/server/service/TestInstanceService.java   |  19 +-
 5 files changed, 319 insertions(+), 237 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
index 1a26831..a32b4d7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixDataAccessorWrapper.java
@@ -1,46 +1,45 @@
 package org.apache.helix.rest.common;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 
 /**
- * A read-only wrapper of {@link ZKHelixDataAccessor} with transient cache
- * The caches is of the value from get methods and short lived for the lifecycle of one rest
request
- * TODO: add more cached read method based on needs
+ * This is a wrapper for {@link ZKHelixDataAccessor} that caches the result of the batch
reads it performs.
+ * Note that the usage of this object is valid for one REST request.
  */
-public final class HelixDataAccessorWrapper extends ZKHelixDataAccessor {
-  private final Map<PropertyKey, HelixProperty> _propertyCache = new HashMap<>();
-  private final Map<PropertyKey, List<String>> _batchNameCache = new HashMap<>();
-
-  public HelixDataAccessorWrapper(ZKHelixDataAccessor dataAccessor) {
-    super(dataAccessor);
-  }
-
-  @Override
-  public <T extends HelixProperty> T getProperty(PropertyKey key) {
-    if (_propertyCache.containsKey(key)) {
-      return (T) _propertyCache.get(key);
+public class HelixDataAccessorWrapper extends ZKHelixDataAccessor {
+    private final Map<PropertyKey, HelixProperty> _propertyCache = new HashMap<>();
+    private final Map<PropertyKey, List<String>> _batchNameCache = new HashMap<>();
+
+    public HelixDataAccessorWrapper(ZKHelixDataAccessor dataAccessor) {
+        super(dataAccessor);
     }
-    T property = super.getProperty(key);
-    _propertyCache.put(key, property);
-    return property;
-  }
-
-  @Override
-  public List<String> getChildNames(PropertyKey key) {
-    if (_batchNameCache.containsKey(key)) {
-      return _batchNameCache.get(key);
+
+    @Override
+    public <T extends HelixProperty> T getProperty(PropertyKey key) {
+        if (_propertyCache.containsKey(key)) {
+            return (T) _propertyCache.get(key);
+        }
+        T property = super.getProperty(key);
+        _propertyCache.put(key, property);
+        return property;
     }
 
-    List<String> names = super.getChildNames(key);
-    _batchNameCache.put(key, names);
+    @Override
+    public List<String> getChildNames(PropertyKey key) {
+        if (_batchNameCache.containsKey(key)) {
+            return _batchNameCache.get(key);
+        }
+
+        List<String> names = super.getChildNames(key);
+        _batchNameCache.put(key, names);
 
-    return names;
-  }
+        return names;
+    }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 36e7249..7191e517 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -8,14 +8,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Response;
-
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -129,7 +127,7 @@ public class InstancesAccessor extends AbstractHelixResource {
         admin.enableInstance(clusterId, enableInstances, false);
         break;
       case stoppable:
-        return getParallelStoppableInstances(clusterId, node);
+        return batchGetStoppableInstances(clusterId, node);
       default:
         _logger.error("Unsupported command :" + command);
         return badRequest("Unsupported command :" + command);
@@ -145,8 +143,7 @@ public class InstancesAccessor extends AbstractHelixResource {
     return OK();
   }
 
-  private Response getParallelStoppableInstances(String clusterId, JsonNode node)
-      throws IOException {
+  private Response batchGetStoppableInstances(String clusterId, JsonNode node) throws IOException
{
     try {
       // TODO: Process input data from the content
       InstancesAccessor.InstanceHealthSelectionBase selectionBase =
@@ -184,19 +181,20 @@ public class InstancesAccessor extends AbstractHelixResource {
       case zone_based:
         List<String> zoneBasedInstance =
             getZoneBasedInstances(instances, orderOfZone, clusterTopology.toZoneMapping());
-        for (String instance : zoneBasedInstance) {
-          StoppableCheck stoppableCheckResult =
-              instanceService.getInstanceStoppableCheck(clusterId, instance, customizedInput);
-          if (!stoppableCheckResult.isStoppable()) {
+        Map<String, StoppableCheck> instancesStoppableChecks = instanceService.batchGetInstancesStoppableChecks(
+            clusterId, zoneBasedInstance, customizedInput);
+        for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet())
{
+          String instance = instanceStoppableCheck.getKey();
+          StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
+          if (!stoppableCheck.isStoppable()) {
             ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
-            for (String failedReason : stoppableCheckResult.getFailedChecks()) {
+            for (String failedReason : stoppableCheck.getFailedChecks()) {
               failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason));
             }
           } else {
             stoppableInstances.add(instance);
           }
         }
-
         // Adding following logic to check whether instances exist or not. An instance exist
could be
         // checking following scenario:
         // 1. Instance got dropped. (InstanceConfig is gone.)
@@ -224,8 +222,8 @@ public class InstancesAccessor extends AbstractHelixResource {
       throw new HelixHealthException(e);
     } catch (Exception e) {
       _logger.error(String.format(
-          "Failed to get parallel stoppable instances for cluster %s with a HelixException!",
-          clusterId), e);
+              "Failed to get parallel stoppable instances for cluster %s with a HelixException!",
+              clusterId), e);
       throw e;
     }
   }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index 51bc354..36e54b0 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -19,79 +19,95 @@ package org.apache.helix.rest.server.service;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableList;
+import org.apache.helix.rest.server.json.instance.InstanceInfo;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.helix.rest.server.json.instance.InstanceInfo;
-import org.apache.helix.rest.server.json.instance.StoppableCheck;
+public interface InstanceService {
+    enum HealthCheck {
+        /**
+         * Check if instance is alive
+         */
+        INSTANCE_NOT_ALIVE,
+        /**
+         * Check if instance is enabled both in instance config and cluster config
+         */
+        INSTANCE_NOT_ENABLED,
+        /**
+         * Check if instance is stable
+         * Stable means all the ideal state mapping matches external view (view of current
state).
+         */
+        INSTANCE_NOT_STABLE,
+        /**
+         * Check if instance has 0 resource assigned
+         */
+        EMPTY_RESOURCE_ASSIGNMENT,
+        /**
+         * Check if instance has disabled partitions
+         */
+        HAS_DISABLED_PARTITION,
+        /**
+         * Check if instance has valid configuration (pre-requisite for all checks)
+         */
+        INVALID_CONFIG,
+        /**
+         * Check if instance has error partitions
+         */
+        HAS_ERROR_PARTITION,
+        /**
+         * Check if all resources hosted on the instance can still meet the min active replica
+         * constraint if this instance is shutdown
+         */
+        MIN_ACTIVE_REPLICA_CHECK_FAILED;
 
-import com.google.common.collect.ImmutableList;
+        /**
+         * Pre-defined list of checks to test if an instance can be stopped at runtime
+         */
+        public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.asList(HealthCheck.values());
+        /**
+         * Pre-defined list of checks to test if an instance is in healthy running state
+         */
+        public static List<HealthCheck> STARTED_AND_HEALTH_CHECK_LIST =
+                ImmutableList.of(HealthCheck.INSTANCE_NOT_ALIVE, HealthCheck.INSTANCE_NOT_ENABLED,
+                        HealthCheck.INSTANCE_NOT_STABLE, HealthCheck.EMPTY_RESOURCE_ASSIGNMENT);
+    }
 
-public interface InstanceService {
-  enum HealthCheck {
-    /**
-     * Check if instance is alive
-     */
-    INSTANCE_NOT_ALIVE,
-    /**
-     * Check if instance is enabled both in instance config and cluster config
-     */
-    INSTANCE_NOT_ENABLED,
     /**
-     * Check if instance is stable
-     * Stable means all the ideal state mapping matches external view (view of current state).
+     * Get the overall status of the instance
+     *
+     * @param clusterId    The cluster id
+     * @param instanceName The instance name
+     * @return An instance of {@link InstanceInfo} easily convertible to JSON
      */
-    INSTANCE_NOT_STABLE,
-    /**
-     * Check if instance has 0 resource assigned
-     */
-    EMPTY_RESOURCE_ASSIGNMENT,
-    /**
-     * Check if instance has disabled partitions
-     */
-    HAS_DISABLED_PARTITION,
-    /**
-     * Check if instance has valid configuration (pre-requisite for all checks)
-     */
-    INVALID_CONFIG,
-    /**
-     * Check if instance has error partitions
-     */
-    HAS_ERROR_PARTITION,
-    /**
-     * Check if all resources hosted on the instance can still meet the min active replica
-     * constraint if this instance is shutdown
-     */
-    MIN_ACTIVE_REPLICA_CHECK_FAILED;
+    InstanceInfo getInstanceInfo(String clusterId, String instanceName,
+                                 List<HealthCheck> healthChecks);
 
     /**
-     * Pre-defined list of checks to test if an instance can be stopped at runtime
+     * Get the current instance stoppable checks
+     *
+     * @param clusterId    The cluster id
+     * @param instanceName The instance name
+     * @param jsonContent  The json payloads from client side
+     * @return An instance of {@link StoppableCheck} easily convertible to JSON
+     * @throws IOException in case of network failure
      */
-    public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.asList(HealthCheck.values());
+    StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
+                                             String jsonContent) throws IOException;
+
     /**
-     * Pre-defined list of checks to test if an instance is in healthy running state
+     * Batch get StoppableCheck results for a list of instances in one cluster
+     *
+     * @param clusterId   The cluster id
+     * @param instances   The list of instances
+     * @param jsonContent The json payloads from client side
+     * @return A map contains the instance as key and the StoppableCheck as the value
+     * @throws IOException in case of network failure
      */
-    public static List<HealthCheck> STARTED_AND_HEALTH_CHECK_LIST =
-        ImmutableList.of(HealthCheck.INSTANCE_NOT_ALIVE, HealthCheck.INSTANCE_NOT_ENABLED,
-            HealthCheck.INSTANCE_NOT_STABLE, HealthCheck.EMPTY_RESOURCE_ASSIGNMENT);
-  }
-
-  /**
-   * Get the overall status of the instance
-   * @param clusterId The cluster id
-   * @param instanceName The instance name
-   * @return An instance of {@link InstanceInfo} easily convertible to JSON
-   */
-  InstanceInfo getInstanceInfo(String clusterId, String instanceName,
-      List<HealthCheck> healthChecks);
-
-  /**
-   * Get the current instance stoppable checks
-   * @param clusterId The cluster id
-   * @param instanceName The instance name
-   * @return An instance of {@link StoppableCheck} easily convertible to JSON
-   */
-  StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
-      String jsonContent) throws IOException;
+    Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent)
+            throws IOException;
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index 79188bc..bda4b0c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -21,15 +21,18 @@ package org.apache.helix.rest.server.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
@@ -41,6 +44,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.client.CustomRestClient;
 import org.apache.helix.rest.client.CustomRestClientFactory;
+import org.apache.helix.rest.common.HelixDataAccessorWrapper;
 import org.apache.helix.rest.server.json.cluster.PartitionHealth;
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
@@ -51,7 +55,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-
+import com.google.common.collect.ImmutableList;
 
 public class InstanceServiceImpl implements InstanceService {
   private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
@@ -60,19 +64,21 @@ public class InstanceServiceImpl implements InstanceService {
   private static final String IS_HEALTHY_KEY = "IS_HEALTHY";
   private static final String EXPIRY_KEY = "EXPIRE";
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final ExecutorService POOL = Executors.newCachedThreadPool();
 
-  private final HelixDataAccessor _dataAccessor;
+  private final HelixDataAccessorWrapper _dataAccessor;
   private final ConfigAccessor _configAccessor;
   private final CustomRestClient _customRestClient;
 
-  public InstanceServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor)
{
+  public InstanceServiceImpl(HelixDataAccessorWrapper dataAccessor, ConfigAccessor configAccessor)
{
     _dataAccessor = dataAccessor;
     _configAccessor = configAccessor;
     _customRestClient = CustomRestClientFactory.get();
   }
 
   @VisibleForTesting
-  InstanceServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor, CustomRestClient
customRestClient) {
+  InstanceServiceImpl(HelixDataAccessorWrapper dataAccessor, ConfigAccessor configAccessor,
+      CustomRestClient customRestClient) {
     _dataAccessor = dataAccessor;
     _configAccessor = configAccessor;
     _customRestClient = customRestClient;
@@ -123,96 +129,100 @@ public class InstanceServiceImpl implements InstanceService {
 
   /**
    * {@inheritDoc}
+   * Single instance stoppable check implementation is a special case of
+   * {@link #batchGetInstancesStoppableChecks(String, List, String)}
+   * <p>
    * Step 1: Perform instance level Helix own health checks
    * Step 2: Perform instance level client side health checks
    * Step 3: Perform partition level (all partitions on the instance) client side health
checks
-   * Note: if the check fails at one step, all the following steps won't be executed because
the instance cannot be stopped
+   * <p>
+   * Note: if the check fails at one step, the rest steps won't be executed because the instance
+   * cannot be stopped
    */
   @Override
   public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
       String jsonContent) throws IOException {
-    LOG.info("Perform instance level helix own health checks for {}/{}", clusterId, instanceName);
-    Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
-        InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
+    return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent)
+        .get(instanceName);
+  }
 
-    StoppableCheck result =
-        new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
-    if (!result.isStoppable()) {
-      return result;
+  @Override
+  public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
+      List<String> instances, String jsonContent) throws IOException {
+    Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
+    Map<String, Future<StoppableCheck>> helixInstanceChecks =
+        instances.stream().collect(Collectors.toMap(Function.identity(),
+            instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
+    List<String> instancesForCustomInstanceLevelChecks =
+        filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
+    if (instancesForCustomInstanceLevelChecks.isEmpty()) {
+      // if all instances failed at helix custom level checks
+      return finalStoppableChecks;
     }
-    LOG.info("{} passed helix side health checks", instanceName);
-    return performCustomInstanceChecks(clusterId, instanceName, getCustomPayLoads(jsonContent));
-  }
 
-  @VisibleForTesting
-  protected StoppableCheck performCustomInstanceChecks(String clusterId, String instanceName,
-      Map<String, String> customPayLoads) throws IOException {
-    StoppableCheck defaultSucceed = new StoppableCheck(true, Collections.emptyList(),
-        StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
-    LOG.info("Perform instance level client side health checks for {}/{}", clusterId, instanceName);
-    Optional<String> maybeBaseUrl = getBaseUrl(instanceName, clusterId);
-    if (!maybeBaseUrl.isPresent()) {
-      LOG.warn("Unable to get custom client health endpoint: " + instanceName);
-      return defaultSucceed;
+    RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
+    if (restConfig == null) {
+      String errorMessage =
+          String.format("The cluster %s hasn't enabled client side health checks yet, "
+              + "thus the stoppable check result is inaccurate", clusterId);
+      LOG.error(errorMessage);
+      throw new HelixException(errorMessage);
     }
-    try {
-      String baseUrl = maybeBaseUrl.get();
-      StoppableCheck result =
-          new StoppableCheck(_customRestClient.getInstanceStoppableCheck(baseUrl, customPayLoads),
-              StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
-      if (!result.isStoppable()) {
-        return result;
+    Map<String, String> customPayLoads = getCustomPayLoads(jsonContent);
+    Map<String, Future<StoppableCheck>> customInstanceLevelChecks =
+        instancesForCustomInstanceLevelChecks.stream()
+            .collect(Collectors.toMap(Function.identity(),
+                instance -> POOL.submit(() -> performCustomInstanceCheck(clusterId,
instance,
+                    getBaseUrl(instance, restConfig), customPayLoads))));
+    List<String> instancesForCustomPartitionLevelChecks =
+        filterInstancesForNextCheck(customInstanceLevelChecks, finalStoppableChecks);
+    if (!instancesForCustomPartitionLevelChecks.isEmpty()) {
+      Map<String, StoppableCheck> instancePartitionLevelChecks = performPartitionsCheck(
+          instancesForCustomPartitionLevelChecks, restConfig, customPayLoads);
+      for (Map.Entry<String, StoppableCheck> instancePartitionStoppableCheckEntry :
instancePartitionLevelChecks
+          .entrySet()) {
+        finalStoppableChecks.put(instancePartitionStoppableCheckEntry.getKey(),
+            instancePartitionStoppableCheckEntry.getValue());
       }
-      LOG.info("{} passed client side instance level health checks", instanceName);
-      return performPartitionLevelChecks(clusterId, instanceName, baseUrl, customPayLoads);
-    } catch (IOException e) {
-      LOG.error("Failed to perform custom client side instance level health checks for {}/{}",
-          clusterId, instanceName, e);
-      throw e;
     }
+
+    return finalStoppableChecks;
   }
 
-  @VisibleForTesting
-  protected StoppableCheck performPartitionLevelChecks(String clusterId, String instanceName,
-      String baseUrl, Map<String, String> customPayLoads) throws IOException {
-    LOG.info("Perform partition level health checks for {}/{}", clusterId, instanceName);
-    // pull the health status from ZK
-    PartitionHealth clusterPartitionsHealth = generatePartitionHealthMapFromZK();
-    Map<String, List<String>> expiredPartitionsOnInstances =
-        clusterPartitionsHealth.getExpiredRecords();
-    // update the health status for those expired partitions on instances
-    try {
-      for (Map.Entry<String, List<String>> entry : expiredPartitionsOnInstances.entrySet())
{
-        Map<String, Boolean> partitionHealthStatus =
-            _customRestClient.getPartitionStoppableCheck(baseUrl, entry.getValue(), customPayLoads);
-        partitionHealthStatus.entrySet().forEach(kv -> clusterPartitionsHealth
-            .updatePartitionHealth(instanceName, kv.getKey(), kv.getValue()));
+  private List<String> filterInstancesForNextCheck(
+      Map<String, Future<StoppableCheck>> futureStoppableChecks,
+      Map<String, StoppableCheck> finalStoppableChecks) {
+    List<String> instancesForNextCheck = new ArrayList<>();
+    for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableChecks.entrySet())
{
+      String instance = entry.getKey();
+      try {
+        StoppableCheck stoppableCheck = entry.getValue().get();
+        if (!stoppableCheck.isStoppable()) {
+          finalStoppableChecks.put(instance, stoppableCheck);
+        } else {
+          instancesForNextCheck.add(instance);
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Failed to get StoppableChecks in parallel. Instance: {}", instance);
       }
-    } catch (IOException e) {
-      LOG.error("Failed to perform client side partition level health checks for {}/{}",
clusterId,
-          instanceName, e);
-      throw e;
     }
-    // sibling checks on partitions health for entire cluster
-    PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
-    List<ExternalView> externalViews =
-        _dataAccessor.getChildNames(propertyKeyBuilder.externalViews()).stream()
-            .map(externalView -> (ExternalView) _dataAccessor
-                .getProperty(propertyKeyBuilder.externalView(externalView)))
-            .collect(Collectors.toList());
-    List<String> unHealthyPartitions = InstanceValidationUtil.perPartitionHealthCheck(externalViews,
-        clusterPartitionsHealth.getGlobalPartitionHealth(), instanceName, _dataAccessor);
-    return new StoppableCheck(unHealthyPartitions.isEmpty(), unHealthyPartitions,
-        StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+
+    return instancesForNextCheck;
+  }
+
+  private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName)
{
+    LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
+    Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
+        InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
+
+    return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
   }
 
   private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException
{
     Map<String, String> result = new HashMap<>();
     JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent);
     // parsing the inputs as string key value pairs
-    jsonNode.fields().forEachRemaining(kv ->
-        result.put(kv.getKey(), kv.getValue().asText())
-    );
+    jsonNode.fields().forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asText()));
     return result;
   }
 
@@ -230,7 +240,8 @@ public class InstanceServiceImpl implements InstanceService {
     for (int i = 0; i < liveInstances.size(); i++) {
       String instance = liveInstances.get(i);
       // TODO: Check ZNRecord is null or not. Need logic to check whether the healthreports
exist
-      // or not. If it does not exist, we should query the participant directly for the health
report.
+      // or not. If it does not exist, we should query the participant directly for the health
+      // report.
       ZNRecord customizedHealth = healthReports.get(i).getRecord();
       for (String partitionName : customizedHealth.getMapFields().keySet()) {
         try {
@@ -260,68 +271,125 @@ public class InstanceServiceImpl implements InstanceService {
     return partitionHealth;
   }
 
+  private StoppableCheck performCustomInstanceCheck(String clusterId, String instanceName,
+      String baseUrl, Map<String, String> customPayLoads) throws IOException {
+    try {
+      return new StoppableCheck(
+          _customRestClient.getInstanceStoppableCheck(baseUrl, customPayLoads),
+          StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
+    } catch (IOException e) {
+      LOG.error("Failed to perform custom client side instance level health checks for {}/{}",
+          clusterId, instanceName, e);
+      throw e;
+    }
+  }
+
+  private Map<String, StoppableCheck> performPartitionsCheck(List<String> instances,
+      RESTConfig restConfig, Map<String, String> customPayLoads) {
+    PartitionHealth clusterPartitionsHealth = generatePartitionHealthMapFromZK();
+    // update the health status for those expired partitions on instances
+    Map<String, List<String>> expiredPartitionsByInstance =
+        clusterPartitionsHealth.getExpiredRecords();
+    Map<String, Future<Map<String, Boolean>>> updatedPartitionsHealthStatusByInstance
=
+        new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : expiredPartitionsByInstance.entrySet())
{
+      String instance = entry.getKey();
+      List<String> expiredPartitions = entry.getValue();
+      Callable<Map<String, Boolean>> refreshTask =
+          () -> _customRestClient.getPartitionStoppableCheck(getBaseUrl(instance, restConfig),
+              expiredPartitions, customPayLoads);
+      updatedPartitionsHealthStatusByInstance.put(instance, POOL.submit(refreshTask));
+    }
+    updatedPartitionsHealthStatusByInstance.entrySet().forEach(instanceToPartitionStatus
-> {
+      String instance = instanceToPartitionStatus.getKey();
+      try {
+        Map<String, Boolean> updatedPartitionsHealth = instanceToPartitionStatus.getValue().get();
+        updatedPartitionsHealth.entrySet().forEach(
+            partitionToHealthStatus -> clusterPartitionsHealth.updatePartitionHealth(instance,
+                partitionToHealthStatus.getKey(), partitionToHealthStatus.getValue()));
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Failed to get partition status on instance {}", instance, e);
+      }
+    });
+    // sibling checks on partitions health for entire cluster
+    PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
+    List<ExternalView> externalViews =
+        _dataAccessor.getChildNames(propertyKeyBuilder.externalViews()).stream()
+            .map(externalView -> (ExternalView) _dataAccessor
+                .getProperty(propertyKeyBuilder.externalView(externalView)))
+            .collect(Collectors.toList());
+
+    Map<String, StoppableCheck> instanceStoppableChecks = new HashMap<>();
+    for (String instanceName : instances) {
+      List<String> unHealthyPartitions =
+          InstanceValidationUtil.perPartitionHealthCheck(externalViews,
+              clusterPartitionsHealth.getGlobalPartitionHealth(), instanceName, _dataAccessor);
+      StoppableCheck stoppableCheck = new StoppableCheck(unHealthyPartitions.isEmpty(),
+          unHealthyPartitions, StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+      instanceStoppableChecks.put(instanceName, stoppableCheck);
+    }
+
+    return instanceStoppableChecks;
+  }
+
   @VisibleForTesting
   protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
       List<HealthCheck> healthChecks) {
     Map<String, Boolean> healthStatus = new HashMap<>();
     for (HealthCheck healthCheck : healthChecks) {
       switch (healthCheck) {
-        case INVALID_CONFIG:
-          healthStatus.put(HealthCheck.INVALID_CONFIG.name(),
-              InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName));
-          if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) {
-            LOG.error("The instance {} doesn't have valid configuration", instanceName);
-            return healthStatus;
-          }
-        case INSTANCE_NOT_ENABLED:
-          healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(), InstanceValidationUtil
-              .isEnabled(_dataAccessor, instanceName));
-          break;
-        case INSTANCE_NOT_ALIVE:
-          healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(),
-              InstanceValidationUtil.isAlive(_dataAccessor, instanceName));
-          break;
-        case INSTANCE_NOT_STABLE:
-          boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName);
-          healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable);
-          break;
-        case HAS_ERROR_PARTITION:
-          healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(),
-              !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName));
-          break;
-        case HAS_DISABLED_PARTITION:
-          healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(),
-              !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName));
-          break;
-        case EMPTY_RESOURCE_ASSIGNMENT:
-          healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
-              InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
-          break;
-        case MIN_ACTIVE_REPLICA_CHECK_FAILED:
-          healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
-              InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
-          break;
-        default:
-          LOG.error("Unsupported health check: {}", healthCheck);
-          break;
+      case INVALID_CONFIG:
+        healthStatus.put(HealthCheck.INVALID_CONFIG.name(),
+            InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName));
+        if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) {
+          LOG.error("The instance {} doesn't have valid configuration", instanceName);
+          return healthStatus;
+        }
+      case INSTANCE_NOT_ENABLED:
+        healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(),
+            InstanceValidationUtil.isEnabled(_dataAccessor, instanceName));
+        break;
+      case INSTANCE_NOT_ALIVE:
+        healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(),
+            InstanceValidationUtil.isAlive(_dataAccessor, instanceName));
+        break;
+      case INSTANCE_NOT_STABLE:
+        boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName);
+        healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable);
+        break;
+      case HAS_ERROR_PARTITION:
+        healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(),
+            !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName));
+        break;
+      case HAS_DISABLED_PARTITION:
+        healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(),
+            !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName));
+        break;
+      case EMPTY_RESOURCE_ASSIGNMENT:
+        healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
+            InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
+        break;
+      case MIN_ACTIVE_REPLICA_CHECK_FAILED:
+        healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
+            InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
+        break;
+      default:
+        LOG.error("Unsupported health check: {}", healthCheck);
+        break;
       }
     }
 
     return healthStatus;
   }
 
-  private Optional<String> getBaseUrl(String instance, String clusterId) {
-    RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
-    if (restConfig == null) {
-      LOG.error("The cluster {} hasn't enabled client side health checks yet", clusterId);
-      return Optional.empty();
-    }
+  private String getBaseUrl(String instance, RESTConfig restConfig) {
     String baseUrl = restConfig.get(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL);
-    // pre-assumption of the url, must be format of "http://*/path", the wildcard is replaceable
by the instance vip
+    // pre-assumption of the url, must be format of "http://*/path", the wildcard is replaceable
by
+    // the instance vip
     assert baseUrl.contains("*");
     // pre-assumption of the instance name, must be format of <instanceVip>_<port>
     assert instance.contains("_");
     String instanceVip = instance.substring(0, instance.indexOf('_'));
-    return Optional.of(baseUrl.replace("*", instanceVip));
+    return baseUrl.replace("*", instanceVip);
   }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
index 2945580..8e17947 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
@@ -19,7 +19,13 @@ package org.apache.helix.rest.server.service;
  * under the License.
  */
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyMap;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -28,12 +34,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.client.CustomRestClient;
+import org.apache.helix.rest.common.HelixDataAccessorWrapper;
 import org.apache.helix.rest.server.json.cluster.PartitionHealth;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
 import org.mockito.Mock;
@@ -45,12 +51,13 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+
 public class TestInstanceService {
   private static final String TEST_CLUSTER = "TestCluster";
   private static final String TEST_INSTANCE = "instance0.linkedin.com_1235";
 
   @Mock
-  private HelixDataAccessor _dataAccessor;
+  private HelixDataAccessorWrapper _dataAccessor;
   @Mock
   private ConfigAccessor _configAccessor;
   @Mock
@@ -110,12 +117,6 @@ public class TestInstanceService {
           List<HealthCheck> healthChecks) {
         return Collections.emptyMap();
       }
-
-      @Override
-      protected StoppableCheck performPartitionLevelChecks(String clusterId, String instanceName,
String baseUrl,
-          Map<String, String> customPayLoads) throws IOException {
-        return new StoppableCheck(false, Collections.emptyList(), StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
-      }
     };
 
     // partition is health on the test instance but unhealthy on the sibling instance


Mime
View raw message