helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] branch master updated: Add Builder to ZKDistributedNonblockingLock to enable ZooScalability use (#1409)
Date Fri, 02 Oct 2020 18:35:49 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new bf40f7e  Add Builder to ZKDistributedNonblockingLock to enable ZooScalability use
(#1409)
bf40f7e is described below

commit bf40f7ea34e48f27c71fc247f50b68a5197dcef8
Author: Hunter Lee <hulee@linkedin.com>
AuthorDate: Fri Oct 2 11:35:38 2020 -0700

    Add Builder to ZKDistributedNonblockingLock to enable ZooScalability use (#1409)
    
    ZKDistributedNonblockingLock and its related helix-lock API were not ZooScalability-compliant,
meaning they could not be used in a multi-ZK environment. This PR updates the said APIs so
that we could use them in multi-ZK environment by adding a Builder that allows users to set
multi-ZK parameters.
---
 .../helix/manager/zk/GenericZkHelixApiBuilder.java |  2 -
 .../lock/helix/ZKDistributedNonblockingLock.java   | 72 +++++++++++++++++++---
 2 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
index 0447eee..fd24eb3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
@@ -24,11 +24,9 @@ import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
-import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
-import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
index a58d33d..18a9a3a 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
@@ -24,9 +24,11 @@ import java.util.Date;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.lock.DistributedLock;
 import org.apache.helix.lock.LockInfo;
 import org.apache.helix.lock.LockScope;
+import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -34,7 +36,9 @@ import org.apache.log4j.Logger;
 
 
 /**
- * Helix nonblocking lock implementation based on Zookeeper
+ * Helix nonblocking lock implementation based on Zookeeper.
+ * NOTE: do NOT use ephemeral nodes in this implementation because ephemeral mode is not
supported
+ * in ZooScalability mode.
  */
 public class ZKDistributedNonblockingLock implements DistributedLock {
   private static final Logger LOG = Logger.getLogger(ZKDistributedNonblockingLock.class);
@@ -55,19 +59,19 @@ public class ZKDistributedNonblockingLock implements DistributedLock {
    */
   public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout,
       String lockMsg, String userId) {
-    this(scope.getPath(), zkAddress, timeout, lockMsg, userId);
+    this(scope.getPath(), timeout, lockMsg, userId, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
   }
 
   /**
    * Initialize the lock with user provided information, e.g., lock path under zookeeper,
etc.
    * @param lockPath the path of the lock under Zookeeper
-   * @param zkAddress the zk address of the cluster
    * @param timeout the timeout period of the lock
    * @param lockMsg the reason for having this lock
    * @param userId a universal unique userId for lock owner identity
+   * @param baseDataAccessor baseDataAccessor instance to do I/O against ZK with
    */
-  private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long timeout,
-      String lockMsg, String userId) {
+  private ZKDistributedNonblockingLock(String lockPath, Long timeout, String lockMsg, String
userId,
+      BaseDataAccessor<ZNRecord> baseDataAccessor) {
     _lockPath = lockPath;
     if (timeout < 0) {
       throw new IllegalArgumentException("The expiration time cannot be negative.");
@@ -75,7 +79,7 @@ public class ZKDistributedNonblockingLock implements DistributedLock {
     _timeout = timeout;
     _lockMsg = lockMsg;
     _userId = userId;
-    _baseDataAccessor = new ZkBaseDataAccessor<>(zkAddress);
+    _baseDataAccessor = baseDataAccessor;
   }
 
   @Override
@@ -143,11 +147,65 @@ public class ZKDistributedNonblockingLock implements DistributedLock
{
       if (!(System.currentTimeMillis() < curLockInfo.getTimeout()) || isCurrentOwner())
{
         return _record;
       }
-      // For users who are not the lock owner and try to do an update on a lock that is held
by someone else, exception thrown is to be caught by data accessor, and return false for the
update
+      // For users who are not the lock owner and try to do an update on a lock that is held
by
+      // someone else, exception thrown is to be caught by data accessor, and return false
for
+      // the update
       LOG.error(
           "User " + _userId + " tried to update the lock at " + new Date(System.currentTimeMillis())
               + ". Lock path: " + _lockPath);
       throw new HelixException("User is not authorized to perform this operation.");
     }
   }
+
+  /**
+   * Builder class to use with ZKDistributedNonblockingLock.
+   */
+  public static class Builder extends GenericZkHelixApiBuilder<Builder> {
+    private LockScope _lockScope;
+    private String _userId;
+    private long _timeout;
+    private String _lockMsg;
+
+    public Builder() {
+    }
+
+    public Builder setLockScope(LockScope lockScope) {
+      _lockScope = lockScope;
+      return this;
+    }
+
+    public Builder setUserId(String userId) {
+      _userId = userId;
+      return this;
+    }
+
+    public Builder setTimeout(long timeout) {
+      _timeout = timeout;
+      return this;
+    }
+
+    public Builder setLockMsg(String lockMsg) {
+      _lockMsg = lockMsg;
+      return this;
+    }
+
+    public ZKDistributedNonblockingLock build() {
+      // Resolve which way we want to create BaseDataAccessor instance
+      BaseDataAccessor<ZNRecord> baseDataAccessor;
+      // If enabled via System.Properties config or the given zkAddress is null, use ZooScalability
+      if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || _zkAddress == null)
{
+        // If the multi ZK config is enabled, use multi-realm mode with FederatedZkClient
+        baseDataAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>().setRealmMode(_realmMode)
+            .setRealmAwareZkClientConfig(_realmAwareZkClientConfig)
+            .setRealmAwareZkConnectionConfig(_realmAwareZkConnectionConfig).setZkAddress(_zkAddress)
+            .build();
+      } else {
+        baseDataAccessor = new ZkBaseDataAccessor<>(_zkAddress);
+      }
+
+      // Return a ZKDistributedNonblockingLock instance
+      return new ZKDistributedNonblockingLock(_lockScope.getPath(), _timeout, _lockMsg, _userId,
+          baseDataAccessor);
+    }
+  }
 }


Mime
View raw message