helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 09/43: Add MetadataStoreRoutingDataWriter with DistributedLeaderElection (#727)
Date Tue, 17 Mar 2020 00:03:40 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ab62683bc91260d19598fc29518616c1fd215923
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Mon Feb 10 16:57:58 2020 -0800

    Add MetadataStoreRoutingDataWriter with DistributedLeaderElection (#727)
    
    We need a separate ZkClient-based writer that could allow users to write routing data
to ZK. This diff adds such an interface, an implementation, and a distributed lock implementation
that could help users to manipulate the routing data.
    
    Changelist:
    
    Add ZkRoutingDataWriter (+ interface)
    Add ZkDistributedLock (+ interface) to guarantee that there's at most one active writer
at a time (where there are multiple Helix REST deployables)
    Add a test for ZkRoutingDataWriter
    Integrate ZkRoutingDataWriter with ZkMetadataStoreDirectory
    Add test methods to TestZkMetadataStoreDirectory
    Add ZkDistributedElection to replace ZkDistributedLock (and move ZkDistributedLock to
a separate PR)
---
 .../metadatastore/ZkMetadataStoreDirectory.java    |  62 +++--
 .../MetadataStoreRoutingDataReader.java            |   3 +-
 .../accessor/MetadataStoreRoutingDataWriter.java   |  74 ++++++
 .../{ => accessor}/ZkRoutingDataReader.java        |  25 +-
 .../accessor/ZkRoutingDataWriter.java              | 253 +++++++++++++++++++++
 .../concurrency/ZkDistributedLeaderElection.java   | 142 ++++++++++++
 .../constant/MetadataStoreRoutingConstants.java    |   3 +
 .../{ => accessor}/TestZkRoutingDataReader.java    |   2 +-
 .../accessor/TestZkRoutingDataWriter.java          | 107 +++++++++
 .../helix/rest/server/AbstractTestClass.java       |   9 -
 10 files changed, 627 insertions(+), 53 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
index 5a88ca9..536d058 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -29,16 +29,11 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
-import org.apache.helix.manager.zk.client.HelixZkClient;
-import org.apache.helix.manager.zk.client.HelixZkClient.ZkClientConfig;
-import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWriter;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter;
 import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +47,7 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory,
Routing
   // TODO: enable the line below when implementation is complete
   // The following maps' keys represent the namespace
   private final Map<String, MetadataStoreRoutingDataReader> _routingDataReaderMap;
+  private final Map<String, MetadataStoreRoutingDataWriter> _routingDataWriterMap;
   private final Map<String, MetadataStoreRoutingData> _routingDataMap;
   private final Map<String, String> _routingZkAddressMap;
   // <namespace, <realm, <list of sharding keys>> mappings
@@ -68,14 +64,17 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory,
Routing
       throw new InvalidRoutingDataException("Routing ZK Addresses given are invalid!");
     }
     _routingDataReaderMap = new HashMap<>();
+    _routingDataWriterMap = new HashMap<>();
     _routingZkAddressMap = routingZkAddressMap;
     _realmToShardingKeysMap = new ConcurrentHashMap<>();
     _routingDataMap = new ConcurrentHashMap<>();
 
-    // Create RoutingDataReaders
+    // Create RoutingDataReaders and RoutingDataWriters
     for (Map.Entry<String, String> routingEntry : _routingZkAddressMap.entrySet())
{
       _routingDataReaderMap.put(routingEntry.getKey(),
           new ZkRoutingDataReader(routingEntry.getKey(), routingEntry.getValue(), this));
+      _routingDataWriterMap.put(routingEntry.getKey(),
+          new ZkRoutingDataWriter(routingEntry.getKey(), routingEntry.getValue()));
 
       // Populate realmToShardingKeys with ZkRoutingDataReader
       _realmToShardingKeysMap.put(routingEntry.getKey(),
@@ -132,26 +131,38 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory,
Routing
 
   @Override
   public boolean addMetadataStoreRealm(String namespace, String realm) {
-    // TODO implement when MetadataStoreRoutingDataWriter is ready
-    throw new UnsupportedOperationException();
+    if (!_routingDataWriterMap.containsKey(namespace)) {
+      throw new IllegalArgumentException(
+          "Failed to add metadata store realm: Namespace " + namespace + " is not found!");
+    }
+    return _routingDataWriterMap.get(namespace).addMetadataStoreRealm(realm);
   }
 
   @Override
   public boolean deleteMetadataStoreRealm(String namespace, String realm) {
-    // TODO implement when MetadataStoreRoutingDataWriter is ready
-    throw new UnsupportedOperationException();
+    if (!_routingDataWriterMap.containsKey(namespace)) {
+      throw new IllegalArgumentException(
+          "Failed to delete metadata store realm: Namespace " + namespace + " is not found!");
+    }
+    return _routingDataWriterMap.get(namespace).deleteMetadataStoreRealm(realm);
   }
 
   @Override
   public boolean addShardingKey(String namespace, String realm, String shardingKey) {
-    // TODO implement when MetadataStoreRoutingDataWriter is ready
-    throw new UnsupportedOperationException();
+    if (!_routingDataWriterMap.containsKey(namespace)) {
+      throw new IllegalArgumentException(
+          "Failed to add sharding key: Namespace " + namespace + " is not found!");
+    }
+    return _routingDataWriterMap.get(namespace).addShardingKey(realm, shardingKey);
   }
 
   @Override
   public boolean deleteShardingKey(String namespace, String realm, String shardingKey) {
-    // TODO implement when MetadataStoreRoutingDataWriter is ready
-    throw new UnsupportedOperationException();
+    if (!_routingDataWriterMap.containsKey(namespace)) {
+      throw new IllegalArgumentException(
+          "Failed to delete sharding key: Namespace " + namespace + " is not found!");
+    }
+    return _routingDataWriterMap.get(namespace).deleteShardingKey(realm, shardingKey);
   }
 
   /**
@@ -165,20 +176,20 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory,
Routing
    */
   @Override
   public void refreshRoutingData(String namespace) {
-    // Safe to ignore the callback if any of the mapping is null.
+    // Safe to ignore the callback if any of the maps are null.
     // If routingDataMap is null, then it will be populated by the constructor anyway
     // If routingDataMap is not null, then it's safe for the callback function to update
it
-    if (_routingZkAddressMap == null || _routingDataMap == null
-        || _realmToShardingKeysMap == null) {
-      LOG.error("Construction is not completed! ");
+    if (_routingZkAddressMap == null || _routingDataMap == null || _realmToShardingKeysMap
== null
+        || _routingDataReaderMap == null || _routingDataWriterMap == null) {
+      LOG.warn(
+          "refreshRoutingData callback called before ZKMetadataStoreDirectory was fully initialized.
Skipping refresh!");
       return;
     }
 
     // Check if namespace exists; otherwise, return as a NOP and log it
     if (!_routingZkAddressMap.containsKey(namespace)) {
-      LOG.error("Failed to refresh internally-cached routing data! Namespace not found: {}",
-          namespace);
-      return;
+      LOG.error(
+          "Failed to refresh internally-cached routing data! Namespace not found: " + namespace);
     }
 
     try {
@@ -197,5 +208,6 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory,
Routing
   @Override
   public synchronized void close() {
     _routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close);
+    _routingDataWriterMap.values().forEach(MetadataStoreRoutingDataWriter::close);
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java
similarity index 93%
rename from helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java
rename to helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java
index 3cc9a06..f19e8ff 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.metadatastore;
+package org.apache.helix.rest.metadatastore.accessor;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -26,6 +26,7 @@ import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataExceptio
 /**
  * An interface for a DAO that fetches routing data from a source and return a key-value
mapping
  * that represent the said routing data.
+ * Note: Each data reader connects to a single namespace.
  */
 public interface MetadataStoreRoutingDataReader {
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java
new file mode 100644
index 0000000..349bbd0
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java
@@ -0,0 +1,74 @@
+package org.apache.helix.rest.metadatastore.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * An interface for a DAO that writes to the metadata store that stores routing data.
+ * Note: Each data writer connects to a single namespace.
+ */
+public interface MetadataStoreRoutingDataWriter {
+
+  /**
+   * Creates a realm. If the namespace does not exist, it creates one.
+   * @param realm
+   * @return true if successful or if the realm already exists. false otherwise.
+   */
+  boolean addMetadataStoreRealm(String realm);
+
+  /**
+   * Deletes a realm.
+   * @param realm
+   * @return true if successful or the realm or namespace does not exist. false otherwise.
+   */
+  boolean deleteMetadataStoreRealm(String realm);
+
+  /**
+   * Creates a mapping between the sharding key to the realm. If realm doesn't exist, it
will be created (this call is idempotent).
+   * @param realm
+   * @param shardingKey
+   * @return false if failed
+   */
+  boolean addShardingKey(String realm, String shardingKey);
+
+  /**
+   * Deletes the mapping between the sharding key to the realm.
+   * @param realm
+   * @param shardingKey
+   * @return false if failed; true if the deletion is successful or the key does not exist.
+   */
+  boolean deleteShardingKey(String realm, String shardingKey);
+
+  /**
+   * Sets (overwrites) the routing data with the given <realm, list of sharding keys>
mapping.
+   * WARNING: This overwrites all existing routing data. Use with care!
+   * @param routingData
+   * @return
+   */
+  boolean setRoutingData(Map<String, List<String>> routingData);
+
+  /**
+   * Closes any stateful resources such as connections or threads.
+   */
+  void close();
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
similarity index 92%
rename from helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
rename to helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
index 453180f..ea8c290 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.metadatastore;
+package org.apache.helix.rest.metadatastore.accessor;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.helix.rest.metadatastore.RoutingDataListener;
 import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
 import org.apache.zookeeper.Watcher;
@@ -42,10 +43,6 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   private final HelixZkClient _zkClient;
   private final RoutingDataListener _routingDataListener;
 
-  public ZkRoutingDataReader(String namespace, String zkAddress) {
-    this(namespace, zkAddress, null);
-  }
-
   public ZkRoutingDataReader(String namespace, String zkAddress,
       RoutingDataListener routingDataListener) {
     if (namespace == null || namespace.isEmpty()) {
@@ -115,8 +112,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   }
 
   @Override
-  public synchronized void handleDataChange(String s, Object o)
-      throws Exception {
+  public synchronized void handleDataChange(String s, Object o) {
     if (_zkClient.isClosed()) {
       return;
     }
@@ -124,8 +120,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   }
 
   @Override
-  public synchronized void handleDataDeleted(String s)
-      throws Exception {
+  public synchronized void handleDataDeleted(String s) {
     if (_zkClient.isClosed()) {
       return;
     }
@@ -140,8 +135,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   }
 
   @Override
-  public synchronized void handleChildChange(String s, List<String> list)
-      throws Exception {
+  public synchronized void handleChildChange(String s, List<String> list) {
     if (_zkClient.isClosed()) {
       return;
     }
@@ -156,8 +150,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   }
 
   @Override
-  public synchronized void handleStateChanged(Watcher.Event.KeeperState state)
-      throws Exception {
+  public synchronized void handleStateChanged(Watcher.Event.KeeperState state) {
     if (_zkClient.isClosed()) {
       return;
     }
@@ -165,8 +158,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   }
 
   @Override
-  public synchronized void handleNewSession(String sessionId)
-      throws Exception {
+  public synchronized void handleNewSession(String sessionId) {
     if (_zkClient.isClosed()) {
       return;
     }
@@ -174,8 +166,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader,
IZkD
   }
 
   @Override
-  public synchronized void handleSessionEstablishmentError(Throwable error)
-      throws Exception {
+  public synchronized void handleSessionEstablishmentError(Throwable error) {
     if (_zkClient.isClosed()) {
       return;
     }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
new file mode 100644
index 0000000..3e43202
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
@@ -0,0 +1,253 @@
+package org.apache.helix.rest.metadatastore.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
+
+  private final String _namespace;
+  private final HelixZkClient _zkClient;
+  private final ZkDistributedLeaderElection _leaderElection;
+
+  public ZkRoutingDataWriter(String namespace, String zkAddress) {
+    if (namespace == null || namespace.isEmpty()) {
+      throw new IllegalArgumentException("namespace cannot be null or empty!");
+    }
+    _namespace = namespace;
+    if (zkAddress == null || zkAddress.isEmpty()) {
+      throw new IllegalArgumentException("Zk address cannot be null or empty!");
+    }
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+
+    // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create
+    // create() semantic will fail if it already exists
+    try {
+      _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
+    } catch (ZkNodeExistsException e) {
+      // This is okay
+    }
+
+    // Get the hostname (REST endpoint) from System property
+    // TODO: Fill in when Helix REST implementations are ready
+    ZNRecord myServerInfo = new ZNRecord("dummy hostname");
+    _leaderElection = new ZkDistributedLeaderElection(_zkClient,
+        MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE, myServerInfo);
+  }
+
+  @Override
+  public synchronized boolean addMetadataStoreRealm(String realm) {
+    if (_leaderElection.isLeader()) {
+      if (_zkClient.isClosed()) {
+        throw new IllegalStateException("ZkClient is closed!");
+      }
+      return createZkRealm(realm);
+    }
+
+    // TODO: Forward the request to leader
+    return true;
+  }
+
+  @Override
+  public synchronized boolean deleteMetadataStoreRealm(String realm) {
+    if (_leaderElection.isLeader()) {
+      if (_zkClient.isClosed()) {
+        throw new IllegalStateException("ZkClient is closed!");
+      }
+      return _zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm);
+    }
+
+    // TODO: Forward the request to leader
+    return true;
+  }
+
+  @Override
+  public synchronized boolean addShardingKey(String realm, String shardingKey) {
+    if (_leaderElection.isLeader()) {
+      if (_zkClient.isClosed()) {
+        throw new IllegalStateException("ZkClient is closed!");
+      }
+      // If the realm does not exist already, then create the realm
+      String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm;
+      if (!_zkClient.exists(realmPath)) {
+        // Create the realm
+        if (!createZkRealm(realm)) {
+          // Failed to create the realm - log and return false
+          LOG.error(
+              "Failed to add sharding key because ZkRealm creation failed! Namespace: {},
Realm: {}, Sharding key: {}",
+              _namespace, realm, shardingKey);
+          return false;
+        }
+      }
+
+      // Add the sharding key to an empty ZNRecord
+      ZNRecord znRecord;
+      try {
+        znRecord = _zkClient.readData(realmPath);
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to read the realm ZNRecord in addShardingKey()! Namespace: {}, Realm:
{}, ShardingKey: {}",
+            _namespace, realm, shardingKey, e);
+        return false;
+      }
+      znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+          Collections.singletonList(shardingKey));
+      try {
+        _zkClient.writeData(realmPath, znRecord);
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to write the realm ZNRecord in addShardingKey()! Namespace: {}, Realm:
{}, ShardingKey: {}",
+            _namespace, realm, shardingKey, e);
+        return false;
+      }
+      return true;
+    }
+
+    // TODO: Forward the request to leader
+    return true;
+  }
+
+  @Override
+  public synchronized boolean deleteShardingKey(String realm, String shardingKey) {
+    if (_leaderElection.isLeader()) {
+      if (_zkClient.isClosed()) {
+        throw new IllegalStateException("ZkClient is closed!");
+      }
+      ZNRecord znRecord =
+          _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm,
true);
+      if (znRecord == null || !znRecord
+          .getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+          .contains(shardingKey)) {
+        // This realm does not exist or shardingKey doesn't exist. Return true!
+        return true;
+      }
+      znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+          .remove(shardingKey);
+      // Overwrite this ZNRecord with the sharding key removed
+      try {
+        _zkClient
+            .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord);
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to write the data back in deleteShardingKey()! Namespace: {}, Realm:
{}, ShardingKey: {}",
+            _namespace, realm, shardingKey, e);
+        return false;
+      }
+      return true;
+    }
+
+    // TODO: Forward the request to leader
+    return true;
+  }
+
+  @Override
+  public synchronized boolean setRoutingData(Map<String, List<String>> routingData)
{
+    if (_leaderElection.isLeader()) {
+      if (_zkClient.isClosed()) {
+        throw new IllegalStateException("ZkClient is closed!");
+      }
+      if (routingData == null) {
+        throw new IllegalArgumentException("routingData given is null!");
+      }
+
+      // Remove existing routing data
+      for (String zkRealm : _zkClient
+          .getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+        if (!_zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm))
{
+          LOG.error(
+              "Failed to delete existing routing data in setRoutingData()! Namespace: {},
Realm: {}",
+              _namespace, zkRealm);
+          return false;
+        }
+      }
+
+      // For each ZkRealm, write the given routing data to ZooKeeper
+      for (Map.Entry<String, List<String>> routingDataEntry : routingData.entrySet())
{
+        String zkRealm = routingDataEntry.getKey();
+        List<String> shardingKeyList = routingDataEntry.getValue();
+
+        ZNRecord znRecord = new ZNRecord(zkRealm);
+        znRecord
+            .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, shardingKeyList);
+
+        String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm;
+        try {
+          if (!_zkClient.exists(realmPath)) {
+            _zkClient.createPersistent(realmPath);
+          }
+          _zkClient.writeData(realmPath, znRecord);
+        } catch (Exception e) {
+          LOG.error("Failed to write data in setRoutingData()! Namespace: {}, Realm: {}",
+              _namespace, zkRealm, e);
+          return false;
+        }
+      }
+      return true;
+    }
+
+    // TODO: Forward the request to leader
+    return true;
+  }
+
+  @Override
+  public synchronized void close() {
+    _zkClient.close();
+  }
+
+  /**
+   * Creates a ZK realm ZNode and populates it with an empty ZNRecord if it doesn't exist
already.
+   * @param realm
+   * @return
+   */
+  private boolean createZkRealm(String realm) {
+    if (_zkClient.exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm))
{
+      LOG.warn("createZkRealm() called for realm: {}, but this realm already exists! Namespace:
{}",
+          realm, _namespace);
+      return true;
+    }
+    try {
+      _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" +
realm);
+      _zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm,
+          new ZNRecord(realm));
+    } catch (Exception e) {
+      LOG.error("Failed to create ZkRealm: {}, Namespace: ", realm, _namespace);
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
new file mode 100644
index 0000000..c9b6bb2
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
@@ -0,0 +1,142 @@
+package org.apache.helix.rest.metadatastore.concurrency;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkDistributedLeaderElection implements IZkDataListener, IZkStateListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLeaderElection.class);
+  private static final String PREFIX = "MSDS_SERVER_";
+
+  private final HelixZkClient _zkClient;
+  private final String _basePath;
+  private final ZNRecord _participantInfo;
+  private ZNRecord _currentLeaderInfo;
+
+  private String _myEphemeralSequentialPath;
+  private volatile boolean _isLeader;
+
+  public ZkDistributedLeaderElection(HelixZkClient zkClient, String basePath,
+      ZNRecord participantInfo) {
+    synchronized (this) {
+      if (zkClient == null || zkClient.isClosed()) {
+        throw new IllegalArgumentException("ZkClient cannot be null or closed!");
+      }
+      _zkClient = zkClient;
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
+      if (basePath == null || basePath.isEmpty()) {
+        throw new IllegalArgumentException("lockBasePath cannot be null or empty!");
+      }
+      _basePath = basePath;
+      _participantInfo = participantInfo;
+      _isLeader = false;
+    }
+    init();
+  }
+
+  /**
+   * Create the base path if it doesn't exist and create an ephemeral sequential ZNode.
+   */
+  private void init() {
+    try {
+      _zkClient.createPersistent(_basePath, true);
+    } catch (ZkNodeExistsException e) {
+      // Okay if it exists already
+    }
+
+    // Create my ephemeral sequential node with my information
+    _myEphemeralSequentialPath = _zkClient
+        .create(_basePath + "/" + PREFIX, _participantInfo, CreateMode.EPHEMERAL_SEQUENTIAL);
+    if (_myEphemeralSequentialPath == null) {
+      throw new IllegalStateException(
+          "Unable to create ephemeral sequential node at path: " + _basePath);
+    }
+    tryAcquiringLeadership();
+  }
+
+  private void tryAcquiringLeadership() {
+    List<String> children = _zkClient.getChildren(_basePath);
+    Collections.sort(children);
+    String leaderName = children.get(0);
+    ZNRecord leaderInfo = _zkClient.readData(_basePath + "/" + leaderName, true);
+
+    String[] myNameArray = _myEphemeralSequentialPath.split("/");
+    String myName = myNameArray[myNameArray.length - 1];
+
+    if (leaderName.equals(myName)) {
+      // My turn for leadership
+      _isLeader = true;
+      _currentLeaderInfo = leaderInfo;
+      LOG.info("{} acquired leadership! Info: {}", myName, leaderInfo);
+    } else {
+      // Watch the ephemeral ZNode before me for a deletion event
+      String beforeMe = children.get(children.indexOf(myName) - 1);
+      _zkClient.subscribeDataChanges(_basePath + "/" + beforeMe, this);
+    }
+  }
+
+  public synchronized boolean isLeader() {
+    return _isLeader;
+  }
+
+  public synchronized ZNRecord getCurrentLeaderInfo() {
+    return _currentLeaderInfo;
+  }
+
+  @Override
+  public synchronized void handleStateChanged(Watcher.Event.KeeperState state) {
+    if (state == Watcher.Event.KeeperState.SyncConnected) {
+      init();
+    }
+  }
+
+  @Override
+  public void handleNewSession(String sessionId) {
+    return;
+  }
+
+  @Override
+  public void handleSessionEstablishmentError(Throwable error) {
+    return;
+  }
+
+  @Override
+  public void handleDataChange(String s, Object o) {
+    return;
+  }
+
+  @Override
+  public void handleDataDeleted(String s) {
+    tryAcquiringLeadership();
+  }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
index fda355b..e4240e7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
@@ -24,4 +24,7 @@ public class MetadataStoreRoutingConstants {
 
   // For ZK only
   public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
+
+  // Leader election ZNode for ZkRoutingDataWriter
+  public static final String LEADER_ELECTION_ZNODE = "/_ZK_ROUTING_DATA_WRITER_LEADER";
 }
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java
similarity index 99%
rename from helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
rename to helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java
index 4479f68..77eb5eb 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.metadatastore;
+package org.apache.helix.rest.metadatastore.accessor;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
new file mode 100644
index 0000000..441bf65
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
@@ -0,0 +1,107 @@
+package org.apache.helix.rest.metadatastore.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.rest.server.AbstractTestClass;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkRoutingDataWriter extends AbstractTestClass {
+  private static final String DUMMY_NAMESPACE = "NAMESPACE";
+  private static final String DUMMY_REALM = "REALM";
+  private static final String DUMMY_SHARDING_KEY = "SHARDING_KEY";
+  private MetadataStoreRoutingDataWriter _zkRoutingDataWriter;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkRoutingDataWriter = new ZkRoutingDataWriter(DUMMY_NAMESPACE, ZK_ADDR);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, AccessOption.PERSISTENT);
+    _zkRoutingDataWriter.close();
+  }
+
+  @Test
+  public void testAddMetadataStoreRealm() {
+    _zkRoutingDataWriter.addMetadataStoreRealm(DUMMY_REALM);
+    ZNRecord znRecord = _baseAccessor
+        .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+            AccessOption.PERSISTENT);
+    Assert.assertNotNull(znRecord);
+  }
+
+  @Test(dependsOnMethods = "testAddMetadataStoreRealm")
+  public void testDeleteMetadataStoreRealm() {
+    _zkRoutingDataWriter.deleteMetadataStoreRealm(DUMMY_REALM);
+    Assert.assertFalse(_baseAccessor
+        .exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM,
+            AccessOption.PERSISTENT));
+  }
+
+  @Test(dependsOnMethods = "testDeleteMetadataStoreRealm")
+  public void testAddShardingKey() {
+    _zkRoutingDataWriter.addShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY);
+    ZNRecord znRecord = _baseAccessor
+        .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+            AccessOption.PERSISTENT);
+    Assert.assertNotNull(znRecord);
+    Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .contains(DUMMY_SHARDING_KEY));
+  }
+
+  @Test(dependsOnMethods = "testAddShardingKey")
+  public void testDeleteShardingKey() {
+    _zkRoutingDataWriter.deleteShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY);
+    ZNRecord znRecord = _baseAccessor
+        .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+            AccessOption.PERSISTENT);
+    Assert.assertNotNull(znRecord);
+    Assert.assertFalse(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .contains(DUMMY_SHARDING_KEY));
+  }
+
+  @Test(dependsOnMethods = "testDeleteShardingKey")
+  public void testSetRoutingData() {
+    Map<String, List<String>> testRoutingDataMap =
+        ImmutableMap.of(DUMMY_REALM, Collections.singletonList(DUMMY_SHARDING_KEY));
+    _zkRoutingDataWriter.setRoutingData(testRoutingDataMap);
+    ZNRecord znRecord = _baseAccessor
+        .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+            AccessOption.PERSISTENT);
+    Assert.assertNotNull(znRecord);
+    Assert.assertEquals(
+        znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).size(),
1);
+    Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .contains(DUMMY_SHARDING_KEY));
+  }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index e6ecb82..c5ffd41 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -269,19 +269,10 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest
{
       _gZkClient = null;
     }
 
-    if (_zkServer != null) {
-      TestHelper.stopZkServer(_zkServer);
-      _zkServer = null;
-    }
-
     if (_gZkClientTestNS != null) {
       _gZkClientTestNS.close();
       _gZkClientTestNS = null;
     }
-    if (_zkServerTestNS != null) {
-      TestHelper.stopZkServer(_zkServerTestNS);
-      _zkServerTestNS = null;
-    }
 
     if (_helixRestServer != null) {
       _helixRestServer.shutdown();


Mime
View raw message