helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 28/44: Integrate customRestClient health check with instance service main logic
Date Sat, 25 May 2019 01:20:02 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d93f05c6c750346c2ccfe80e89aaa0fee150a982
Author: Yi Wang <ywang4@linkedin.com>
AuthorDate: Thu May 2 17:45:08 2019 -0700

    Integrate customRestClient health check with instance service main logic
    
    RB=1645567
    G=helix-reviewers
    A=jxue
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 .../main/java/org/apache/helix/ConfigAccessor.java |  10 +-
 .../java/org/apache/helix/model/RESTConfig.java    |  45 ++--
 .../apache/helix/util/InstanceValidationUtil.java  |  72 +++---
 .../helix/util/TestInstanceValidationUtil.java     |  28 +-
 .../apache/helix/rest/client/CustomRestClient.java |   3 +-
 .../helix/rest/client/CustomRestClientFactory.java |   3 +-
 .../helix/rest/client/CustomRestClientImpl.java    |   2 +-
 .../rest/server/json/cluster/PartitionHealth.java  |  40 ++-
 .../rest/server/json/instance/StoppableCheck.java  |  60 +++--
 .../server/resources/helix/InstancesAccessor.java  |   6 +-
 .../resources/helix/PerInstanceAccessor.java       |  58 ++---
 .../helix/rest/server/service/InstanceService.java |  27 +-
 .../rest/server/service/InstanceServiceImpl.java   | 287 ++++++++++++---------
 .../server/json/instance/TestStoppableCheck.java   |  26 +-
 .../rest/server/service/TestInstanceService.java   | 188 ++++++++++----
 15 files changed, 468 insertions(+), 387 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 1d4c5e8..15ea4a0 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,12 +27,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.model.ResourceConfig;
@@ -563,11 +563,11 @@ public class ConfigAccessor {
   }
 
   /**
-   * Get ClusterConfig of the given cluster.
+   * Get RestConfig of the given cluster.
    *
-   * @param clusterName
+   * @param clusterName The cluster
    *
-   * @return
+   * @return The instance of {@link RESTConfig}
    */
   public RESTConfig getRESTConfig(String clusterName) {
     HelixConfigScope scope =
@@ -575,7 +575,7 @@ public class ConfigAccessor {
     ZNRecord record = getConfigZnRecord(scope);
 
     if (record == null) {
-      LOG.warn("No config found at " + scope.getZkPath());
+      LOG.warn("No rest config found at " + scope.getZkPath());
       return null;
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
index a47226c..6683f72 100644
--- a/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/RESTConfig.java
@@ -3,18 +3,20 @@ package org.apache.helix.model;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 
-public class RESTConfig extends HelixProperty {
-  public enum RESTConfigProperty {
-    CUSTOMIZED_HEALTH_URL // User customized URL for getting participant health status or partition
-                          // health status.
-  }
 
+/**
+ * The configuration entry for persisting the client side rest endpoint
+ * The rest endpoint is used for helix to fetch the health status or other important status of the participant at runtime
+ */
+public class RESTConfig extends HelixProperty {
   /**
-   * Instantiate REST config for a specific cluster
-   * @param cluster the cluster identifier
+   * Corresponds to "simpleFields" concept in ZnNode
    */
-  public RESTConfig(String cluster) {
-    super(cluster);
+  public enum SimpleFields {
+    /**
+     * Customized URL for getting participant(instance)'s health status or partition's health status.
+     */
+    CUSTOMIZED_HEALTH_URL
   }
 
   /**
@@ -26,26 +28,15 @@ public class RESTConfig extends HelixProperty {
     super(record);
   }
 
-  /**
-   * Set up the user defined URL for check per participant health / per partition health by combine
-   * URL and final endpoint. It must ended without "/"
-   *
-   * eg: http://*:12345/customized/path/check
-   *
-   * @param customizedHealthURL
-   */
-  public void setCustomizedHealthURL(String customizedHealthURL) {
-    _record.setSimpleField(RESTConfigProperty.CUSTOMIZED_HEALTH_URL.name(), customizedHealthURL);
+  public RESTConfig(String id) {
+    super(id);
   }
 
-  /**
-   * Get user defined URL to construct per participant health / partition health
-   * Return null if it does not exist.
-   *
-   * @return
-   */
-  public String getCustomizedHealthURL() {
-    return _record.getSimpleField(RESTConfigProperty.CUSTOMIZED_HEALTH_URL.name());
+  public void set(SimpleFields property, String value) {
+    _record.setSimpleField(property.name(), value);
   }
 
+  public String get(SimpleFields property) {
+    return _record.getSimpleField(property.name());
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index 6ac9605..7d23117 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -19,29 +19,21 @@ package org.apache.helix.util;
  * under the License.
  */
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.helix.ConfigAccessor;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.*;
 import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
 /**
  * Utility class for validating Helix properties
  * Warning: each method validates one single property of instance individually and independently.
@@ -53,27 +45,19 @@ public class InstanceValidationUtil {
   public static Set<String> UNHEALTHY_STATES =
       ImmutableSet.of(HelixDefinedState.DROPPED.name(), HelixDefinedState.ERROR.name());
 
-  public enum HealthStatusType {
-    instanceHealthStatus,
-    partitionHealthStatus
-  }
-
   private InstanceValidationUtil() {
   }
 
   /**
    * Method to check if the instance is enabled by configuration
    * @param dataAccessor
-   * @param configAccessor
-   * @param clusterId
    * @param instanceName
    * @return
    */
-  public static boolean isEnabled(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor,
-      String clusterId, String instanceName) {
+  public static boolean isEnabled(HelixDataAccessor dataAccessor, String instanceName) {
     PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
     InstanceConfig instanceConfig = dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instanceName));
-    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterId);
+    ClusterConfig clusterConfig = dataAccessor.getProperty(propertyKeyBuilder.clusterConfig());
     // TODO deprecate instance level config checks once migrated the enable status to cluster config only
     if (instanceConfig == null || clusterConfig == null) {
       throw new HelixException("InstanceConfig or ClusterConfig is NULL");
@@ -89,12 +73,10 @@ public class InstanceValidationUtil {
   /**
    * Method to check if the instance is up and running by configuration
    * @param dataAccessor
-   * @param clusterId
    * @param instanceName
    * @return
    */
-  public static boolean isAlive(HelixDataAccessor dataAccessor, String clusterId,
-      String instanceName) {
+  public static boolean isAlive(HelixDataAccessor dataAccessor, String instanceName) {
     LiveInstance liveInstance = dataAccessor.getProperty(dataAccessor.keyBuilder().liveInstance(instanceName));
     return liveInstance != null;
   }
@@ -193,34 +175,42 @@ public class InstanceValidationUtil {
   }
 
   /**
-   * Perform sibling node partition health check
-   * @param partitionHealthMap
-   * @return
+   * Get the problematic partitions on the to-be-stop instance
+   * Requirement:
+   *  If the instance gets stopped and the partitions on the instance are OFFLINE,
+   *  the cluster still have enough "healthy" replicas on other sibling instances
+   *
+   *  - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance
+   *
+   * @param globalPartitionHealthStatus (instance => (partition name, health status))
+   * @param instanceToBeStop The instance to be stopped
+   * @param dataAccessor The data accessor
+   * @return A list of problematic partitions if the instance is stopped
    */
   public static List<String> perPartitionHealthCheck(List<ExternalView> externalViews,
-      Map<String, Map<String, Boolean>> partitionHealthMap, String instanceName,
-      HelixDataAccessor accessor) {
+      Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
+      HelixDataAccessor dataAccessor) {
     List<String> unhealthyPartitions = new ArrayList<>();
 
     for (ExternalView externalView : externalViews) {
-      StateModelDefinition stateModelDefinition = accessor
-          .getProperty(accessor.keyBuilder().stateModelDef(externalView.getStateModelDefRef()));
+      StateModelDefinition stateModelDefinition = dataAccessor
+          .getProperty(dataAccessor.keyBuilder().stateModelDef(externalView.getStateModelDefRef()));
       for (String partition : externalView.getPartitionSet()) {
         Map<String, String> stateMap = externalView.getStateMap(partition);
         // Only check if instance holds top state
-        if (stateMap.containsKey(instanceName) && stateMap.get(instanceName)
-            .equals(stateModelDefinition.getTopState())) {
+        if (stateMap.containsKey(instanceToBeStop)
+            && stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
           for (String siblingInstance : stateMap.keySet()) {
             // Skip this self check
-            if (siblingInstance.equals(instanceName)) {
+            if (siblingInstance.equals(instanceToBeStop)) {
               continue;
             }
 
             // We are checking sibling partition healthy status. So if partition health does not
             // exist or it is not healthy. We should mark this partition is unhealthy.
-            if (!partitionHealthMap.containsKey(siblingInstance) || !partitionHealthMap
-                .get(siblingInstance).containsKey(partition)
-                || !partitionHealthMap.get(siblingInstance).get(partition)) {
+            if (!globalPartitionHealthStatus.containsKey(siblingInstance)
+                || !globalPartitionHealthStatus.get(siblingInstance).containsKey(partition)
+                || !globalPartitionHealthStatus.get(siblingInstance).get(partition)) {
               unhealthyPartitions.add(partition);
               break;
             }
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
index 5f72ee8..cbbbfd6 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSet;
 public class TestInstanceValidationUtil {
   private static final String TEST_CLUSTER = "testCluster";
   private static final String TEST_INSTANCE = "instance0";
+  private static final PropertyKey.Builder BUILDER = new PropertyKey.Builder(TEST_CLUSTER);
 
   @DataProvider
   Object[][] isEnabledTestSuite() {
@@ -52,15 +53,15 @@ public class TestInstanceValidationUtil {
     InstanceConfig instanceConfig = new InstanceConfig(TEST_INSTANCE);
     instanceConfig.setInstanceEnabled(instanceConfigEnabled);
     doReturn(instanceConfig).when(mock.dataAccessor)
-        .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS)));
+        .getProperty(BUILDER.instanceConfig(TEST_INSTANCE));
     ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER);
     if (!clusterConfigEnabled) {
       clusterConfig.setDisabledInstances(ImmutableMap.of(TEST_INSTANCE, "12345"));
     }
-    when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(clusterConfig);
+    doReturn(clusterConfig).when(mock.dataAccessor)
+        .getProperty(BUILDER.clusterConfig());
 
-    boolean isEnabled = InstanceValidationUtil.isEnabled(mock.dataAccessor, mock.configAccessor,
-        TEST_CLUSTER, TEST_INSTANCE);
+    boolean isEnabled = InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
 
     Assert.assertEquals(isEnabled, expected);
   }
@@ -70,11 +71,8 @@ public class TestInstanceValidationUtil {
     Mock mock = new Mock();
     doReturn(null).when(mock.dataAccessor)
         .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS)));
-    when(mock.configAccessor.getClusterConfig(TEST_CLUSTER))
-        .thenReturn(new ClusterConfig(TEST_CLUSTER));
 
-    InstanceValidationUtil.isEnabled(mock.dataAccessor, mock.configAccessor, TEST_CLUSTER,
-        TEST_INSTANCE);
+    InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
   }
 
   @Test(expectedExceptions = HelixException.class)
@@ -82,10 +80,10 @@ public class TestInstanceValidationUtil {
     Mock mock = new Mock();
     doReturn(new InstanceConfig(TEST_INSTANCE)).when(mock.dataAccessor)
         .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS)));
-    when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(null);
+    doReturn(null).when(mock.dataAccessor)
+        .getProperty(BUILDER.clusterConfig());
 
-    InstanceValidationUtil.isEnabled(mock.dataAccessor, mock.configAccessor, TEST_CLUSTER,
-        TEST_INSTANCE);
+    InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
   }
 
   @Test
@@ -94,8 +92,7 @@ public class TestInstanceValidationUtil {
     doReturn(new LiveInstance(TEST_INSTANCE)).when(mock.dataAccessor)
         .getProperty(argThat(new PropertyKeyArgument(PropertyType.LIVEINSTANCES)));
 
-    Assert
-        .assertTrue(InstanceValidationUtil.isAlive(mock.dataAccessor, TEST_CLUSTER, TEST_INSTANCE));
+    Assert.assertTrue(InstanceValidationUtil.isAlive(mock.dataAccessor, TEST_INSTANCE));
   }
 
   @Test
@@ -263,7 +260,8 @@ public class TestInstanceValidationUtil {
     IdealState idealState = mock(IdealState.class);
     when(idealState.isEnabled()).thenReturn(true);
     when(idealState.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
-    when(idealState.getInstanceStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master"));
+    when(idealState.getInstanceStateMap("db0"))
+        .thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master"));
     when(idealState.isValid()).thenReturn(true);
     when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
     doReturn(idealState).when(mock.dataAccessor)
@@ -416,7 +414,7 @@ public class TestInstanceValidationUtil {
     Mock() {
       this.dataAccessor = mock(HelixDataAccessor.class);
       this.configAccessor = mock(ConfigAccessor.class);
-      when(dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
+      when(dataAccessor.keyBuilder()).thenReturn(BUILDER);
     }
   }
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
index afa4ef3..8b7c5ba 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
@@ -39,7 +39,8 @@ public interface CustomRestClient {
       throws IOException;
 
   /**
-   * Get stoppable check result on partition
+   * Get stoppable check result on a list of partitions on the instance
+   *
    * @param baseUrl the base url of the participant
    * @param partitions a list of partitions maintained by the participant
    * @param customPayloads generic payloads required from client side and helix only works as proxy
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
index 362818c..7a1c58f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
@@ -33,12 +33,13 @@ public class CustomRestClientFactory {
   private CustomRestClientFactory() {
   }
 
-  public static CustomRestClient get(String jsonContent) {
+  public static CustomRestClient get() {
     if (INSTANCE == null) {
       synchronized (CustomRestClientFactory.class) {
         if (INSTANCE == null) {
           try {
             INSTANCE = new CustomRestClientImpl();
+            return INSTANCE;
           } catch (Exception e) {
             LOG.error("Exception when initializing CustomRestClient", e);
           }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index 0db5a9b..1a6af22 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -145,4 +145,4 @@ class CustomRestClientImpl implements CustomRestClient {
       throw e;
     }
   }
-}
+}
\ No newline at end of file
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java
index 1fc2564..a09e383 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java
@@ -20,52 +20,44 @@ package org.apache.helix.rest.server.json.cluster;
  */
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
 
 public class PartitionHealth {
   // Partition health map stores the global metadata about the partition health in the format of
   // instanceName -> partitionName -> isHealthy
-  private Map<String, Map<String, Boolean>> _paritionHealthMap;
-  private Map<String, List<String>> _instancesThatNeedDirectCallWithPartitions;
+  private Map<String, Map<String, Boolean>> _instanceToPartitionHealthMap;
+  private Map<String, List<String>> _instanceToPartitionsMap;
 
   public PartitionHealth() {
-    _paritionHealthMap = new HashMap<>();
-    _instancesThatNeedDirectCallWithPartitions = new HashMap<>();
+    _instanceToPartitionHealthMap = new HashMap<>();
+    _instanceToPartitionsMap = new HashMap<>();
   }
 
   public void addInstanceThatNeedDirectCallWithPartition(String instanceName,
       String partitionName) {
-    _instancesThatNeedDirectCallWithPartitions
+    _instanceToPartitionsMap
         .computeIfAbsent(instanceName, partitions -> new ArrayList<>()).add(partitionName);
   }
 
-  public void setPartitionHealthForInstance(String instanceName,
-      Map<String, Boolean> partitionHealth) {
-    _paritionHealthMap.put(instanceName, partitionHealth);
-  }
-
   public void addSinglePartitionHealthForInstance(String instanceName, String partitionName,
       Boolean isHealthy) {
-    _paritionHealthMap.computeIfAbsent(instanceName, partitionMap -> new HashMap<>())
+    _instanceToPartitionHealthMap.computeIfAbsent(instanceName, partitionMap -> new HashMap<>())
         .put(partitionName, isHealthy);
   }
 
-  public List<String> getInstanceThatNeedDirectCallWithPartitions(String instanceName) {
-    return _instancesThatNeedDirectCallWithPartitions.getOrDefault(instanceName,
-        Collections.EMPTY_LIST);
+  public Map<String, List<String>> getExpiredRecords() {
+      return _instanceToPartitionsMap;
   }
 
-  public Map<String, Map<String, Boolean>> getParitionHealthMap() {
-    return _paritionHealthMap;
+  public void updatePartitionHealth(String instance, String partition, boolean isHealthy) {
+    _instanceToPartitionHealthMap.get(instance).put(partition, isHealthy);
   }
 
-  public void removePartitionHealthForInstance(String instanceName) {
-    _paritionHealthMap.remove(instanceName);
+  public Map<String, Map<String, Boolean>> getGlobalPartitionHealth() {
+    return _instanceToPartitionHealthMap;
   }
 
   @Override
@@ -74,8 +66,8 @@ public class PartitionHealth {
       return false;
     }
 
-    return _paritionHealthMap.equals(((PartitionHealth) o)._paritionHealthMap)
-        && _instancesThatNeedDirectCallWithPartitions
-        .equals(((PartitionHealth) o)._instancesThatNeedDirectCallWithPartitions);
+    return _instanceToPartitionHealthMap.equals(((PartitionHealth) o)._instanceToPartitionHealthMap)
+        && _instanceToPartitionsMap
+        .equals(((PartitionHealth) o)._instanceToPartitionsMap);
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
index b78a9c6..44458b3 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/instance/StoppableCheck.java
@@ -19,54 +19,52 @@ package org.apache.helix.rest.server.json.instance;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 public class StoppableCheck {
-  private static final String HELIX_CHECK_PREFIX = "Helix:";
-  private static final String CUSTOM_CHECK_PREFIX = "Custom:";
+  // Category to differentiate which step the check fails
+  public enum Category {
+    HELIX_OWN_CHECK("Helix:"),
+    CUSTOM_INSTANCE_CHECK("CustomInstance:"),
+    CUSTOM_PARTITION_CHECK("CustomPartition:");
+
+    String prefix;
+
+    Category(String prefix) {
+      this.prefix = prefix;
+    }
+  }
 
   @JsonProperty("stoppable")
   private boolean isStoppable;
+  // The list of failed checks should be sorted to make test consistent pass
   @JsonProperty("failedChecks")
   private List<String> failedChecks;
 
-  public StoppableCheck(boolean isStoppable, List<String> failedChecks) {
+  public StoppableCheck(boolean isStoppable, List<String> failedChecks, Category category) {
     this.isStoppable = isStoppable;
-    // sort the failed checks in order so that tests can always pass
-    Collections.sort(failedChecks);
-    this.failedChecks = failedChecks;
+    this.failedChecks = failedChecks.stream()
+        .sorted()
+        .map(checkName -> appendPrefix(checkName, category))
+        .collect(Collectors.toList());
   }
 
-  public static StoppableCheck mergeStoppableChecks(Map<String, Boolean> helixChecks, Map<String, Boolean> customChecks) {
-    Map<String, Boolean> mergedResult = ImmutableMap.<String, Boolean>builder()
-        .putAll(appendPrefix(helixChecks, HELIX_CHECK_PREFIX))
-        .putAll(appendPrefix(customChecks, CUSTOM_CHECK_PREFIX))
-        .build();
-
-    List<String> failedChecks = new ArrayList<>();
-    for (Map.Entry<String, Boolean> entry : mergedResult.entrySet()) {
-      if (!entry.getValue()) {
-        failedChecks.add(entry.getKey());
-      }
-    }
-
-    return new StoppableCheck(failedChecks.isEmpty(), failedChecks);
+  public StoppableCheck(Map<String, Boolean> checks, Category category) {
+    this.failedChecks = Maps.filterValues(checks, Boolean.FALSE::equals).keySet()
+        .stream()
+        .sorted()
+        .map(checkName -> appendPrefix(checkName, category))
+        .collect(Collectors.toList());
+    this.isStoppable = this.failedChecks.isEmpty();
   }
 
-  private static Map<String, Boolean> appendPrefix(Map<String, Boolean> checks, String prefix) {
-    Map<String, Boolean> result = new HashMap<>();
-    for (Map.Entry<String, Boolean> entry : checks.entrySet()) {
-      result.put(prefix + entry.getKey(), entry.getValue());
-    }
-
-    return result;
+  private String appendPrefix(String checkName, Category category) {
+    return category.prefix + checkName;
   }
 
   public boolean isStoppable() {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
index 3ff4e13..99449be 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java
@@ -20,8 +20,10 @@ import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.rest.common.HelixDataAccessorWrapper;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
 import org.apache.helix.rest.server.resources.exceptions.HelixHealthException;
 import org.apache.helix.rest.server.service.ClusterService;
@@ -171,13 +173,13 @@ public class InstancesAccessor extends AbstractHelixResource {
       ObjectNode failedStoppableInstances = result.putObject(
           InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
       InstanceService instanceService =
-          new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
+          new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) getDataAccssor(clusterId)), getConfigAccessor());
       switch (selectionBase) {
       case zone_based:
         List<String> zoneBasedInstance = getZoneBasedInstances(clusterId, instances, orderOfZone);
         for (String instance : zoneBasedInstance) {
           StoppableCheck stoppableCheckResult =
-              instanceService.checkSingleInstanceStoppable(clusterId, instance, customizedInput);
+              instanceService.getInstanceStoppableCheck(clusterId, instance, customizedInput);
           if (!stoppableCheckResult.isStoppable()) {
             ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance);
             for (String failedReason : stoppableCheckResult.getFailedChecks()) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index e95f0a2..6b615ee 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -19,32 +19,17 @@ package org.apache.helix.rest.server.resources.helix;
  * under the License.
  */
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.List;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
+
+import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
+
+import org.apache.helix.*;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.*;
 import org.apache.helix.model.Error;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.rest.common.HelixDataAccessorWrapper;
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
@@ -59,10 +44,11 @@ import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @Path("/clusters/{clusterId}/instances/{instanceName}")
 public class PerInstanceAccessor extends AbstractHelixResource {
-  private final static Logger _logger = LoggerFactory.getLogger(PerInstanceAccessor.class);
+  private final static Logger LOG = LoggerFactory.getLogger(PerInstanceAccessor.class);
 
   public enum PerInstanceProperties {
     config,
@@ -86,8 +72,7 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
     // TODO reduce GC by dependency injection
     InstanceService instanceService =
-        new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor),
-            getConfigAccessor());
+        new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
     InstanceInfo instanceInfo = instanceService.getInstanceInfo(clusterId, instanceName,
         InstanceService.HealthCheck.STARTED_AND_HEALTH_CHECK_LIST);
 
@@ -100,13 +85,16 @@ public class PerInstanceAccessor extends AbstractHelixResource {
   public Response isInstanceStoppable(String jsonContent, @PathParam("clusterId") String clusterId,
       @PathParam("instanceName") String instanceName) throws IOException {
     ObjectMapper objectMapper = new ObjectMapper();
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    InstanceService instanceService =
+        new InstanceServiceImpl(new HelixDataAccessorWrapper((ZKHelixDataAccessor) dataAccessor), getConfigAccessor());
     StoppableCheck stoppableCheck = null;
     try {
-      stoppableCheck = new InstanceServiceImpl(getDataAccssor(clusterId), getConfigAccessor())
-          .checkSingleInstanceStoppable(clusterId, instanceName, jsonContent);
+      stoppableCheck =
+          instanceService.getInstanceStoppableCheck(clusterId, instanceName, jsonContent);
     } catch (HelixException e) {
-      _logger
-          .error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
+      LOG.error(String.format("Current cluster %s has issue with health checks!", clusterId),
+          e);
       return serverError(e);
     }
     return OK(objectMapper.writeValueAsString(stoppableCheck));
@@ -120,14 +108,14 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     try {
       record = toZNRecord(content);
     } catch (IOException e) {
-      _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+      LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e);
       return badRequest("Input is not a vaild ZNRecord!");
     }
 
     try {
       admin.addInstance(clusterId, new InstanceConfig(record));
     } catch (Exception ex) {
-      _logger.error("Error in adding an instance: " + instanceName, ex);
+      LOG.error("Error in adding an instance: " + instanceName, ex);
       return serverError(ex);
     }
 
@@ -207,11 +195,11 @@ public class PerInstanceAccessor extends AbstractHelixResource {
                     OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)));
         break;
       default:
-        _logger.error("Unsupported command :" + command);
+        LOG.error("Unsupported command :" + command);
         return badRequest("Unsupported command :" + command);
       }
     } catch (Exception e) {
-      _logger.error("Failed in updating instance : " + instanceName, e);
+      LOG.error("Failed in updating instance : " + instanceName, e);
       return badRequest(e.getMessage());
     }
     return OK();
@@ -266,7 +254,7 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     try {
       record = toZNRecord(content);
     } catch (IOException e) {
-      _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
+      LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e);
       return badRequest("Input is not a vaild ZNRecord!");
     }
     InstanceConfig instanceConfig = new InstanceConfig(record);
@@ -288,7 +276,7 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     } catch (HelixException ex) {
       return notFound(ex.getMessage());
     } catch (Exception ex) {
-      _logger.error(String.format("Error in update instance config for instance: %s", instanceName),
+      LOG.error(String.format("Error in update instance config for instance: %s", instanceName),
           ex);
       return serverError(ex);
     }
@@ -423,14 +411,14 @@ public class PerInstanceAccessor extends AbstractHelixResource {
     List<String> messageNames =
         accessor.getChildNames(accessor.keyBuilder().messages(instanceName));
     if (messageNames == null || messageNames.size() == 0) {
-      _logger.warn("Unable to get any messages on instance: " + instanceName);
+      LOG.warn("Unable to get any messages on instance: " + instanceName);
       return notFound();
     }
 
     for (String messageName : messageNames) {
       Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageName));
       if (message == null) {
-        _logger.warn("Message is deleted given message name: ", messageName);
+        LOG.warn("Message is deleted given message name: ", messageName);
         continue;
       }
       // if stateModelDef is valid, keep messages with StateModelDef equals to the parameter
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index 3e642cb..51bc354 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -22,12 +22,11 @@ package org.apache.helix.rest.server.service;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.helix.rest.server.json.instance.StoppableCheck;
 
 public interface InstanceService {
   enum HealthCheck {
@@ -79,24 +78,20 @@ public interface InstanceService {
   }
 
   /**
-   * Get the current instance stoppable checks based on Helix own business logic
-   * @param clusterId
-   * @param instanceName
-   * @return a map where key is stoppable check name and boolean value represents whether the check
-   *         succeeds
-   */
-  Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
-      List<HealthCheck> healthChecks);
-
-  /**
    * Get the overall status of the instance
-   * @param clusterId
-   * @param instanceName
-   * @return
+   * @param clusterId The cluster id
+   * @param instanceName The instance name
+   * @return An instance of {@link InstanceInfo} easily convertible to JSON
    */
   InstanceInfo getInstanceInfo(String clusterId, String instanceName,
       List<HealthCheck> healthChecks);
 
-  StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
+  /**
+   * Get the current instance stoppable checks
+   * @param clusterId The cluster id
+   * @param instanceName The instance name
+   * @return An instance of {@link StoppableCheck} easily convertible to JSON
+   */
+  StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
       String jsonContent) throws IOException;
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index a01912a..310e2b9 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -25,13 +25,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.RESTConfig;
@@ -44,69 +47,34 @@ import org.apache.helix.util.InstanceValidationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+
+
 public class InstanceServiceImpl implements InstanceService {
   private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
 
   private static final String PARTITION_HEALTH_KEY = "PARTITION_HEALTH";
   private static final String IS_HEALTHY_KEY = "IS_HEALTHY";
   private static final String EXPIRY_KEY = "EXPIRE";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   private final HelixDataAccessor _dataAccessor;
   private final ConfigAccessor _configAccessor;
+  private final CustomRestClient _customRestClient;
 
   public InstanceServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor) {
     _dataAccessor = dataAccessor;
     _configAccessor = configAccessor;
+    _customRestClient = CustomRestClientFactory.get();
   }
 
-  @Override
-  public Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
-      List<HealthCheck> healthChecks) {
-    Map<String, Boolean> healthStatus = new HashMap<>();
-    for (HealthCheck healthCheck : healthChecks) {
-      switch (healthCheck) {
-      case INVALID_CONFIG:
-        healthStatus.put(HealthCheck.INVALID_CONFIG.name(),
-            InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName));
-        if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) {
-          LOG.error("The instance {} doesn't have valid configuration", instanceName);
-          return healthStatus;
-        }
-      case INSTANCE_NOT_ENABLED:
-        healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(), InstanceValidationUtil
-            .isEnabled(_dataAccessor, _configAccessor, clusterId, instanceName));
-        break;
-      case INSTANCE_NOT_ALIVE:
-        healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(),
-            InstanceValidationUtil.isAlive(_dataAccessor, clusterId, instanceName));
-        break;
-      case INSTANCE_NOT_STABLE:
-        boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName);
-        healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable);
-        break;
-      case HAS_ERROR_PARTITION:
-        healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(),
-            !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName));
-        break;
-      case HAS_DISABLED_PARTITION:
-        healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(),
-            !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName));
-        break;
-      case EMPTY_RESOURCE_ASSIGNMENT:
-        healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
-            InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
-        break;
-      case MIN_ACTIVE_REPLICA_CHECK_FAILED:
-        healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
-            InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
-        break;
-      default:
-        LOG.error("Unsupported health check: {}", healthCheck);
-        break;
-      }
-    }
-
-    return healthStatus;
+  @VisibleForTesting
+  InstanceServiceImpl(HelixDataAccessor dataAccessor, ConfigAccessor configAccessor, CustomRestClient customRestClient) {
+    _dataAccessor = dataAccessor;
+    _configAccessor = configAccessor;
+    _customRestClient = customRestClient;
   }
 
   @Override
@@ -139,35 +107,116 @@ public class InstanceServiceImpl implements InstanceService {
       instanceInfoBuilder.partitions(partitions);
     }
     try {
-      Map<String, Boolean> healthStatus = getInstanceHealthStatus(clusterId, instanceName, healthChecks);
+      Map<String, Boolean> healthStatus =
+          getInstanceHealthStatus(clusterId, instanceName, healthChecks);
       instanceInfoBuilder.healthStatus(healthStatus);
     } catch (HelixException ex) {
-      LOG.error("Exception while getting health status: {}, reporting health status as unHealth", ex);
+      LOG.error(
+          "Exception while getting health status. Cluster: {}, Instance: {}, reporting health status as unHealth",
+          clusterId, instanceName, ex);
       instanceInfoBuilder.healthStatus(false);
     }
 
     return instanceInfoBuilder.build();
   }
 
-
+  /**
+   * {@inheritDoc}
+   * Step 1: Perform instance level Helix own health checks
+   * Step 2: Perform instance level client side health checks
+   * Step 3: Perform partition level (all partitions on the instance) client side health checks
+   * Note: if the check fails at one step, all the following steps won't be executed because the instance cannot be stopped
+   */
   @Override
-  public StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
+  public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
       String jsonContent) throws IOException {
-    // TODO reduce GC by dependency injection
-    Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId,
-        instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
-    CustomRestClient customClient = CustomRestClientFactory.get(jsonContent);
+    LOG.info("Perform instance level helix own health checks for {}/{}", clusterId, instanceName);
+    Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId, instanceName,
+        InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
+
+    StoppableCheck result =
+        new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
+    if (!result.isStoppable()) {
+      return result;
+    }
+    LOG.info("{} passed helix side health checks", instanceName);
+    return performCustomInstanceChecks(clusterId, instanceName, getCustomPayLoads(jsonContent));
+  }
+
+  @VisibleForTesting
+  protected StoppableCheck performCustomInstanceChecks(String clusterId, String instanceName,
+      Map<String, String> customPayLoads) throws IOException {
+    StoppableCheck defaultSucceed = new StoppableCheck(true, Collections.emptyList(),
+        StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
+    LOG.info("Perform instance level client side health checks for {}/{}", clusterId, instanceName);
+    Optional<String> maybeBaseUrl = getBaseUrl(instanceName, clusterId);
+    if (!maybeBaseUrl.isPresent()) {
+      LOG.warn("Unable to get custom client health endpoint: " + instanceName);
+      return defaultSucceed;
+    }
+    try {
+      String baseUrl = maybeBaseUrl.get();
+      StoppableCheck result =
+          new StoppableCheck(_customRestClient.getInstanceStoppableCheck(baseUrl, customPayLoads),
+              StoppableCheck.Category.CUSTOM_INSTANCE_CHECK);
+      if (!result.isStoppable()) {
+        return result;
+      }
+      LOG.info("{} passed client side instance level health checks", instanceName);
+      return performPartitionLevelChecks(clusterId, instanceName, baseUrl, customPayLoads);
+    } catch (IOException e) {
+      LOG.error("Failed to perform custom client side instance level health checks for {}/{}",
+          clusterId, instanceName, e);
+      throw e;
+    }
+  }
+
+  @VisibleForTesting
+  protected StoppableCheck performPartitionLevelChecks(String clusterId, String instanceName,
+      String baseUrl, Map<String, String> customPayLoads) throws IOException {
+    LOG.info("Perform partition level health checks for {}/{}", clusterId, instanceName);
+    // pull the health status from ZK
+    PartitionHealth clusterPartitionsHealth = generatePartitionHealthMapFromZK();
+    Map<String, List<String>> expiredPartitionsOnInstances =
+        clusterPartitionsHealth.getExpiredRecords();
+    // update the health status for those expired partitions on instances
     try {
-      Map<String, Boolean> customStoppableCheck =
-        customClient.getInstanceStoppableCheck("", Collections.emptyMap());
-      return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck);
+      for (Map.Entry<String, List<String>> entry : expiredPartitionsOnInstances.entrySet()) {
+        Map<String, Boolean> partitionHealthStatus =
+            _customRestClient.getPartitionStoppableCheck(baseUrl, entry.getValue(), customPayLoads);
+        partitionHealthStatus.entrySet().forEach(kv -> clusterPartitionsHealth
+            .updatePartitionHealth(instanceName, kv.getKey(), kv.getValue()));
+      }
     } catch (IOException e) {
-      LOG.error("Failed to perform customized health check for {}/{}", clusterId, instanceName, e);
+      LOG.error("Failed to perform client side partition level health checks for {}/{}", clusterId,
+          instanceName, e);
       throw e;
     }
+    // sibling checks on partitions health for entire cluster
+    PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
+    List<ExternalView> externalViews =
+        _dataAccessor.getChildNames(propertyKeyBuilder.externalViews()).stream()
+            .map(externalView -> (ExternalView) _dataAccessor
+                .getProperty(propertyKeyBuilder.externalView(externalView)))
+            .collect(Collectors.toList());
+    List<String> unHealthyPartitions = InstanceValidationUtil.perPartitionHealthCheck(externalViews,
+        clusterPartitionsHealth.getGlobalPartitionHealth(), instanceName, _dataAccessor);
+    return new StoppableCheck(unHealthyPartitions.isEmpty(), unHealthyPartitions,
+        StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
   }
 
-  public PartitionHealth generatePartitionHealthMapFromZK() {
+  private Map<String, String> getCustomPayLoads(String jsonContent) throws IOException {
+    Map<String, String> result = new HashMap<>();
+    JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonContent);
+    // parsing the inputs as string key value pairs
+    jsonNode.fields().forEachRemaining(kv ->
+        result.put(kv.getKey(), kv.getValue().asText())
+    );
+    return result;
+  }
+
+  @VisibleForTesting
+  protected PartitionHealth generatePartitionHealthMapFromZK() {
     PartitionHealth partitionHealth = new PartitionHealth();
 
     // Only checks the instances are online with valid reports
@@ -180,9 +229,10 @@ public class InstanceServiceImpl implements InstanceService {
       for (String partitionName : customizedHealth.getMapFields().keySet()) {
         try {
           Map<String, String> healthMap = customizedHealth.getMapField(partitionName);
-          if (healthMap == null || Long.parseLong(healthMap.get(EXPIRY_KEY)) < System
-              .currentTimeMillis()) {
-            // Clean all the existing checks. If we do not clean it, when we do the customized check,
+          if (healthMap == null
+              || Long.parseLong(healthMap.get(EXPIRY_KEY)) < System.currentTimeMillis()) {
+            // Clean all the existing checks. If we do not clean it, when we do the customized
+            // check,
             // Helix may think these partitions are only partitions holding on the instance.
             // But it could potentially have some partitions are unhealthy for expired ones.
             // It could problem for shutting down instances.
@@ -204,61 +254,68 @@ public class InstanceServiceImpl implements InstanceService {
     return partitionHealth;
   }
 
-  /**
-   * Get general customized URL from RESTConfig
-   *
-   * @param configAccessor
-   * @param clustername
-   *
-   * @return null if RESTConfig is null
-   */
-  protected String getGeneralCustomizedURL(ConfigAccessor configAccessor, String clustername) {
-    RESTConfig restConfig = configAccessor.getRESTConfig(clustername);
-    // If user customized URL is not ready, return true as the check
-    if (restConfig == null) {
-      return null;
-    }
-    return restConfig.getCustomizedHealthURL();
-  }
-
-  /**
-   * Use get user provided general URL to construct the stoppable status or partition status URL
-   *
-   * @param generalURL
-   * @param instanceName
-   * @param statusType
-   * @return null if URL is malformed
-   */
-  protected String getCustomizedURLWithEndPoint(String generalURL, String instanceName,
-      InstanceValidationUtil.HealthStatusType statusType) {
-    if (generalURL == null) {
-      LOG.warn("Failed to generate customized URL for instance {}", instanceName);
-      return null;
+  @VisibleForTesting
+  protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+      List<HealthCheck> healthChecks) {
+    Map<String, Boolean> healthStatus = new HashMap<>();
+    for (HealthCheck healthCheck : healthChecks) {
+      switch (healthCheck) {
+        case INVALID_CONFIG:
+          healthStatus.put(HealthCheck.INVALID_CONFIG.name(),
+              InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName));
+          if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) {
+            LOG.error("The instance {} doesn't have valid configuration", instanceName);
+            return healthStatus;
+          }
+        case INSTANCE_NOT_ENABLED:
+          healthStatus.put(HealthCheck.INSTANCE_NOT_ENABLED.name(), InstanceValidationUtil
+              .isEnabled(_dataAccessor, instanceName));
+          break;
+        case INSTANCE_NOT_ALIVE:
+          healthStatus.put(HealthCheck.INSTANCE_NOT_ALIVE.name(),
+              InstanceValidationUtil.isAlive(_dataAccessor, instanceName));
+          break;
+        case INSTANCE_NOT_STABLE:
+          boolean isStable = InstanceValidationUtil.isInstanceStable(_dataAccessor, instanceName);
+          healthStatus.put(HealthCheck.INSTANCE_NOT_STABLE.name(), isStable);
+          break;
+        case HAS_ERROR_PARTITION:
+          healthStatus.put(HealthCheck.HAS_ERROR_PARTITION.name(),
+              !InstanceValidationUtil.hasErrorPartitions(_dataAccessor, clusterId, instanceName));
+          break;
+        case HAS_DISABLED_PARTITION:
+          healthStatus.put(HealthCheck.HAS_DISABLED_PARTITION.name(),
+              !InstanceValidationUtil.hasDisabledPartitions(_dataAccessor, clusterId, instanceName));
+          break;
+        case EMPTY_RESOURCE_ASSIGNMENT:
+          healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
+              InstanceValidationUtil.hasResourceAssigned(_dataAccessor, clusterId, instanceName));
+          break;
+        case MIN_ACTIVE_REPLICA_CHECK_FAILED:
+          healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
+              InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
+          break;
+        default:
+          LOG.error("Unsupported health check: {}", healthCheck);
+          break;
+      }
     }
 
-    try {
-      // If user customized URL is not ready, return true as the check
-      String hostName = instanceName.split("_")[0];
-      return String.format("%s/%s", generalURL.replace("*", hostName), statusType.name());
-    } catch (Exception e) {
-      LOG.info("Failed to prepare customized check for generalURL {} instance {}", generalURL,
-          instanceName, e);
-      return null;
-    }
+    return healthStatus;
   }
 
-  /**
-   * Perform customized single instance health check map filtering
-   *
-   * Input map is user customized health out put. It will be HEALTH_ENTRY_KEY -> true/false
-   * @param statusMap
-   * @return
-   */
-  private Map<String, Boolean> perInstanceHealthCheck(Map<String, Boolean> statusMap) {
-    if (statusMap != null && !statusMap.isEmpty()) {
-      statusMap = statusMap.entrySet().stream().filter(entry -> !entry.getValue())
-          .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue()));
+  private Optional<String> getBaseUrl(String instance, String clusterId) {
+    RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
+    if (restConfig == null) {
+      LOG.error("The cluster {} hasn't enabled client side health checks yet", clusterId);
+      return Optional.empty();
     }
-    return statusMap;
+    String baseUrl = restConfig.get(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL);
+    // pre-assumption of the url, must be format of "http://*/path", the wildcard is replaceable by the instance vip
+    assert baseUrl.contains("*");
+    // pre-assumption of the instance name, must be format of <instanceVip>_<port>
+    assert instance.contains("_");
+    String instanceVip = instance.substring(0, instance.indexOf('_'));
+    return Optional.of(baseUrl.replace("*", instanceVip));
   }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java b/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java
index bcfc0e3..0249b8f 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/json/instance/TestStoppableCheck.java
@@ -19,42 +19,30 @@ package org.apache.helix.rest.server.json.instance;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Map;
-
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 public class TestStoppableCheck {
 
   @Test
   public void whenSerializingStoppableCheck() throws JsonProcessingException {
-    StoppableCheck stoppableCheck = new StoppableCheck(false, new ArrayList<String>() {
-      {
-        add("failedCheck");
-      }
-    });
+    StoppableCheck stoppableCheck = new StoppableCheck(false, ImmutableList.of("check"), StoppableCheck.Category.HELIX_OWN_CHECK);
 
     ObjectMapper mapper = new ObjectMapper();
     String result = mapper.writeValueAsString(stoppableCheck);
 
-    Assert.assertEquals(result, "{\"stoppable\":false,\"failedChecks\":[\"failedCheck\"]}");
+    Assert.assertEquals(result, "{\"stoppable\":false,\"failedChecks\":[\"Helix:check\"]}");
   }
 
   @Test
-  public void testMergeStoppableChecks() throws JsonProcessingException {
-    Map<String, Boolean> helixCheck = ImmutableMap.of("check0", false, "check1", false);
-    Map<String, Boolean> customCheck = ImmutableMap.of("check1", true, "check2", true);
-
-    StoppableCheck stoppableCheck = StoppableCheck.mergeStoppableChecks(helixCheck, customCheck);
-    ObjectMapper mapper = new ObjectMapper();
-    String result = mapper.writeValueAsString(stoppableCheck);
-
-    Assert.assertEquals(result,
-        "{\"stoppable\":false,\"failedChecks\":[\"Helix:check0\",\"Helix:check1\"]}");
+  public void testConstructorSortingOrder() {
+    StoppableCheck stoppableCheck = new StoppableCheck(ImmutableMap.of("a", true, "c", false, "b", false), StoppableCheck.Category.HELIX_OWN_CHECK);
+    Assert.assertFalse(stoppableCheck.isStoppable());
+    Assert.assertEquals(stoppableCheck.getFailedChecks(), ImmutableList.of("Helix:b", "Helix:c"));
   }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
index d304ea8..bc2e05c 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
@@ -19,82 +19,162 @@ package org.apache.helix.rest.server.service;
  * under the License.
  */
 
-import com.google.common.collect.ImmutableMap;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.HealthStat;
+import org.apache.helix.model.RESTConfig;
+import org.apache.helix.rest.client.CustomRestClient;
 import org.apache.helix.rest.server.json.cluster.PartitionHealth;
+import org.apache.helix.rest.server.json.instance.StoppableCheck;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class TestInstanceService {
   private static final String TEST_CLUSTER = "TestCluster";
+  private static final String TEST_INSTANCE = "instance0.linkedin.com_1235";
+
+  @Mock
+  private HelixDataAccessor _dataAccessor;
+  @Mock
+  private ConfigAccessor _configAccessor;
+  @Mock
+  private CustomRestClient _customRestClient;
+
+  @BeforeMethod
+  public void beforeMethod() {
+    MockitoAnnotations.initMocks(this);
+    RESTConfig restConfig = new RESTConfig("restConfig");
+    restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, "http://*:123/path");
+    when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
+  }
+
+  @Test
+  public void testGetInstanceStoppableCheck_fail_at_helix_own_checks() throws IOException {
+    StoppableCheck expected = new StoppableCheck(false, ImmutableList.of("FailedCheck"), StoppableCheck.Category.HELIX_OWN_CHECK);
+    InstanceService service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient) {
+      @Override
+      public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent)
+          throws IOException {
+        return expected;
+      }
+    };
+
+    String jsonContent = "";
+    StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
+
+    Assert.assertEquals(actual, expected);
+    verifyZeroInteractions(_customRestClient);
+    verifyZeroInteractions(_configAccessor);
+  }
+
+  @Test
+  public void testGetInstanceStoppableCheck_fail_at_custom_instance_checks() throws IOException {
+    InstanceService service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient) {
+      @Override
+      protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+          List<HealthCheck> healthChecks) {
+        return Collections.emptyMap();
+      }
+    };
+    when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap())).thenReturn(ImmutableMap.of("check1", false));
+
+    String jsonContent = "{\n" + "   \"param1\": \"value1\",\n" + "\"param2\": \"value2\"\n" + "}";
+    StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
+
+    Assert.assertFalse(actual.isStoppable());
+    verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any());
+    verify(_customRestClient, times(0)).getPartitionStoppableCheck(any(), any(), any());
+  }
+
+  @Test
+  public void testGetInstanceStoppableCheck_fail_at_custom_partition_checks() throws IOException {
+    InstanceService service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient) {
+      @Override
+      protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
+          List<HealthCheck> healthChecks) {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      protected StoppableCheck performPartitionLevelChecks(String clusterId, String instanceName, String baseUrl,
+          Map<String, String> customPayLoads) throws IOException {
+        return new StoppableCheck(false, Collections.emptyList(), StoppableCheck.Category.CUSTOM_PARTITION_CHECK);
+      }
+    };
+
+    // partition is health on the test instance but unhealthy on the sibling instance
+    when(_customRestClient.getInstanceStoppableCheck(anyString(), anyMap())).thenReturn(Collections.emptyMap());
+    String jsonContent = "{\n" + "   \"param1\": \"value1\",\n" + "\"param2\": \"value2\"\n" + "}";
+    StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, jsonContent);
+
+    Assert.assertFalse(actual.isStoppable());
+    verify(_customRestClient, times(1)).getInstanceStoppableCheck(any(), any());
+  }
 
   @Test
   public void testGeneratePartitionHealthMapFromZK() {
-    //Prepare for testing data.
-    Mock mock = new Mock();
-    when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
-    when(mock.dataAccessor.getChildNames(new PropertyKey.Builder(TEST_CLUSTER).liveInstances()))
-        .thenReturn(mock.liveInstances);
-    when(mock.dataAccessor.getProperty(
+    List<ZNRecord> healthData = generateHealthData();
+
+    InstanceServiceImpl service = new InstanceServiceImpl(_dataAccessor, _configAccessor, _customRestClient);
+    when(_dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
+    when(_dataAccessor.getChildNames(new PropertyKey.Builder(TEST_CLUSTER).liveInstances()))
+        .thenReturn(Arrays.asList("host0", "host1"));
+    when(_dataAccessor.getProperty(
         new PropertyKey.Builder(TEST_CLUSTER).healthReport("host0", "PARTITION_HEALTH")))
-        .thenReturn(new HealthStat(mock.healthData.get(0)));
-    when(mock.dataAccessor.getProperty(
+        .thenReturn(new HealthStat(healthData.get(0)));
+    when(_dataAccessor.getProperty(
         new PropertyKey.Builder(TEST_CLUSTER).healthReport("host1", "PARTITION_HEALTH")))
-        .thenReturn(new HealthStat(mock.healthData.get(1)));
-    PartitionHealth computeResult = new InstanceServiceImpl(mock.dataAccessor, mock.configAccessor)
-        .generatePartitionHealthMapFromZK();
+        .thenReturn(new HealthStat(healthData.get(1)));
+    PartitionHealth computeResult = service.generatePartitionHealthMapFromZK();
     PartitionHealth expectedResult = generateExpectedResult();
-    Assert.assertTrue(computeResult.equals(expectedResult));
+    Assert.assertEquals(computeResult, expectedResult);
   }
 
-
-  private final class Mock {
-    private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
-    private ConfigAccessor configAccessor = mock(ConfigAccessor.class);
-    private List<String> liveInstances = Arrays.asList("host0", "host1");
-    private List<ZNRecord> healthData = generateHealthData();
-
-    Mock() {
-    }
-
-    private List<ZNRecord> generateHealthData() {
-      // Set EXPIRY time 100000 that guarantees the test has enough time 
-      // Host 0 contains unhealthy partition but it does not matter.
-      ZNRecord record1 = new ZNRecord("PARTITION_HEALTH");
-      record1.setMapField("TESTDB0_0", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record1.setMapField("TESTDB0_1", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record1.setMapField("TESTDB0_2", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record1.setMapField("TESTDB1_0", ImmutableMap
-          .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record1.setMapField("TESTDB2_0", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-
-      // Host 1 has expired data, which requires immediate API querying.
-      ZNRecord record2 = new ZNRecord("PARTITION_HEALTH");
-      record2.setMapField("TESTDB0_0", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record2.setMapField("TESTDB0_1", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record2.setMapField("TESTDB0_2", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", "123456"));
-      record2.setMapField("TESTDB1_0", ImmutableMap
-          .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-      record2.setMapField("TESTDB2_0", ImmutableMap
-          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
-
-      return Arrays.asList(record1, record2);
-    }
+  private List<ZNRecord> generateHealthData() {
+    // Set EXPIRY time 100000 that guarantees the test has enough time
+    // Host 0 contains unhealthy partition but it does not matter.
+    ZNRecord record1 = new ZNRecord("PARTITION_HEALTH");
+    record1.setMapField("TESTDB0_0", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record1.setMapField("TESTDB0_1", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record1.setMapField("TESTDB0_2", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record1.setMapField("TESTDB1_0", ImmutableMap
+        .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record1.setMapField("TESTDB2_0", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+
+    // Host 1 has expired data, which requires immediate API querying.
+    ZNRecord record2 = new ZNRecord("PARTITION_HEALTH");
+    record2.setMapField("TESTDB0_0", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record2.setMapField("TESTDB0_1", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record2.setMapField("TESTDB0_2", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", "123456"));
+    record2.setMapField("TESTDB1_0", ImmutableMap
+        .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+    record2.setMapField("TESTDB2_0", ImmutableMap
+        .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() + 100000)));
+
+    return Arrays.asList(record1, record2);
   }
 
   private PartitionHealth generateExpectedResult() {


Mime
View raw message