helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 39/43: Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)
Date Tue, 17 Mar 2020 00:04:10 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 57ce75ca58d96fac87d1bbcb6329e72325d08f36
Author: Huizhi Lu <ihuizhi.lu@gmail.com>
AuthorDate: Thu Mar 12 09:44:38 2020 -0700

    Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)
    
    To make Helix Java APIs realm-aware, we need to make both ZKHelixAdmin and ZKHelixManager
realm-aware. This commit adds a Builder to set client config and connection config for building
realm-aware ZkClients underneath.
---
 .../apache/helix/manager/zk/CallbackHandler.java   |   7 +-
 .../helix/manager/zk/ParticipantManager.java       |   5 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 202 +++++++++++++++++----
 .../apache/helix/manager/zk/ZKHelixManager.java    | 102 ++++++++---
 .../test/java/org/apache/helix/ZkTestHelper.java   |  21 ++-
 .../integration/TestResourceGroupEndtoEnd.java     |   3 +-
 .../controller/TestControllerLeadershipChange.java |   4 +-
 .../manager/ClusterControllerManager.java          |   3 +-
 .../manager/ClusterDistributedController.java      |   3 +-
 .../manager/MockParticipantManager.java            |   3 +-
 .../helix/integration/manager/ZkTestManager.java   |   5 +-
 .../helix/manager/zk/TestHandleNewSession.java     |   5 +-
 .../impl/factory/HelixZkClientFactory.java         |   2 +-
 13 files changed, 281 insertions(+), 84 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 82c9b29..bbb8788 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -64,6 +64,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -107,7 +108,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
   private final Set<EventType> _eventTypes;
   private final HelixDataAccessor _accessor;
   private final ChangeType _changeType;
-  private final HelixZkClient _zkClient;
+  private final RealmAwareZkClient _zkClient;
   private final AtomicLong _lastNotificationTimeStamp;
   private final HelixManager _manager;
   private final PropertyKey _propertyKey;
@@ -191,12 +192,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
-  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+  public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
     this(manager, client, propertyKey, listener, eventTypes, changeType, null);
   }
 
-  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+  public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType,
       HelixCallbackMonitor monitor) {
     if (listener == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 411d937..5090a86 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -47,6 +47,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -62,7 +63,7 @@ import org.slf4j.LoggerFactory;
 public class ParticipantManager {
   private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
 
-  final HelixZkClient _zkclient;
+  final RealmAwareZkClient _zkclient;
   final HelixManager _manager;
   final PropertyKey.Builder _keyBuilder;
   final String _clusterName;
@@ -81,7 +82,7 @@ public class ParticipantManager {
   // session race condition when handling new session for the participant.
   private final String _sessionId;
 
-  public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
+  public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
       LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks,
       final String sessionId) {
     _zkclient = zkclient;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d7c40ff..ea9b55a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -47,9 +47,9 @@ import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -74,12 +74,14 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -90,18 +92,27 @@ import org.slf4j.LoggerFactory;
 
 
 public class ZKHelixAdmin implements HelixAdmin {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class);
+
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
   private static final String MAINTENANCE_ZNODE_ID = "maintenance";
   private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
 
   private final RealmAwareZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
-  // true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
+  // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false otherwise
   // This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
   private final boolean _usesExternalZkClient;
 
   private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
 
+  /**
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
+   * instead to avoid having to manually create and maintain a RealmAwareZkClient
+   * outside of ZKHelixAdmin.
+   *
+   * @param zkClient A created RealmAwareZkClient
+   */
   @Deprecated
   public ZKHelixAdmin(RealmAwareZkClient zkClient) {
     _zkClient = zkClient;
@@ -109,14 +120,69 @@ public class ZKHelixAdmin implements HelixAdmin {
     _usesExternalZkClient = true;
   }
 
+  /**
+   * There are 2 realm-aware modes to connect to ZK:
+   * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>,
+   * it will connect on multi-realm mode;
+   * 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code>
provided.
+   *
+   * @param zkAddress ZK address
+   * @exception HelixException if not able to connect on multi-realm mode
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
+   */
+  @Deprecated
   public ZKHelixAdmin(String zkAddress) {
     int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30"));
-    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-    clientConfig.setZkSerializer(new ZNRecordSerializer())
-        .setConnectInitTimeout(timeOutInSec * 1000);
-    _zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
-    _zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig()
+            .setConnectInitTimeout(timeOutInSec * 1000L)
+            .setZkSerializer(new ZNRecordSerializer());
+
+    RealmAwareZkClient zkClient;
+
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      try {
+        zkClient = new FederatedZkClient(
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
+      } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on multi-realm mode.", e);
+      }
+    } else {
+      zkClient = SharedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+              clientConfig.createHelixZkClientConfig());
+      zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+    }
+
+    _zkClient = zkClient;
+    _configAccessor = new ConfigAccessor(_zkClient);
+    _usesExternalZkClient = false;
+  }
+
+  private ZKHelixAdmin(Builder builder) {
+    RealmAwareZkClient zkClient;
+    switch (builder.realmMode) {
+      case MULTI_REALM:
+        try {
+          zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig,
+              builder.realmAwareZkClientConfig);
+        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          throw new HelixException("Not able to connect on multi-realm mode.", e);
+        }
+        break;
+      case SINGLE_REALM:
+        // Create a HelixZkClient: Use a SharedZkClient because ZKHelixAdmin does not need
to do
+        // ephemeral operations
+        zkClient = SharedZkClientFactory.getInstance()
+            .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+                builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+        break;
+      default:
+        throw new HelixException("Invalid RealmMode given: " + builder.realmMode);
+    }
+
+    _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(_zkClient);
     _usesExternalZkClient = false;
   }
@@ -206,7 +272,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
   }
@@ -382,7 +448,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         reason == null ? "NULL" : reason);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     if (enabled) {
       accessor.removeProperty(keyBuilder.pause());
@@ -407,7 +473,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public boolean isInMaintenanceMode(String clusterName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getBaseDataAccessor()
         .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
   }
@@ -448,7 +514,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       final MaintenanceSignal.TriggeringEntity triggeringEntity) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
         triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically"
             : "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
@@ -516,7 +582,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         instanceName, clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     // check the instance is alive
     LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
@@ -634,7 +700,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         instanceNames == null ? "NULL" : HelixUtil.serializeByComma(instanceNames), clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
 
     Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
@@ -662,7 +728,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         resourceNames == null ? "NULL" : HelixUtil.serializeByComma(resourceNames), clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
 
     Set<String> resetResourceNames = new HashSet<String>(resourceNames);
@@ -790,7 +856,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     for (String instanceName : instances) {
       InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -909,7 +975,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     for (String resourceName : getResourcesInCluster(clusterName)) {
       IdealState is = accessor.getProperty(keyBuilder.idealStates(resourceName));
@@ -925,7 +991,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public IdealState getResourceIdealState(String clusterName, String resourceName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.idealStates(resourceName));
   }
@@ -938,7 +1004,7 @@ public class ZKHelixAdmin implements HelixAdmin {
             clusterName, idealState == null ? "NULL" : idealState.toString());
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
   }
@@ -981,7 +1047,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public ExternalView getResourceExternalView(String clusterName, String resourceName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.externalView(resourceName));
   }
 
@@ -1015,7 +1081,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
   }
 
@@ -1024,7 +1090,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info("Drop resource {} from cluster {}", resourceName, clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.removeProperty(keyBuilder.idealStates(resourceName));
     accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
@@ -1039,7 +1105,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
{
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
   }
@@ -1049,7 +1115,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info("Deleting cluster {}.", clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     String root = "/" + clusterName;
     if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0) {
@@ -1093,7 +1159,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
   }
@@ -1288,7 +1354,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         constraintId, clusterName);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
-    Builder keyBuilder = new Builder(clusterName);
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
 
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1311,7 +1377,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         constraintId, clusterName);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
-    Builder keyBuilder = new Builder(clusterName);
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
 
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1333,7 +1399,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
 
-    Builder keyBuilder = new Builder(clusterName);
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     return accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
   }
 
@@ -1415,7 +1481,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
     config.addTag(tag);
@@ -1436,7 +1502,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
     config.removeTag(tag);
@@ -1457,7 +1523,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
     config.setZoneId(zoneId);
@@ -1649,7 +1715,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates());
     List<String> nullIdealStates = new ArrayList<>();
     for (int i = 0; i < idealStates.size(); i++) {
@@ -1690,7 +1756,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     // Ensure that all instances are valid
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
     if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false))
{
       throw new HelixException(String
@@ -1800,4 +1866,74 @@ public class ZKHelixAdmin implements HelixAdmin {
             clusterConfig));
     return true;
   }
+
+  // TODO: refactor builder to reduce duplicate code with other Helix Java APIs
+  public static class Builder {
+    private String zkAddress;
+    private RealmAwareZkClient.RealmMode realmMode;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
+
+    public Builder() {
+    }
+
+    public ZKHelixAdmin.Builder setZkAddress(String zkAddress) {
+      this.zkAddress = zkAddress;
+      return this;
+    }
+
+    public ZKHelixAdmin.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+      this.realmMode = realmMode;
+      return this;
+    }
+
+    public ZKHelixAdmin.Builder setRealmAwareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+      realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+      return this;
+    }
+
+    public ZKHelixAdmin.Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+      realmAwareZkClientConfig = realmAwareZkClientConfig;
+      return this;
+    }
+
+    public ZKHelixAdmin build() {
+      validate();
+      return new ZKHelixAdmin(this);
+    }
+
+    /*
+     * Validates the given parameters before creating an instance of ZKHelixAdmin.
+     */
+    private void validate() {
+      // Resolve RealmMode based on other parameters
+      boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty();
+      if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet)
{
+        throw new HelixException(
+            "RealmMode cannot be single-realm without a valid ZkAddress set!");
+      }
+      if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet)
{
+        throw new HelixException(
+            "ZkAddress cannot be set on multi-realm mode!");
+      }
+      if (realmMode == null) {
+        realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+            : RealmAwareZkClient.RealmMode.MULTI_REALM;
+      }
+
+      // Resolve RealmAwareZkClientConfig
+      if (realmAwareZkClientConfig == null) {
+        realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+      }
+
+      // Resolve RealmAwareZkConnectionConfig
+      if (realmAwareZkConnectionConfig == null) {
+        // If not set, create a default one
+        realmAwareZkConnectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+      }
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b452b4c..f7180b2 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -73,14 +74,17 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
 import org.apache.helix.monitoring.mbeans.MonitorLevel;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.AutoFallbackPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -117,7 +121,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   private final String _version;
   private int _reportLatency;
 
-  protected HelixZkClient _zkclient = null;
+  protected RealmAwareZkClient _zkclient;
   private final DefaultMessagingService _messagingService;
   private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
 
@@ -652,30 +656,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   }
 
   void createClient() throws Exception {
-    PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
-
-    HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
-    connectionConfig.setSessionTimeout(_sessionTimeout);
-    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-    clientConfig
-        .setZkSerializer(zkSerializer)
-        .setConnectInitTimeout(_connectionInitTimeout)
-        .setMonitorType(_instanceType.name())
-        .setMonitorKey(_clusterName)
-        .setMonitorInstanceName(_instanceName)
-        .setMonitorRootPathOnly(isMonitorRootPathOnly());
-
-    HelixZkClient newClient;
-    switch (_instanceType) {
-    case ADMINISTRATOR:
-      newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
-      break;
-    default:
-      newClient = DedicatedZkClientFactory
-          .getInstance().buildZkClient(connectionConfig, clientConfig);
-      break;
-    }
+    final RealmAwareZkClient newClient = createSingleRealmZkClient();
 
     synchronized (this) {
       if (_zkclient != null) {
@@ -1285,4 +1266,75 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   public Long getSessionStartTime() {
     return _sessionStartTime;
   }
+
+  /*
+   * Prepares connection config and client config based on the internal parameters given
to
+   * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
+   * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating
+   * ZkConnections.
+   */
+  private RealmAwareZkClient createSingleRealmZkClient() {
+    final String shardingKey = buildShardingKey();
+    PathBasedZkSerializer zkSerializer =
+        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
+
+    RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+            .setZkRealmShardingKey(shardingKey)
+            .setSessionTimeout(_sessionTimeout).build();
+
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig();
+
+    clientConfig.setZkSerializer(zkSerializer)
+        .setConnectInitTimeout(_connectionInitTimeout)
+        .setMonitorType(_instanceType.name())
+        .setMonitorKey(_clusterName)
+        .setMonitorInstanceName(_instanceName)
+        .setMonitorRootPathOnly(isMonitorRootPathOnly());
+
+    if (_instanceType == InstanceType.ADMINISTRATOR) {
+      return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig,
+          clientConfig);
+    }
+
+    return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig,
+        clientConfig);
+  }
+
+  /*
+   * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
+   * System config is set or not. Two types of ZkClients are available:
+   * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient
+   * that provides full ZkClient functionalities and connects to the correct ZK by querying
+   * MetadataStoreDirectoryService.
+   * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
+   * the ZK address given.
+   */
+  private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      try {
+        // Create realm-aware ZkClient.
+        return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
+      } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on realm-aware mode for sharding key:
"
+            + connectionConfig.getZkRealmShardingKey(), e);
+      }
+    }
+
+    // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address.
+    HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig();
+    HelixZkClient.ZkConnectionConfig helixZkConnectionConfig =
+        new HelixZkClient.ZkConnectionConfig(_zkAddress)
+            .setSessionTimeout(connectionConfig.getSessionTimeout());
+
+    return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
+  }
+
+  private String buildShardingKey() {
+    return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index c2b3d35..73dded4 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -43,6 +43,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
@@ -71,7 +72,7 @@ public class ZkTestHelper {
   /**
    * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
    */
-  public static void simulateZkStateReconnected(HelixZkClient client) {
+  public static void simulateZkStateReconnected(RealmAwareZkClient client) {
     ZkClient zkClient = (ZkClient) client;
     WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
     zkClient.process(event);
@@ -84,7 +85,7 @@ public class ZkTestHelper {
    * @param client
    * @return
    */
-  public static String getSessionId(HelixZkClient client) {
+  public static String getSessionId(RealmAwareZkClient client) {
     ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
     ZooKeeper curZookeeper = connection.getZookeeper();
     return Long.toHexString(curZookeeper.getSessionId());
@@ -146,7 +147,7 @@ public class ZkTestHelper {
     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
   }
 
-  public static void expireSession(HelixZkClient client) throws Exception {
+  public static void expireSession(RealmAwareZkClient client) throws Exception {
     final CountDownLatch waitNewSession = new CountDownLatch(1);
     final ZkClient zkClient = (ZkClient) client;
 
@@ -213,7 +214,7 @@ public class ZkTestHelper {
    * @param client
    * @throws Exception
    */
-  public static void asyncExpireSession(HelixZkClient client) throws Exception {
+  public static void asyncExpireSession(RealmAwareZkClient client) throws Exception {
     final ZkClient zkClient = (ZkClient) client;
     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper curZookeeper = connection.getZookeeper();
@@ -245,7 +246,7 @@ public class ZkTestHelper {
   /*
    * stateMap: partition->instance->state
    */
-  public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
+  public static boolean verifyState(RealmAwareZkClient zkclient, String clusterName, String
resourceName,
       Map<String, Map<String, String>> expectStateMap, String op) {
     boolean result = true;
     ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -391,7 +392,7 @@ public class ZkTestHelper {
     }
   }
 
-  public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws
Exception {
+  public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client)
throws Exception {
     Map<String, List<String>> lists = new HashMap<String, List<String>>();
     ZkClient zkClient = (ZkClient) client;
 
@@ -424,7 +425,7 @@ public class ZkTestHelper {
     return lists;
   }
 
-  public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient
client)
+  public static Map<String, Set<IZkDataListener>> getZkDataListener(RealmAwareZkClient
client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
     field.setAccessible(true);
@@ -433,7 +434,7 @@ public class ZkTestHelper {
     return dataListener;
   }
 
-  public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient
client)
+  public static Map<String, Set<IZkChildListener>> getZkChildListener(RealmAwareZkClient
client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
     field.setAccessible(true);
@@ -442,7 +443,7 @@ public class ZkTestHelper {
     return childListener;
   }
 
-  public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
+  public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception
{
     java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
     field.setAccessible(true);
     Object eventThread = field.get(zkclient);
@@ -468,7 +469,7 @@ public class ZkTestHelper {
     return false;
   }
 
-  public static void injectExpire(HelixZkClient client)
+  public static void injectExpire(RealmAwareZkClient client)
       throws ExecutionException, InterruptedException {
     final ZkClient zkClient = (ZkClient) client;
     Future future = _executor.submit(new Runnable() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index 0aa5787..0313a77 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -47,6 +47,7 @@ import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -427,7 +428,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
     }
 
     @Override
-    public HelixZkClient getZkClient() {
+    public RealmAwareZkClient getZkClient() {
       return _zkclient;
     }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
index 8a1ff03..5aa9a91 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
@@ -34,12 +34,12 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -148,7 +148,7 @@ public class TestControllerLeadershipChange extends ZkTestBase {
     callbackHandlers.forEach(callbackHandler -> Assert.assertTrue(callbackHandler.isReady()));
 
     // check the zk connection is open
-    HelixZkClient zkClient = controller.getZkClient();
+    RealmAwareZkClient zkClient = controller.getZkClient();
     Assert.assertFalse(zkClient.isClosed());
     Long sessionId = zkClient.getSessionId();
     Assert.assertNotNull(sessionId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 7d810fb..9281e2d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -27,6 +27,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,7 +95,7 @@ public class ClusterControllerManager extends ZKHelixManager implements
Runnable
   }
 
   @Override
-  public HelixZkClient getZkClient() {
+  public RealmAwareZkClient getZkClient() {
     return _zkclient;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 8e15928..397fae5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +84,7 @@ public class ClusterDistributedController extends ZKHelixManager implements
Runn
   }
 
   @Override
-  public HelixZkClient getZkClient() {
+  public RealmAwareZkClient getZkClient() {
     return _zkclient;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 0036e0d..84bc334 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -33,6 +33,7 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,7 +129,7 @@ public class MockParticipantManager extends ZKHelixManager implements
Runnable,
   }
 
   @Override
-  public HelixZkClient getZkClient() {
+  public RealmAwareZkClient getZkClient() {
     return _zkclient;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
index 1a6903a..93e4b53 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
@@ -22,10 +22,11 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+
 
 public interface ZkTestManager {
-  HelixZkClient getZkClient();
+  RealmAwareZkClient getZkClient();
 
   List<CallbackHandler> getHandlers();
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index 00bedcf..e4c6695 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -35,6 +35,7 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.model.LiveInstance;
 import org.testng.Assert;
@@ -492,7 +493,7 @@ public class TestHandleNewSession extends ZkTestBase {
       return _handlers;
     }
 
-    HelixZkClient getZkClient() {
+    RealmAwareZkClient getZkClient() {
       return _zkclient;
     }
 
@@ -538,7 +539,7 @@ public class TestHandleNewSession extends ZkTestBase {
       return _handlers;
     }
 
-    HelixZkClient getZkClient() {
+    RealmAwareZkClient getZkClient() {
       return _zkclient;
     }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
index ca13321..1f792ee 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
@@ -29,7 +29,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection;
 /**
  * Abstract class of the ZkClient factory.
  */
-abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
+public abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
 
   /**
    * Build a ZkClient using specified connection config and client config


Mime
View raw message