Implementation of LockClient for Lattice. (#2457)

Implementation of LockClient for Lattice.

Co-authored-by: mapeng <mapeng@linkedin.com>
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
index 00fe441..3ed6928 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
@@ -19,15 +19,20 @@
  * under the License.
  */
 
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * The DataRecord object is a wrapper around ZNRecord.
  * TODO: Create an interface to decouple DataRecord and have a pluggable record store.
  */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class DataRecord extends ZNRecord {
   public DataRecord(String znodeId) {
     super(znodeId);
   }
-}
 
+  public DataRecord(ZNRecord record) {
+    super(record);
+  }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
index cf3feb0..0453743 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
@@ -21,6 +21,9 @@
 
 
 import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +35,18 @@
   private static final Logger LOG = LoggerFactory.getLogger(MetaClientFactory.class);
 
   public MetaClientInterface getMetaClient(MetaClientConfig config) {
+    if (config == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
+          setConnectionAddress(config.getConnectionAddress())
+          .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
+          .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
+          .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
+          .build();
+      return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+    }
     return null;
   }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
index b3480b1..9eba28b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientFactory.java
@@ -27,7 +27,10 @@
 public class ZkMetaClientFactory extends MetaClientFactory {
   @Override
   public MetaClientInterface getMetaClient(MetaClientConfig config) {
-    if (config.getStoreType() == MetaClientConfig.StoreType.ZOOKEEPER
+    if (config == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())
         && config instanceof ZkMetaClientConfig) {
       return new ZkMetaClient((ZkMetaClientConfig) config);
     }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
new file mode 100644
index 0000000..cd5c3c2
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java
@@ -0,0 +1,123 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.api.Op;
+import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class LockClient implements LockClientInterface, AutoCloseable {
+  private final MetaClientInterface<LockInfo> _metaClient;
+  //NEW_METACLIENT is used to indicate whether the metaClient is created by the LockClient or not.
+  private static Boolean NEW_METACLIENT = false;
+  private static final Logger LOG = LoggerFactory.getLogger(LockClient.class);
+
+  public LockClient(MetaClientConfig config) {
+    if (config == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    LOG.info("Creating MetaClient for LockClient");
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
+          setConnectionAddress(config.getConnectionAddress())
+          // Currently only support ZNRecordSerializer. TODO: make this configurable
+          .setZkSerializer((new ZNRecordSerializer()))
+          .build();
+      _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+      NEW_METACLIENT = true;
+    } else {
+      throw new MetaClientException("Unsupported store type: " + config.getStoreType());
+    }
+  }
+
+  public LockClient(MetaClientInterface<LockInfo> client) {
+    if (client == null) {
+      throw new IllegalArgumentException("MetaClient cannot be null.");
+    }
+    _metaClient = client;
+    LOG.info("Connecting to existing MetaClient for LockClient");
+    _metaClient.connect();
+  }
+
+  @Override
+  public void acquireLock(String key, LockInfo lockInfo, MetaClientInterface.EntryMode mode) {
+    _metaClient.create(key, lockInfo, mode);
+  }
+
+  @Override
+  public void acquireLockWithTTL(String key, LockInfo lockInfo, long ttl) {
+    _metaClient.createWithTTL(key, lockInfo, ttl);
+  }
+
+  @Override
+  public void renewTTLLock(String key) {
+    _metaClient.renewTTLNode(key);
+  }
+
+  @Override
+  public void releaseLock(String key) {
+    MetaClientInterface.Stat stat = _metaClient.exists(key);
+    if (stat != null) {
+      int version = stat.getVersion();
+      List<Op> ops = Arrays.asList(
+          Op.check(key, version),
+          Op.delete(key, version));
+      _metaClient.transactionOP(ops);
+      if (_metaClient.exists(key) != null) {
+        throw new MetaClientException("Failed to release lock for key: " + key);
+      }
+    }
+  }
+
+  @Override
+  public LockInfo retrieveLock(String key) {
+    MetaClientInterface.Stat stat = _metaClient.exists(key);
+    if (stat == null) {
+      return null;
+    }
+    //Create a new DataRecord from underlying record
+    DataRecord dataRecord = new DataRecord(_metaClient.get(key));
+    //Create a new LockInfo from DataRecord
+    LockInfo lockInfo = new LockInfo(dataRecord, stat);
+    return lockInfo;
+  }
+
+  @Override
+  public void close() {
+    if (NEW_METACLIENT) {
+      LOG.info("Closing created MetaClient for LockClient");
+    } else {
+      LOG.warn("Closing existing MetaClient");
+    }
+    _metaClient.disconnect();
+  }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClientInterface.java
index 81fe6a3..02de695 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClientInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClientInterface.java
@@ -27,27 +27,31 @@
    * @param key key to identify the entry
    * @param info Metadata of the lock
    * @param mode EntryMode identifying if the entry will be deleted upon client disconnect
-   *             (Persistent, Ephemeral, or TTL)
-   * @return True if the lock is acquired. Raises exception if fails.
+   *             (Persistent, Ephemeral, or Container)
    */
-  boolean acquireLock(String key, LockInfo info, MetaClientInterface.EntryMode mode);
+  void acquireLock(String key, LockInfo info, MetaClientInterface.EntryMode mode);
+
+  /**
+   * Acquires a lock at key with a TTL. The lock will be deleted after the TTL.
+   * @param key key to identify the entry
+   * @param info Metadata of the lock
+   * @param ttl Time to live in milliseconds
+   */
+  void acquireLockWithTTL(String key, LockInfo info, long ttl);
 
   /**
    * Renews lock for a TTL Node.
    * Will fail if key is an invalid path or isn't of type TTL.
    * @param key key to identify the entry
-   * @return True if the lock was renewed. Raises exception if fails.
    */
-  boolean renewTTLLock(String key);
+  void renewTTLLock(String key);
 
   /**
    * Releases the lock.
    * Will fail if key is an invalid path.
    * @param key key to identify the entry
-   * @return True if the lock was released or the lock had already been released.
-   * Raises exception if fails.
    */
-  boolean releaseLock(String key);
+  void releaseLock(String key);
 
   /**
    * Obtains the metadata of a lock (the LockInfo).
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockInfo.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockInfo.java
index d579427..26736f1 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockInfo.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockInfo.java
@@ -19,16 +19,16 @@
  * under the License.
  */
 
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
 
 
 /**
  * This structure represents a Lock node information, implemented using DataRecord
  */
-public class LockInfo {
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class LockInfo extends DataRecord {
 
   // Default values for each attribute if there are no current values set by user
   public static final String DEFAULT_LOCK_ID_TEXT = "";
@@ -37,9 +37,9 @@
   public static final String DEFAULT_CLIENT_DATA = "";
   public static final long DEFAULT_GRANTED_AT_LONG = -1L;
   public static final long DEFAULT_LAST_RENEWED_AT_LONG = -1L;
-  public static final Duration DEFAULT_TIMEOUT_DURATION = Duration.ofMillis(-1L);
-  private static final String ZNODE_ID = "LOCK";
-  private final DataRecord _record;
+  public static final long DEFAULT_TIMEOUT_DURATION = -1L;
+  private static final String DEFAULT_LOCK_INFO = "lockInfo.";
+  private DataRecord _dataRecord;
 
   /**
    * The keys to lock information
@@ -57,14 +57,15 @@
   /**
    * Initialize a default LockInfo instance
    */
-  private LockInfo() {
-    _record = new DataRecord(ZNODE_ID);
+  public LockInfo() {
+    super(DEFAULT_LOCK_INFO);
+    _dataRecord = new DataRecord(DEFAULT_LOCK_INFO);
     setLockInfoFields(DEFAULT_LOCK_ID_TEXT, DEFAULT_OWNER_ID_TEXT, DEFAULT_CLIENT_ID_TEXT, DEFAULT_CLIENT_DATA, DEFAULT_GRANTED_AT_LONG,
         DEFAULT_LAST_RENEWED_AT_LONG, DEFAULT_TIMEOUT_DURATION);
   }
 
   /**
-   * Initialize a LockInfo with a ZNRecord, set all info fields to default data
+   * Initialize a LockInfo with a DataRecord, set all info fields to default data
    * @param dataRecord The dataRecord contains lock node data that used to initialize the LockInfo
    */
   public LockInfo(DataRecord dataRecord) {
@@ -77,14 +78,25 @@
       long grantTime = dataRecord.getLongField(LockInfoAttribute.GRANTED_AT.name(), DEFAULT_GRANTED_AT_LONG);
       long lastRenewalTime =
           dataRecord.getLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), DEFAULT_LAST_RENEWED_AT_LONG);
-      long timeout = dataRecord.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_DURATION.toMillis());
-      Duration duration = Duration.of(timeout, ChronoUnit.MILLIS);
+      long timeout = dataRecord.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_DURATION);
       setLockInfoFields(lockId,ownerId, clientId, clientData, grantTime,
-          lastRenewalTime, duration);
+          lastRenewalTime, timeout);
     }
   }
 
   /**
+   * Initialize a LockInfo with a DataRecord and a Stat, set all info fields to default data
+   * @param dataRecord The dataRecord contains lock node data that used to initialize the LockInfo
+   * @param stat The stat of the lock node
+   */
+  public LockInfo(DataRecord dataRecord, MetaClientInterface.Stat stat) {
+    this(dataRecord);
+    //Synchronize the lockInfo with the stat
+    setGrantedAt(stat.getCreationTime());
+    setLastRenewedAt(stat.getModifiedTime());
+  }
+
+  /**
    * Initialize a LockInfo with data for each field, set all null info fields to default data
    * @param lockId value of LOCK_ID attribute
    * @param ownerId value of OWNER_ID attribute
@@ -95,7 +107,7 @@
    * @param timeout value of TIMEOUT attribute
    */
   public LockInfo(String lockId, String ownerId, String clientId,
-                  String clientData, long grantTime, long lastRenewalTime, Duration timeout) {
+                  String clientData, long grantTime, long lastRenewalTime, long timeout) {
     this();
     setLockInfoFields(lockId, ownerId, clientId, clientData, grantTime, lastRenewalTime, timeout);
   }
@@ -112,7 +124,7 @@
    * @param timeout value of TIMEOUT attribute
    */
   private void setLockInfoFields(String lockId, String ownerId, String clientId, String clientData, long grantTime, long lastRenewalTime,
-                                 Duration timeout) {
+                                 long timeout) {
     setLockId(lockId);
     setOwnerId(ownerId);
     setClientId(clientId);
@@ -128,7 +140,7 @@
    *               It is created by the lockClient and a new one is created for each time the lock is acquired.
    */
   public void setLockId(String lockId) {
-    _record.setSimpleField(LockInfoAttribute.LOCK_ID.name(), lockId == null ? DEFAULT_LOCK_ID_TEXT : lockId);
+    _dataRecord.setSimpleField(LockInfoAttribute.LOCK_ID.name(), lockId == null ? DEFAULT_LOCK_ID_TEXT : lockId);
   }
 
   /**
@@ -138,7 +150,7 @@
    *                by the same owner.
    */
   public void setOwnerId(String ownerId) {
-    _record.setSimpleField(LockInfoAttribute.OWNER_ID.name(), ownerId == null ? DEFAULT_OWNER_ID_TEXT : ownerId);
+    _dataRecord.setSimpleField(LockInfoAttribute.OWNER_ID.name(), ownerId == null ? DEFAULT_OWNER_ID_TEXT : ownerId);
   }
 
   /**
@@ -146,7 +158,7 @@
    * @param clientId Unique identifier that represents who will get the lock (the client).
    */
   public void setClientId(String clientId) {
-    _record.setSimpleField(LockInfoAttribute.CLIENT_ID.name(), clientId == null ? DEFAULT_CLIENT_ID_TEXT : clientId);
+    _dataRecord.setSimpleField(LockInfoAttribute.CLIENT_ID.name(), clientId == null ? DEFAULT_CLIENT_ID_TEXT : clientId);
   }
 
   /**
@@ -154,7 +166,7 @@
    * @param clientData String representing the serialized data object
    */
   public void setClientData(String clientData) {
-    _record.setSimpleField(LockInfoAttribute.CLIENT_DATA.name(), clientData == null ? DEFAULT_CLIENT_DATA : clientData);
+    _dataRecord.setSimpleField(LockInfoAttribute.CLIENT_DATA.name(), clientData == null ? DEFAULT_CLIENT_DATA : clientData);
   }
 
   /**
@@ -162,7 +174,7 @@
    * @param grantTime Long representing the time at which the lock was granted
    */
   public void setGrantedAt(Long grantTime) {
-    _record.setLongField(LockInfoAttribute.GRANTED_AT.name(), grantTime);
+    _dataRecord.setLongField(LockInfoAttribute.GRANTED_AT.name(), grantTime);
   }
 
   /**
@@ -170,16 +182,16 @@
    * @param lastRenewalTime Long representing the time at which the lock was last renewed
    */
   public void setLastRenewedAt(Long lastRenewalTime) {
-    _record.setLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), lastRenewalTime);
+    _dataRecord.setLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), lastRenewalTime);
   }
 
   /**
    * Get the value for TIMEOUT attribute of the lock
-   * @param timeout Duration object representing the duration of a lock.
+   * @param timeout Long representing the duration of a lock in milliseconds.
    */
-  public void setTimeout(Duration timeout) {
+  public void setTimeout(long timeout) {
     // Always store the timeout value in milliseconds for the sake of simplicity
-    _record.setLongField(LockInfoAttribute.TIMEOUT.name(), timeout.toMillis());
+    _dataRecord.setLongField(LockInfoAttribute.TIMEOUT.name(), timeout);
   }
 
   /**
@@ -187,7 +199,7 @@
    * @return the owner id of the lock, {@link #DEFAULT_OWNER_ID_TEXT} if there is no owner id set
    */
   public String getOwnerId() {
-    return _record.getStringField(LockInfoAttribute.OWNER_ID.name(), DEFAULT_OWNER_ID_TEXT);
+    return _dataRecord.getStringField(LockInfoAttribute.OWNER_ID.name(), DEFAULT_OWNER_ID_TEXT);
   }
 
   /**
@@ -195,7 +207,7 @@
    * @return the client id of the lock, {@link #DEFAULT_CLIENT_ID_TEXT} if there is no client id set
    */
   public String getClientId() {
-    return _record.getStringField(LockInfoAttribute.CLIENT_ID.name(), DEFAULT_CLIENT_ID_TEXT);
+    return _dataRecord.getStringField(LockInfoAttribute.CLIENT_ID.name(), DEFAULT_CLIENT_ID_TEXT);
   }
 
   /**
@@ -203,7 +215,7 @@
    * @return the id of the lock, {@link #DEFAULT_LOCK_ID_TEXT} if there is no lock id set
    */
   public String getLockId() {
-    return _record.getStringField(LockInfoAttribute.LOCK_ID.name(), DEFAULT_LOCK_ID_TEXT);
+    return _dataRecord.getStringField(LockInfoAttribute.LOCK_ID.name(), DEFAULT_LOCK_ID_TEXT);
   }
 
   /**
@@ -212,7 +224,7 @@
    * if there is no client data set.
    */
   public String getClientData() {
-    return _record.getStringField(LockInfoAttribute.CLIENT_DATA.name(), DEFAULT_CLIENT_DATA);
+    return _dataRecord.getStringField(LockInfoAttribute.CLIENT_DATA.name(), DEFAULT_CLIENT_DATA);
   }
 
   /**
@@ -221,7 +233,7 @@
    * if there is no grant time set
    */
   public Long getGrantedAt() {
-    return _record.getLongField(LockInfoAttribute.GRANTED_AT.name(), DEFAULT_GRANTED_AT_LONG);
+    return _dataRecord.getLongField(LockInfoAttribute.GRANTED_AT.name(), DEFAULT_GRANTED_AT_LONG);
   }
 
   /**
@@ -230,24 +242,15 @@
    * if there is no renewal time set
    */
   public Long getLastRenewedAt() {
-    return _record
-        .getLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), DEFAULT_LAST_RENEWED_AT_LONG);
+    return _dataRecord.getLongField(LockInfoAttribute.LAST_RENEWED_AT.name(), DEFAULT_LAST_RENEWED_AT_LONG);
   }
 
   /**
    * Get the value for TIMEOUT attribute of the lock
    * @return the expiring time of the lock, {@link #DEFAULT_TIMEOUT_DURATION} if there is no timeout set
    */
-  public Duration getTimeout() {
-    long timeout = _record.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_DURATION.toMillis());
-    return Duration.of(timeout, ChronoUnit.MILLIS);
+  public long getTimeout() {
+    return _dataRecord.getLongField(LockInfoAttribute.TIMEOUT.name(), DEFAULT_TIMEOUT_DURATION);
   }
 
-  /**
-   * Get the underlying DataRecord in a LockInfo
-   * @return lock information contained in a DataRecord
-   */
-  public DataRecord getRecord() {
-    return _record;
-  }
 }
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
index e00eb9e..2a5f4b9 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
@@ -23,7 +23,6 @@
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.helix.metaclient.factories.MetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockClientTest.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockClientTest.java
new file mode 100644
index 0000000..a8ca387
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockClientTest.java
@@ -0,0 +1,110 @@
+package org.apache.helix.metaclient.recipes.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class LockClientTest extends ZkMetaClientTestBase {
+
+  private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
+
+  public LockClient createLockClient() {
+
+    MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER;
+    MetaClientConfig config = new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
+        .setStoreType(storeType).build();
+    return new LockClient(config);
+  }
+
+  @Test
+  public void testAcquireLock() {
+    final String key = "/TestLockClient_testAcquireLock";
+    LockClient lockClient = createLockClient();
+    LockInfo lockInfo = new LockInfo();
+    lockClient.acquireLock(key, lockInfo, MetaClientInterface.EntryMode.PERSISTENT);
+    Assert.assertNotNull(lockClient.retrieveLock(key));
+    try {
+      lockClient.acquireLock(TEST_INVALID_PATH, new LockInfo(), MetaClientInterface.EntryMode.PERSISTENT);
+      Assert.fail("Should not be able to acquire lock for key: " + key);
+    } catch (Exception e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testReleaseLock() {
+    final String key = "/TestLockClient_testReleaseLock";
+    LockClient lockClient = createLockClient();
+    LockInfo lockInfo = new LockInfo();
+    lockClient.acquireLock(key, lockInfo, MetaClientInterface.EntryMode.PERSISTENT);
+    Assert.assertNotNull(lockClient.retrieveLock(key));
+
+    lockClient.releaseLock(key);
+    Assert.assertNull(lockClient.retrieveLock(key));
+    lockClient.releaseLock(TEST_INVALID_PATH);
+  }
+
+  @Test
+  public void testAcquireTTLLock() {
+    final String key = "/TestLockClient_testAcquireTTLLock";
+    LockClient lockClient = createLockClient();
+    LockInfo lockInfo = new LockInfo();
+    lockClient.acquireLockWithTTL(key, lockInfo, 1L);
+    Assert.assertNotNull(lockClient.retrieveLock(key));
+    try {
+      lockClient.acquireLockWithTTL(TEST_INVALID_PATH, lockInfo, 1L);
+      Assert.fail("Should not be able to acquire lock for key: " + key);
+    } catch (Exception e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testRetrieveLock() {
+    final String key = "/TestLockClient_testRetrieveLock";
+    LockClient lockClient = createLockClient();
+    LockInfo lockInfo = new LockInfo();
+    lockClient.acquireLock(key, lockInfo, MetaClientInterface.EntryMode.PERSISTENT);
+    Assert.assertNotNull(lockClient.retrieveLock(key));
+    Assert.assertNull(lockClient.retrieveLock(TEST_INVALID_PATH));
+  }
+
+  @Test
+  public void testRenewTTLLock() {
+    final String key = "/TestLockClient_testRenewTTLLock";
+    LockClient lockClient = createLockClient();
+    LockInfo lockInfo = new LockInfo();
+    lockClient.acquireLockWithTTL(key, lockInfo, 1L);
+    Assert.assertNotNull(lockClient.retrieveLock(key));
+
+    lockClient.renewTTLLock(key);
+    Assert.assertNotSame(lockClient.retrieveLock(key).getGrantedAt(), lockInfo.getLastRenewedAt());
+    try {
+      lockClient.renewTTLLock(TEST_INVALID_PATH);
+      Assert.fail("Should not be able to renew lock for key: " + key);
+    } catch (Exception e) {
+      // expected
+    }
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockInfoTest.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockInfoTest.java
index c7f1321..4f48a5a 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockInfoTest.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/LockInfoTest.java
@@ -33,7 +33,7 @@
   private static final String LOCK_ID = "794c8a4c-c14b-4c23-b83f-4e1147fc6978";
   private static final long GRANT_TIME = System.currentTimeMillis();
   private static final long LAST_RENEWAL_TIME = System.currentTimeMillis();
-  private static final Duration TIMEOUT = Duration.ofMillis(100000);
+  private static final long TIMEOUT = 100000;
 
   public static final String DEFAULT_LOCK_ID_TEXT = "";
   public static final String DEFAULT_OWNER_ID_TEXT = "";
@@ -41,7 +41,7 @@
   public static final String DEFAULT_CLIENT_DATA = "";
   public static final long DEFAULT_GRANTED_AT_LONG = -1L;
   public static final long DEFAULT_LAST_RENEWED_AT_LONG = -1L;
-  public static final Duration DEFAULT_TIMEOUT_DURATION = Duration.ofMillis(-1L);
+  public static final long DEFAULT_TIMEOUT_DURATION = -1L;
 
   @Test
   public void testLockInfo() {