HDDS-7203. LookupKey Latency breakdown (#3742)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 3eeb3e9..0013617 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -1279,6 +1279,7 @@
 
     StorageContainerLocationProtocol sclProtocolMock = mock(
         StorageContainerLocationProtocol.class);
+    OMPerformanceMetrics metrics = mock(OMPerformanceMetrics.class);
 
     List<Long> containerIDs = new ArrayList<>();
     containerIDs.add(100L);
@@ -1334,7 +1335,7 @@
     omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
 
     KeyManagerImpl keyManagerImpl =
-        new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1");
+        new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1", metrics);
 
     keyManagerImpl.refresh(omKeyInfo);
 
@@ -1357,6 +1358,7 @@
 
     ScmClient scmClientMock = mock(ScmClient.class);
     when(scmClientMock.getContainerClient()).thenReturn(sclProtocolMock);
+    OMPerformanceMetrics metrics = mock(OMPerformanceMetrics.class);
 
     OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo("v1",
         "b1", "k1", ReplicationType.RATIS,
@@ -1374,7 +1376,7 @@
     omKeyInfo.appendNewBlocks(omKeyLocationInfoList, false);
 
     KeyManagerImpl keyManagerImpl =
-        new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1");
+        new KeyManagerImpl(ozoneManager, scmClientMock, conf, "om1", metrics);
 
     try {
       keyManagerImpl.refresh(omKeyInfo);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index aade4f3..6f7229c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -170,37 +170,43 @@
 
   private final boolean enableFileSystemPaths;
   private BackgroundService dirDeletingService;
+  private final OMPerformanceMetrics metrics;
 
   private BackgroundService openKeyCleanupService;
 
   @VisibleForTesting
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
-      OzoneBlockTokenSecretManager secretManager) {
+      OzoneBlockTokenSecretManager secretManager,
+      OMPerformanceMetrics metrics) {
     this(null, new ScmClient(scmBlockClient, null), metadataManager,
-        conf, omId, secretManager, null, null);
+        conf, omId, secretManager, null, null, metrics);
   }
 
   @VisibleForTesting
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       StorageContainerLocationProtocol scmContainerClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
-      OzoneBlockTokenSecretManager secretManager) {
+      OzoneBlockTokenSecretManager secretManager,
+      OMPerformanceMetrics metrics) {
     this(null, new ScmClient(scmBlockClient, scmContainerClient),
-        metadataManager, conf, omId, secretManager, null, null);
+        metadataManager, conf, omId, secretManager, null, null,
+        metrics);
   }
 
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
-      OzoneConfiguration conf, String omId) {
+      OzoneConfiguration conf, String omId, OMPerformanceMetrics metrics) {
     this (om, scmClient, om.getMetadataManager(), conf, omId,
-        om.getBlockTokenMgr(), om.getKmsProvider(), om.getPrefixManager());
+        om.getBlockTokenMgr(), om.getKmsProvider(), om.getPrefixManager(),
+        metrics);
   }
 
   @SuppressWarnings("parameternumber")
   public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager,
-      KeyProviderCryptoExtension kmsProvider, PrefixManager prefixManager) {
+      KeyProviderCryptoExtension kmsProvider, PrefixManager prefixManager,
+      OMPerformanceMetrics metrics) {
     this.scmBlockSize = (long) conf
         .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
             StorageUnit.BYTES);
@@ -226,6 +232,7 @@
     this.prefixManager = prefixManager;
     this.secretManager = secretManager;
     this.kmsProvider = kmsProvider;
+    this.metrics = metrics;
 
   }
 
@@ -359,7 +366,7 @@
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
-
+    long start = Time.monotonicNowNanos();
     metadataManager.getLock().acquireReadLock(BUCKET_LOCK, volumeName,
         bucketName);
 
@@ -390,6 +397,7 @@
       metadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
           bucketName);
     }
+    metrics.addLookupReadKeyInfoLatency(Time.monotonicNowNanos() - start);
 
     if (value == null) {
       if (LOG.isDebugEnabled()) {
@@ -409,12 +417,17 @@
     if (!args.isHeadOp()) {
 
       // add block token for read.
+      start = Time.monotonicNowNanos();
       addBlockToken4Read(value);
+      metrics.addLookupGenerateBlockTokenLatency(
+          Time.monotonicNowNanos() - start);
 
       // Refresh container pipeline info from SCM
       // based on OmKeyArgs.refreshPipeline flag
       // value won't be null as the check is done inside try/catch block.
+      start = Time.monotonicNowNanos();
       refresh(value);
+      metrics.addLookupRefreshLocationLatency(Time.monotonicNowNanos() - start);
 
       if (args.getSortDatanodes()) {
         sortDatanodes(clientAddress, value);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java
new file mode 100644
index 0000000..e86264a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPerformanceMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * Including OM performance related metrics.
+ */
+public class OMPerformanceMetrics {
+  private static final String SOURCE_NAME =
+      OMPerformanceMetrics.class.getSimpleName();
+
+  public static OMPerformanceMetrics register() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+            "OzoneManager Request Performance",
+            new OMPerformanceMetrics());
+  }
+
+  public static void unregister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  @Metric(about = "Overall lookupKey in nanoseconds")
+  private MutableRate lookupLatencyNs;
+
+  @Metric(about = "Read key info from meta in nanoseconds")
+  private MutableRate lookupReadKeyInfoLatencyNs;
+
+  @Metric(about = "Block token generation latency in nanoseconds")
+  private MutableRate lookupGenerateBlockTokenLatencyNs;
+
+  @Metric(about = "Refresh location nanoseconds")
+  private MutableRate lookupRefreshLocationLatencyNs;
+
+  @Metric(about = "ACLs check nanoseconds")
+  private MutableRate lookupAclCheckLatencyNs;
+
+  @Metric(about = "resolveBucketLink latency nanoseconds")
+  private MutableRate lookupResolveBucketLatencyNs;
+
+  @Metric(about = "s3VolumeInfo latency nanoseconds")
+  private MutableRate s3VolumeContextLatencyNs;
+
+
+  public void addLookupLatency(long latencyInNs) {
+    lookupLatencyNs.add(latencyInNs);
+  }
+
+  public void addLookupGenerateBlockTokenLatency(long latencyInNs) {
+    lookupGenerateBlockTokenLatencyNs.add(latencyInNs);
+  }
+
+  public void addLookupRefreshLocationLatency(long latencyInNs) {
+    lookupRefreshLocationLatencyNs.add(latencyInNs);
+  }
+
+  public void addLookupAclCheckLatency(long latencyInNs) {
+    lookupAclCheckLatencyNs.add(latencyInNs);
+  }
+
+  public void addLookupReadKeyInfoLatency(long latencyInNs) {
+    lookupReadKeyInfoLatencyNs.add(latencyInNs);
+  }
+
+  public void addS3VolumeContextLatencyNs(long latencyInNs) {
+    s3VolumeContextLatencyNs.add(latencyInNs);
+  }
+
+  public void addLookupResolveBucketLatencyNs(long latencyInNs) {
+    lookupResolveBucketLatencyNs.add(latencyInNs);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index baaa9f9..1d7282c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -410,6 +410,7 @@
   private static boolean testSecureOmFlag = false;
 
   private final OzoneLockProvider ozoneLockProvider;
+  private OMPerformanceMetrics perfMetrics;
 
   /**
    * OM Startup mode.
@@ -719,8 +720,9 @@
     }
 
     prefixManager = new PrefixManagerImpl(metadataManager, isRatisEnabled);
+    perfMetrics = OMPerformanceMetrics.register();
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
-        omStorage.getOmId());
+        omStorage.getOmId(), perfMetrics);
 
     if (withNewSnapshot) {
       Integer layoutVersionInDB = getLayoutVersionInDB();
@@ -2077,6 +2079,7 @@
       if (omSnapshotProvider != null) {
         omSnapshotProvider.stop();
       }
+      OMPerformanceMetrics.unregister();
     } catch (Exception e) {
       LOG.error("OzoneManager stop failed.", e);
     }
@@ -2812,11 +2815,16 @@
    */
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    long start = Time.monotonicNowNanos();
     ResolvedBucket bucket = resolveBucketLink(args);
+    perfMetrics.addLookupResolveBucketLatencyNs(
+        Time.monotonicNowNanos() - start);
 
     if (isAclEnabled) {
+      long startAcl = Time.monotonicNowNanos();
       checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
           bucket.realVolume(), bucket.realBucket(), args.getKeyName());
+      perfMetrics.addLookupAclCheckLatency(Time.monotonicNowNanos() - startAcl);
     }
 
     boolean auditSuccess = true;
@@ -2838,6 +2846,8 @@
         AUDIT.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
             auditMap));
       }
+
+      perfMetrics.addLookupLatency(Time.monotonicNowNanos() - start);
     }
   }
 
@@ -3322,6 +3332,7 @@
 
   @Override
   public S3VolumeContext getS3VolumeContext() throws IOException {
+    long start = Time.monotonicNowNanos();
     // Unless the OM request contains S3 authentication info with an access
     // ID that corresponds to a tenant volume, the request will be directed
     // to the default S3 volume.
@@ -3404,7 +3415,7 @@
     final S3VolumeContext.Builder s3VolumeContext = S3VolumeContext.newBuilder()
         .setOmVolumeArgs(getVolumeInfo(s3Volume))
         .setUserPrincipal(userPrincipal);
-
+    perfMetrics.addS3VolumeContextLatencyNs(Time.monotonicNowNanos() - start);
     return s3VolumeContext.build();
   }
 
@@ -4138,7 +4149,6 @@
 
   public ResolvedBucket resolveBucketLink(Pair<String, String> requested)
       throws IOException {
-
     Pair<String, String> resolved;
     if (isAclEnabled) {
       UserGroupInformation ugi = getRemoteUser();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index c223293..a4906d8 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
 import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
 import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.KeyManager;
@@ -92,6 +93,7 @@
   protected ScmClient scmClient;
   protected OzoneBlockTokenSecretManager ozoneBlockTokenSecretManager;
   protected ScmBlockLocationProtocol scmBlockLocationProtocol;
+  protected OMPerformanceMetrics metrics;
 
   protected static final long CONTAINER_ID = 1000L;
   protected static final long LOCAL_ID = 100L;
@@ -144,8 +146,9 @@
     ozoneBlockTokenSecretManager =
         Mockito.mock(OzoneBlockTokenSecretManager.class);
     scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
+    metrics = Mockito.mock(OMPerformanceMetrics.class);
     keyManager = new KeyManagerImpl(ozoneManager, scmClient, ozoneConfiguration,
-        "");
+        "", metrics);
     when(ozoneManager.getScmClient()).thenReturn(scmClient);
     when(ozoneManager.getBlockTokenSecretManager())
         .thenReturn(ozoneBlockTokenSecretManager);