helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 07/11: Add MetadataStoreDirectory and ZkMetadataStoreDirectory (#720)
Date Wed, 12 Feb 2020 02:52:27 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a1be1b3ae69485ab00c9d6e71a98e33551ee96ad
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Wed Feb 5 11:12:50 2020 -0800

    Add MetadataStoreDirectory and ZkMetadataStoreDirectory (#720)
    
    MetadataStoreDirectory is an object that provides Metadata Store Directory APIs (routing APIs, CRUD of routing data, etc.). Helix REST will use this object to serve Metadata Store Directory Service REST endpoints.
    Also, it will make appropriate changes to the ZK access layer to listen on changes on the routing data.
    
    Changelist:
    1. Refactor AbstractTestClass to make multi-ZK setup work
    2. Add implementation of MetadataStoreDirectory
    3. Add TestZkMetadataStoreDirectory
---
 .../rest/metadatastore/MetadataStoreDirectory.java | 119 ++++++++++
 .../rest/metadatastore/RoutingDataListener.java    |  27 +++
 .../metadatastore/ZkMetadataStoreDirectory.java    | 193 +++++++++++++++++
 .../rest/metadatastore/ZkRoutingDataReader.java    | 143 ++++++++++--
 .../constant/MetadataStoreRoutingConstants.java    |  27 +++
 .../TestZkMetadataStoreDirectory.java              | 240 +++++++++++++++++++++
 .../metadatastore/TestZkRoutingDataReader.java     |  52 ++---
 .../helix/rest/server/AbstractTestClass.java       | 158 +++++++-------
 8 files changed, 843 insertions(+), 116 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreDirectory.java
new file mode 100644
index 0000000..032362a
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreDirectory.java
@@ -0,0 +1,119 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+
+/**
+ * MetadataStoreDirectory interface that provides methods that are used to route requests to appropriate metadata store realm.
+ *
+ * namespace: tied to a namespace used in Helix REST (Metadata Store Directory Service endpoints will be served by Helix REST deployables)
+ * realm: a metadata store deployable/ensemble. for example, if an application wishes to use 3 ZK quorums, then each ZK quorum would be considered a realm (ZK realm)
+ * metadata store path sharding key: assuming the metadata store uses a file system APIs, this sharding key denotes the key that maps to a particular metadata store realm. an example of a key is a cluster name mapping to a particular ZK realm (ZK address)
+ */
+public interface MetadataStoreDirectory extends AutoCloseable {
+
+  /**
+   * Retrieves all existing namespaces in the routing metadata store.
+   * @return
+   */
+  Collection<String> getAllNamespaces();
+
+  /**
+   * Returns all metadata store realms in the given namespace.
+   * @return
+   */
+  Collection<String> getAllMetadataStoreRealms(String namespace);
+
+  /**
+   * Returns all path-based sharding keys in the given namespace.
+   * @return
+   */
+  Collection<String> getAllShardingKeys(String namespace);
+
+  /**
+   * Returns all path-based sharding keys in the given namespace and the realm.
+   * @param namespace
+   * @param realm
+   * @return
+   */
+  Collection<String> getAllShardingKeysInRealm(String namespace, String realm);
+
+  /**
+   * Returns all sharding keys that have the given path as the prefix substring.
+   * E.g) Given that there are sharding keys: /a/b/c, /a/b/d, /a/e,
+   * getAllShardingKeysUnderPath(namespace, "/a/b") returns ["/a/b/c": "realm", "/a/b/d": "realm].
+   * @param namespace
+   * @param path
+   * @return
+   */
+  Map<String, String> getAllMappingUnderPath(String namespace, String path);
+
+  /**
+   * Returns the name of the metadata store realm based on the namespace and the sharding key given.
+   * @param namespace
+   * @param shardingKey
+   * @return
+   */
+  String getMetadataStoreRealm(String namespace, String shardingKey)
+      throws NoSuchElementException;
+
+  /**
+   * Creates a realm. If the namespace does not exist, it creates one.
+   * @param namespace
+   * @param realm
+   * @return true if successful or if the realm already exists. false otherwise.
+   */
+  boolean addMetadataStoreRealm(String namespace, String realm);
+
+  /**
+   * Deletes a realm.
+   * @param namespace
+   * @param realm
+   * @return true if successful or the realm or namespace does not exist. false otherwise.
+   */
+  boolean deleteMetadataStoreRealm(String namespace, String realm);
+
+  /**
+   * Creates a mapping between the sharding key to the realm in the given namespace.
+   * @param namespace
+   * @param realm
+   * @param shardingKey
+   * @return false if failed
+   */
+  boolean addShardingKey(String namespace, String realm, String shardingKey);
+
+  /**
+   * Deletes the mapping between the sharding key to the realm in the given namespace.
+   * @param namespace
+   * @param realm
+   * @param shardingKey
+   * @return false if failed; true if the deletion is successful or the key does not exist.
+   */
+  boolean deleteShardingKey(String namespace, String realm, String shardingKey);
+
+  /**
+   * Close MetadataStoreDirectory.
+   */
+  void close();
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/RoutingDataListener.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/RoutingDataListener.java
new file mode 100644
index 0000000..a44fc17
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/RoutingDataListener.java
@@ -0,0 +1,27 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface RoutingDataListener {
+  /**
+   * Callback for updating the internally-cached routing data.
+   */
+  void refreshRoutingData(String namespace);
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
new file mode 100644
index 0000000..85f8f4a
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -0,0 +1,193 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient.ZkClientConfig;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ZK-based MetadataStoreDirectory that listens on the routing data in routing ZKs with a update callback.
+ */
+public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, RoutingDataListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkMetadataStoreDirectory.class);
+
+  // TODO: enable the line below when implementation is complete
+  // The following maps' keys represent the namespace
+  private final Map<String, MetadataStoreRoutingDataReader> _routingDataReaderMap;
+  private final Map<String, MetadataStoreRoutingData> _routingDataMap;
+  private final Map<String, String> _routingZkAddressMap;
+  // <namespace, <realm, <list of sharding keys>> mappings
+  private final Map<String, Map<String, List<String>>> _realmToShardingKeysMap;
+
+  /**
+   * Creates a ZkMetadataStoreDirectory based on the given routing ZK addresses.
+   * @param routingZkAddressMap (namespace, routing ZK connect string)
+   * @throws InvalidRoutingDataException
+   */
+  public ZkMetadataStoreDirectory(Map<String, String> routingZkAddressMap)
+      throws InvalidRoutingDataException {
+    if (routingZkAddressMap == null || routingZkAddressMap.isEmpty()) {
+      throw new InvalidRoutingDataException("Routing ZK Addresses given are invalid!");
+    }
+    _routingDataReaderMap = new HashMap<>();
+    _routingZkAddressMap = routingZkAddressMap;
+    _realmToShardingKeysMap = new ConcurrentHashMap<>();
+    _routingDataMap = new ConcurrentHashMap<>();
+
+    // Create RoutingDataReaders
+    for (Map.Entry<String, String> routingEntry : _routingZkAddressMap.entrySet()) {
+      _routingDataReaderMap.put(routingEntry.getKey(),
+          new ZkRoutingDataReader(routingEntry.getKey(), routingEntry.getValue(), this));
+
+      // Populate realmToShardingKeys with ZkRoutingDataReader
+      _realmToShardingKeysMap.put(routingEntry.getKey(),
+          _routingDataReaderMap.get(routingEntry.getKey()).getRoutingData());
+    }
+  }
+
+  @Override
+  public Collection<String> getAllNamespaces() {
+    return Collections.unmodifiableCollection(_routingZkAddressMap.keySet());
+  }
+
+  @Override
+  public Collection<String> getAllMetadataStoreRealms(String namespace) {
+    if (!_realmToShardingKeysMap.containsKey(namespace)) {
+      throw new NoSuchElementException("Namespace " + namespace + " does not exist!");
+    }
+    return Collections.unmodifiableCollection(_realmToShardingKeysMap.get(namespace).keySet());
+  }
+
+  @Override
+  public Collection<String> getAllShardingKeys(String namespace) {
+    if (!_realmToShardingKeysMap.containsKey(namespace)) {
+      throw new NoSuchElementException("Namespace " + namespace + " does not exist!");
+    }
+    Set<String> allShardingKeys = new HashSet<>();
+    _realmToShardingKeysMap.get(namespace).values().forEach(keys -> allShardingKeys.addAll(keys));
+    return allShardingKeys;
+  }
+
+  @Override
+  public Collection<String> getAllShardingKeysInRealm(String namespace, String realm) {
+    if (!_realmToShardingKeysMap.containsKey(namespace)) {
+      throw new NoSuchElementException("Namespace " + namespace + " does not exist!");
+    }
+    if (!_realmToShardingKeysMap.get(namespace).containsKey(realm)) {
+      throw new NoSuchElementException(
+          "Realm " + realm + " does not exist in namespace " + namespace);
+    }
+    return Collections.unmodifiableCollection(_realmToShardingKeysMap.get(namespace).get(realm));
+  }
+
+  @Override
+  public Map<String, String> getAllMappingUnderPath(String namespace, String path) {
+    // TODO: get it from routingData
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getMetadataStoreRealm(String namespace, String shardingKey) {
+    // TODO: get it from routingData
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addMetadataStoreRealm(String namespace, String realm) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean deleteMetadataStoreRealm(String namespace, String realm) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addShardingKey(String namespace, String realm, String shardingKey) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean deleteShardingKey(String namespace, String realm, String shardingKey) {
+    // TODO implement when MetadataStoreRoutingDataWriter is ready
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Callback for updating the cached routing data.
+   * Note: this method should not synchronize on the class or the map. We do not want namespaces blocking each other.
+   * Threadsafe map is used for _realmToShardingKeysMap.
+   * The global consistency of the in-memory routing data is not a requirement (eventual consistency is enough).
+   * @param namespace
+   */
+  @Override
+  public void refreshRoutingData(String namespace) {
+    // Safe to ignore the callback if routingDataMap is null.
+    // If routingDataMap is null, then it will be populated by the constructor anyway
+    // If routingDataMap is not null, then it's safe for the callback function to update it
+
+    // Check if namespace exists; otherwise, return as a NOP and log it
+    if (!_routingZkAddressMap.containsKey(namespace)) {
+      LOG.error("Failed to refresh internally-cached routing data! Namespace not found: " + namespace);
+    }
+
+    try {
+      _realmToShardingKeysMap.put(namespace, _routingDataReaderMap.get(namespace).getRoutingData());
+    } catch (InvalidRoutingDataException e) {
+      LOG.error("Failed to get routing data for namespace: " + namespace + "!");
+    }
+
+    if (_routingDataMap != null) {
+      MetadataStoreRoutingData newRoutingData =
+          new TrieRoutingData(new TrieRoutingData.TrieNode(null, null, false, null));
+      // TODO call constructRoutingData() here.
+      _routingDataMap.put(namespace, newRoutingData);
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    _routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close);
+  }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
index a4c7e1c..453180f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
@@ -22,54 +22,163 @@ package org.apache.helix.rest.metadatastore;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
+import org.apache.zookeeper.Watcher;
 
-public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader {
-  static final String ROUTING_DATA_PATH = "/METADATA_STORE_ROUTING_DATA";
-  static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
 
+public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkDataListener, IZkChildListener, IZkStateListener {
+  private final String _namespace;
   private final String _zkAddress;
   private final HelixZkClient _zkClient;
+  private final RoutingDataListener _routingDataListener;
+
+  public ZkRoutingDataReader(String namespace, String zkAddress) {
+    this(namespace, zkAddress, null);
+  }
 
-  public ZkRoutingDataReader(String zkAddress) {
+  public ZkRoutingDataReader(String namespace, String zkAddress,
+      RoutingDataListener routingDataListener) {
+    if (namespace == null || namespace.isEmpty()) {
+      throw new IllegalArgumentException("namespace cannot be null or empty!");
+    }
+    _namespace = namespace;
+    if (zkAddress == null || zkAddress.isEmpty()) {
+      throw new IllegalArgumentException("Zk address cannot be null or empty!");
+    }
     _zkAddress = zkAddress;
-    _zkClient = DedicatedZkClientFactory.getInstance().buildZkClient(
-        new HelixZkClient.ZkConnectionConfig(zkAddress),
-        new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+    _routingDataListener = routingDataListener;
+    if (_routingDataListener != null) {
+      // Subscribe child changes
+      _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this);
+      // Subscribe data changes
+      for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+        _zkClient
+            .subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
+                this);
+      }
+    }
   }
 
-  public Map<String, List<String>> getRoutingData() throws InvalidRoutingDataException {
+  /**
+   * Returns (realm, list of ZK path sharding keys) mappings.
+   * @return
+   * @throws InvalidRoutingDataException
+   */
+  public Map<String, List<String>> getRoutingData()
+      throws InvalidRoutingDataException {
     Map<String, List<String>> routingData = new HashMap<>();
     List<String> children;
     try {
-      children = _zkClient.getChildren(ROUTING_DATA_PATH);
+      children = _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
     } catch (ZkNoNodeException e) {
-      throw new InvalidRoutingDataException("Routing data directory ZNode " + ROUTING_DATA_PATH
-          + " does not exist. Routing ZooKeeper address: " + _zkAddress);
+      throw new InvalidRoutingDataException(
+          "Routing data directory ZNode " + MetadataStoreRoutingConstants.ROUTING_DATA_PATH
+              + " does not exist. Routing ZooKeeper address: " + _zkAddress);
     }
     if (children == null || children.isEmpty()) {
       throw new InvalidRoutingDataException(
           "There are no metadata store realms defined. Routing ZooKeeper address: " + _zkAddress);
     }
     for (String child : children) {
-      ZNRecord record = _zkClient.readData(ROUTING_DATA_PATH + "/" + child);
-      List<String> shardingKeys = record.getListField(ZNRECORD_LIST_FIELD_KEY);
+      ZNRecord record =
+          _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child);
+      List<String> shardingKeys =
+          record.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY);
       if (shardingKeys == null || shardingKeys.isEmpty()) {
-        throw new InvalidRoutingDataException("Realm address ZNode " + ROUTING_DATA_PATH + "/"
-            + child + " does not have a value for key " + ZNRECORD_LIST_FIELD_KEY
-            + ". Routing ZooKeeper address: " + _zkAddress);
+        throw new InvalidRoutingDataException(
+            "Realm address ZNode " + MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child
+                + " does not have a value for key "
+                + MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY
+                + ". Routing ZooKeeper address: " + _zkAddress);
       }
       routingData.put(child, shardingKeys);
     }
     return routingData;
   }
 
-  public void close() {
+  public synchronized void close() {
+    _zkClient.unsubscribeAll();
     _zkClient.close();
   }
+
+  @Override
+  public synchronized void handleDataChange(String s, Object o)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleDataDeleted(String s)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+
+    // Renew subscription
+    _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this);
+    for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+      _zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
+          this);
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleChildChange(String s, List<String> list)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+
+    // Subscribe data changes again because some children might have been deleted or added
+    _zkClient.unsubscribeAll();
+    for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+      _zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
+          this);
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleStateChanged(Watcher.Event.KeeperState state)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleNewSession(String sessionId)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
+
+  @Override
+  public synchronized void handleSessionEstablishmentError(Throwable error)
+      throws Exception {
+    if (_zkClient.isClosed()) {
+      return;
+    }
+    _routingDataListener.refreshRoutingData(_namespace);
+  }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
new file mode 100644
index 0000000..fda355b
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
@@ -0,0 +1,27 @@
+package org.apache.helix.rest.metadatastore.constant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class MetadataStoreRoutingConstants {
+  public static final String ROUTING_DATA_PATH = "/METADATA_STORE_ROUTING_DATA";
+
+  // For ZK only
+  public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
new file mode 100644
index 0000000..7b0a4f0
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
@@ -0,0 +1,240 @@
+package org.apache.helix.rest.metadatastore;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
+import org.apache.helix.rest.server.AbstractTestClass;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkMetadataStoreDirectory extends AbstractTestClass {
+  /**
+   * The following are constants to be used for testing.
+   */
+  private static final String TEST_REALM_1 = "testRealm1";
+  private static final List<String> TEST_SHARDING_KEYS_1 =
+      Arrays.asList("/sharding/key/1/a", "/sharding/key/1/b", "/sharding/key/1/c");
+  private static final String TEST_REALM_2 = "testRealm2";
+  private static final List<String> TEST_SHARDING_KEYS_2 =
+      Arrays.asList("/sharding/key/1/d", "/sharding/key/1/e", "/sharding/key/1/f");
+  private static final String TEST_REALM_3 = "testRealm3";
+  private static final List<String> TEST_SHARDING_KEYS_3 =
+      Arrays.asList("/sharding/key/1/x", "/sharding/key/1/y", "/sharding/key/1/z");
+
+  // List of all ZK addresses, each of which corresponds to a namespace/routing ZK
+  private List<String> _zkList;
+  // <Namespace, ZkAddr> mapping
+  private Map<String, String> _routingZkAddrMap;
+  private MetadataStoreDirectory _metadataStoreDirectory;
+
+  @BeforeClass
+  public void beforeClass()
+      throws InvalidRoutingDataException {
+    _zkList = new ArrayList<>(ZK_SERVER_MAP.keySet());
+
+    // Populate routingZkAddrMap
+    _routingZkAddrMap = new LinkedHashMap<>();
+    int namespaceIndex = 0;
+    String namespacePrefix = "namespace_";
+    for (String zk : _zkList) {
+      _routingZkAddrMap.put(namespacePrefix + namespaceIndex, zk);
+    }
+
+    // Write dummy mappings in ZK
+    // Create a node that represents a realm address and add 3 sharding keys to it
+    ZNRecord znRecord = new ZNRecord("RoutingInfo");
+
+    _zkList.forEach(zk -> {
+      ZK_SERVER_MAP.get(zk).getZkClient().setZkSerializer(new ZNRecordSerializer());
+      // Write first realm and sharding keys pair
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_1);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
+              true);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
+              znRecord);
+
+      // Create another realm and sharding keys pair
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_2);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_2,
+              true);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_2,
+              znRecord);
+    });
+
+    // Create metadataStoreDirectory
+    _metadataStoreDirectory = new ZkMetadataStoreDirectory(_routingZkAddrMap);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _metadataStoreDirectory.close();
+    _zkList.forEach(zk -> ZK_SERVER_MAP.get(zk).getZkClient()
+        .deleteRecursive(MetadataStoreRoutingConstants.ROUTING_DATA_PATH));
+  }
+
+  @Test
+  public void testGetAllNamespaces() {
+    Assert.assertEquals(_metadataStoreDirectory.getAllNamespaces(), _routingZkAddrMap.keySet());
+  }
+
+  @Test(dependsOnMethods = "testGetAllNamespaces")
+  public void testGetAllMetadataStoreRealms() {
+    Set<String> realms = new HashSet<>();
+    realms.add(TEST_REALM_1);
+    realms.add(TEST_REALM_2);
+
+    for (String namespace : _routingZkAddrMap.keySet()) {
+      Assert.assertEquals(_metadataStoreDirectory.getAllMetadataStoreRealms(namespace), realms);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetAllMetadataStoreRealms")
+  public void testGetAllShardingKeys() {
+    Set<String> allShardingKeys = new HashSet<>();
+    allShardingKeys.addAll(TEST_SHARDING_KEYS_1);
+    allShardingKeys.addAll(TEST_SHARDING_KEYS_2);
+
+    for (String namespace : _routingZkAddrMap.keySet()) {
+      Assert.assertEquals(_metadataStoreDirectory.getAllShardingKeys(namespace), allShardingKeys);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetAllShardingKeys")
+  public void testGetAllShardingKeysInRealm() {
+    for (String namespace : _routingZkAddrMap.keySet()) {
+      // Test two realms independently
+      Assert
+          .assertEquals(_metadataStoreDirectory.getAllShardingKeysInRealm(namespace, TEST_REALM_1),
+              TEST_SHARDING_KEYS_1);
+      Assert
+          .assertEquals(_metadataStoreDirectory.getAllShardingKeysInRealm(namespace, TEST_REALM_2),
+              TEST_SHARDING_KEYS_2);
+    }
+  }
+
+  @Test(dependsOnMethods = "testGetAllShardingKeysInRealm")
+  public void testDataChangeCallback()
+      throws Exception {
+    // For all namespaces (Routing ZKs), add an extra sharding key to TEST_REALM_1
+    String newKey = "/a/b/c/d/e";
+    _zkList.forEach(zk -> {
+      ZNRecord znRecord = ZK_SERVER_MAP.get(zk).getZkClient()
+          .readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1);
+      znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).add(newKey);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
+              znRecord);
+    });
+
+    // Verify that the sharding keys field have been updated
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String namespace : _routingZkAddrMap.keySet()) {
+        try {
+          return _metadataStoreDirectory.getAllShardingKeys(namespace).contains(newKey)
+              && _metadataStoreDirectory.getAllShardingKeysInRealm(namespace, TEST_REALM_1)
+              .contains(newKey);
+        } catch (NoSuchElementException e) {
+          // Pass - wait until callback is called
+        }
+      }
+      return false;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  @Test(dependsOnMethods = "testDataChangeCallback")
+  public void testChildChangeCallback()
+      throws Exception {
+    // For all namespaces (Routing ZKs), add a realm with a sharding key list
+    _zkList.forEach(zk -> {
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_3,
+              true);
+      ZNRecord znRecord = new ZNRecord("RoutingInfo");
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_3);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_3,
+              znRecord);
+    });
+
+    // Verify that the new realm and sharding keys have been updated in-memory via callback
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String namespace : _routingZkAddrMap.keySet()) {
+        try {
+          return _metadataStoreDirectory.getAllMetadataStoreRealms(namespace).contains(TEST_REALM_3)
+              && _metadataStoreDirectory.getAllShardingKeysInRealm(namespace, TEST_REALM_3)
+              .containsAll(TEST_SHARDING_KEYS_3);
+        } catch (NoSuchElementException e) {
+          // Pass - wait until callback is called
+        }
+      }
+      return false;
+    }, TestHelper.WAIT_DURATION));
+
+    // Since there was a child change callback, make sure data change works on the new child (realm) as well by adding a key
+    // This tests removing all subscriptions and subscribing with new children list
+    // For all namespaces (Routing ZKs), add an extra sharding key to TEST_REALM_3
+    String newKey = "/a/b/c/d/e";
+    _zkList.forEach(zk -> {
+      ZNRecord znRecord = ZK_SERVER_MAP.get(zk).getZkClient()
+          .readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_3);
+      znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).add(newKey);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_3,
+              znRecord);
+    });
+
+    // Verify that the sharding keys field have been updated
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String namespace : _routingZkAddrMap.keySet()) {
+        try {
+          return _metadataStoreDirectory.getAllShardingKeys(namespace).contains(newKey)
+              && _metadataStoreDirectory.getAllShardingKeysInRealm(namespace, TEST_REALM_3)
+              .contains(newKey);
+        } catch (NoSuchElementException e) {
+          // Pass - wait until callback is called
+        }
+      }
+      return false;
+    }, TestHelper.WAIT_DURATION));
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
index d06c38d..4479f68 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
@@ -23,8 +23,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
 import org.apache.helix.rest.server.AbstractTestClass;
 import org.testng.Assert;
@@ -33,12 +35,14 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+
 public class TestZkRoutingDataReader extends AbstractTestClass {
+  private static final String DUMMY_NAMESPACE = "NAMESPACE";
   private MetadataStoreRoutingDataReader _zkRoutingDataReader;
 
   @BeforeClass
   public void beforeClass() {
-    _zkRoutingDataReader = new ZkRoutingDataReader(ZK_ADDR);
+    _zkRoutingDataReader = new ZkRoutingDataReader(DUMMY_NAMESPACE, ZK_ADDR, null);
   }
 
   @AfterClass
@@ -48,7 +52,7 @@ public class TestZkRoutingDataReader extends AbstractTestClass {
 
   @AfterMethod
   public void afterMethod() {
-    _baseAccessor.remove(ZkRoutingDataReader.ROUTING_DATA_PATH, AccessOption.PERSISTENT);
+    _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, AccessOption.PERSISTENT);
   }
 
   @Test
@@ -57,23 +61,24 @@ public class TestZkRoutingDataReader extends AbstractTestClass {
     ZNRecord testZnRecord1 = new ZNRecord("testZnRecord1");
     List<String> testShardingKeys1 =
         Arrays.asList("/sharding/key/1/a", "/sharding/key/1/b", "/sharding/key/1/c");
-    testZnRecord1.setListField(ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY, testShardingKeys1);
+    testZnRecord1
+        .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, testShardingKeys1);
 
     // Create another node that represents a realm address and add 3 sharding keys to it
     ZNRecord testZnRecord2 = new ZNRecord("testZnRecord2");
-    List<String> testShardingKeys2 = Arrays.asList("/sharding/key/2/a", "/sharding/key/2/b",
-        "/sharding/key/2/c", "/sharding/key/2/d");
-    testZnRecord2.setListField(ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY, testShardingKeys2);
+    List<String> testShardingKeys2 = Arrays
+        .asList("/sharding/key/2/a", "/sharding/key/2/b", "/sharding/key/2/c", "/sharding/key/2/d");
+    testZnRecord2
+        .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, testShardingKeys2);
 
     // Add both nodes as children nodes to ZkRoutingDataReader.ROUTING_DATA_PATH
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH + "/testRealmAddress1",
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/testRealmAddress1",
         testZnRecord1, AccessOption.PERSISTENT);
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH + "/testRealmAddress2",
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/testRealmAddress2",
         testZnRecord2, AccessOption.PERSISTENT);
 
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new ZkRoutingDataReader(ZK_ADDR);
     try {
-      Map<String, List<String>> routingData = zkRoutingDataReader.getRoutingData();
+      Map<String, List<String>> routingData = _zkRoutingDataReader.getRoutingData();
       Assert.assertEquals(routingData.size(), 2);
       Assert.assertEquals(routingData.get("testRealmAddress1"), testShardingKeys1);
       Assert.assertEquals(routingData.get("testRealmAddress2"), testShardingKeys2);
@@ -84,24 +89,22 @@ public class TestZkRoutingDataReader extends AbstractTestClass {
 
   @Test
   public void testGetRoutingDataMissingMSRD() {
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new ZkRoutingDataReader(ZK_ADDR);
     try {
-      zkRoutingDataReader.getRoutingData();
+      _zkRoutingDataReader.getRoutingData();
       Assert.fail("Expecting InvalidRoutingDataException");
     } catch (InvalidRoutingDataException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains("Routing data directory ZNode " + ZkRoutingDataReader.ROUTING_DATA_PATH
+      Assert.assertTrue(e.getMessage().contains(
+          "Routing data directory ZNode " + MetadataStoreRoutingConstants.ROUTING_DATA_PATH
               + " does not exist. Routing ZooKeeper address: " + ZK_ADDR));
     }
   }
 
   @Test
   public void testGetRoutingDataMissingMSRDChildren() {
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH, new ZNRecord("test"),
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, new ZNRecord("test"),
         AccessOption.PERSISTENT);
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new ZkRoutingDataReader(ZK_ADDR);
     try {
-      zkRoutingDataReader.getRoutingData();
+      _zkRoutingDataReader.getRoutingData();
       Assert.fail("Expecting InvalidRoutingDataException");
     } catch (InvalidRoutingDataException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -112,20 +115,19 @@ public class TestZkRoutingDataReader extends AbstractTestClass {
   @Test
   public void testGetRoutingDataMSRDChildEmptyValue() {
     ZNRecord testZnRecord1 = new ZNRecord("testZnRecord1");
-    testZnRecord1.setListField(ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY,
+    testZnRecord1.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
         Collections.emptyList());
-    _baseAccessor.create(ZkRoutingDataReader.ROUTING_DATA_PATH + "/testRealmAddress1",
+    _baseAccessor.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/testRealmAddress1",
         testZnRecord1, AccessOption.PERSISTENT);
-    MetadataStoreRoutingDataReader zkRoutingDataReader = new ZkRoutingDataReader(ZK_ADDR);
     try {
-      zkRoutingDataReader.getRoutingData();
+      _zkRoutingDataReader.getRoutingData();
       Assert.fail("Expecting InvalidRoutingDataException");
     } catch (InvalidRoutingDataException e) {
-      Assert.assertTrue(e.getMessage()
-          .contains("Realm address ZNode " + ZkRoutingDataReader.ROUTING_DATA_PATH
+      Assert.assertTrue(e.getMessage().contains(
+          "Realm address ZNode " + MetadataStoreRoutingConstants.ROUTING_DATA_PATH
               + "/testRealmAddress1 does not have a value for key "
-              + ZkRoutingDataReader.ZNRECORD_LIST_FIELD_KEY + ". Routing ZooKeeper address: "
-              + ZK_ADDR));
+              + MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY
+              + ". Routing ZooKeeper address: " + ZK_ADDR));
     }
   }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index 0302758..e6ecb82 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -66,8 +66,6 @@ import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
@@ -92,13 +90,19 @@ import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+
 public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
+  /**
+   * Constants for multi-ZK environment.
+   */
   private static final String MULTI_ZK_PROPERTY_KEY = "multiZk";
   private static final String NUM_ZK_PROPERTY_KEY = "numZk";
-  private static final String ZK_PREFIX = "localhost:";
-  private static final int ZK_START_PORT = 2123;
-  protected Map<String, ZkServer> _zkServerMap;
+  protected static final String ZK_PREFIX = "localhost:";
+  protected static final int ZK_START_PORT = 2123;
+  // The following map must be a static map because it needs to be shared throughout tests
+  protected static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
 
+  // For a single-ZK/Helix environment
   protected static final String ZK_ADDR = "localhost:2123";
   protected static final String WORKFLOW_PREFIX = "Workflow_";
   protected static final String JOB_PREFIX = "Job_";
@@ -154,58 +158,19 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
 
   @Override
   protected Application configure() {
-    // start zk
-    _zkServerMap = new HashMap<>();
-    try {
-      if (_zkServer == null) {
-        _zkServer = TestHelper.startZkServer(ZK_ADDR);
-        Assert.assertNotNull(_zkServer);
-        _zkServerMap.put(ZK_ADDR, _zkServer);
-        ZKClientPool.reset();
-      }
-
-      if (_zkServerTestNS == null) {
-        _zkServerTestNS = TestHelper.startZkServer(_zkAddrTestNS);
-        Assert.assertNotNull(_zkServerTestNS);
-        _zkServerMap.put(_zkAddrTestNS, _zkServerTestNS);
-        ZKClientPool.reset();
-      }
-    } catch (Exception e) {
-      Assert.fail(String.format("Failed to start ZK server: %s", e.toString()));
-    }
-
-    // Start additional ZKs in a multi-ZK setup
-    String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
-    if (multiZkConfig != null && multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
-      String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
-      if (numZkFromConfig != null) {
-        try {
-          int numZkFromConfigInt = Integer.parseInt(numZkFromConfig);
-          // Start (numZkFromConfigInt - 2) ZooKeepers
-          for (int i = 2; i < numZkFromConfigInt; i++) {
-            String zkAddr = ZK_PREFIX + (ZK_START_PORT + i);
-            ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-            Assert.assertNotNull(zkServer);
-            _zkServerMap.put(zkAddr, zkServer);
-          }
-        } catch (Exception e) {
-          Assert.fail("Failed to create multiple ZooKeepers!");
-        }
-      }
-    }
-
     // Configure server context
     ResourceConfig resourceConfig = new ResourceConfig();
     resourceConfig.packages(AbstractResource.class.getPackage().getName());
     ServerContext serverContext = new ServerContext(ZK_ADDR);
     resourceConfig.property(ContextPropertyKeys.SERVER_CONTEXT.name(), serverContext);
-    resourceConfig.register(new AuditLogFilter(Arrays.<AuditLogger>asList(new MockAuditLogger())));
+    resourceConfig.register(new AuditLogFilter(Collections.singletonList(new MockAuditLogger())));
 
     return resourceConfig;
   }
 
   @Override
-  protected TestContainerFactory getTestContainerFactory() throws TestContainerException {
+  protected TestContainerFactory getTestContainerFactory()
+      throws TestContainerException {
     return new TestContainerFactory() {
       @Override
       public TestContainer create(final URI baseUri, DeploymentContext deploymentContext) {
@@ -234,7 +199,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
               try {
                 _helixRestServer =
                     new HelixRestServer(namespaces, baseUri.getPort(), baseUri.getPath(),
-                        Arrays.<AuditLogger>asList(_auditLogger));
+                        Collections.singletonList(_auditLogger));
                 _helixRestServer.start();
               } catch (Exception ex) {
                 throw new TestContainerException(ex);
@@ -251,8 +216,11 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   }
 
   @BeforeSuite
-  public void beforeSuite() throws Exception {
+  public void beforeSuite()
+      throws Exception {
     if (!_init) {
+      setupZooKeepers();
+
       // TODO: use logging.properties file to config java.util.logging.Logger levels
       java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
       topJavaLogger.setLevel(Level.WARNING);
@@ -260,12 +228,12 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
 
       clientConfig.setZkSerializer(new ZNRecordSerializer());
-      _gZkClient = DedicatedZkClientFactory
-          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
+      _gZkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
 
       clientConfig.setZkSerializer(new ZNRecordSerializer());
-      _gZkClientTestNS = DedicatedZkClientFactory
-          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig);
+      _gZkClientTestNS = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig);
 
       _gSetupTool = new ClusterSetup(_gZkClient);
       _configAccessor = new ConfigAccessor(_gZkClient);
@@ -274,14 +242,14 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
 
       // wait for the web service to start
       Thread.sleep(100);
-
-      setup();
+      setupHelixResources();
       _init = true;
     }
   }
 
   @AfterSuite
-  public void afterSuite() throws Exception {
+  public void afterSuite()
+      throws Exception {
     // tear down orphan-ed threads
     for (ClusterControllerManager cm : _clusterControllerManagers) {
       if (cm != null && cm.isConnected()) {
@@ -289,7 +257,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       }
     }
 
-    for (MockParticipantManager mm: _mockParticipantManagers) {
+    for (MockParticipantManager mm : _mockParticipantManagers) {
       if (mm != null && mm.isConnected()) {
         mm.syncStop();
       }
@@ -315,16 +283,57 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       _zkServerTestNS = null;
     }
 
-    // Stop all ZkServers
-    _zkServerMap.forEach((zkAddr, zkServer) -> TestHelper.stopZkServer(zkServer));
-
     if (_helixRestServer != null) {
       _helixRestServer.shutdown();
       _helixRestServer = null;
     }
+
+    // Stop all ZkServers
+    ZK_SERVER_MAP.forEach((zkAddr, zkServer) -> TestHelper.stopZkServer(zkServer));
+  }
+
+  private void setupZooKeepers() {
+    // start zk
+    try {
+      if (_zkServer == null) {
+        _zkServer = TestHelper.startZkServer(ZK_ADDR);
+        Assert.assertNotNull(_zkServer);
+        ZK_SERVER_MAP.put(ZK_ADDR, _zkServer);
+        ZKClientPool.reset();
+      }
+
+      if (_zkServerTestNS == null) {
+        _zkServerTestNS = TestHelper.startZkServer(_zkAddrTestNS);
+        Assert.assertNotNull(_zkServerTestNS);
+        ZK_SERVER_MAP.put(_zkAddrTestNS, _zkServerTestNS);
+        ZKClientPool.reset();
+      }
+    } catch (Exception e) {
+      Assert.fail(String.format("Failed to start ZK servers: %s", e.toString()));
+    }
+
+    // Start additional ZKs in a multi-ZK setup if applicable
+    String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
+    if (multiZkConfig != null && multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
+      String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
+      if (numZkFromConfig != null) {
+        try {
+          int numZkFromConfigInt = Integer.parseInt(numZkFromConfig);
+          // Start (numZkFromConfigInt - 2) ZooKeepers
+          for (int i = 2; i < numZkFromConfigInt; i++) {
+            String zkAddr = ZK_PREFIX + (ZK_START_PORT + i);
+            ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+            Assert.assertNotNull(zkServer);
+            ZK_SERVER_MAP.put(zkAddr, zkServer);
+          }
+        } catch (Exception e) {
+          Assert.fail("Failed to create multiple ZooKeepers!");
+        }
+      }
+    }
   }
 
-  protected void setup() throws Exception {
+  protected void setupHelixResources() {
     _clusters = createClusters(3);
     _gSetupTool.addCluster(_superCluster, true);
     _gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
@@ -347,7 +356,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
   }
 
-  protected Set<String> createInstances(String cluster, int numInstances) throws Exception {
+  protected Set<String> createInstances(String cluster, int numInstances) {
     Set<String> instances = new HashSet<>();
     for (int i = 0; i < numInstances; i++) {
       String instanceName = cluster + "localhost_" + (12918 + i);
@@ -362,7 +371,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     for (int i = 0; i < numResources; i++) {
       String resource = cluster + "_db_" + i;
       _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave");
-      IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
       idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA);
       _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
       _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA);
@@ -390,12 +400,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     int i = 0;
     for (String instance : instances) {
       MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, cluster, instance);
-      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-      taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-        @Override public Task createNewTask(TaskCallbackContext context) {
-          return new MockTask(context);
-        }
-      });
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
+      taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
       StateMachineEngine stateMachineEngine = participant.getStateMachineEngine();
       stateMachineEngine.registerStateModelFactory("Task",
           new TaskStateModelFactory(participant, taskFactoryReg));
@@ -431,8 +437,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
 
   protected Map<String, Workflow> createWorkflows(String cluster, int numWorkflows) {
     Map<String, Workflow> workflows = new HashMap<>();
-    HelixPropertyStore<ZNRecord> propertyStore = new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
-        PropertyPathBuilder.propertyStore(cluster), null);
+    HelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
+            PropertyPathBuilder.propertyStore(cluster), null);
 
     for (int i = 0; i < numWorkflows; i++) {
       Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i);
@@ -489,11 +496,13 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     return jobCfgs;
   }
 
-  protected static ZNRecord toZNRecord(String data) throws IOException {
+  protected static ZNRecord toZNRecord(String data)
+      throws IOException {
     return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
   }
 
-  protected String get(String uri, Map<String, String> queryParams, int expectedReturnStatus, boolean expectBodyReturned) {
+  protected String get(String uri, Map<String, String> queryParams, int expectedReturnStatus,
+      boolean expectBodyReturned) {
     WebTarget webTarget = target(uri);
     if (queryParams != null) {
       for (Map.Entry<String, String> entry : queryParams.entrySet()) {
@@ -552,7 +561,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
     return new TaskDriver(_gZkClient, clusterName);
   }
 
-  private void preSetupForParallelInstancesStoppableTest(String clusterName, List<String> instances) {
+  private void preSetupForParallelInstancesStoppableTest(String clusterName,
+      List<String> instances) {
     _gSetupTool.addCluster(clusterName, true);
     ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
     clusterConfig.setFaultZoneType("helixZoneId");


Mime
View raw message