helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch zooscalability updated: Implement request forwarding for ZkRoutingDataWriter (#788)
Date Thu, 27 Feb 2020 01:21:52 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/zooscalability by this push:
     new 7804319  Implement request forwarding for ZkRoutingDataWriter (#788)
7804319 is described below

commit 7804319c5456ca27183f43212f461592010e3aaa
Author: Neal Sun <nealsun.0428@gmail.com>
AuthorDate: Wed Feb 26 17:21:44 2020 -0800

    Implement request forwarding for ZkRoutingDataWriter (#788)
    
    This PR added the request forwarding feature to ZkRoutingDataWriter. It also included a lot of other work, most notably: changing ZkMetadataStoreDirectory to singleton in order to allow leader forwarding, added integration tests for the request forwarding flow, modified MetadataStoreDirectoryAccessor for it to respect underlying return values, fixed numerous behavior bugs.
---
 .../apache/helix/rest/common/HttpConstants.java    |  29 +++
 .../metadatastore/ZkMetadataStoreDirectory.java    |  83 ++++---
 .../accessor/ZkRoutingDataReader.java              |   6 +-
 .../accessor/ZkRoutingDataWriter.java              | 256 +++++++++++++++------
 .../concurrency/ZkDistributedLeaderElection.java   |   5 +-
 .../apache/helix/rest/server/HelixRestServer.java  |   2 +-
 .../apache/helix/rest/server/ServerContext.java    |   7 +
 .../MetadataStoreDirectoryAccessor.java            |  37 +--
 .../TestZkMetadataStoreDirectory.java              |  21 +-
 .../accessor/TestZkRoutingDataWriter.java          |  84 +++++++
 .../MetadataStoreDirectoryAccessorTestBase.java    | 131 +++++++++++
 .../rest/server/TestMSDAccessorLeaderElection.java | 227 ++++++++++++++++++
 .../server/TestMetadataStoreDirectoryAccessor.java | 165 ++-----------
 .../mock/MockMetadataStoreDirectoryAccessor.java   | 124 ++++++++++
 .../constant/MetadataStoreRoutingConstants.java    |  10 +
 15 files changed, 914 insertions(+), 273 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HttpConstants.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HttpConstants.java
new file mode 100644
index 0000000..369f1a4
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HttpConstants.java
@@ -0,0 +1,29 @@
+package org.apache.helix.rest.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class HttpConstants {
+  public enum RestVerbs {
+    GET,
+    POST,
+    PUT,
+    DELETE
+  }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
index 5b64f7b..9d54c82 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -21,7 +21,6 @@ package org.apache.helix.rest.metadatastore;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +28,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.msdcommon.callback.RoutingDataListener;
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.datamodel.TrieRoutingData;
@@ -42,49 +42,68 @@ import org.slf4j.LoggerFactory;
 
 
 /**
+ * NOTE: This is a singleton class. DO NOT EXTEND!
  * ZK-based MetadataStoreDirectory that listens on the routing data in routing ZKs with a update
  * callback.
  */
 public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, RoutingDataListener {
   private static final Logger LOG = LoggerFactory.getLogger(ZkMetadataStoreDirectory.class);
 
-  // TODO: enable the line below when implementation is complete
   // The following maps' keys represent the namespace
-  private final Map<String, MetadataStoreRoutingDataReader> _routingDataReaderMap;
-  private final Map<String, MetadataStoreRoutingDataWriter> _routingDataWriterMap;
-  private final Map<String, MetadataStoreRoutingData> _routingDataMap;
-  private final Map<String, String> _routingZkAddressMap;
+  // NOTE: made protected for testing reasons. DO NOT MODIFY!
+  protected final Map<String, MetadataStoreRoutingDataReader> _routingDataReaderMap;
+  protected final Map<String, MetadataStoreRoutingDataWriter> _routingDataWriterMap;
+  protected final Map<String, MetadataStoreRoutingData> _routingDataMap;
+  protected final Map<String, String> _routingZkAddressMap;
   // <namespace, <realm, <list of sharding keys>> mappings
-  private final Map<String, Map<String, List<String>>> _realmToShardingKeysMap;
+  protected final Map<String, Map<String, List<String>>> _realmToShardingKeysMap;
+
+  private static volatile ZkMetadataStoreDirectory _zkMetadataStoreDirectoryInstance;
+
+  public static ZkMetadataStoreDirectory getInstance() {
+    if (_zkMetadataStoreDirectoryInstance == null) {
+      synchronized (ZkMetadataStoreDirectory.class) {
+        if (_zkMetadataStoreDirectoryInstance == null) {
+          _zkMetadataStoreDirectoryInstance = new ZkMetadataStoreDirectory();
+        }
+      }
+    }
+    return _zkMetadataStoreDirectoryInstance;
+  }
+
+  public static ZkMetadataStoreDirectory getInstance(String namespace, String zkAddress)
+      throws InvalidRoutingDataException {
+    getInstance().init(namespace, zkAddress);
+    return _zkMetadataStoreDirectoryInstance;
+  }
 
   /**
-   * Creates a ZkMetadataStoreDirectory based on the given routing ZK addresses.
-   * @param routingZkAddressMap (namespace, routing ZK connect string)
-   * @throws InvalidRoutingDataException
+   * Note: this is a singleton class. The constructor is made protected for testing. DO NOT EXTEND!
    */
-  public ZkMetadataStoreDirectory(Map<String, String> routingZkAddressMap)
-      throws InvalidRoutingDataException {
-    if (routingZkAddressMap == null || routingZkAddressMap.isEmpty()) {
-      throw new InvalidRoutingDataException("Routing ZK Addresses given are invalid!");
-    }
-    _routingDataReaderMap = new HashMap<>();
-    _routingDataWriterMap = new HashMap<>();
-    _routingZkAddressMap = routingZkAddressMap;
+  @VisibleForTesting
+  protected ZkMetadataStoreDirectory() {
+    _routingDataReaderMap = new ConcurrentHashMap<>();
+    _routingDataWriterMap = new ConcurrentHashMap<>();
+    _routingZkAddressMap = new ConcurrentHashMap<>();
     _realmToShardingKeysMap = new ConcurrentHashMap<>();
     _routingDataMap = new ConcurrentHashMap<>();
+  }
 
-    // Create RoutingDataReaders and RoutingDataWriters
-    for (Map.Entry<String, String> routingEntry : _routingZkAddressMap.entrySet()) {
-      _routingDataReaderMap.put(routingEntry.getKey(),
-          new ZkRoutingDataReader(routingEntry.getKey(), routingEntry.getValue(), this));
-      _routingDataWriterMap.put(routingEntry.getKey(),
-          new ZkRoutingDataWriter(routingEntry.getKey(), routingEntry.getValue()));
+  private void init(String namespace, String zkAddress) throws InvalidRoutingDataException {
+    if (!_routingZkAddressMap.containsKey(namespace)) {
+      synchronized (_routingZkAddressMap) {
+        if (!_routingZkAddressMap.containsKey(namespace)) {
+          _routingZkAddressMap.put(namespace, zkAddress);
+          _routingDataReaderMap.put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this));
+          _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress));
 
-      // Populate realmToShardingKeys with ZkRoutingDataReader
-      _realmToShardingKeysMap.put(routingEntry.getKey(),
-          _routingDataReaderMap.get(routingEntry.getKey()).getRoutingData());
-      _routingDataMap.put(routingEntry.getKey(),
-          new TrieRoutingData(_realmToShardingKeysMap.get(routingEntry.getKey())));
+          // Populate realmToShardingKeys with ZkRoutingDataReader
+          _realmToShardingKeysMap
+              .put(namespace, _routingDataReaderMap.get(namespace).getRoutingData());
+          _routingDataMap
+              .put(namespace, new TrieRoutingData(_realmToShardingKeysMap.get(namespace)));
+        }
+      }
     }
   }
 
@@ -238,5 +257,11 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
   public synchronized void close() {
     _routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close);
     _routingDataWriterMap.values().forEach(MetadataStoreRoutingDataWriter::close);
+    _routingDataReaderMap.clear();
+    _routingDataWriterMap.clear();
+    _routingZkAddressMap.clear();
+    _realmToShardingKeysMap.clear();
+    _routingDataMap.clear();
+    _zkMetadataStoreDirectoryInstance = null;
   }
 }
\ No newline at end of file
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
index 12de9e2..76465f9 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.metadatastore.accessor;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -91,9 +92,8 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
           _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realmAddress);
       List<String> shardingKeys =
           record.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY);
-      if (shardingKeys != null) {
-        routingData.put(realmAddress, shardingKeys);
-      }
+      routingData
+          .put(realmAddress, shardingKeys != null ? shardingKeys : Collections.emptyList());
     }
     return routingData;
   }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
index 48aeb13..061372c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
@@ -19,27 +19,45 @@ package org.apache.helix.rest.metadatastore.accessor;
  * under the License.
  */
 
-import java.util.Collections;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import javax.ws.rs.core.Response;
+
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.rest.common.HttpConstants;
 import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
+  // Time out for http requests that are forwarded to leader instances measured in milliseconds
+  private static final int HTTP_REQUEST_FORWARDING_TIMEOUT = 60 * 1000;
   private static final Logger LOG = LoggerFactory.getLogger(ZkRoutingDataWriter.class);
 
   private final String _namespace;
   private final HelixZkClient _zkClient;
   private final ZkDistributedLeaderElection _leaderElection;
+  private final CloseableHttpClient _forwardHttpClient;
+  private final String _myHostName;
 
   public ZkRoutingDataWriter(String namespace, String zkAddress) {
     if (namespace == null || namespace.isEmpty()) {
@@ -62,10 +80,26 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
     }
 
     // Get the hostname (REST endpoint) from System property
-    // TODO: Fill in when Helix REST implementations are ready
-    ZNRecord myServerInfo = new ZNRecord("dummy hostname");
+    String hostName = System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
+    if (hostName == null || hostName.isEmpty()) {
+      throw new IllegalStateException(
+          "Unable to get the hostname of this server instance. System.getProperty fails to fetch "
+              + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY + ".");
+    }
+    // remove trailing slash
+    if (hostName.charAt(hostName.length() - 1) == '/') {
+      hostName = hostName.substring(0, hostName.length() - 1);
+    }
+    _myHostName = hostName;
+    ZNRecord myServerInfo = new ZNRecord(_myHostName);
+
     _leaderElection = new ZkDistributedLeaderElection(_zkClient,
         MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE, myServerInfo);
+
+    RequestConfig config = RequestConfig.custom().setConnectTimeout(HTTP_REQUEST_FORWARDING_TIMEOUT)
+        .setConnectionRequestTimeout(HTTP_REQUEST_FORWARDING_TIMEOUT)
+        .setSocketTimeout(HTTP_REQUEST_FORWARDING_TIMEOUT).build();
+    _forwardHttpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
   }
 
   @Override
@@ -77,8 +111,10 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
       return createZkRealm(realm);
     }
 
-    // TODO: Forward the request to leader
-    return true;
+    String urlSuffix =
+        constructUrlSuffix(MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, realm);
+    return forwardRequestToLeader(urlSuffix, HttpConstants.RestVerbs.PUT,
+        Response.Status.CREATED.getStatusCode());
   }
 
   @Override
@@ -87,11 +123,13 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
       if (_zkClient.isClosed()) {
         throw new IllegalStateException("ZkClient is closed!");
       }
-      return _zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm);
+      return deleteZkRealm(realm);
     }
 
-    // TODO: Forward the request to leader
-    return true;
+    String urlSuffix =
+        constructUrlSuffix(MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, realm);
+    return forwardRequestToLeader(urlSuffix, HttpConstants.RestVerbs.DELETE,
+        Response.Status.OK.getStatusCode());
   }
 
   @Override
@@ -100,44 +138,14 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
       if (_zkClient.isClosed()) {
         throw new IllegalStateException("ZkClient is closed!");
       }
-      // If the realm does not exist already, then create the realm
-      String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm;
-      if (!_zkClient.exists(realmPath)) {
-        // Create the realm
-        if (!createZkRealm(realm)) {
-          // Failed to create the realm - log and return false
-          LOG.error(
-              "Failed to add sharding key because ZkRealm creation failed! Namespace: {}, Realm: {}, Sharding key: {}",
-              _namespace, realm, shardingKey);
-          return false;
-        }
-      }
-
-      // Add the sharding key to an empty ZNRecord
-      ZNRecord znRecord;
-      try {
-        znRecord = _zkClient.readData(realmPath);
-      } catch (Exception e) {
-        LOG.error(
-            "Failed to read the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
-            _namespace, realm, shardingKey, e);
-        return false;
-      }
-      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
-          Collections.singletonList(shardingKey));
-      try {
-        _zkClient.writeData(realmPath, znRecord);
-      } catch (Exception e) {
-        LOG.error(
-            "Failed to write the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
-            _namespace, realm, shardingKey, e);
-        return false;
-      }
-      return true;
+      return createZkShardingKey(realm, shardingKey);
     }
 
-    // TODO: Forward the request to leader
-    return true;
+    String urlSuffix =
+        constructUrlSuffix(MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, realm,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_SHARDING_KEYS_ENDPOINT, shardingKey);
+    return forwardRequestToLeader(urlSuffix, HttpConstants.RestVerbs.PUT,
+        Response.Status.CREATED.getStatusCode());
   }
 
   @Override
@@ -146,31 +154,14 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
       if (_zkClient.isClosed()) {
         throw new IllegalStateException("ZkClient is closed!");
       }
-      ZNRecord znRecord =
-          _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, true);
-      if (znRecord == null || !znRecord
-          .getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
-          .contains(shardingKey)) {
-        // This realm does not exist or shardingKey doesn't exist. Return true!
-        return true;
-      }
-      znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
-          .remove(shardingKey);
-      // Overwrite this ZNRecord with the sharding key removed
-      try {
-        _zkClient
-            .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord);
-      } catch (Exception e) {
-        LOG.error(
-            "Failed to write the data back in deleteShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
-            _namespace, realm, shardingKey, e);
-        return false;
-      }
-      return true;
+      return deleteZkShardingKey(realm, shardingKey);
     }
 
-    // TODO: Forward the request to leader
-    return true;
+    String urlSuffix =
+        constructUrlSuffix(MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, realm,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_SHARDING_KEYS_ENDPOINT, shardingKey);
+    return forwardRequestToLeader(urlSuffix, HttpConstants.RestVerbs.DELETE,
+        Response.Status.OK.getStatusCode());
   }
 
   @Override
@@ -225,6 +216,11 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
   @Override
   public synchronized void close() {
     _zkClient.close();
+    try {
+      _forwardHttpClient.close();
+    } catch (IOException e) {
+      LOG.error("HttpClient failed to close. ", e);
+    }
   }
 
   /**
@@ -232,7 +228,7 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
    * @param realm
    * @return
    */
-  private boolean createZkRealm(String realm) {
+  protected boolean createZkRealm(String realm) {
     if (_zkClient.exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm)) {
       LOG.warn("createZkRealm() called for realm: {}, but this realm already exists! Namespace: {}",
           realm, _namespace);
@@ -249,4 +245,128 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
 
     return true;
   }
+
+  protected boolean deleteZkRealm(String realm) {
+    return _zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm);
+  }
+
+  protected boolean createZkShardingKey(String realm, String shardingKey) {
+    // If the realm does not exist already, then create the realm
+    String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm;
+    if (!_zkClient.exists(realmPath)) {
+      // Create the realm
+      if (!createZkRealm(realm)) {
+        // Failed to create the realm - log and return false
+        LOG.error(
+            "Failed to add sharding key because ZkRealm creation failed! Namespace: {}, Realm: {}, Sharding key: {}",
+            _namespace, realm, shardingKey);
+        return false;
+      }
+    }
+
+    ZNRecord znRecord;
+    try {
+      znRecord = _zkClient.readData(realmPath);
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to read the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
+          _namespace, realm, shardingKey, e);
+      return false;
+    }
+    List<String> shardingKeys =
+        znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY);
+    if (shardingKeys == null || shardingKeys.isEmpty()) {
+      shardingKeys = new ArrayList<>();
+    }
+    shardingKeys.add(shardingKey);
+    znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, shardingKeys);
+    try {
+      _zkClient.writeData(realmPath, znRecord);
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to write the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
+          _namespace, realm, shardingKey, e);
+      return false;
+    }
+    return true;
+  }
+
+  protected boolean deleteZkShardingKey(String realm, String shardingKey) {
+    ZNRecord znRecord =
+        _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, true);
+    if (znRecord == null || !znRecord
+        .getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .contains(shardingKey)) {
+      // This realm does not exist or shardingKey doesn't exist. Return true!
+      return true;
+    }
+    znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .remove(shardingKey);
+    // Overwrite this ZNRecord with the sharding key removed
+    try {
+      _zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord);
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to write the data back in deleteShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
+          _namespace, realm, shardingKey, e);
+      return false;
+    }
+    return true;
+  }
+
+  private String constructUrlSuffix(String... urlParams) {
+    List<String> allUrlParameters = new ArrayList<>(
+        Arrays.asList(MetadataStoreRoutingConstants.MSDS_NAMESPACES_URL_PREFIX, "/", _namespace));
+    for (String urlParam : urlParams) {
+      if (urlParam.charAt(0) != '/') {
+        urlParam = "/" + urlParam;
+      }
+      allUrlParameters.add(urlParam);
+    }
+    return String.join("", allUrlParameters);
+  }
+
+  private boolean forwardRequestToLeader(String urlSuffix, HttpConstants.RestVerbs request_method,
+      int expectedResponseCode) throws IllegalArgumentException {
+    String leaderHostName = _leaderElection.getCurrentLeaderInfo().getId();
+    String url = leaderHostName + urlSuffix;
+    HttpUriRequest request;
+    switch (request_method) {
+      case PUT:
+        request = new HttpPut(url);
+        break;
+      case DELETE:
+        request = new HttpDelete(url);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported request_method: " + request_method);
+    }
+
+    return sendRequestToLeader(request, expectedResponseCode, leaderHostName);
+  }
+
+  // Set to be protected for testing purposes
+  protected boolean sendRequestToLeader(HttpUriRequest request, int expectedResponseCode,
+      String leaderHostName) {
+    try {
+      HttpResponse response = _forwardHttpClient.execute(request);
+      if (response.getStatusLine().getStatusCode() != expectedResponseCode) {
+        HttpEntity respEntity = response.getEntity();
+        String errorLog = "The forwarded request to leader has failed. Uri: " + request.getURI()
+            + ". Error code: " + response.getStatusLine().getStatusCode() + " Current hostname: "
+            + _myHostName + " Leader hostname: " + leaderHostName;
+        if (respEntity != null) {
+          errorLog += " Response: " + EntityUtils.toString(respEntity);
+        }
+        LOG.error(errorLog);
+        return false;
+      }
+    } catch (IOException e) {
+      LOG.error(
+          "The forwarded request to leader raised an exception. Uri: {} Current hostname: {} Leader hostname: {}",
+          request.getURI(), _myHostName, leaderHostName, e);
+      return false;
+    }
+    return true;
+  }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
index 330611f..fca7b76 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
@@ -88,7 +88,7 @@ public class ZkDistributedLeaderElection implements IZkDataListener, IZkStateLis
     List<String> children = _zkClient.getChildren(_basePath);
     Collections.sort(children);
     String leaderName = children.get(0);
-    ZNRecord leaderInfo = _zkClient.readData(_basePath + "/" + leaderName, true);
+    _currentLeaderInfo = _zkClient.readData(_basePath + "/" + leaderName, true);
 
     String[] myNameArray = _myEphemeralSequentialPath.split("/");
     String myName = myNameArray[myNameArray.length - 1];
@@ -96,8 +96,7 @@ public class ZkDistributedLeaderElection implements IZkDataListener, IZkStateLis
     if (leaderName.equals(myName)) {
       // My turn for leadership
       _isLeader = true;
-      _currentLeaderInfo = leaderInfo;
-      LOG.info("{} acquired leadership! Info: {}", myName, leaderInfo);
+      LOG.info("{} acquired leadership! Info: {}", myName, _currentLeaderInfo);
     } else {
       // Watch the ephemeral ZNode before me for a deletion event
       String beforeMe = children.get(children.indexOf(myName) - 1);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index b1880f4..64c1139 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -141,7 +141,7 @@ public class HelixRestServer {
     return String.format("%s_%s", type.name(), namespace.getName());
   }
 
-  private ResourceConfig getResourceConfig(HelixRestNamespace namespace, ServletType type) {
+  protected ResourceConfig getResourceConfig(HelixRestNamespace namespace, ServletType type) {
     ResourceConfig cfg = new ResourceConfig();
     cfg.packages(type.getServletPackageArray());
     cfg.setApplicationName(namespace.getName());
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 5a0530a..b845356 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -27,6 +27,7 @@ import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.InstanceType;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -54,6 +55,7 @@ public class ServerContext {
   private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
   // 1 Cluster name will correspond to 1 task driver
   private final Map<String, TaskDriver> _taskDriverPool;
+  private ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
 
   public ServerContext(String zkAddr) {
     _zkAddr = zkAddr;
@@ -64,6 +66,8 @@ public class ServerContext {
     // cannot be started correctly.
     _helixDataAccessorPool = new HashMap<>();
     _taskDriverPool = new HashMap<>();
+    // Initialize the singleton ZkMetadataStoreDirectory instance to allow it to be closed later
+    _zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance();
   }
 
   public HelixZkClient getHelixZkClient() {
@@ -158,5 +162,8 @@ public class ServerContext {
     if (_zkClient != null) {
       _zkClient.close();
     }
+    if (_zkMetadataStoreDirectory != null) {
+      _zkMetadataStoreDirectory.close();
+    }
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadatastore/MetadataStoreDirectoryAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadatastore/MetadataStoreDirectoryAccessor.java
index 0f22d81..38b764d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadatastore/MetadataStoreDirectoryAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/metadatastore/MetadataStoreDirectoryAccessor.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
@@ -58,7 +57,7 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
   private static final Logger LOG = LoggerFactory.getLogger(MetadataStoreDirectoryAccessor.class);
 
   private String _namespace;
-  private MetadataStoreDirectory _metadataStoreDirectory;
+  protected MetadataStoreDirectory _metadataStoreDirectory;
 
   @PostConstruct
   private void postConstruct() {
@@ -68,11 +67,6 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
     buildMetadataStoreDirectory(_namespace, helixRestNamespace.getMetadataStoreAddress());
   }
 
-  @PreDestroy
-  private void preDestroy() {
-    _metadataStoreDirectory.close();
-  }
-
   /**
    * Gets all existing namespaces in the routing metadata store at endpoint:
    * "GET /metadata-store-namespaces"
@@ -121,7 +115,9 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
   @Path("/metadata-store-realms/{realm}")
   public Response addMetadataStoreRealm(@PathParam("realm") String realm) {
     try {
-      _metadataStoreDirectory.addMetadataStoreRealm(_namespace, realm);
+      if (!_metadataStoreDirectory.addMetadataStoreRealm(_namespace, realm)) {
+        return serverError();
+      }
     } catch (IllegalArgumentException ex) {
       return notFound(ex.getMessage());
     }
@@ -133,7 +129,9 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
   @Path("/metadata-store-realms/{realm}")
   public Response deleteMetadataStoreRealm(@PathParam("realm") String realm) {
     try {
-      _metadataStoreDirectory.deleteMetadataStoreRealm(_namespace, realm);
+      if (!_metadataStoreDirectory.deleteMetadataStoreRealm(_namespace, realm)) {
+        return serverError();
+      }
     } catch (IllegalArgumentException ex) {
       return notFound(ex.getMessage());
     }
@@ -249,8 +247,11 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
   @Path("/metadata-store-realms/{realm}/sharding-keys/{sharding-key: .+}")
   public Response addShardingKey(@PathParam("realm") String realm,
       @PathParam("sharding-key") String shardingKey) {
+    shardingKey = "/" + shardingKey;
     try {
-      _metadataStoreDirectory.addShardingKey(_namespace, realm, shardingKey);
+      if (!_metadataStoreDirectory.addShardingKey(_namespace, realm, shardingKey)) {
+        return serverError();
+      }
     } catch (IllegalArgumentException ex) {
       return notFound(ex.getMessage());
     }
@@ -262,8 +263,11 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
   @Path("/metadata-store-realms/{realm}/sharding-keys/{sharding-key: .+}")
   public Response deleteShardingKey(@PathParam("realm") String realm,
       @PathParam("sharding-key") String shardingKey) {
+    shardingKey = "/" + shardingKey;
     try {
-      _metadataStoreDirectory.deleteShardingKey(_namespace, realm, shardingKey);
+      if (!_metadataStoreDirectory.deleteShardingKey(_namespace, realm, shardingKey)) {
+        return serverError();
+      }
     } catch (IllegalArgumentException ex) {
       return notFound(ex.getMessage());
     }
@@ -295,15 +299,12 @@ public class MetadataStoreDirectoryAccessor extends AbstractResource {
     return helixRestNamespace;
   }
 
-  private void buildMetadataStoreDirectory(String namespace, String address) {
-    Map<String, String> routingZkAddressMap = ImmutableMap.of(namespace, address);
+  protected void buildMetadataStoreDirectory(String namespace, String address) {
     try {
-      _metadataStoreDirectory = new ZkMetadataStoreDirectory(routingZkAddressMap);
+      _metadataStoreDirectory = ZkMetadataStoreDirectory.getInstance(namespace, address);
     } catch (InvalidRoutingDataException ex) {
-      // In this case, the InvalidRoutingDataException should not happen because routing
-      // ZK address is always valid here.
-      LOG.warn("Unable to create metadata store directory for routing ZK address: {}",
-          routingZkAddressMap, ex);
+      LOG.warn("Unable to create metadata store directory for namespace: {}, ZK address: {}",
+          namespace, address, ex);
     }
   }
 
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
index 604d331..6fe5f32 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkMetadataStoreDirectory.java
@@ -62,8 +62,7 @@ public class TestZkMetadataStoreDirectory extends AbstractTestClass {
   private MetadataStoreDirectory _metadataStoreDirectory;
 
   @BeforeClass
-  public void beforeClass()
-      throws InvalidRoutingDataException {
+  public void beforeClass() throws InvalidRoutingDataException {
     _zkList = new ArrayList<>(ZK_SERVER_MAP.keySet());
 
     // Populate routingZkAddrMap
@@ -101,8 +100,14 @@ public class TestZkMetadataStoreDirectory extends AbstractTestClass {
               znRecord);
     });
 
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY,
+        getBaseUri().toString());
+
     // Create metadataStoreDirectory
-    _metadataStoreDirectory = new ZkMetadataStoreDirectory(_routingZkAddrMap);
+    for (Map.Entry<String, String> entry : _routingZkAddrMap.entrySet()) {
+      _metadataStoreDirectory =
+          ZkMetadataStoreDirectory.getInstance(entry.getKey(), entry.getValue());
+    }
   }
 
   @AfterClass
@@ -110,11 +115,13 @@ public class TestZkMetadataStoreDirectory extends AbstractTestClass {
     _metadataStoreDirectory.close();
     _zkList.forEach(zk -> ZK_SERVER_MAP.get(zk).getZkClient()
         .deleteRecursive(MetadataStoreRoutingConstants.ROUTING_DATA_PATH));
+    System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
   }
 
   @Test
   public void testGetAllNamespaces() {
-    Assert.assertEquals(_metadataStoreDirectory.getAllNamespaces(), _routingZkAddrMap.keySet());
+    Assert.assertTrue(
+        _metadataStoreDirectory.getAllNamespaces().containsAll(_routingZkAddrMap.keySet()));
   }
 
   @Test(dependsOnMethods = "testGetAllNamespaces")
@@ -187,8 +194,7 @@ public class TestZkMetadataStoreDirectory extends AbstractTestClass {
   }
 
   @Test(dependsOnMethods = "testGetMetadataStoreRealm")
-  public void testDataChangeCallback()
-      throws Exception {
+  public void testDataChangeCallback() throws Exception {
     // For all namespaces (Routing ZKs), add an extra sharding key to TEST_REALM_1
     String newKey = "/a/b/c/d/e";
     _zkList.forEach(zk -> {
@@ -216,8 +222,7 @@ public class TestZkMetadataStoreDirectory extends AbstractTestClass {
   }
 
   @Test(dependsOnMethods = "testDataChangeCallback")
-  public void testChildChangeCallback()
-      throws Exception {
+  public void testChildChangeCallback() throws Exception {
     // For all namespaces (Routing ZKs), add a realm with a sharding key list
     _zkList.forEach(zk -> {
       ZK_SERVER_MAP.get(zk).getZkClient()
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
index 1aba067..e61e905 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.metadatastore.accessor;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +29,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.rest.server.AbstractTestClass;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -40,15 +42,35 @@ public class TestZkRoutingDataWriter extends AbstractTestClass {
   private static final String DUMMY_SHARDING_KEY = "SHARDING_KEY";
   private MetadataStoreRoutingDataWriter _zkRoutingDataWriter;
 
+  // MockWriter is used for testing request forwarding features in non-leader situations
+  class MockWriter extends ZkRoutingDataWriter {
+    HttpUriRequest calledRequest;
+
+    MockWriter(String namespace, String zkAddress) {
+      super(namespace, zkAddress);
+    }
+
+    // This method does not call super() because the http call should not be actually made
+    @Override
+    protected boolean sendRequestToLeader(HttpUriRequest request, int expectedResponseCode,
+        String leaderHostName) {
+      calledRequest = request;
+      return false;
+    }
+  }
+
   @BeforeClass
   public void beforeClass() {
     _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, AccessOption.PERSISTENT);
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY,
+        getBaseUri().toString());
     _zkRoutingDataWriter = new ZkRoutingDataWriter(DUMMY_NAMESPACE, ZK_ADDR);
   }
 
   @AfterClass
   public void afterClass() {
     _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, AccessOption.PERSISTENT);
+    System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
     _zkRoutingDataWriter.close();
   }
 
@@ -105,4 +127,66 @@ public class TestZkRoutingDataWriter extends AbstractTestClass {
     Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
         .contains(DUMMY_SHARDING_KEY));
   }
+
+  @Test(dependsOnMethods = "testSetRoutingData")
+  public void testAddMetadataStoreRealmNonLeader() {
+    MockWriter mockWriter = new MockWriter(DUMMY_NAMESPACE, ZK_ADDR);
+    mockWriter.addMetadataStoreRealm(DUMMY_REALM);
+    Assert.assertEquals("PUT", mockWriter.calledRequest.getMethod());
+    List<String> expectedUrlParams = Arrays
+        .asList(MetadataStoreRoutingConstants.MSDS_NAMESPACES_URL_PREFIX, DUMMY_NAMESPACE,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, DUMMY_REALM);
+    String expectedUrl =
+        getBaseUri().toString() + String.join("/", expectedUrlParams).replaceAll("//", "/")
+            .substring(1);
+    Assert.assertEquals(expectedUrl, mockWriter.calledRequest.getURI().toString());
+    mockWriter.close();
+  }
+
+  @Test(dependsOnMethods = "testAddMetadataStoreRealmNonLeader")
+  public void testDeleteMetadataStoreRealmNonLeader() {
+    MockWriter mockWriter = new MockWriter(DUMMY_NAMESPACE, ZK_ADDR);
+    mockWriter.deleteMetadataStoreRealm(DUMMY_REALM);
+    Assert.assertEquals("DELETE", mockWriter.calledRequest.getMethod());
+    List<String> expectedUrlParams = Arrays
+        .asList(MetadataStoreRoutingConstants.MSDS_NAMESPACES_URL_PREFIX, DUMMY_NAMESPACE,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, DUMMY_REALM);
+    String expectedUrl =
+        getBaseUri().toString() + String.join("/", expectedUrlParams).replaceAll("//", "/")
+            .substring(1);
+    Assert.assertEquals(expectedUrl, mockWriter.calledRequest.getURI().toString());
+    mockWriter.close();
+  }
+
+  @Test(dependsOnMethods = "testDeleteMetadataStoreRealmNonLeader")
+  public void testAddShardingKeyNonLeader() {
+    MockWriter mockWriter = new MockWriter(DUMMY_NAMESPACE, ZK_ADDR);
+    mockWriter.addShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY);
+    Assert.assertEquals("PUT", mockWriter.calledRequest.getMethod());
+    List<String> expectedUrlParams = Arrays
+        .asList(MetadataStoreRoutingConstants.MSDS_NAMESPACES_URL_PREFIX, DUMMY_NAMESPACE,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, DUMMY_REALM,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_SHARDING_KEYS_ENDPOINT, DUMMY_SHARDING_KEY);
+    String expectedUrl =
+        getBaseUri().toString() + String.join("/", expectedUrlParams).replaceAll("//", "/")
+            .substring(1);
+    Assert.assertEquals(expectedUrl, mockWriter.calledRequest.getURI().toString());
+    mockWriter.close();
+  }
+
+  @Test(dependsOnMethods = "testAddShardingKeyNonLeader")
+  public void testDeleteShardingKeyNonLeader() {
+    MockWriter mockWriter = new MockWriter(DUMMY_NAMESPACE, ZK_ADDR);
+    mockWriter.deleteShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY);
+    Assert.assertEquals("DELETE", mockWriter.calledRequest.getMethod());
+    List<String> expectedUrlParams = Arrays
+        .asList(MetadataStoreRoutingConstants.MSDS_NAMESPACES_URL_PREFIX, DUMMY_NAMESPACE,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_REALMS_ENDPOINT, DUMMY_REALM,
+            MetadataStoreRoutingConstants.MSDS_GET_ALL_SHARDING_KEYS_ENDPOINT, DUMMY_SHARDING_KEY);
+    String expectedUrl =
+        getBaseUri().toString() + String.join("/", expectedUrlParams).replaceAll("//", "/")
+            .substring(1);
+    Assert.assertEquals(expectedUrl, mockWriter.calledRequest.getURI().toString());
+    mockWriter.close();
+  }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/MetadataStoreDirectoryAccessorTestBase.java b/helix-rest/src/test/java/org/apache/helix/rest/server/MetadataStoreDirectoryAccessorTestBase.java
new file mode 100644
index 0000000..f234795
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/MetadataStoreDirectoryAccessorTestBase.java
@@ -0,0 +1,131 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+public class MetadataStoreDirectoryAccessorTestBase extends AbstractTestClass {
+  /*
+   * The following are constants to be used for testing.
+   */
+  protected static final String TEST_NAMESPACE_URI_PREFIX = "/namespaces/" + TEST_NAMESPACE;
+  protected static final String NON_EXISTING_NAMESPACE_URI_PREFIX =
+      "/namespaces/not-existed-namespace/metadata-store-realms/";
+  protected static final String TEST_REALM_1 = "testRealm1";
+  protected static final List<String> TEST_SHARDING_KEYS_1 =
+      Arrays.asList("/sharding/key/1/a", "/sharding/key/1/b", "/sharding/key/1/c");
+  protected static final String TEST_REALM_2 = "testRealm2";
+  protected static final List<String> TEST_SHARDING_KEYS_2 =
+      Arrays.asList("/sharding/key/1/d", "/sharding/key/1/e", "/sharding/key/1/f");
+  protected static final String TEST_REALM_3 = "testRealm3";
+  protected static final String TEST_SHARDING_KEY = "/sharding/key/1/x";
+
+  // List of all ZK addresses, each of which corresponds to a namespace/routing ZK
+  protected List<String> _zkList;
+  protected MetadataStoreRoutingDataReader _routingDataReader;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _zkList = new ArrayList<>(ZK_SERVER_MAP.keySet());
+
+    deleteRoutingDataPath();
+
+    // Write dummy mappings in ZK
+    // Create a node that represents a realm address and add 3 sharding keys to it
+    ZNRecord znRecord = new ZNRecord("RoutingInfo");
+
+    _zkList.forEach(zk -> {
+      ZK_SERVER_MAP.get(zk).getZkClient().setZkSerializer(new ZNRecordSerializer());
+      // Write first realm and sharding keys pair
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_1);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
+              true);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
+              znRecord);
+
+      // Create another realm and sharding keys pair
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          TEST_SHARDING_KEYS_2);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_2,
+              true);
+      ZK_SERVER_MAP.get(zk).getZkClient()
+          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_2,
+              znRecord);
+    });
+
+    _routingDataReader = new ZkRoutingDataReader(TEST_NAMESPACE, _zkAddrTestNS, null);
+
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY,
+        getBaseUri().toString());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
+    deleteRoutingDataPath();
+  }
+
+  protected void deleteRoutingDataPath() throws Exception {
+    Assert.assertTrue(TestHelper.verify(() -> {
+      _zkList.forEach(zk -> ZK_SERVER_MAP.get(zk).getZkClient()
+          .deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH));
+
+      for (String zk : _zkList) {
+        if (ZK_SERVER_MAP.get(zk).getZkClient()
+            .exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+          return false;
+        }
+      }
+
+      return true;
+    }, TestHelper.WAIT_DURATION), "Routing data path should be deleted after the tests.");
+  }
+
+  // Uses routingDataReader to get the latest realms in test-namespace; returns a modifiable copy
+  // because it'll be modified in test cases
+  protected Set<String> getAllRealms() throws InvalidRoutingDataException {
+    return new HashSet<>(_routingDataReader.getRoutingData().keySet());
+  }
+
+  // Uses routingDataReader to get the latest sharding keys in test-namespace, testRealm1
+  protected Set<String> getAllShardingKeysInTestRealm1() throws InvalidRoutingDataException {
+    return new HashSet<>(_routingDataReader.getRoutingData().get(TEST_REALM_1));
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestMSDAccessorLeaderElection.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestMSDAccessorLeaderElection.java
new file mode 100644
index 0000000..9a122a9
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestMSDAccessorLeaderElection.java
@@ -0,0 +1,227 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.common.ContextPropertyKeys;
+import org.apache.helix.rest.common.HelixRestNamespace;
+import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.rest.common.ServletType;
+import org.apache.helix.rest.server.auditlog.AuditLogger;
+import org.apache.helix.rest.server.filters.CORSFilter;
+import org.apache.helix.rest.server.mock.MockMetadataStoreDirectoryAccessor;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestMSDAccessorLeaderElection extends MetadataStoreDirectoryAccessorTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(TestMSDAccessorLeaderElection.class);
+  private static final String MOCK_URL_PREFIX = "/mock";
+
+  private HelixRestServer _mockHelixRestServer;
+  private String _mockBaseUri;
+  private String _leaderBaseUri;
+  private CloseableHttpClient _httpClient;
+  private HelixZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _leaderBaseUri = getBaseUri().toString();
+    _leaderBaseUri = _leaderBaseUri.substring(0, _leaderBaseUri.length() - 1);
+    int newPort = getBaseUri().getPort() + 1;
+
+    // Start a second server for testing Distributed Leader Election for writes
+    _mockBaseUri = getBaseUri().getScheme() + "://" + getBaseUri().getHost() + ":" + newPort;
+    try {
+      List<HelixRestNamespace> namespaces = new ArrayList<>();
+      // Add test namespace
+      namespaces.add(new HelixRestNamespace(TEST_NAMESPACE,
+          HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false));
+      _mockHelixRestServer = new MockHelixRestServer(namespaces, newPort, getBaseUri().getPath(),
+          Collections.singletonList(_auditLogger));
+      _mockHelixRestServer.start();
+    } catch (InterruptedException e) {
+      LOG.error("MockHelixRestServer starting encounter an exception.", e);
+    }
+
+    // Calling the original endpoint to create an instance of MetadataStoreDirectory in case
+    // it didn't exist yet.
+    get(TEST_NAMESPACE_URI_PREFIX + "/metadata-store-realms", null,
+        Response.Status.OK.getStatusCode(), true);
+
+    // Set the new uri to be used in leader election
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY, _mockBaseUri);
+
+    // Start http client for testing
+    _httpClient = HttpClients.createDefault();
+
+    // Start zkclient to verify leader election behavior
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+    MockMetadataStoreDirectoryAccessor._mockMSDInstance.close();
+    _mockHelixRestServer.shutdown();
+    _httpClient.close();
+    _zkClient.close();
+  }
+
+  @Test
+  public void testAddMetadataStoreRealmRequestForwarding()
+      throws InvalidRoutingDataException, IOException {
+    Set<String> expectedRealmsSet = getAllRealms();
+    Assert.assertFalse(expectedRealmsSet.contains(TEST_REALM_3),
+        "Metadata store directory should not have realm: " + TEST_REALM_3);
+    sendRequestAndValidate("/metadata-store-realms/" + TEST_REALM_3, HttpConstants.RestVerbs.PUT,
+        Response.Status.CREATED.getStatusCode());
+    expectedRealmsSet.add(TEST_REALM_3);
+    Assert.assertEquals(getAllRealms(), expectedRealmsSet);
+    MockMetadataStoreDirectoryAccessor._mockMSDInstance.close();
+  }
+
+  @Test(dependsOnMethods = "testAddMetadataStoreRealmRequestForwarding")
+  public void testDeleteMetadataStoreRealmRequestForwarding()
+      throws InvalidRoutingDataException, IOException {
+    Set<String> expectedRealmsSet = getAllRealms();
+    sendRequestAndValidate("/metadata-store-realms/" + TEST_REALM_3, HttpConstants.RestVerbs.DELETE,
+        Response.Status.OK.getStatusCode());
+    expectedRealmsSet.remove(TEST_REALM_3);
+    Assert.assertEquals(getAllRealms(), expectedRealmsSet);
+    MockMetadataStoreDirectoryAccessor._mockMSDInstance.close();
+  }
+
+  @Test(dependsOnMethods = "testDeleteMetadataStoreRealmRequestForwarding")
+  public void testAddShardingKeyRequestForwarding()
+      throws InvalidRoutingDataException, IOException {
+    Set<String> expectedShardingKeysSet = getAllShardingKeysInTestRealm1();
+    Assert.assertFalse(expectedShardingKeysSet.contains(TEST_SHARDING_KEY),
+        "Realm does not have sharding key: " + TEST_SHARDING_KEY);
+    sendRequestAndValidate(
+        "/metadata-store-realms/" + TEST_REALM_1 + "/sharding-keys/" + TEST_SHARDING_KEY,
+        HttpConstants.RestVerbs.PUT, Response.Status.CREATED.getStatusCode());
+    expectedShardingKeysSet.add(TEST_SHARDING_KEY);
+    Assert.assertEquals(getAllShardingKeysInTestRealm1(), expectedShardingKeysSet);
+    MockMetadataStoreDirectoryAccessor._mockMSDInstance.close();
+  }
+
+  @Test(dependsOnMethods = "testAddShardingKeyRequestForwarding")
+  public void testDeleteShardingKeyRequestForwarding()
+      throws InvalidRoutingDataException, IOException {
+    Set<String> expectedShardingKeysSet = getAllShardingKeysInTestRealm1();
+    sendRequestAndValidate(
+        "/metadata-store-realms/" + TEST_REALM_1 + "/sharding-keys/" + TEST_SHARDING_KEY,
+        HttpConstants.RestVerbs.DELETE, Response.Status.OK.getStatusCode());
+    expectedShardingKeysSet.remove(TEST_SHARDING_KEY);
+    Assert.assertEquals(getAllShardingKeysInTestRealm1(), expectedShardingKeysSet);
+    MockMetadataStoreDirectoryAccessor._mockMSDInstance.close();
+  }
+
+  private void sendRequestAndValidate(String urlSuffix, HttpConstants.RestVerbs requestMethod,
+      int expectedResponseCode) throws IllegalArgumentException, IOException {
+    String url = _mockBaseUri + TEST_NAMESPACE_URI_PREFIX + MOCK_URL_PREFIX + urlSuffix;
+    HttpUriRequest request;
+    switch (requestMethod) {
+      case PUT:
+        request = new HttpPut(url);
+        break;
+      case DELETE:
+        request = new HttpDelete(url);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported requestMethod: " + requestMethod);
+    }
+    HttpResponse response = _httpClient.execute(request);
+    Assert.assertEquals(response.getStatusLine().getStatusCode(), expectedResponseCode);
+
+    // Validate leader election behavior
+    List<String> leaderSelectionNodes =
+        _zkClient.getChildren(MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE);
+    leaderSelectionNodes.sort(Comparator.comparing(String::toString));
+    Assert.assertEquals(leaderSelectionNodes.size(), 2);
+    ZNRecord firstEphemeralNode = _zkClient.readData(
+        MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE + "/" + leaderSelectionNodes.get(0));
+    ZNRecord secondEphemeralNode = _zkClient.readData(
+        MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE + "/" + leaderSelectionNodes.get(1));
+    Assert.assertEquals(firstEphemeralNode.getId(), _leaderBaseUri);
+    Assert.assertEquals(secondEphemeralNode.getId(), _mockBaseUri);
+
+    // Make sure the operation is not done by the follower instance
+    Assert.assertFalse(MockMetadataStoreDirectoryAccessor.operatedOnZk);
+  }
+
+  /**
+   * A class that mocks HelixRestServer for testing. It overloads getResourceConfig to inject
+   * MockMetadataStoreDirectoryAccessor as a servlet.
+   */
+  class MockHelixRestServer extends HelixRestServer {
+    public MockHelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix,
+        List<AuditLogger> auditLoggers) {
+      super(namespaces, port, urlPrefix, auditLoggers);
+    }
+
+    public MockHelixRestServer(String zkAddr, int port, String urlPrefix) {
+      super(zkAddr, port, urlPrefix);
+    }
+
+    @Override
+    protected ResourceConfig getResourceConfig(HelixRestNamespace namespace, ServletType type) {
+      ResourceConfig cfg = new ResourceConfig();
+      List<String> packages = new ArrayList<>(Arrays.asList(type.getServletPackageArray()));
+      packages.add(MockMetadataStoreDirectoryAccessor.class.getPackage().getName());
+      cfg.packages(packages.toArray(new String[0]));
+      cfg.setApplicationName(namespace.getName());
+      cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
+          new ServerContext(namespace.getMetadataStoreAddress()));
+      cfg.property(ContextPropertyKeys.METADATA.name(), namespace);
+      cfg.register(new CORSFilter());
+      return cfg;
+    }
+  }
+}
\ No newline at end of file
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestMetadataStoreDirectoryAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestMetadataStoreDirectoryAccessor.java
index ee49239..b6179aa 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestMetadataStoreDirectoryAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestMetadataStoreDirectoryAccessor.java
@@ -20,8 +20,6 @@ package org.apache.helix.rest.server;
  */
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -33,89 +31,14 @@ import javax.ws.rs.core.Response;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.helix.TestHelper;
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
-import org.apache.helix.rest.common.HelixRestNamespace;
-import org.apache.helix.rest.metadatastore.MetadataStoreDirectory;
-import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-// TODO: enable asserts and add verify for refreshed MSD once write operations are ready.
-public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
-  /*
-   * The following are constants to be used for testing.
-   */
-  private static final String TEST_NAMESPACE_URI_PREFIX = "/namespaces/" + TEST_NAMESPACE;
-  private static final String NON_EXISTING_NAMESPACE_URI_PREFIX =
-      "/namespaces/not-existed-namespace/metadata-store-realms/";
-  private static final String TEST_REALM_1 = "testRealm1";
-  private static final List<String> TEST_SHARDING_KEYS_1 =
-      Arrays.asList("/sharding/key/1/a", "/sharding/key/1/b", "/sharding/key/1/c");
-  private static final String TEST_REALM_2 = "testRealm2";
-  private static final List<String> TEST_SHARDING_KEYS_2 =
-      Arrays.asList("/sharding/key/1/d", "/sharding/key/1/e", "/sharding/key/1/f");
-  private static final String TEST_REALM_3 = "testRealm3";
-  private static final String TEST_SHARDING_KEY = "/sharding/key/3/x";
-
-  // List of all ZK addresses, each of which corresponds to a namespace/routing ZK
-  private List<String> _zkList;
-  private MetadataStoreDirectory _metadataStoreDirectory;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    _zkList = new ArrayList<>(ZK_SERVER_MAP.keySet());
-
-    deleteRoutingDataPath();
-
-    // Populate routingZkAddrMap according namespaces in helix rest server.
-    // <Namespace, ZkAddr> mapping
-    Map<String, String> routingZkAddrMap = ImmutableMap
-        .of(HelixRestNamespace.DEFAULT_NAMESPACE_NAME, ZK_ADDR, TEST_NAMESPACE, _zkAddrTestNS);
-
-    // Write dummy mappings in ZK
-    // Create a node that represents a realm address and add 3 sharding keys to it
-    ZNRecord znRecord = new ZNRecord("RoutingInfo");
-
-    _zkList.forEach(zk -> {
-      ZK_SERVER_MAP.get(zk).getZkClient().setZkSerializer(new ZNRecordSerializer());
-      // Write first realm and sharding keys pair
-      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
-          TEST_SHARDING_KEYS_1);
-      ZK_SERVER_MAP.get(zk).getZkClient()
-          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
-              true);
-      ZK_SERVER_MAP.get(zk).getZkClient()
-          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_1,
-              znRecord);
-
-      // Create another realm and sharding keys pair
-      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
-          TEST_SHARDING_KEYS_2);
-      ZK_SERVER_MAP.get(zk).getZkClient()
-          .createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_2,
-              true);
-      ZK_SERVER_MAP.get(zk).getZkClient()
-          .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + TEST_REALM_2,
-              znRecord);
-    });
-
-    // Create metadataStoreDirectory
-    _metadataStoreDirectory = new ZkMetadataStoreDirectory(routingZkAddrMap);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _metadataStoreDirectory.close();
-    deleteRoutingDataPath();
-  }
-
+public class TestMetadataStoreDirectoryAccessor extends MetadataStoreDirectoryAccessorTestBase {
   /*
    * Tests REST endpoint: "GET /namespaces/{namespace}/metadata-store-namespaces"
    */
@@ -168,6 +91,7 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
    * Tests REST endpoint: "GET /metadata-store-realms?sharding-key={sharding-key}"
    */
   @Test(dependsOnMethods = "testGetAllMetadataStoreRealms")
+
   public void testGetMetadataStoreRealmWithShardingKey() throws IOException {
     String shardingKey = TEST_SHARDING_KEYS_1.get(0);
 
@@ -194,11 +118,8 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
    * Tests REST endpoint: "PUT /metadata-store-realms/{realm}"
    */
   @Test(dependsOnMethods = "testGetMetadataStoreRealmWithShardingKey")
-  public void testAddMetadataStoreRealm() {
-    Collection<String> previousRealms =
-        _metadataStoreDirectory.getAllMetadataStoreRealms(TEST_NAMESPACE);
-    Set<String> expectedRealmsSet = new HashSet<>(previousRealms);
-
+  public void testAddMetadataStoreRealm() throws InvalidRoutingDataException {
+    Set<String> expectedRealmsSet = getAllRealms();
     Assert.assertFalse(expectedRealmsSet.contains(TEST_REALM_3),
         "Metadata store directory should not have realm: " + TEST_REALM_3);
 
@@ -212,26 +133,16 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
         Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
         Response.Status.CREATED.getStatusCode());
 
-    Collection<String> updatedRealms =
-        _metadataStoreDirectory.getAllMetadataStoreRealms(TEST_NAMESPACE);
-    Set<String> updateRealmsSet = new HashSet<>(updatedRealms);
     expectedRealmsSet.add(TEST_REALM_3);
-
-//    Assert.assertEquals(updateRealmsSet, previousRealms);
+    Assert.assertEquals(getAllRealms(), expectedRealmsSet);
   }
 
   /*
    * Tests REST endpoint: "DELETE /metadata-store-realms/{realm}"
    */
   @Test(dependsOnMethods = "testAddMetadataStoreRealm")
-  public void testDeleteMetadataStoreRealm() {
-    Collection<String> previousRealms =
-        _metadataStoreDirectory.getAllMetadataStoreRealms(TEST_NAMESPACE);
-    Set<String> expectedRealmsSet = new HashSet<>(previousRealms);
-
-//    Assert.assertTrue(expectedRealmsSet.contains(TEST_REALM_3),
-//        "Metadata store directory should have realm: " + TEST_REALM_3);
-
+  public void testDeleteMetadataStoreRealm() throws InvalidRoutingDataException {
+    Set<String> expectedRealmsSet = getAllRealms();
     // Test a request that has not found response.
     delete(NON_EXISTING_NAMESPACE_URI_PREFIX + TEST_REALM_3,
         Response.Status.NOT_FOUND.getStatusCode());
@@ -240,12 +151,9 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
     delete(TEST_NAMESPACE_URI_PREFIX + "/metadata-store-realms/" + TEST_REALM_3,
         Response.Status.OK.getStatusCode());
 
-    Collection<String> updatedRealms =
-        _metadataStoreDirectory.getAllMetadataStoreRealms(TEST_NAMESPACE);
-    Set<String> updateRealmsSet = new HashSet<>(updatedRealms);
+    Set<String> updateRealmsSet = getAllRealms();
     expectedRealmsSet.remove(TEST_REALM_3);
-
-//    Assert.assertEquals(updateRealmsSet, previousRealms);
+    Assert.assertEquals(updateRealmsSet, expectedRealmsSet);
   }
 
   /*
@@ -299,9 +207,8 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
      *   } ]
      * }
      */
-    String responseBody =
-        new JerseyUriRequestBuilder(TEST_NAMESPACE_URI_PREFIX + "/routing-data")
-            .isBodyReturnExpected(true).get(this);
+    String responseBody = new JerseyUriRequestBuilder(TEST_NAMESPACE_URI_PREFIX + "/routing-data")
+        .isBodyReturnExpected(true).get(this);
 
     // It is safe to cast the object and suppress warnings.
     @SuppressWarnings("unchecked")
@@ -463,54 +370,42 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
    * Tests REST endpoint: "PUT /metadata-store-realms/{realm}/sharding-keys/{sharding-key}"
    */
   @Test(dependsOnMethods = "testGetRealmShardingKeysUnderPath")
-  public void testAddShardingKey() {
-    Set<String> expectedShardingKeysSet = new HashSet<>(
-        _metadataStoreDirectory.getAllShardingKeysInRealm(TEST_NAMESPACE, TEST_REALM_1));
-
+  public void testAddShardingKey() throws InvalidRoutingDataException {
+    Set<String> expectedShardingKeysSet = getAllShardingKeysInTestRealm1();
     Assert.assertFalse(expectedShardingKeysSet.contains(TEST_SHARDING_KEY),
         "Realm does not have sharding key: " + TEST_SHARDING_KEY);
 
     // Request that gets not found response.
-    put(NON_EXISTING_NAMESPACE_URI_PREFIX + TEST_REALM_1 + "/sharding-keys/" + TEST_SHARDING_KEY,
+    put(NON_EXISTING_NAMESPACE_URI_PREFIX + TEST_REALM_1 + "/sharding-keys" + TEST_SHARDING_KEY,
         null, Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
         Response.Status.NOT_FOUND.getStatusCode());
 
     // Successful request.
-    put(TEST_NAMESPACE_URI_PREFIX + "/metadata-store-realms/" + TEST_REALM_1 + "/sharding-keys/"
+    put(TEST_NAMESPACE_URI_PREFIX + "/metadata-store-realms/" + TEST_REALM_1 + "/sharding-keys"
             + TEST_SHARDING_KEY, null, Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
         Response.Status.CREATED.getStatusCode());
 
-    Set<String> updatedShardingKeysSet = new HashSet<>(
-        _metadataStoreDirectory.getAllShardingKeysInRealm(TEST_NAMESPACE, TEST_REALM_1));
     expectedShardingKeysSet.add(TEST_SHARDING_KEY);
-
-//    Assert.assertEquals(updatedShardingKeysSet, expectedShardingKeysSet);
+    Assert.assertEquals(getAllShardingKeysInTestRealm1(), expectedShardingKeysSet);
   }
 
   /*
    * Tests REST endpoint: "PUT /metadata-store-realms/{realm}/sharding-keys/{sharding-key}"
    */
   @Test(dependsOnMethods = "testAddShardingKey")
-  public void testDeleteShardingKey() {
-    Set<String> expectedShardingKeysSet = new HashSet<>(
-        _metadataStoreDirectory.getAllShardingKeysInRealm(TEST_NAMESPACE, TEST_REALM_1));
-
-//    Assert.assertTrue(expectedShardingKeysSet.contains(TEST_SHARDING_KEY),
-//        "Realm should have sharding key: " + TEST_SHARDING_KEY);
+  public void testDeleteShardingKey() throws InvalidRoutingDataException {
+    Set<String> expectedShardingKeysSet = getAllShardingKeysInTestRealm1();
 
     // Request that gets not found response.
-    delete(NON_EXISTING_NAMESPACE_URI_PREFIX + TEST_REALM_1 + "/sharding-keys/" + TEST_SHARDING_KEY,
+    delete(NON_EXISTING_NAMESPACE_URI_PREFIX + TEST_REALM_1 + "/sharding-keys" + TEST_SHARDING_KEY,
         Response.Status.NOT_FOUND.getStatusCode());
 
     // Successful request.
-    delete(TEST_NAMESPACE_URI_PREFIX + "/metadata-store-realms/" + TEST_REALM_1 + "/sharding-keys/"
+    delete(TEST_NAMESPACE_URI_PREFIX + "/metadata-store-realms/" + TEST_REALM_1 + "/sharding-keys"
         + TEST_SHARDING_KEY, Response.Status.OK.getStatusCode());
 
-    Set<String> updatedShardingKeysSet = new HashSet<>(
-        _metadataStoreDirectory.getAllShardingKeysInRealm(TEST_NAMESPACE, TEST_REALM_1));
     expectedShardingKeysSet.remove(TEST_SHARDING_KEY);
-
-//    Assert.assertEquals(updatedShardingKeysSet, expectedShardingKeysSet);
+    Assert.assertEquals(getAllShardingKeysInTestRealm1(), expectedShardingKeysSet);
   }
 
   private void verifyRealmShardingKeys(String responseBody) throws IOException {
@@ -536,20 +431,4 @@ public class TestMetadataStoreDirectoryAccessor extends AbstractTestClass {
 
     Assert.assertEquals(queriedShardingKeys, expectedShardingKeys);
   }
-
-  private void deleteRoutingDataPath() throws Exception {
-    Assert.assertTrue(TestHelper.verify(() -> {
-      _zkList.forEach(zk -> ZK_SERVER_MAP.get(zk).getZkClient()
-          .deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH));
-
-      for (String zk : _zkList) {
-        if (ZK_SERVER_MAP.get(zk).getZkClient()
-            .exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
-          return false;
-        }
-      }
-
-      return true;
-    }, TestHelper.WAIT_DURATION), "Routing data path should be deleted after the tests.");
-  }
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/mock/MockMetadataStoreDirectoryAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/mock/MockMetadataStoreDirectoryAccessor.java
new file mode 100644
index 0000000..b5452f1
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/mock/MockMetadataStoreDirectoryAccessor.java
@@ -0,0 +1,124 @@
+package org.apache.helix.rest.server.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.ws.rs.Path;
+
+import org.apache.helix.msdcommon.datamodel.TrieRoutingData;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.metadatastore.MetadataStoreDirectory;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWriter;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter;
+import org.apache.helix.rest.server.resources.metadatastore.MetadataStoreDirectoryAccessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An accessor that mocks the MetadataStoreDirectoryAccessor for testing purpose.
+ */
+@Path("/mock")
+public class MockMetadataStoreDirectoryAccessor extends MetadataStoreDirectoryAccessor {
+  //TODO: use this class as a template for https://github.com/apache/helix/issues/816
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MockMetadataStoreDirectoryAccessor.class);
+  // A flag that will be modified if the underlying MockZkRoutingDataWriter makes an operation
+  // against ZooKeeper
+  public static boolean operatedOnZk = false;
+  // The instance of mockMSD that's created by this accessor; it's saved here to be closed later
+  public static MetadataStoreDirectory _mockMSDInstance;
+
+  /**
+   * This method is overriden so that an instance of MockZkMetadataStoreDirectory can be passed in
+   */
+  @Override
+  protected void buildMetadataStoreDirectory(String namespace, String address) {
+    try {
+      _metadataStoreDirectory = new MockZkMetadataStoreDirectory(namespace, address);
+      _mockMSDInstance = _metadataStoreDirectory;
+    } catch (InvalidRoutingDataException e) {
+      LOG.error("buildMetadataStoreDirectory encountered an exception.", e);
+    }
+  }
+
+  /**
+   * Used to artificially create another instance of ZkMetadataStoreDirectory.
+   * ZkMetadataStoreDirectory being a singleton makes it difficult to test it,
+   * therefore this is the only way to create another instance.
+   */
+  class MockZkMetadataStoreDirectory extends ZkMetadataStoreDirectory {
+    MockZkMetadataStoreDirectory(String namespace, String zkAddress)
+        throws InvalidRoutingDataException {
+      super();
+
+      // Manually populate the map so that MockZkRoutingDataWriter can be passed in
+      _routingZkAddressMap.put(namespace, zkAddress);
+      _routingDataReaderMap.put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this));
+      _routingDataWriterMap.put(namespace, new MockZkRoutingDataWriter(namespace, zkAddress));
+      _realmToShardingKeysMap.put(namespace, _routingDataReaderMap.get(namespace).getRoutingData());
+      _routingDataMap.put(namespace, new TrieRoutingData(_realmToShardingKeysMap.get(namespace)));
+    }
+
+    @Override
+    public void close() {
+      _routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close);
+      _routingDataWriterMap.values().forEach(MetadataStoreRoutingDataWriter::close);
+    }
+  }
+
+  /**
+   * A mock to ZkRoutingDataWriter. The only purpose is to set the static flag signifying that
+   * this writer is used for zookeeper operations.
+   */
+  class MockZkRoutingDataWriter extends ZkRoutingDataWriter {
+    public MockZkRoutingDataWriter(String namespace, String zkAddress) {
+      super(namespace, zkAddress);
+      operatedOnZk = false;
+    }
+
+    @Override
+    protected boolean createZkRealm(String realm) {
+      operatedOnZk = true;
+      return super.createZkRealm(realm);
+    }
+
+    @Override
+    protected boolean deleteZkRealm(String realm) {
+      operatedOnZk = true;
+      return super.deleteZkRealm(realm);
+    }
+
+    @Override
+    protected boolean createZkShardingKey(String realm, String shardingKey) {
+      operatedOnZk = true;
+      return super.createZkShardingKey(realm, shardingKey);
+    }
+
+    @Override
+    protected boolean deleteZkShardingKey(String realm, String shardingKey) {
+      operatedOnZk = true;
+      return super.deleteZkShardingKey(realm, shardingKey);
+    }
+  }
+}
diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
index e3d541b..766f98a 100644
--- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
+++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
@@ -66,9 +66,19 @@ public class MetadataStoreRoutingConstants {
   // System Property Metadata Store Directory Server endpoint key
   public static final String MSDS_SERVER_ENDPOINT_KEY = "metadataStoreDirectoryServerEndpoint";
 
+  // Prefix to MSDS resource endpoints
+  public static final String MSDS_NAMESPACES_URL_PREFIX = "/namespaces";
+
   // MSDS resource getAllRealms endpoint string
   public static final String MSDS_GET_ALL_REALMS_ENDPOINT = "/metadata-store-realms";
 
   // MSDS resource get all routing data endpoint string
   public static final String MSDS_GET_ALL_ROUTING_DATA_ENDPOINT = "/routing-data";
+
+  // MSDS resource get all sharding keys endpoint string
+  public static final String MSDS_GET_ALL_SHARDING_KEYS_ENDPOINT = "/sharding-keys";
+
+  // The key for system properties that contains the hostname of of the
+  // MetadataStoreDirectoryService server instance
+  public static final String MSDS_SERVER_HOSTNAME_KEY = "msds_hostname";
 }


Mime
View raw message