HDDS-6581. Introduce KEY_PATH_LOCK in OMKeyCreateRequest class (#3560)

diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java
index 62e6d27..1a462c0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/OMHashCodeGenerator.java
@@ -28,6 +28,6 @@
    * @param resourceName
    * @return hash code of resourceName
    */
-  int getHashCode(String resourceName);
+  long getHashCode(String resourceName);
 
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java
index 652f253..3560b05 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/hashcodegenerator/StringOMHashCodeGeneratorImpl.java
@@ -24,7 +24,7 @@
 public class StringOMHashCodeGeneratorImpl implements OMHashCodeGenerator {
 
   @Override
-  public int getHashCode(String resourceName) {
+  public long getHashCode(String resourceName) {
     return resourceName.hashCode();
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
index aa616f6..4e6e79d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OBSKeyPathLockStrategy.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.KEY_PATH_LOCK;
 
 /**
  * Implementation of OzoneLockStrategy interface. Concrete strategy for OBS
@@ -40,7 +41,6 @@
 
   @Override
   public boolean acquireWriteLock(OMMetadataManager omMetadataManager,
-                                  OzoneManagerLock.Resource resource,
                                   String volumeName, String bucketName,
                                   String keyName) throws IOException {
     boolean acquiredLock;
@@ -49,28 +49,29 @@
         volumeName, bucketName);
     OMFileRequest.validateBucket(omMetadataManager, volumeName, bucketName);
 
+
     Preconditions.checkArgument(acquiredLock,
         "BUCKET_LOCK should be acquired!");
 
     String resourceName = omMetadataManager.getLock()
-        .generateResourceName(resource, volumeName, bucketName, keyName);
-    int resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
+        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
+    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
     acquiredLock = omMetadataManager.getLock()
-        .acquireWriteHashedLock(resource, String.valueOf(resourceHashCode));
+        .acquireWriteHashedLock(KEY_PATH_LOCK,
+            String.valueOf(resourceHashCode));
 
     return acquiredLock;
   }
 
   @Override
   public void releaseWriteLock(OMMetadataManager omMetadataManager,
-                               OzoneManagerLock.Resource resource,
                                String volumeName, String bucketName,
-                               String keyName) throws IOException {
+                               String keyName) {
     String resourceName = omMetadataManager.getLock()
-        .generateResourceName(resource, volumeName, bucketName, keyName);
-    int resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
-    omMetadataManager.getLock()
-        .releaseWriteHashedLock(resource, String.valueOf(resourceHashCode));
+        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
+    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
+    omMetadataManager.getLock().releaseWriteHashedLock(KEY_PATH_LOCK,
+        String.valueOf(resourceHashCode));
 
     omMetadataManager.getLock()
         .releaseReadLock(BUCKET_LOCK, volumeName, bucketName);
@@ -80,7 +81,6 @@
 
   @Override
   public boolean acquireReadLock(OMMetadataManager omMetadataManager,
-                                 OzoneManagerLock.Resource resource,
                                  String volumeName, String bucketName,
                                  String keyName) throws IOException {
     boolean acquiredLock;
@@ -89,28 +89,28 @@
         .acquireReadLock(BUCKET_LOCK, volumeName, bucketName);
     OMFileRequest.validateBucket(omMetadataManager, volumeName, bucketName);
 
+
     Preconditions.checkArgument(acquiredLock,
         "BUCKET_LOCK should be acquired!");
 
     String resourceName = omMetadataManager.getLock()
-        .generateResourceName(resource, volumeName, bucketName, keyName);
-    int resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
+        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
+    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
     acquiredLock = omMetadataManager.getLock()
-        .acquireReadHashedLock(resource, String.valueOf(resourceHashCode));
+        .acquireReadHashedLock(KEY_PATH_LOCK, String.valueOf(resourceHashCode));
 
     return acquiredLock;
   }
 
   @Override
   public void releaseReadLock(OMMetadataManager omMetadataManager,
-                              OzoneManagerLock.Resource resource,
                               String volumeName, String bucketName,
-                              String keyName) throws IOException {
+                              String keyName) {
     String resourceName = omMetadataManager.getLock()
-        .generateResourceName(resource, volumeName, bucketName, keyName);
-    int resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
+        .generateResourceName(KEY_PATH_LOCK, volumeName, bucketName, keyName);
+    long resourceHashCode = omHashCodeGenerator.getHashCode(resourceName);
     omMetadataManager.getLock()
-        .releaseReadHashedLock(resource, String.valueOf(resourceHashCode));
+        .releaseReadHashedLock(KEY_PATH_LOCK, String.valueOf(resourceHashCode));
 
     omMetadataManager.getLock()
         .releaseReadLock(BUCKET_LOCK, volumeName, bucketName);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneLockStrategy.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneLockStrategy.java
index 951e4a5..82591ec 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneLockStrategy.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneLockStrategy.java
@@ -31,19 +31,16 @@
  */
 public interface OzoneLockStrategy {
   boolean acquireWriteLock(OMMetadataManager omMetadataManager,
-                           OzoneManagerLock.Resource resource,
                            String volumeName, String bucketName, String keyName)
       throws IOException;
 
-  void releaseWriteLock(OMMetadataManager omMetadataManager,
-                        OzoneManagerLock.Resource resource, String volumeName,
-                        String bucketName, String keyName) throws IOException;
+  void releaseWriteLock(OMMetadataManager omMetadataManager, String volumeName,
+                        String bucketName, String keyName);
 
   boolean acquireReadLock(OMMetadataManager omMetadataManager,
-                          OzoneManagerLock.Resource resource, String volumeName,
-                          String bucketName, String keyName) throws IOException;
+                          String volumeName, String bucketName, String keyName)
+      throws IOException;
 
-  void releaseReadLock(OMMetadataManager omMetadataManager,
-                       OzoneManagerLock.Resource resource, String volumeName,
-                       String bucketName, String keyName) throws IOException;
+  void releaseReadLock(OMMetadataManager omMetadataManager, String volumeName,
+                       String bucketName, String keyName);
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/RegularBucketLockStrategy.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/RegularBucketLockStrategy.java
index 161bcec..41dfa0b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/RegularBucketLockStrategy.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/RegularBucketLockStrategy.java
@@ -32,7 +32,6 @@
 
   @Override
   public boolean acquireWriteLock(OMMetadataManager omMetadataManager,
-                                  OzoneManagerLock.Resource resource,
                                   String volumeName, String bucketName,
                                   String keyName) throws IOException {
     boolean acquiredLock;
@@ -46,9 +45,8 @@
 
   @Override
   public void releaseWriteLock(OMMetadataManager omMetadataManager,
-                               OzoneManagerLock.Resource resource,
                                String volumeName, String bucketName,
-                               String keyName) throws IOException {
+                               String keyName) {
     omMetadataManager.getLock()
         .releaseWriteLock(BUCKET_LOCK, volumeName, bucketName);
 
@@ -57,7 +55,6 @@
 
   @Override
   public boolean acquireReadLock(OMMetadataManager omMetadataManager,
-                                 OzoneManagerLock.Resource resource,
                                  String volumeName, String bucketName,
                                  String keyName) throws IOException {
     boolean acquiredLock;
@@ -71,9 +68,8 @@
 
   @Override
   public void releaseReadLock(OMMetadataManager omMetadataManager,
-                              OzoneManagerLock.Resource resource,
                               String volumeName, String bucketName,
-                              String keyName) throws IOException {
+                              String keyName) {
     omMetadataManager.getLock()
         .releaseReadLock(BUCKET_LOCK, volumeName, bucketName);
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index 8d51e90..f526075 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
@@ -71,7 +72,6 @@
 import org.apache.hadoop.hdds.utils.UniqueId;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
 import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
 
@@ -198,6 +198,7 @@
     omMetrics.incNumKeyAllocates();
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    OzoneLockStrategy ozoneLockStrategy = getOzoneLockStrategy(ozoneManager);
     OmKeyInfo omKeyInfo = null;
     OmBucketInfo omBucketInfo = null;
     final List< OmKeyLocationInfo > locations = new ArrayList<>();
@@ -219,8 +220,8 @@
       checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
-      acquireLock = omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
-          volumeName, bucketName);
+      acquireLock = ozoneLockStrategy.acquireWriteLock(omMetadataManager,
+          volumeName, bucketName, keyName);
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       //TODO: We can optimize this get here, if getKmsProvider is null, then
       // bucket encryptionInfo will be not set. If this assumption holds
@@ -340,8 +341,8 @@
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
       if (acquireLock) {
-        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
-            bucketName);
+        ozoneLockStrategy.releaseWriteLock(omMetadataManager, volumeName,
+            bucketName, keyName);
       }
     }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 79e0d51..904b5c0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -53,6 +53,7 @@
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.lock.OzoneLockStrategy;
 import org.apache.hadoop.ozone.om.request.OMClientRequestUtils;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
@@ -810,4 +811,9 @@
     return OmUtils.prepareKeyForDelete(keyToDelete, keysToDelete,
           trxnLogIndex, isRatisEnabled);
   }
+
+  protected OzoneLockStrategy getOzoneLockStrategy(OzoneManager ozoneManager) {
+    return ozoneManager.getOzoneLockProvider()
+        .createLockStrategy(getBucketLayout());
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
index f0af5e7..5421273 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequest.java
@@ -21,6 +21,8 @@
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 
@@ -30,6 +32,7 @@
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.lock.OzoneLockProvider;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -44,6 +47,8 @@
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
@@ -58,8 +63,34 @@
 /**
  * Tests OMCreateKeyRequest class.
  */
+@RunWith(Parameterized.class)
 public class TestOMKeyCreateRequest extends TestOMKeyRequest {
 
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+        new Object[]{true, true},
+        new Object[]{true, false},
+        new Object[]{false, true},
+        new Object[]{false, false});
+  }
+
+  public TestOMKeyCreateRequest(boolean setKeyPathLock,
+                                boolean setFileSystemPaths) {
+    // Ignored. Actual init done in initParam().
+    // This empty constructor is still required to avoid argument exception.
+  }
+
+  @Parameterized.BeforeParam
+  public static void initParam(boolean setKeyPathLock,
+                               boolean setFileSystemPaths) {
+    keyPathLockEnabled = setKeyPathLock;
+    enableFileSystemPaths = setFileSystemPaths;
+  }
+
+  private static boolean keyPathLockEnabled;
+  private static boolean enableFileSystemPaths;
+
   @Test
   public void testPreExecuteWithNormalKey() throws Exception {
     ReplicationConfig ratis3Config =
@@ -94,7 +125,8 @@
 
   @Test
   public void testValidateAndUpdateCache() throws Exception {
-
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(keyPathLockEnabled, enableFileSystemPaths));
     OMRequest modifiedOmRequest =
         doPreExecute(createKeyRequest(false, 0));
 
@@ -222,8 +254,8 @@
   @Test
   public void testValidateAndUpdateCacheWithNoSuchMultipartUploadError()
       throws Exception {
-
-
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(keyPathLockEnabled, enableFileSystemPaths));
     int partNumber = 1;
     OMRequest modifiedOmRequest =
         doPreExecute(createKeyRequest(true, partNumber));
@@ -266,7 +298,8 @@
 
   @Test
   public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception {
-
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(keyPathLockEnabled, enableFileSystemPaths));
     OMRequest modifiedOmRequest =
         doPreExecute(createKeyRequest(false, 0));
 
@@ -305,8 +338,8 @@
 
   @Test
   public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
-
-
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(keyPathLockEnabled, enableFileSystemPaths));
     OMRequest modifiedOmRequest =
         doPreExecute(createKeyRequest(
             false, 0));
@@ -482,6 +515,8 @@
     configuration.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, true);
     when(ozoneManager.getConfiguration()).thenReturn(configuration);
     when(ozoneManager.getEnableFileSystemPaths()).thenReturn(true);
+    when(ozoneManager.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(keyPathLockEnabled, true));
 
     // Add volume and bucket entries to DB.
     addVolumeAndBucketToDB(volumeName, bucketName,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
index abc1a34..1cdc2a1 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCreateRequestWithFSO.java
@@ -42,6 +42,11 @@
  */
 public class TestOMKeyCreateRequestWithFSO extends TestOMKeyCreateRequest {
 
+  public TestOMKeyCreateRequestWithFSO(boolean setKeyPathLock,
+                                       boolean setFileSystemPaths) {
+    super(setKeyPathLock, setFileSystemPaths);
+  }
+
   @Override
   protected OzoneConfiguration getOzoneConfiguration() {
     OzoneConfiguration config = super.getOzoneConfiguration();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index f9d72b6..ab858e3 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -38,6 +38,7 @@
 import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.lock.OzoneLockProvider;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
@@ -189,6 +190,8 @@
   public void testKeyCreateRequestSetsAllTouchedTableCachesForEviction() {
     OMKeyCreateRequest request = anOMKeyCreateRequest();
     when(om.getEnableFileSystemPaths()).thenReturn(true);
+    when(om.getOzoneLockProvider()).thenReturn(
+        new OzoneLockProvider(false, false));
 
     Map<String, Integer> cacheItemCount = recordCacheItemCounts();