helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch helix-cloud updated: Modify participant manager to add cluster auto registration logic (#695)
Date Fri, 07 Feb 2020 23:41:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch helix-cloud
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-cloud by this push:
     new 4a82b96  Modify participant manager to add cluster auto registration logic (#695)
4a82b96 is described below

commit 4a82b96c0864dfe8ad34c01c0628113652e9c592
Author: zhangmeng916 <56051770+zhangmeng916@users.noreply.github.com>
AuthorDate: Fri Feb 7 15:41:36 2020 -0800

    Modify participant manager to add cluster auto registration logic (#695)
    
    Modify participant logic to add logic for instance auto registration to a Helix cluster.
    Auto registration differs with existing auto join in that auto registration will retrieve
the fault domain information for the instance and populate it under instanceConfig. With auto
registration on, user does not need to manually populate instance information in Zookeeper.
---
 .../helix/manager/zk/ParticipantManager.java       | 100 ++++++++++++++++-----
 .../main/java/org/apache/helix/util/HelixUtil.java |  19 ++++
 .../manager/MockParticipantManager.java            |  10 ++-
 3 files changed, 107 insertions(+), 22 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 9c5d602..32e0575 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -20,6 +20,8 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +33,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperty;
@@ -40,6 +43,8 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordBucketizer;
+import org.apache.helix.api.cloud.CloudInstanceInformation;
+import org.apache.helix.api.cloud.CloudInstanceInformationProcessor;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.CurrentState;
@@ -52,6 +57,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.util.HelixUtil;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +67,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ParticipantManager {
   private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
+  private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud.";
 
   final HelixZkClient _zkclient;
   final HelixManager _manager;
@@ -107,7 +114,7 @@ public class ParticipantManager {
   }
 
   /**
-   * Handle new session for a participang.
+   * Handle new session for a participant.
    * @throws Exception
    */
   public void handleNewSession() throws Exception {
@@ -132,18 +139,31 @@ public class ParticipantManager {
   }
 
   private void joinCluster() {
-    // Read cluster config and see if instance can auto join the cluster
+    // Read cluster config and see if an instance can auto join or auto register to the cluster
     boolean autoJoin = false;
+    boolean autoRegistration = false;
+
+    // Read "allowParticipantAutoJoin" flag to see if an instance can auto join to the cluster
     try {
-      HelixConfigScope scope =
-          new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
-              _manager.getClusterName()).build();
-      autoJoin =
-          Boolean.parseBoolean(_configAccessor.get(scope,
-              ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+      HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER)
+          .forCluster(_manager.getClusterName()).build();
+      autoJoin = Boolean
+          .parseBoolean(_configAccessor.get(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
       LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " +
autoJoin);
     } catch (Exception e) {
-      // autoJoin is false
+      LOG.info("auto join is false for cluster" + _clusterName);
+    }
+
+    // Read cloud config and see if an instance can auto register to the cluster
+    // Difference between auto join and auto registration is that the latter will also populate
the
+    // domain information in instance config
+    try {
+      autoRegistration =
+          Boolean.valueOf(_helixManagerProperty.getHelixCloudProperty().getCloudEnabled());
+      LOG.info("instance: " + _instanceName + " auto-register " + _clusterName + " is "
+          + autoRegistration);
+    } catch (Exception e) {
+      LOG.info("auto registration is false for cluster" + _clusterName);
     }
 
     if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
@@ -151,23 +171,61 @@ public class ParticipantManager {
         throw new HelixException("Initial cluster structure is not set up for instance: "
             + _instanceName + ", instanceType: " + _instanceType);
       } else {
-        LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
-        InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
-        String hostName = _instanceName;
-        String port = "";
-        int lastPos = _instanceName.lastIndexOf("_");
-        if (lastPos > 0) {
-          hostName = _instanceName.substring(0, lastPos);
-          port = _instanceName.substring(lastPos + 1);
+        if (!autoRegistration) {
+          LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
+          _helixAdmin.addInstance(_clusterName, HelixUtil.composeInstanceConfig(_instanceName));
+        } else {
+          LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
+          CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
+          String domain = cloudInstanceInformation
+              .get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name());
+
+          // Disable the verification for now
+          /*String cloudIdInRemote = cloudInstanceInformation
+              .get(CloudInstanceInformation.CloudInstanceField.INSTANCE_SET_NAME.name());
+          String cloudIdInConfig = _configAccessor.getCloudConfig(_clusterName).getCloudID();
+
+          // validate that the instance is auto registering to the correct cluster
+          if (!cloudIdInRemote.equals(cloudIdInConfig)) {
+            throw new IllegalArgumentException(String.format(
+                "cloudId in config: %s is not consistent with cloudId from remote: %s. The
instance is auto registering to a wrong cluster.",
+                cloudIdInConfig, cloudIdInRemote));
+          }*/
+
+          InstanceConfig instanceConfig = HelixUtil.composeInstanceConfig(_instanceName);
+          instanceConfig.setDomain(domain);
+          _helixAdmin.addInstance(_clusterName, instanceConfig);
         }
-        instanceConfig.setHostName(hostName);
-        instanceConfig.setPort(port);
-        instanceConfig.setInstanceEnabled(true);
-        _helixAdmin.addInstance(_clusterName, instanceConfig);
       }
     }
   }
 
+  private CloudInstanceInformation getCloudInstanceInformation() {
+    String cloudInstanceInformationProcessorName =
+        _helixManagerProperty.getHelixCloudProperty().getCloudInfoProcessorName();
+    try {
+      // fetch cloud instance information for the instance
+      String cloudInstanceInformationProcessorClassName = CLOUD_PROCESSOR_PATH_PREFIX
+          + _helixManagerProperty.getHelixCloudProperty().getCloudProvider().toLowerCase()
+ "."
+          + cloudInstanceInformationProcessorName;
+      Class processorClass = Class.forName(cloudInstanceInformationProcessorClassName);
+      Constructor constructor = processorClass.getConstructor(HelixCloudProperty.class);
+      CloudInstanceInformationProcessor processor = (CloudInstanceInformationProcessor) constructor
+          .newInstance(_helixManagerProperty.getHelixCloudProperty());
+      List<String> responses = processor.fetchCloudInstanceInformation();
+
+      // parse cloud instance information for the participant
+      CloudInstanceInformation cloudInstanceInformation =
+          processor.parseCloudInstanceInformation(responses);
+      return cloudInstanceInformation;
+    } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
+        | IllegalAccessException | InvocationTargetException ex) {
+      throw new HelixException(
+          "Failed to create a new instance for the class: " + cloudInstanceInformationProcessorName,
+          ex);
+    }
+  }
+
   private void createLiveInstance() {
     String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
     LiveInstance liveInstance = new LiveInstance(_instanceName);
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index a31c3fe..3b41b83 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -290,4 +290,23 @@ public final class HelixUtil {
     return propertyDefaultValue;
   }
 
+  /**
+   * Compose the config for an instance
+   * @param instanceName
+   * @return InstanceConfig
+   */
+  public static InstanceConfig composeInstanceConfig(String instanceName) {
+    InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+    String hostName = instanceName;
+    String port = "";
+    int lastPos = instanceName.lastIndexOf("_");
+    if (lastPos > 0) {
+      hostName = instanceName.substring(0, lastPos);
+      port = instanceName.substring(lastPos + 1);
+    }
+    instanceConfig.setHostName(hostName);
+    instanceConfig.setPort(port);
+    instanceConfig.setInstanceEnabled(true);
+    return instanceConfig;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index ed4612e..e2623cf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.helix.HelixCloudProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
@@ -48,6 +49,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   protected MockMSModelFactory _msModelFactory;
   protected DummyLeaderStandbyStateModelFactory _lsModelFactory;
   protected DummyOnlineOfflineStateModelFactory _ofModelFactory;
+  protected HelixCloudProperty _helixCloudProperty;
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName) {
     this(zkAddr, clusterName, instanceName, 10);
@@ -55,11 +57,17 @@ public class MockParticipantManager extends ZKHelixManager implements
Runnable,
 
   public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
       int transDelay) {
+    this(zkAddr, clusterName, instanceName, transDelay, null);
+  }
+
+  public MockParticipantManager(String zkAddr, String clusterName, String instanceName,
+      int transDelay, HelixCloudProperty helixCloudProperty) {
     super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
     _transDelay = transDelay;
     _msModelFactory = new MockMSModelFactory(null);
     _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay);
     _ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay);
+    _helixCloudProperty = helixCloudProperty;
   }
 
   public void setTransition(MockTransition transition) {
@@ -136,4 +144,4 @@ public class MockParticipantManager extends ZKHelixManager implements
Runnable,
   public List<CallbackHandler> getHandlers() {
     return _handlers;
   }
-}
+}
\ No newline at end of file


Mime
View raw message