HDDS-3639. Maintain FileHandle Information in OMMetadataManager. (#1004)

diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index e5432da..35e6cfe 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -104,6 +104,16 @@
   String getOzoneKey(String volume, String bucket, String key);
 
   /**
+   * Given fileHandleInfo, return the corresponding DB key.
+   *
+   * @param fileHandleInfo - unique keyId that is used by NFS to create a
+   *                       fileHandle.
+   * @return DB key as String.
+   */
+
+  String getOzoneKeyIdTableKey(long fileHandleInfo);
+
+  /**
    * Given a volume, bucket and a key, return the corresponding DB directory
    * key.
    *
@@ -280,6 +290,13 @@
   Table<String, OmKeyInfo> getKeyTable();
 
   /**
+   * Returns the KeyIdTable.
+   * KeyIdTable maps KeyId to current KeyName.
+   * @return KeyTable.
+   */
+  Table<String, String> getKeyIdTable();
+
+  /**
    * Get Deleted Table.
    *
    * @return Deleted Table.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 54f1452..4acd342 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -118,12 +118,19 @@
    * |----------------------------------------------------------------------|
    * |  multipartInfoTable| /volumeName/bucketName/keyName/uploadId ->...   |
    * |----------------------------------------------------------------------|
+   * | keyIdTable         | /volumeName/bucketName/keyId -> KeyName         |
+   * |----------------------------------------------------------------------|
+   *
+   * TBD : Renames need to be made keyIdTable aware. Also KeyId based lookups
+   * should be able to handle any possible race with renames/deletes.
+   *
    */
 
   public static final String USER_TABLE = "userTable";
   public static final String VOLUME_TABLE = "volumeTable";
   public static final String BUCKET_TABLE = "bucketTable";
   public static final String KEY_TABLE = "keyTable";
+  public static final String KEY_ID_TABLE = "keyIdTable";
   public static final String DELETED_TABLE = "deletedTable";
   public static final String OPEN_KEY_TABLE = "openKeyTable";
   public static final String S3_TABLE = "s3Table";
@@ -141,6 +148,7 @@
   private Table volumeTable;
   private Table bucketTable;
   private Table keyTable;
+  private Table keyIdTable;
   private Table deletedTable;
   private Table openKeyTable;
   private Table s3Table;
@@ -198,6 +206,11 @@
   }
 
   @Override
+  public Table<String, String> getKeyIdTable() {
+    return keyIdTable;
+  }
+
+  @Override
   public Table<String, RepeatedOmKeyInfo> getDeletedTable() {
     return deletedTable;
   }
@@ -282,6 +295,7 @@
         .addTable(DELEGATION_TOKEN_TABLE)
         .addTable(S3_SECRET_TABLE)
         .addTable(PREFIX_TABLE)
+        .addTable(KEY_ID_TABLE)
         .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
         .addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
         .addCodec(RepeatedOmKeyInfo.class, new RepeatedOmKeyInfoCodec())
@@ -320,6 +334,10 @@
     keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
     checkTableStatus(keyTable, KEY_TABLE);
 
+    keyIdTable = this.store.getTable(KEY_ID_TABLE, String.class,
+        String.class);
+    checkTableStatus(keyIdTable, KEY_ID_TABLE);
+
     deletedTable = this.store.getTable(DELETED_TABLE, String.class,
         RepeatedOmKeyInfo.class);
     checkTableStatus(deletedTable, DELETED_TABLE);
@@ -423,6 +441,12 @@
   }
 
   @Override
+
+  public String getOzoneKeyIdTableKey(long uniqueKeyId) {
+    return Long.toString(uniqueKeyId);
+  }
+
+  @Override
   public String getOzoneDirKey(String volume, String bucket, String key) {
     key = OzoneFSUtils.addTrailingSlashIfNeeded(key);
     return getOzoneKey(volume, bucket, key);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index 7f860fc..aa61eec 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -371,7 +371,8 @@
         .setReplicationType(HddsProtos.ReplicationType.RATIS)
         .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
         .setObjectID(objectId)
-        .setUpdateID(objectId);
+        .setUpdateID(objectId)
+        .setFileHandleInfo(objectId);
   }
 
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 0aec04d..43cdd13 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -254,6 +254,7 @@
         .addAllMetadata(KeyValueUtil.getFromProtobuf(keyArgs.getMetadataList()))
         .setObjectID(objectID)
         .setUpdateID(transactionLogIndex)
+        .setFileHandleInfo(objectID)
         .build();
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index f51cba8..1dc7fbf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -156,6 +156,7 @@
           .setReplicationFactor(keyArgs.getFactor())
           .setObjectID(objectID)
           .setUpdateID(transactionLogIndex)
+          .setFileHandleInfo(objectID)
           .build();
 
       omKeyInfo = new OmKeyInfo.Builder()
@@ -171,6 +172,7 @@
           .setAcls(OzoneAclUtil.fromProtobuf(keyArgs.getAclsList()))
           .setObjectID(objectID)
           .setUpdateID(transactionLogIndex)
+          .setFileHandleInfo(objectID)
           .build();
 
       // Add to cache
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponse.java
index 55e7585..2326a63 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMDirectoryCreateResponse.java
@@ -81,6 +81,12 @@
       omMetadataManager.getKeyTable().putWithBatch(batchOperation, dirKey,
           dirKeyInfo);
 
+      // We can also persist the fileHandle to KeyIdTable.
+      if (dirKeyInfo.getFileHandleInfo() != 0) {
+        omMetadataManager.getKeyIdTable().putWithBatch(batchOperation,
+            omMetadataManager.getOzoneKeyIdTableKey(
+                dirKeyInfo.getFileHandleInfo()), dirKey);
+      }
     } else {
       // When directory already exists, we don't add it to cache. And it is
       // not an error, in this case dirKeyInfo will be null.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index c21b283..cb320d4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -80,6 +80,12 @@
     if (omKeyInfo != null) {
       omMetadataManager.getKeyTable().putWithBatch(batchOperation, ozoneKeyName,
           omKeyInfo);
+      // We can also persist the fileHandle to KeyIdTable.
+      if (omKeyInfo.getFileHandleInfo() != 0) {
+        omMetadataManager.getKeyIdTable().putWithBatch(batchOperation,
+            omMetadataManager.getOzoneKeyIdTableKey(
+                omKeyInfo.getFileHandleInfo()), ozoneKeyName);
+      }
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
index afff73a..aaa6789 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyDeleteResponse.java
@@ -68,6 +68,12 @@
       omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
           ozoneKey);
 
+      if (omKeyInfo.getFileHandleInfo() != 0) {
+        omMetadataManager.getKeyIdTable().deleteWithBatch(batchOperation,
+            omMetadataManager.getOzoneKeyIdTableKey(
+                omKeyInfo.getFileHandleInfo()));
+      }
+
       // If Key is not empty add this to delete table.
       if (!isKeyEmpty(omKeyInfo)) {
         // If a deleted key is put in the table where a key with the same
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
index c147db9..11f4b5d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyRenameResponse.java
@@ -87,12 +87,18 @@
       omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
           omMetadataManager.getOzoneKey(volumeName, bucketName, fromKeyName));
     } else if (createToKeyAndDeleteFromKey()) {
+      // At this point we can also update the KeyIdTable.
+      if (newKeyInfo.getFileHandleInfo() != 0) {
+        omMetadataManager.getKeyIdTable().putWithBatch(batchOperation,
+            omMetadataManager.getOzoneKeyIdTableKey(
+                newKeyInfo.getFileHandleInfo()), toKeyName);
+      }
       // If both from and toKeyName are equal do nothing
-      omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
-          omMetadataManager.getOzoneKey(volumeName, bucketName, fromKeyName));
       omMetadataManager.getKeyTable().putWithBatch(batchOperation,
           omMetadataManager.getOzoneKey(volumeName, bucketName, toKeyName),
           newKeyInfo);
+      omMetadataManager.getKeyTable().deleteWithBatch(batchOperation,
+          omMetadataManager.getOzoneKey(volumeName, bucketName, fromKeyName));
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
index cfe967d..6508fe7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java
@@ -86,6 +86,11 @@
           omKeyInfo.getBucketName(), omKeyInfo.getKeyName());
       omMetadataManager.getKeyTable().putWithBatch(batchOperation,
           ozoneKey, omKeyInfo);
+      if (omKeyInfo.getFileHandleInfo() != 0) {
+        omMetadataManager.getKeyIdTable().putWithBatch(batchOperation,
+            omMetadataManager.getOzoneKeyIdTableKey(
+                omKeyInfo.getFileHandleInfo()), ozoneKey);
+      }
 
       if (!partsUnusedList.isEmpty()) {
         // Add unused parts to deleted key table.