helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch zooscalability updated: Make ConfigAccessor and ZkUtil realm-aware (#838)
Date Tue, 03 Mar 2020 04:48:16 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/zooscalability by this push:
     new 08f5ec7  Make ConfigAccessor and ZkUtil realm-aware (#838)
08f5ec7 is described below

commit 08f5ec7ec88eb8db95cf63c89d6062a4d19c31f1
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Mon Mar 2 20:48:10 2020 -0800

    Make ConfigAccessor and ZkUtil realm-aware (#838)
    
    To make Helix Java APIs realm-aware, we first make ConfigAccessor and ZkUtil realm-aware
by instrumenting these APIs with a Builder and RealmAwareZkClients.
    
    The Builder pattern is chosen because it is a scalable option when there are a lot of
configurable parameters. It makes it easy to validate the given parameters as well.
---
 .../main/java/org/apache/helix/ConfigAccessor.java | 120 ++++++++++++++++++---
 .../java/org/apache/helix/manager/zk/ZKUtil.java   |  54 ++++++----
 metadata-store-directory-common/pom.xml            |   7 --
 .../helix/zookeeper/api/client/HelixZkClient.java  |   5 +-
 .../zookeeper/api/client/RealmAwareZkClient.java   | 112 +++++++++++++++++--
 .../impl/client/RealmAwareZkClientTestBase.java    |   7 +-
 6 files changed, 248 insertions(+), 57 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index b5f06ef..7fc82ac 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -19,6 +19,7 @@ package org.apache.helix;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,13 +38,17 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.RESTConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.util.StringTemplate;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Provides access to the persistent configuration of the cluster, the instances that live
on it,
  * and the logical resources assigned to it.
@@ -52,6 +57,7 @@ public class ConfigAccessor {
   private static Logger LOG = LoggerFactory.getLogger(ConfigAccessor.class);
 
   private static final StringTemplate template = new StringTemplate();
+
   static {
     // @formatter:off
     template.addEntry(ConfigScopeProperty.CLUSTER, 1, "/{clusterName}/CONFIGS/CLUSTER");
@@ -70,12 +76,37 @@ public class ConfigAccessor {
     // @formatter:on
   }
 
-  private final HelixZkClient _zkClient;
+  private final RealmAwareZkClient _zkClient;
   // true if ConfigAccessor was instantiated with a HelixZkClient, false otherwise
   // This is used for close() to determine how ConfigAccessor should close the underlying
ZkClient
   private final boolean _usesExternalZkClient;
 
   /**
+   * Constructor that creates a realm-aware ConfigAccessor using a builder.
+   * @param builder
+   */
+  private ConfigAccessor(Builder builder) throws IOException, InvalidRoutingDataException
{
+    switch (builder._realmMode) {
+      case MULTI_REALM:
+        // TODO: make sure FederatedZkClient is created correctly
+        // TODO: pass in MSDS endpoint or pass in _realmAwareZkConnectionConfig
+        String msdsEndpoint = builder._realmAwareZkConnectionConfig.getMsdsEndpoint();
+        _zkClient = new FederatedZkClient();
+        break;
+      case SINGLE_REALM:
+        // Create a HelixZkClient: Use a SharedZkClient because ConfigAccessor does not need
to do
+        // ephemeral operations
+        _zkClient = SharedZkClientFactory.getInstance()
+            .buildZkClient(builder._realmAwareZkConnectionConfig.createZkConnectionConfig(),
+                builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+        break;
+      default:
+        throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
+    }
+    _usesExternalZkClient = false;
+  }
+
+  /**
    * Initialize an accessor with a Zookeeper client
    * Note: it is recommended to use the other constructor instead to avoid having to create
a
    * HelixZkClient.
@@ -94,9 +125,9 @@ public class ConfigAccessor {
    * @param zkAddress
    */
   public ConfigAccessor(String zkAddress) {
-    _zkClient = SharedZkClientFactory.getInstance().buildZkClient(
-        new HelixZkClient.ZkConnectionConfig(zkAddress),
-        new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
     _usesExternalZkClient = false;
   }
 
@@ -157,7 +188,6 @@ public class ConfigAccessor {
       }
     }
     return map;
-
   }
 
   /**
@@ -265,8 +295,8 @@ public class ConfigAccessor {
       String instanceName = scopeStr.substring(scopeStr.lastIndexOf('/') + 1);
       if (!ZKUtil.isInstanceSetup(_zkClient, scope.getClusterName(), instanceName,
           InstanceType.PARTICIPANT)) {
-        throw new HelixException("instance: " + instanceName + " is NOT setup in cluster:
"
-            + clusterName);
+        throw new HelixException(
+            "instance: " + instanceName + " is NOT setup in cluster: " + clusterName);
       }
     }
 
@@ -499,7 +529,6 @@ public class ConfigAccessor {
 
         if (splits[1].startsWith("SIMPLEKEYS")) {
           retKeys = new ArrayList<String>(record.getSimpleFields().keySet());
-
         } else if (splits[1].startsWith("MAPKEYS")) {
           retKeys = new ArrayList<String>(record.getMapFields().keySet());
         } else if (splits[1].startsWith("MAPMAPKEYS")) {
@@ -516,7 +545,6 @@ public class ConfigAccessor {
     } catch (Exception e) {
       return Collections.emptyList();
     }
-
   }
 
   /**
@@ -638,8 +666,8 @@ public class ConfigAccessor {
     updateClusterConfig(clusterName, clusterConfig, false);
   }
 
-
-  private void updateClusterConfig(String clusterName, ClusterConfig clusterConfig, boolean
overwrite) {
+  private void updateClusterConfig(String clusterName, ClusterConfig clusterConfig,
+      boolean overwrite) {
     if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) {
       throw new HelixException("fail to update config. cluster: " + clusterName + " is NOT
setup.");
     }
@@ -775,7 +803,6 @@ public class ConfigAccessor {
   public void setInstanceConfig(String clusterName, String instanceName,
       InstanceConfig instanceConfig) {
     updateInstanceConfig(clusterName, instanceName, instanceConfig, true);
-
   }
 
   /**
@@ -830,4 +857,71 @@ public class ConfigAccessor {
       _zkClient.close();
     }
   }
+
+  public static class Builder {
+    private String _zkAddress;
+    private RealmAwareZkClient.RealmMode _realmMode;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
+
+    public Builder() {
+    }
+
+    public Builder setZkAddress(String zkAddress) {
+      _zkAddress = zkAddress;
+      return this;
+    }
+
+    public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+      _realmMode = realmMode;
+      return this;
+    }
+
+    public Builder setRealmAwareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+      _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+      return this;
+    }
+
+    public Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+      _realmAwareZkClientConfig = realmAwareZkClientConfig;
+      return this;
+    }
+
+    public ConfigAccessor build() throws Exception {
+      validate();
+      return new ConfigAccessor(this);
+    }
+
+    /**
+     * Validate the given parameters before creating an instance of ConfigAccessor.
+     */
+    private void validate() {
+      // Resolve RealmMode based on other parameters
+      boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
+      if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet)
{
+        throw new HelixException(
+            "ConfigAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!");
+      }
+      if (_realmMode == null) {
+        _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+            : RealmAwareZkClient.RealmMode.MULTI_REALM;
+      }
+
+      // Resolve RealmAwareZkClientConfig
+      boolean isZkClientConfigSet = _realmAwareZkClientConfig != null;
+      // Resolve which clientConfig to use
+      _realmAwareZkClientConfig =
+          isZkClientConfigSet ? _realmAwareZkClientConfig.createHelixZkClientConfig()
+              : new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer());
+
+      // Resolve RealmAwareZkConnectionConfig
+      if (_realmAwareZkConnectionConfig == null) {
+        // If not set, create a default one
+        _realmAwareZkConnectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+      }
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 126ae38..ccf1a87 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -23,11 +23,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -36,6 +36,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Using this ZKUtil class for production purposes is NOT recommended since a lot of the
static
  * methods require a ZkClient instance to be passed in.
@@ -65,7 +66,7 @@ public final class ZKUtil {
     return result;
   }
 
-  public static boolean isClusterSetup(String clusterName, HelixZkClient zkClient) {
+  public static boolean isClusterSetup(String clusterName, RealmAwareZkClient zkClient) {
     if (clusterName == null) {
       logger.info("Fail to check cluster setup : cluster name is null!");
       return false;
@@ -75,7 +76,7 @@ public final class ZKUtil {
       logger.info("Fail to check cluster setup : zookeeper client is null!");
       return false;
     }
-    ArrayList<String> requiredPaths = new ArrayList<String>();
+    List<String> requiredPaths = new ArrayList<>();
     requiredPaths.add(PropertyPathBuilder.idealState(clusterName));
     requiredPaths.add(PropertyPathBuilder.clusterConfig(clusterName));
     requiredPaths.add(PropertyPathBuilder.instanceConfig(clusterName));
@@ -92,15 +93,21 @@ public final class ZKUtil {
     requiredPaths.add(PropertyPathBuilder.controllerHistory(clusterName));
     boolean isValid = true;
 
-    BaseDataAccessor<Object> baseAccessor = new ZkBaseDataAccessor<Object>(zkClient);
-    boolean[] ret = baseAccessor.exists(requiredPaths, 0);
+    boolean[] ret = new boolean[requiredPaths.size()];
+    for (int i = 0; i < requiredPaths.size(); i++) {
+      try {
+        ret[i] = zkClient.exists(requiredPaths.get(i));
+      } catch (Exception e) {
+        ret[i] = false;
+      }
+    }
     StringBuilder errorMsg = new StringBuilder();
 
     for (int i = 0; i < ret.length; i++) {
       if (!ret[i]) {
         isValid = false;
-        errorMsg
-            .append(("Invalid cluster setup, missing znode path: " + requiredPaths.get(i))
+ "\n");
+        errorMsg.append("Invalid cluster setup, missing znode path: ").append(requiredPaths.get(i))
+            .append("\n");
       }
     }
 
@@ -132,10 +139,10 @@ public final class ZKUtil {
     return result;
   }
 
-  public static boolean isInstanceSetup(HelixZkClient zkclient, String clusterName,
+  public static boolean isInstanceSetup(RealmAwareZkClient zkclient, String clusterName,
       String instanceName, InstanceType type) {
     if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT)
{
-      ArrayList<String> requiredPaths = new ArrayList<>();
+      List<String> requiredPaths = new ArrayList<>();
       requiredPaths.add(PropertyPathBuilder.instanceConfig(clusterName, instanceName));
       requiredPaths.add(PropertyPathBuilder.instanceMessage(clusterName, instanceName));
       requiredPaths.add(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName));
@@ -157,7 +164,6 @@ public final class ZKUtil {
         if (!zkclient.exists(historyPath)) {
           zkclient.createPersistent(historyPath, true);
         }
-
       }
       return isValid;
     }
@@ -181,7 +187,8 @@ public final class ZKUtil {
     }
   }
 
-  public static void createChildren(HelixZkClient client, String parentPath, List<ZNRecord>
list) {
+  public static void createChildren(RealmAwareZkClient client, String parentPath,
+      List<ZNRecord> list) {
     client.createPersistent(parentPath, true);
     if (list != null) {
       for (ZNRecord record : list) {
@@ -206,7 +213,8 @@ public final class ZKUtil {
     }
   }
 
-  public static void createChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord)
{
+  public static void createChildren(RealmAwareZkClient client, String parentPath,
+      ZNRecord nodeRecord) {
     client.createPersistent(parentPath, true);
 
     String id = nodeRecord.getId();
@@ -230,7 +238,8 @@ public final class ZKUtil {
     }
   }
 
-  public static void dropChildren(HelixZkClient client, String parentPath, List<ZNRecord>
list) {
+  public static void dropChildren(RealmAwareZkClient client, String parentPath,
+      List<ZNRecord> list) {
     // TODO: check if parentPath exists
     if (list != null) {
       for (ZNRecord record : list) {
@@ -255,7 +264,8 @@ public final class ZKUtil {
     }
   }
 
-  public static void dropChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord)
{
+  public static void dropChildren(RealmAwareZkClient client, String parentPath,
+      ZNRecord nodeRecord) {
     // TODO: check if parentPath exists
     String id = nodeRecord.getId();
     String temp = parentPath + "/" + id;
@@ -280,7 +290,7 @@ public final class ZKUtil {
     return result;
   }
 
-  public static List<ZNRecord> getChildren(HelixZkClient client, String path) {
+  public static List<ZNRecord> getChildren(RealmAwareZkClient client, String path)
{
     // parent watch will be set by zkClient
     List<String> children = client.getChildren(path);
     if (children == null || children.size() == 0) {
@@ -321,7 +331,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void updateIfExists(HelixZkClient client, String path, final ZNRecord record,
+  public static void updateIfExists(RealmAwareZkClient client, String path, final ZNRecord
record,
       boolean mergeOnUpdate) {
     if (client.exists(path)) {
       DataUpdater<Object> updater = new DataUpdater<Object>() {
@@ -353,7 +363,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrMerge(HelixZkClient client, String path, final ZNRecord record,
+  public static void createOrMerge(RealmAwareZkClient client, String path, final ZNRecord
record,
       final boolean persistent, final boolean mergeOnUpdate) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -408,7 +418,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrUpdate(HelixZkClient client, String path, final ZNRecord record,
+  public static void createOrUpdate(RealmAwareZkClient client, String path, final ZNRecord
record,
       final boolean persistent, final boolean mergeOnUpdate) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -457,8 +467,8 @@ public final class ZKUtil {
     }
   }
 
-  public static void asyncCreateOrMerge(HelixZkClient client, String path, final ZNRecord
record,
-      final boolean persistent, final boolean mergeOnUpdate) {
+  public static void asyncCreateOrMerge(RealmAwareZkClient client, String path,
+      final ZNRecord record, final boolean persistent, final boolean mergeOnUpdate) {
     try {
       if (client.exists(path)) {
         if (mergeOnUpdate) {
@@ -510,7 +520,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrReplace(HelixZkClient client, String path, final ZNRecord record,
+  public static void createOrReplace(RealmAwareZkClient client, String path, final ZNRecord
record,
       final boolean persistent) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -553,7 +563,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void subtract(HelixZkClient client, final String path,
+  public static void subtract(RealmAwareZkClient client, final String path,
       final ZNRecord recordTosubtract) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
diff --git a/metadata-store-directory-common/pom.xml b/metadata-store-directory-common/pom.xml
index 1b0d964..a38d287 100644
--- a/metadata-store-directory-common/pom.xml
+++ b/metadata-store-directory-common/pom.xml
@@ -33,8 +33,6 @@ under the License.
 
   <properties>
     <osgi.import>
-      org.apache.commons.cli*,
-      org.apache.commons.io*;version="[1.4,2)",
       org.slf4j*;version="[1.6,2)",
       *
     </osgi.import>
@@ -69,11 +67,6 @@ under the License.
       <version>3.8.1</version>
     </dependency>
     <dependency>
-      <groupId>commons-cli</groupId>
-      <artifactId>commons-cli</artifactId>
-      <version>1.2</version>
-    </dependency>
-    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
index 03bf000..d9f7461 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.api.client;
  * under the License.
  */
 
-import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -150,8 +149,8 @@ public interface HelixZkClient extends RealmAwareZkClient {
     }
 
     @Override
-    public ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) {
-      this._connectInitTimeout = _connectInitTimeout;
+    public ZkClientConfig setConnectInitTimeout(long connectInitTimeout) {
+      this._connectInitTimeout = connectInitTimeout;
       return this;
     }
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index e466d36..fb10073 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -19,9 +19,13 @@ package org.apache.helix.zookeeper.api.client;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -54,9 +58,8 @@ public interface RealmAwareZkClient {
    * SINGLE_REALM: CRUD, change subscription, and EPHEMERAL CreateMode are supported.
    * MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL
CreateMode will throw an UnsupportedOperationException.
    */
-  enum MODE {
-    SINGLE_REALM,
-    MULTI_REALM
+  enum RealmMode {
+    SINGLE_REALM, MULTI_REALM
   }
 
   int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
@@ -325,16 +328,18 @@ public interface RealmAwareZkClient {
    * ZkConnection-related configs for creating an instance of RealmAwareZkClient.
    */
   class RealmAwareZkConnectionConfig {
-
     /**
      * zkRealmShardingKey: used to deduce which ZK realm this RealmAwareZkClientConfig should
connect to.
-     * NOTE: this field will be ignored if MODE is MULTI_REALM!
+     * NOTE: this field will be ignored if RealmMode is MULTI_REALM!
      */
-    private final String _zkRealmShardingKey;
-    private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
-
-    public RealmAwareZkConnectionConfig(String zkRealmShardingKey) {
-      _zkRealmShardingKey = zkRealmShardingKey;
+    private String _zkRealmShardingKey;
+    private String _msdsEndpoint;
+    private int _sessionTimeout;
+
+    private RealmAwareZkConnectionConfig(Builder builder) {
+      _zkRealmShardingKey = builder._zkRealmShardingKey;
+      _msdsEndpoint = builder._msdsEndpoint;
+      _sessionTimeout = builder._sessionTimeout;
     }
 
     @Override
@@ -373,6 +378,81 @@ public interface RealmAwareZkClient {
     public int getSessionTimeout() {
       return _sessionTimeout;
     }
+
+    public String getMsdsEndpoint() {
+      return _msdsEndpoint;
+    }
+
+    public HelixZkClient.ZkConnectionConfig createZkConnectionConfig()
+        throws IOException, InvalidRoutingDataException {
+      // Convert to a single-realm HelixZkClient's ZkConnectionConfig
+      if (_zkRealmShardingKey == null || _zkRealmShardingKey.isEmpty()) {
+        throw new ZkClientException(
+            "Cannot create ZkConnectionConfig because ZK realm sharding key is either null
or empty!");
+      }
+
+      String zkAddress;
+      // Look up the ZK address for the given ZK realm sharding key
+      if (_msdsEndpoint == null || _msdsEndpoint.isEmpty()) {
+        zkAddress = HttpRoutingDataReader.getMetadataStoreRoutingData()
+            .getMetadataStoreRealm(_zkRealmShardingKey);
+      } else {
+        zkAddress = HttpRoutingDataReader.getMetadataStoreRoutingData(_msdsEndpoint)
+            .getMetadataStoreRealm(_zkRealmShardingKey);
+      }
+
+      return new HelixZkClient.ZkConnectionConfig(zkAddress).setSessionTimeout(_sessionTimeout);
+    }
+
+    public static class Builder {
+      private RealmMode _realmMode;
+      private String _zkRealmShardingKey;
+      private String _msdsEndpoint;
+      private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+
+      public Builder() {
+      }
+
+      public Builder setRealmMode(RealmMode mode) {
+        _realmMode = mode;
+        return this;
+      }
+
+      public Builder setZkRealmShardingKey(String shardingKey) {
+        _zkRealmShardingKey = shardingKey;
+        return this;
+      }
+
+      public Builder setMsdsEndpoint(String msdsEndpoint) {
+        _msdsEndpoint = msdsEndpoint;
+        return this;
+      }
+
+      public Builder setSessionTimeout(int sessionTimeout) {
+        _sessionTimeout = sessionTimeout;
+        return this;
+      }
+
+      public RealmAwareZkConnectionConfig build() {
+        validate();
+        return new RealmAwareZkConnectionConfig(this);
+      }
+
+      /**
+       * Validate the internal fields of the builder before creating an instance.
+       */
+      private void validate() {
+        boolean isShardingKeySet = _zkRealmShardingKey != null && !_zkRealmShardingKey.isEmpty();
+        if (_realmMode == RealmMode.MULTI_REALM && isShardingKeySet) {
+          throw new IllegalArgumentException(
+              "ZK sharding key cannot be set on multi-realm mode! Sharding key: "
+                  + _zkRealmShardingKey);
+        }
+        if (_realmMode == RealmMode.SINGLE_REALM && !isShardingKeySet) {
+          throw new IllegalArgumentException("ZK sharding key must be set on single-realm
mode!");
+        }
+      }
+    }
   }
 
   /**
@@ -479,5 +559,17 @@ public interface RealmAwareZkClient {
     public long getConnectInitTimeout() {
       return _connectInitTimeout;
     }
+
+    /**
+     * Create HelixZkClient.ZkClientConfig based on RealmAwareZkClientConfig.
+     * @return
+     */
+    public HelixZkClient.ZkClientConfig createHelixZkClientConfig() {
+      return new HelixZkClient.ZkClientConfig().setZkSerializer(_zkSerializer)
+          .setMonitorType(_monitorType).setMonitorKey(_monitorKey)
+          .setMonitorInstanceName(_monitorInstanceName).setMonitorRootPathOnly(_monitorRootPathOnly)
+          .setOperationRetryTimeout(_operationRetryTimeout)
+          .setConnectInitTimeout(_connectInitTimeout);
+    }
   }
 }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
index d500ce4..323d5f4 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -85,8 +85,10 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
         new RealmAwareZkClient.RealmAwareZkClientConfig();
 
     // Create a connection config with the invalid sharding key
+    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder builder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
     RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
-        new RealmAwareZkClient.RealmAwareZkConnectionConfig(invalidShardingKey);
+        builder.setZkRealmShardingKey(invalidShardingKey).build();
 
     try {
       _realmAwareZkClient = _realmAwareZkClientFactory
@@ -98,7 +100,8 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
 
     // Use a valid sharding key this time around
     String validShardingKey = ZK_SHARDING_KEY_PREFIX + "_" + 0; // Use TEST_SHARDING_KEY_0
-    connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig(validShardingKey);
+    builder.setZkRealmShardingKey(validShardingKey);
+    connectionConfig = builder.build();
     _realmAwareZkClient = _realmAwareZkClientFactory
         .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData);
   }


Mime
View raw message