helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 25/44: Support partion level health mapping fetch from ZK
Date Sat, 25 May 2019 01:19:59 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 393c1a4eace5d6bf6a13be809f04e02d01a272c8
Author: Junkai Xue <jxue@linkedin.com>
AuthorDate: Thu Apr 11 14:38:45 2019 -0700

    Support partion level health mapping fetch from ZK
    
    For partition level health status is different from per instance querying. Helix will
try to get data from ZK under HEALTH_REPORT folder first. If the data is expired (check with
EXPIRE entry), Helix will directly call the API to the participant to get latest data.
    
    Otherwise, we shall assume the customized check as failed.
    
    RB=1628988
    BUG=HELIX-1785
    G=helix-reviewers
    A=hulee
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 .../apache/helix/util/InstanceValidationUtil.java  |   6 +-
 .../rest/server/json/cluster/PartitionHealth.java  |  81 ++++++++++++++
 .../rest/server/service/InstanceServiceImpl.java   |  95 ++++++++++++++++-
 .../rest/server/service/TestInstanceService.java   | 118 +++++++++++++++++++++
 4 files changed, 292 insertions(+), 8 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index 244ecad..6ac9605 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -19,12 +19,13 @@ package org.apache.helix.util;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
@@ -41,9 +42,6 @@ import org.apache.helix.task.TaskConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
 /**
  * Utility class for validating Helix properties
  * Warning: each method validates one single property of instance individually and independently.
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java
new file mode 100644
index 0000000..1fc2564
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/json/cluster/PartitionHealth.java
@@ -0,0 +1,81 @@
+package org.apache.helix.rest.server.json.cluster;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class PartitionHealth {
+  // Partition health map stores the global metadata about the partition health in the format
of
+  // instanceName -> partitionName -> isHealthy
+  private Map<String, Map<String, Boolean>> _paritionHealthMap;
+  private Map<String, List<String>> _instancesThatNeedDirectCallWithPartitions;
+
+  public PartitionHealth() {
+    _paritionHealthMap = new HashMap<>();
+    _instancesThatNeedDirectCallWithPartitions = new HashMap<>();
+  }
+
+  public void addInstanceThatNeedDirectCallWithPartition(String instanceName,
+      String partitionName) {
+    _instancesThatNeedDirectCallWithPartitions
+        .computeIfAbsent(instanceName, partitions -> new ArrayList<>()).add(partitionName);
+  }
+
+  public void setPartitionHealthForInstance(String instanceName,
+      Map<String, Boolean> partitionHealth) {
+    _paritionHealthMap.put(instanceName, partitionHealth);
+  }
+
+  public void addSinglePartitionHealthForInstance(String instanceName, String partitionName,
+      Boolean isHealthy) {
+    _paritionHealthMap.computeIfAbsent(instanceName, partitionMap -> new HashMap<>())
+        .put(partitionName, isHealthy);
+  }
+
+  public List<String> getInstanceThatNeedDirectCallWithPartitions(String instanceName)
{
+    return _instancesThatNeedDirectCallWithPartitions.getOrDefault(instanceName,
+        Collections.EMPTY_LIST);
+  }
+
+  public Map<String, Map<String, Boolean>> getParitionHealthMap() {
+    return _paritionHealthMap;
+  }
+
+  public void removePartitionHealthForInstance(String instanceName) {
+    _paritionHealthMap.remove(instanceName);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !(o instanceof PartitionHealth)) {
+      return false;
+    }
+
+    return _paritionHealthMap.equals(((PartitionHealth) o)._paritionHealthMap)
+        && _instancesThatNeedDirectCallWithPartitions
+        .equals(((PartitionHealth) o)._instancesThatNeedDirectCallWithPartitions);
+  }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index 0c1d9ba..8db4d42 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -29,11 +29,14 @@ import java.util.stream.Collectors;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.RESTConfig;
 import org.apache.helix.rest.client.CustomRestClient;
 import org.apache.helix.rest.client.CustomRestClientFactory;
+import org.apache.helix.rest.server.json.cluster.PartitionHealth;
 import org.apache.helix.rest.server.json.instance.InstanceInfo;
 import org.apache.helix.rest.server.json.instance.StoppableCheck;
 import org.apache.helix.util.InstanceValidationUtil;
@@ -41,7 +44,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class InstanceServiceImpl implements InstanceService {
-  private static final Logger _logger = LoggerFactory.getLogger(InstanceServiceImpl.class);
+  private static final Logger LOG = LoggerFactory.getLogger(InstanceServiceImpl.class);
+
+  private static final String PARTITION_HEALTH_KEY = "PARTITION_HEALTH";
+  private static final String IS_HEALTHY_KEY = "IS_HEALTHY";
+  private static final String EXPIRY_KEY = "EXPIRE";
 
   private final HelixDataAccessor _dataAccessor;
   private final ConfigAccessor _configAccessor;
@@ -61,7 +68,7 @@ public class InstanceServiceImpl implements InstanceService {
         healthStatus.put(HealthCheck.INVALID_CONFIG.name(),
             InstanceValidationUtil.hasValidConfig(_dataAccessor, clusterId, instanceName));
         if (!healthStatus.get(HealthCheck.INVALID_CONFIG.name())) {
-          _logger.error("The instance {} doesn't have valid configuration", instanceName);
+          LOG.error("The instance {} doesn't have valid configuration", instanceName);
           return healthStatus;
         }
       case INSTANCE_NOT_ENABLED:
@@ -93,7 +100,7 @@ public class InstanceServiceImpl implements InstanceService {
             InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
         break;
       default:
-        _logger.error("Unsupported health check: {}", healthCheck);
+        LOG.error("Unsupported health check: {}", healthCheck);
         break;
       }
     }
@@ -134,7 +141,7 @@ public class InstanceServiceImpl implements InstanceService {
       Map<String, Boolean> healthStatus = getInstanceHealthStatus(clusterId, instanceName,
healthChecks);
       instanceInfoBuilder.healthStatus(healthStatus);
     } catch (HelixException ex) {
-      _logger.error("Exception while getting health status: {}, reporting health status as
unHealth", ex);
+      LOG.error("Exception while getting health status: {}, reporting health status as unHealth",
ex);
       instanceInfoBuilder.healthStatus(false);
     }
 
@@ -155,6 +162,86 @@ public class InstanceServiceImpl implements InstanceService {
     return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck);
   }
 
+  public PartitionHealth generatePartitionHealthMapFromZK() {
+    PartitionHealth partitionHealth = new PartitionHealth();
+
+    // Only checks the instances are online with valid reports
+    List<String> liveInstances =
+        _dataAccessor.getChildNames(_dataAccessor.keyBuilder().liveInstances());
+    for (String instance : liveInstances) {
+      ZNRecord customizedHealth = _dataAccessor
+          .getProperty(_dataAccessor.keyBuilder().healthReport(instance, PARTITION_HEALTH_KEY))
+          .getRecord();
+      for (String partitionName : customizedHealth.getMapFields().keySet()) {
+        try {
+          Map<String, String> healthMap = customizedHealth.getMapField(partitionName);
+          if (healthMap == null || Long.parseLong(healthMap.get(EXPIRY_KEY)) < System
+              .currentTimeMillis()) {
+            // Clean all the existing checks. If we do not clean it, when we do the customized
check,
+            // Helix may think these partitions are only partitions holding on the instance.
+            // But it could potentially have some partitions are unhealthy for expired ones.
+            // It could problem for shutting down instances.
+            partitionHealth.addInstanceThatNeedDirectCallWithPartition(instance, partitionName);
+            continue;
+          }
+
+          partitionHealth.addSinglePartitionHealthForInstance(instance, partitionName,
+              Boolean.valueOf(healthMap.get(IS_HEALTHY_KEY)));
+        } catch (Exception e) {
+          LOG.warn(
+              "Error in processing partition level health for instance {}, partition {},
directly querying API",
+              instance, partitionName, e);
+          partitionHealth.addInstanceThatNeedDirectCallWithPartition(instance, partitionName);
+        }
+      }
+    }
+
+    return partitionHealth;
+  }
+
+  /**
+   * Get general customized URL from RESTConfig
+   *
+   * @param configAccessor
+   * @param clustername
+   *
+   * @return null if RESTConfig is null
+   */
+  protected String getGeneralCustomizedURL(ConfigAccessor configAccessor, String clustername)
{
+    RESTConfig restConfig = configAccessor.getRESTConfig(clustername);
+    // If user customized URL is not ready, return true as the check
+    if (restConfig == null) {
+      return null;
+    }
+    return restConfig.getCustomizedHealthURL();
+  }
+
+  /**
+   * Use get user provided general URL to construct the stoppable status or partition status
URL
+   *
+   * @param generalURL
+   * @param instanceName
+   * @param statusType
+   * @return null if URL is malformed
+   */
+  protected String getCustomizedURLWithEndPoint(String generalURL, String instanceName,
+      InstanceValidationUtil.HealthStatusType statusType) {
+    if (generalURL == null) {
+      LOG.warn("Failed to generate customized URL for instance {}", instanceName);
+      return null;
+    }
+
+    try {
+      // If user customized URL is not ready, return true as the check
+      String hostName = instanceName.split("_")[0];
+      return String.format("%s/%s", generalURL.replace("*", hostName), statusType.name());
+    } catch (Exception e) {
+      LOG.info("Failed to prepare customized check for generalURL {} instance {}", generalURL,
+          instanceName, e);
+      return null;
+    }
+  }
+
   /**
    * Perform customized single instance health check map filtering
    *
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
new file mode 100644
index 0000000..d304ea8
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestInstanceService.java
@@ -0,0 +1,118 @@
+package org.apache.helix.rest.server.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.HealthStat;
+import org.apache.helix.rest.server.json.cluster.PartitionHealth;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestInstanceService {
+  private static final String TEST_CLUSTER = "TestCluster";
+
+  @Test
+  public void testGeneratePartitionHealthMapFromZK() {
+    //Prepare for testing data.
+    Mock mock = new Mock();
+    when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
+    when(mock.dataAccessor.getChildNames(new PropertyKey.Builder(TEST_CLUSTER).liveInstances()))
+        .thenReturn(mock.liveInstances);
+    when(mock.dataAccessor.getProperty(
+        new PropertyKey.Builder(TEST_CLUSTER).healthReport("host0", "PARTITION_HEALTH")))
+        .thenReturn(new HealthStat(mock.healthData.get(0)));
+    when(mock.dataAccessor.getProperty(
+        new PropertyKey.Builder(TEST_CLUSTER).healthReport("host1", "PARTITION_HEALTH")))
+        .thenReturn(new HealthStat(mock.healthData.get(1)));
+    PartitionHealth computeResult = new InstanceServiceImpl(mock.dataAccessor, mock.configAccessor)
+        .generatePartitionHealthMapFromZK();
+    PartitionHealth expectedResult = generateExpectedResult();
+    Assert.assertTrue(computeResult.equals(expectedResult));
+  }
+
+
+  private final class Mock {
+    private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
+    private ConfigAccessor configAccessor = mock(ConfigAccessor.class);
+    private List<String> liveInstances = Arrays.asList("host0", "host1");
+    private List<ZNRecord> healthData = generateHealthData();
+
+    Mock() {
+    }
+
+    private List<ZNRecord> generateHealthData() {
+      // Set EXPIRY time 100000 that guarantees the test has enough time 
+      // Host 0 contains unhealthy partition but it does not matter.
+      ZNRecord record1 = new ZNRecord("PARTITION_HEALTH");
+      record1.setMapField("TESTDB0_0", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+      record1.setMapField("TESTDB0_1", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+      record1.setMapField("TESTDB0_2", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+      record1.setMapField("TESTDB1_0", ImmutableMap
+          .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis()
+ 100000)));
+      record1.setMapField("TESTDB2_0", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+
+      // Host 1 has expired data, which requires immediate API querying.
+      ZNRecord record2 = new ZNRecord("PARTITION_HEALTH");
+      record2.setMapField("TESTDB0_0", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+      record2.setMapField("TESTDB0_1", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+      record2.setMapField("TESTDB0_2", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", "123456"));
+      record2.setMapField("TESTDB1_0", ImmutableMap
+          .of("IS_HEALTHY", "false", "EXPIRE", String.valueOf(System.currentTimeMillis()
+ 100000)));
+      record2.setMapField("TESTDB2_0", ImmutableMap
+          .of("IS_HEALTHY", "true", "EXPIRE", String.valueOf(System.currentTimeMillis() +
100000)));
+
+      return Arrays.asList(record1, record2);
+    }
+  }
+
+  private PartitionHealth generateExpectedResult() {
+    PartitionHealth partitionHealth = new PartitionHealth();
+
+    partitionHealth.addSinglePartitionHealthForInstance("host0", "TESTDB0_0", true);
+    partitionHealth.addSinglePartitionHealthForInstance("host0", "TESTDB0_1", true);
+    partitionHealth.addSinglePartitionHealthForInstance("host0", "TESTDB0_2", true);
+    partitionHealth.addSinglePartitionHealthForInstance("host0", "TESTDB1_0", false);
+    partitionHealth.addSinglePartitionHealthForInstance("host0", "TESTDB2_0", true);
+
+
+    partitionHealth.addSinglePartitionHealthForInstance("host1", "TESTDB0_0", true);
+    partitionHealth.addSinglePartitionHealthForInstance("host1", "TESTDB0_1", true);
+    partitionHealth.addSinglePartitionHealthForInstance("host1", "TESTDB1_0", false);
+    partitionHealth.addSinglePartitionHealthForInstance("host1", "TESTDB2_0", true);
+    partitionHealth.addInstanceThatNeedDirectCallWithPartition("host1", "TESTDB0_2");
+
+    return partitionHealth;
+  }
+}


Mime
View raw message