helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch distributed-lock updated: Implement Helix nonblocking lock (#718)
Date Thu, 20 Feb 2020 21:58:00 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch distributed-lock
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/distributed-lock by this push:
     new 29c99a0  Implement Helix nonblocking lock (#718)
29c99a0 is described below

commit 29c99a0de4a34246d2cb735052ce18e42068edb8
Author: mgao0 <31704180+mgao0@users.noreply.github.com>
AuthorDate: Thu Feb 20 13:57:51 2020 -0800

    Implement Helix nonblocking lock (#718)
    
    Implemented distributed nonblocking lock with acquire lock, release lock, get current
lock info, and check is current user functionalities.
---
 .../lock/{HelixLock.java => DistributedLock.java}  |  15 +-
 .../main/java/org/apache/helix/lock/LockInfo.java  | 111 ++++++++++++--
 .../helix/lock/{LockInfo.java => LockScope.java}   |  24 +--
 .../apache/helix/lock/helix/HelixLockScope.java    | 100 +++++++++++++
 .../lock/helix/ZKDistributedNonblockingLock.java   | 147 +++++++++++++++++++
 .../lock/helix/TestZKHelixNonblockingLock.java     | 163 +++++++++++++++++++++
 6 files changed, 520 insertions(+), 40 deletions(-)

diff --git a/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java
similarity index 74%
rename from helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
rename to helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java
index 01ef63b..594ccf4 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/DistributedLock.java
@@ -22,7 +22,7 @@ package org.apache.helix.lock;
 /**
  * Generic interface for Helix distributed lock
  */
-public interface HelixLock<T> {
+public interface DistributedLock {
   /**
    * Blocking call to acquire a lock
    * @return true if the lock was successfully acquired,
@@ -32,22 +32,21 @@ public interface HelixLock<T> {
 
   /**
    * Blocking call to release a lock
-   * @return true if the lock was successfully released,
-   * false if the locked is not locked or is not locked by the user,
-   * or the lock could not be released
+   * @return true if the lock was successfully released or if the locked is not currently
locked,
+   * false if the lock is not locked by the user or the release operation failed
    */
   boolean releaseLock();
 
   /**
-   * Retrieve the lock information, e.g. lock timeout, lock message, etc.
+   * Retrieve the information of the current lock on the resource this lock object specifies,
e.g. lock timeout, lock message, etc.
    * @return lock metadata information
    */
-  LockInfo<T> getLockInfo();
+  LockInfo getCurrentLockInfo();
 
   /**
-   * If the user is current lock owner
+   * If the user is current lock owner of the resource
    * @return true if the user is the lock owner,
    * false if the user is not the lock owner or the lock doesn't have a owner
    */
-  boolean isOwner();
+  boolean isCurrentOwner();
 }
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
index 30322bb..71f7258 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
@@ -19,28 +19,113 @@
 
 package org.apache.helix.lock;
 
-import java.util.Map;
+import org.apache.helix.ZNRecord;
 
 
 /**
- * Generic interface for a map contains the Helix lock information
- * @param <T> The type of the LockInfo value
+ * Structure represents a lock node information, implemented using ZNRecord
  */
-public interface LockInfo<T> {
+public class LockInfo {
 
-  //TODO: add specific setter and getter for any field that is determined to be universal
for all implementations of HelixLock
+  // Default values for each attribute if there are no current values set by user
+  public static final String DEFAULT_OWNER_TEXT = "";
+  public static final String DEFAULT_MESSAGE_TEXT = "";
+  public static final long DEFAULT_TIMEOUT_LONG = -1L;
+
+  // default lock info represents the status of a unlocked lock
+  public static final LockInfo defaultLockInfo =
+      new LockInfo(DEFAULT_OWNER_TEXT, DEFAULT_MESSAGE_TEXT, DEFAULT_TIMEOUT_LONG);
+
+  private static final String ZNODE_ID = "LOCK";
+  private ZNRecord _record;
+
+  /**
+   * The keys to lock information
+   */
+  public enum LockInfoAttribute {
+    OWNER,
+    MESSAGE,
+    TIMEOUT
+  }
+
+  /**
+   * Initialize a default LockInfo instance
+   */
+  private LockInfo() {
+    _record = new ZNRecord(ZNODE_ID);
+    setLockInfoFields(DEFAULT_OWNER_TEXT, DEFAULT_MESSAGE_TEXT, DEFAULT_TIMEOUT_LONG);
+  }
+
+  /**
+   * Initialize a LockInfo with a ZNRecord, set all info fields to default data
+   * @param znRecord The ZNRecord contains lock node data that used to initialize the LockInfo
+   */
+  public LockInfo(ZNRecord znRecord) {
+    this();
+    if (znRecord != null) {
+      String ownerId = znRecord.getSimpleField(LockInfoAttribute.OWNER.name());
+      String message = znRecord.getSimpleField(LockInfoAttribute.MESSAGE.name());
+      long timeout = znRecord.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_LONG);
+      setLockInfoFields(ownerId, message, timeout);
+    }
+  }
+
+  /**
+   * Initialize a LockInfo with data for each field, set all null info fields to default
data
+   * @param ownerId value of OWNER attribute
+   * @param message value of MESSAGE attribute
+   * @param timeout value of TIMEOUT attribute
+   */
+  public LockInfo(String ownerId, String message, long timeout) {
+    this();
+    setLockInfoFields(ownerId, message, timeout);
+  }
+
+  /**
+   * Set each field of lock info to user provided values if the values are not null, null
values are set to default values
+   * @param ownerId value of OWNER attribute
+   * @param message value of MESSAGE attribute
+   * @param timeout value of TIMEOUT attribute
+   */
+  private void setLockInfoFields(String ownerId, String message, long timeout) {
+    _record.setSimpleField(LockInfoAttribute.OWNER.name(),
+        ownerId == null ? DEFAULT_OWNER_TEXT : ownerId);
+    _record.setSimpleField(LockInfoAttribute.MESSAGE.name(),
+        message == null ? DEFAULT_MESSAGE_TEXT : message);
+    _record.setLongField(LockInfoAttribute.TIMEOUT.name(), timeout);
+  }
+
+  /**
+   * Get the value for OWNER attribute of the lock
+   * @return the owner id of the lock, empty string if there is no owner id set
+   */
+  public String getOwner() {
+    String owner = _record.getSimpleField(LockInfoAttribute.OWNER.name());
+    return owner == null ? DEFAULT_OWNER_TEXT : owner;
+  }
+
+  /**
+   * Get the value for MESSAGE attribute of the lock
+   * @return the message of the lock, empty string if there is no message set
+   */
+  public String getMessage() {
+    String message = _record.getSimpleField(LockInfoAttribute.MESSAGE.name());
+    return message == null ? DEFAULT_MESSAGE_TEXT : message;
+  }
 
   /**
-   * Create a single filed of LockInfo, or update the value of the field if it already exists
-   * @param infoKey the key of the LockInfo field
-   * @param infoValue the value of the LockInfo field
+   * Get the value for TIMEOUT attribute of the lock
+   * @return the expiring time of the lock, -1 if there is no timeout set
    */
-  void setInfoValue(String infoKey, T infoValue);
+  public Long getTimeout() {
+    return _record.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_LONG);
+  }
 
   /**
-   * Get the value of a field in LockInfo
-   * @param infoKey the key of the LockInfo field
-   * @return the value of the field or null if this key does not exist
+   * Get the underlying ZNRecord in a LockInfo
+   * @return lock information contained in a ZNRecord
    */
-  T getInfoValue(String infoKey);
+  public ZNRecord getRecord() {
+    return _record;
+  }
 }
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/LockScope.java
similarity index 53%
copy from helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
copy to helix-lock/src/main/java/org/apache/helix/lock/LockScope.java
index 30322bb..0fa5cfc 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/LockScope.java
@@ -19,28 +19,14 @@
 
 package org.apache.helix.lock;
 
-import java.util.Map;
-
-
 /**
- * Generic interface for a map contains the Helix lock information
- * @param <T> The type of the LockInfo value
+ * A predefined class to generate the lock path based on user input
  */
-public interface LockInfo<T> {
-
-  //TODO: add specific setter and getter for any field that is determined to be universal
for all implementations of HelixLock
-
-  /**
-   * Create a single filed of LockInfo, or update the value of the field if it already exists
-   * @param infoKey the key of the LockInfo field
-   * @param infoValue the value of the LockInfo field
-   */
-  void setInfoValue(String infoKey, T infoValue);
+public interface LockScope {
 
   /**
-   * Get the value of a field in LockInfo
-   * @param infoKey the key of the LockInfo field
-   * @return the value of the field or null if this key does not exist
+   * Get the lock path
+   * @return the path of the lock
    */
-  T getInfoValue(String infoKey);
+  String getPath();
 }
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/helix/HelixLockScope.java b/helix-lock/src/main/java/org/apache/helix/lock/helix/HelixLockScope.java
new file mode 100644
index 0000000..8eec472
--- /dev/null
+++ b/helix-lock/src/main/java/org/apache/helix/lock/helix/HelixLockScope.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix.lock.helix;
+
+import java.util.List;
+
+import org.apache.helix.lock.LockScope;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.util.StringTemplate;
+
+
+/**
+ *  Defines the various scopes of Helix locks, and how they are represented on Zookeeper
+ */
+public class HelixLockScope implements LockScope {
+
+  /**
+   * Define various properties of Helix lock, and associate them with the number of arguments
required for getting znode path
+   */
+  public enum LockScopeProperty {
+
+    CLUSTER(2),
+
+    PARTICIPANT(2),
+
+    RESOURCE(2);
+
+    //the number of arguments required to generate a full path for the specific scope
+    final int _pathArgNum;
+
+    /**
+     * Initialize a LockScopeProperty
+     * @param pathArgNum the number of arguments required to generate a full path for the
specific scope
+    \     */
+    private LockScopeProperty(int pathArgNum) {
+      _pathArgNum = pathArgNum;
+    }
+
+    /**
+     * Get the number of template arguments required to generate a full path
+     * @return number of template arguments in the path
+     */
+    public int getPathArgNum() {
+      return _pathArgNum;
+    }
+  }
+
+  /**
+   * string templates to generate znode path
+   */
+  private static final StringTemplate template = new StringTemplate();
+
+  //TODO: Enrich the logic of path generation once we have a more detailed design
+  static {
+    template.addEntry(LockScopeProperty.CLUSTER, 2, "/{clusterName}/LOCK/CLUSTER/{clusterName}");
+    template.addEntry(HelixLockScope.LockScopeProperty.PARTICIPANT, 2,
+        "/{clusterName}/LOCK/PARTICIPANT/{participantName}");
+    template.addEntry(HelixLockScope.LockScopeProperty.RESOURCE, 2,
+        "/{clusterName}/LOCK/RESOURCE/{resourceName}");
+  }
+
+  private final String _path;
+
+  /**
+   * Initialize with a type of scope and unique identifiers
+   * @param type the scope
+   * @param pathKeys keys identifying a ZNode location
+   */
+  public HelixLockScope(HelixLockScope.LockScopeProperty type, List<String> pathKeys)
{
+
+    if (pathKeys.size() != type.getPathArgNum()) {
+      throw new IllegalArgumentException(
+          type + " requires " + type.getPathArgNum() + " arguments to get znode, but was:
"
+              + pathKeys);
+    }
+    _path = template.instantiate(type, pathKeys.toArray(new String[0]));
+  }
+
+  @Override
+  public String getPath() {
+    return _path;
+  }
+}
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
new file mode 100644
index 0000000..8fe8c8c
--- /dev/null
+++ b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix.lock.helix;
+
+import java.util.Date;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.lock.DistributedLock;
+import org.apache.helix.lock.LockInfo;
+import org.apache.helix.lock.LockScope;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Helix nonblocking lock implementation based on Zookeeper
+ */
+public class ZKDistributedNonblockingLock implements DistributedLock {
+
+  private static final Logger LOG = Logger.getLogger(ZKDistributedNonblockingLock.class);
+
+  private final String _lockPath;
+  private final String _userId;
+  private final long _timeout;
+  private final String _lockMsg;
+  private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
+
+  /**
+   * Initialize the lock with user provided information, e.g.,cluster, scope, etc.
+   * @param scope the scope to lock
+   * @param zkAddress the zk address the cluster connects to
+   * @param timeout the timeout period of the lcok
+   * @param lockMsg the reason for having this lock
+   * @param userId a universal unique userId for lock owner identity
+   */
+  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout, String
lockMsg,
+      String userId) {
+    this(scope.getPath(), zkAddress, timeout, lockMsg, userId);
+  }
+
+  /**
+   * Initialize the lock with user provided information, e.g., lock path under zookeeper,
etc.
+   * @param lockPath the path of the lock under Zookeeper
+   * @param zkAddress the zk address of the cluster
+   * @param timeout the timeout period of the lcok
+   * @param lockMsg the reason for having this lock
+   * @param userId a universal unique userId for lock owner identity
+   */
+  private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long timeout, String
lockMsg,
+      String userId) {
+    _lockPath = lockPath;
+    if (timeout < 0) {
+      throw new IllegalArgumentException("The expiration time cannot be negative.");
+    }
+    _timeout = timeout;
+    _lockMsg = lockMsg;
+    _userId = userId;
+    _baseDataAccessor = new ZkBaseDataAccessor<>(zkAddress);
+  }
+
+  @Override
+  public boolean acquireLock() {
+
+    // Set lock information fields
+    long deadline;
+    // Prevent value overflow
+    if (_timeout > Long.MAX_VALUE - System.currentTimeMillis()) {
+      deadline = Long.MAX_VALUE;
+    } else {
+      deadline = System.currentTimeMillis() + _timeout;
+    }
+    LockUpdater updater = new LockUpdater(new LockInfo(_userId, _lockMsg, deadline));
+    return _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
+  }
+
+  //TODO: update release lock logic so it would not leave empty znodes after the lock is
released
+  @Override
+  public boolean releaseLock() {
+    // Initialize the lock updater with a default lock info represents the state of a unlocked
lock
+    LockUpdater updater = new LockUpdater(LockInfo.defaultLockInfo);
+    return _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public LockInfo getCurrentLockInfo() {
+    ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
+    return new LockInfo(curLockInfo);
+  }
+
+  @Override
+  public boolean isCurrentOwner() {
+    LockInfo lockInfo = getCurrentLockInfo();
+    return lockInfo.getOwner().equals(_userId) && (System.currentTimeMillis() <
lockInfo
+        .getTimeout());
+  }
+
+  /**
+   * Class that specifies how a lock node should be updated with another lock node
+   */
+  private class LockUpdater implements DataUpdater<ZNRecord> {
+    final ZNRecord _record;
+
+    /**
+     * Initialize a structure for lock user to update a lock node value
+     * @param lockInfo the lock node value will be used to update the lock
+     */
+    public LockUpdater(LockInfo lockInfo) {
+      _record = lockInfo.getRecord();
+    }
+
+    @Override
+    public ZNRecord update(ZNRecord current) {
+      // If no one owns the lock, allow the update
+      // If the user is the current lock owner, allow the update
+      LockInfo curLockInfo = new LockInfo(current);
+      if (!(System.currentTimeMillis() < curLockInfo.getTimeout()) || isCurrentOwner())
{
+        return _record;
+      }
+      // For users who are not the lock owner and try to do an update on a lock that is held
by someone else, exception thrown is to be caught by data accessor, and return false for the
update
+      LOG.error(
+          "User " + _userId + " tried to update the lock at " + new Date(System.currentTimeMillis())
+              + ". Lock path: " + _lockPath);
+      throw new HelixException("User is not authorized to perform this operation.");
+    }
+  }
+}
diff --git a/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java
b/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java
new file mode 100644
index 0000000..08cf33b
--- /dev/null
+++ b/helix-lock/src/test/java/org/apache/helix/lock/helix/TestZKHelixNonblockingLock.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix.lock.helix;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.lock.LockInfo;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestZKHelixNonblockingLock extends ZkTestBase {
+
+  private final String _clusterName = TestHelper.getTestClassName();
+  private final String _lockMessage = "Test";
+  private String _lockPath;
+  private ZKDistributedNonblockingLock _lock;
+  private String _userId;
+  private HelixLockScope _participantScope;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+
+    System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5,
3,
+        "MasterSlave", true);
+    _userId = UUID.randomUUID().toString();
+
+    List<String> pathKeys = new ArrayList<>();
+    pathKeys.add(_clusterName);
+    pathKeys.add(_clusterName);
+
+    _participantScope = new HelixLockScope(HelixLockScope.LockScopeProperty.CLUSTER, pathKeys);
+    _lockPath = _participantScope.getPath();
+    _lock = new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE,
_lockMessage,
+        _userId);
+  }
+
+  @BeforeMethod
+  public void beforeMethod() {
+    _gZkClient.delete(_lockPath);
+    Assert.assertFalse(_gZkClient.exists(_lockPath));
+  }
+
+  @Test
+  public void testAcquireLock() {
+
+    // Acquire lock
+    _lock.acquireLock();
+    Assert.assertTrue(_gZkClient.exists(_lockPath));
+
+    // Get lock information
+    LockInfo lockInfo = _lock.getCurrentLockInfo();
+    Assert.assertEquals(lockInfo.getOwner(), _userId);
+    Assert.assertEquals(lockInfo.getMessage(), _lockMessage);
+
+    // Check if the user is lock owner
+    Assert.assertTrue(_lock.isCurrentOwner());
+
+    // Release lock
+    _lock.releaseLock();
+    Assert.assertFalse(_lock.isCurrentOwner());
+  }
+
+  @Test
+  public void testAcquireLockWhenExistingLockNotExpired() {
+
+    // Fake condition when the lock owner is not current user
+    String fakeUserID = UUID.randomUUID().toString();
+    ZNRecord fakeRecord = new ZNRecord(fakeUserID);
+    fakeRecord.setSimpleField(LockInfo.LockInfoAttribute.OWNER.name(), fakeUserID);
+    fakeRecord
+        .setSimpleField(LockInfo.LockInfoAttribute.TIMEOUT.name(), String.valueOf(Long.MAX_VALUE));
+    _gZkClient.create(_lockPath, fakeRecord, CreateMode.PERSISTENT);
+
+    // Check if the user is lock owner
+    Assert.assertFalse(_lock.isCurrentOwner());
+
+    // Acquire lock
+    Assert.assertFalse(_lock.acquireLock());
+    Assert.assertFalse(_lock.isCurrentOwner());
+
+    // Release lock
+    Assert.assertFalse(_lock.releaseLock());
+  }
+
+  @Test
+  public void testAcquireLockWhenExistingLockExpired() {
+
+    // Fake condition when the current lock already expired
+    String fakeUserID = UUID.randomUUID().toString();
+    ZNRecord fakeRecord = new ZNRecord(fakeUserID);
+    fakeRecord.setSimpleField(LockInfo.LockInfoAttribute.OWNER.name(), fakeUserID);
+    fakeRecord.setSimpleField(LockInfo.LockInfoAttribute.TIMEOUT.name(),
+        String.valueOf(System.currentTimeMillis()));
+    _gZkClient.create(_lockPath, fakeRecord, CreateMode.PERSISTENT);
+
+    // Acquire lock
+    Assert.assertTrue(_lock.acquireLock());
+    Assert.assertTrue(_lock.isCurrentOwner());
+
+    // Release lock
+    Assert.assertTrue(_lock.releaseLock());
+    Assert.assertFalse(_lock.isCurrentOwner());
+  }
+
+  @Test
+  public void testSimultaneousAcquire() {
+    List<Callable<Boolean>> threads = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      ZKDistributedNonblockingLock lock =
+          new ZKDistributedNonblockingLock(_participantScope, ZK_ADDR, Long.MAX_VALUE, _lockMessage,
+              UUID.randomUUID().toString());
+      threads.add(new TestSimultaneousAcquireLock(lock));
+    }
+    Map<String, Boolean> resultMap = TestHelper.startThreadsConcurrently(threads, 1000);
+    Assert.assertEquals(resultMap.size(), 2);
+    Assert.assertEqualsNoOrder(resultMap.values().toArray(), new Boolean[]{true, false});
+  }
+
+  private static class TestSimultaneousAcquireLock implements Callable<Boolean> {
+    final ZKDistributedNonblockingLock _lock;
+
+    TestSimultaneousAcquireLock(ZKDistributedNonblockingLock lock) {
+      _lock = lock;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      return _lock.acquireLock();
+    }
+  }
+}
+


Mime
View raw message