[#1645] feat(server): Add gauge metrics for reading localfile data (#1646)

### What changes were proposed in this pull request?

Add two metrics for each reading memory/index/local files:
1. Add a gauge metric for its threads number.
2. Add a gauge metric for its reading size.

### Why are the changes needed?

Fix https://github.com/apache/incubator-uniffle/issues/1645.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 9f8f79e..47a0e80 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -693,6 +693,8 @@
         ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
         ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
         ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
+        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
+        ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
         shuffleServer
             .getGrpcMetrics()
             .recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
@@ -718,6 +720,8 @@
       } finally {
         if (sdr != null) {
           sdr.release();
+          ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
+          ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(length);
         }
         shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
       }
@@ -769,18 +773,23 @@
     if (shuffleServer.getShuffleBufferManager().requireReadMemory(assumedFileSize)) {
       ShuffleIndexResult shuffleIndexResult = null;
       try {
-        long start = System.currentTimeMillis();
+        final long start = System.currentTimeMillis();
         shuffleIndexResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
-        long readTime = System.currentTimeMillis() - start;
 
         ByteBuffer data = shuffleIndexResult.getIndexData();
         ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
         ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
+        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
+        ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
         GetLocalShuffleIndexResponse.Builder builder =
             GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg);
+        long readTime = System.currentTimeMillis() - start;
+        shuffleServer
+            .getGrpcMetrics()
+            .recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD, readTime);
         LOG.info(
             "Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}",
             readTime,
@@ -808,6 +817,8 @@
       } finally {
         if (shuffleIndexResult != null) {
           shuffleIndexResult.release();
+          ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
+          ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(assumedFileSize);
         }
         shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
       }
@@ -845,7 +856,6 @@
                 ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
       }
     }
-    long start = System.currentTimeMillis();
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetMemoryShuffleDataResponse reply;
@@ -856,6 +866,7 @@
     if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
       ShuffleDataResult shuffleDataResult = null;
       try {
+        final long start = System.currentTimeMillis();
         Roaring64NavigableMap expectedTaskIds = null;
         if (request.getSerializedExpectedTaskIdsBitmap() != null
             && !request.getSerializedExpectedTaskIdsBitmap().isEmpty()) {
@@ -875,6 +886,8 @@
           bufferSegments = shuffleDataResult.getBufferSegments();
           ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
           ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
+          ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
         }
         long costTime = System.currentTimeMillis() - start;
         shuffleServer
@@ -911,6 +924,8 @@
       } finally {
         if (shuffleDataResult != null) {
           shuffleDataResult.release();
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
+          ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
         }
         shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
       }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index f1f3736..b4566b3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -54,6 +54,13 @@
       "localfile_flush_thread_pool_queue_size";
   private static final String FALLBACK_FLUSH_THREAD_POOL_QUEUE_SIZE =
       "fallback_flush_thread_pool_queue_size";
+  private static final String READ_LOCAL_DATA_FILE_THREAD_NUM = "read_local_data_file_thread_num";
+  private static final String READ_LOCAL_INDEX_FILE_THREAD_NUM = "read_local_index_file_thread_num";
+  private static final String READ_MEMORY_DATA_THREAD_NUM = "read_memory_data_thread_num";
+  private static final String READ_LOCAL_DATA_FILE_BUFFER_SIZE = "read_local_data_file_buffer_size";
+  private static final String READ_LOCAL_INDEX_FILE_BUFFER_SIZE =
+      "read_local_index_file_buffer_size";
+  private static final String READ_MEMORY_DATA_BUFFER_SIZE = "read_memory_data_buffer_size";
   private static final String TOTAL_READ_DATA = "total_read_data";
   private static final String TOTAL_READ_LOCAL_DATA_FILE = "total_read_local_data_file";
   private static final String TOTAL_READ_LOCAL_INDEX_FILE = "total_read_local_index_file";
@@ -196,6 +203,12 @@
   public static Gauge.Child gaugeFallbackFlushThreadPoolQueueSize;
   public static Gauge.Child gaugeAppNum;
   public static Gauge.Child gaugeTotalPartitionNum;
+  public static Gauge.Child gaugeReadLocalDataFileThreadNum;
+  public static Gauge.Child gaugeReadLocalIndexFileThreadNum;
+  public static Gauge.Child gaugeReadMemoryDataThreadNum;
+  public static Gauge.Child gaugeReadLocalDataFileBufferSize;
+  public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
+  public static Gauge.Child gaugeReadMemoryDataBufferSize;
 
   public static Gauge gaugeTotalDataSizeUsage;
   public static Gauge gaugeInMemoryDataSizeUsage;
@@ -401,6 +414,17 @@
     gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);
     gaugeTotalPartitionNum = metricsManager.addLabeledGauge(PARTITION_NUM_WITH_NODE);
 
+    gaugeReadLocalDataFileThreadNum =
+        metricsManager.addLabeledGauge(READ_LOCAL_DATA_FILE_THREAD_NUM);
+    gaugeReadLocalIndexFileThreadNum =
+        metricsManager.addLabeledGauge(READ_LOCAL_INDEX_FILE_THREAD_NUM);
+    gaugeReadMemoryDataThreadNum = metricsManager.addLabeledGauge(READ_MEMORY_DATA_THREAD_NUM);
+    gaugeReadLocalDataFileBufferSize =
+        metricsManager.addLabeledGauge(READ_LOCAL_DATA_FILE_BUFFER_SIZE);
+    gaugeReadLocalIndexFileBufferSize =
+        metricsManager.addLabeledGauge(READ_LOCAL_INDEX_FILE_BUFFER_SIZE);
+    gaugeReadMemoryDataBufferSize = metricsManager.addLabeledGauge(READ_MEMORY_DATA_BUFFER_SIZE);
+
     gaugeHugePartitionNum = metricsManager.addLabeledGauge(HUGE_PARTITION_NUM);
     gaugeAppWithHugePartitionNum = metricsManager.addLabeledGauge(APP_WITH_HUGE_PARTITION_NUM);
 
diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ed83c0b..dbda25a 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -26,6 +26,7 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -255,7 +256,6 @@
             .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
       }
     }
-    final long start = System.currentTimeMillis();
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetMemoryShuffleDataResponse response;
@@ -266,6 +266,7 @@
     if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
       ShuffleDataResult shuffleDataResult = null;
       try {
+        final long start = System.currentTimeMillis();
         shuffleDataResult =
             shuffleServer
                 .getShuffleTaskManager()
@@ -283,12 +284,14 @@
           bufferSegments = shuffleDataResult.getBufferSegments();
           ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
           ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
+          ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
         }
         response =
             new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, bufferSegments, data);
         ReleaseMemoryAndRecordReadTimeListener listener =
             new ReleaseMemoryAndRecordReadTimeListener(
-                start, readBufferSize, data.size(), requestInfo, req, client);
+                start, readBufferSize, data.size(), requestInfo, req, response, client);
         client.getChannel().writeAndFlush(response).addListener(listener);
         return;
       } catch (Exception e) {
@@ -359,12 +362,14 @@
         ManagedBuffer data = shuffleIndexResult.getManagedBuffer();
         ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
         ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.size());
+        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
+        ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
         response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, data, shuffleIndexResult.getDataFileLen());
         ReleaseMemoryAndRecordReadTimeListener listener =
             new ReleaseMemoryAndRecordReadTimeListener(
-                start, assumedFileSize, data.size(), requestInfo, req, client);
+                start, assumedFileSize, data.size(), requestInfo, req, response, client);
         client.getChannel().writeAndFlush(response).addListener(listener);
         return;
       } catch (FileNotFoundException indexFileNotFoundException) {
@@ -465,12 +470,14 @@
                     length);
         ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
         ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
+        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
+        ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, sdr.getManagedBuffer());
         ReleaseMemoryAndRecordReadTimeListener listener =
             new ReleaseMemoryAndRecordReadTimeListener(
-                start, length, sdr.getDataLength(), requestInfo, req, client);
+                start, length, sdr.getDataLength(), requestInfo, req, response, client);
         client.getChannel().writeAndFlush(response).addListener(listener);
         return;
       } catch (Exception e) {
@@ -531,6 +538,7 @@
     private final long dataSize;
     private final String requestInfo;
     private final RequestMessage request;
+    private final RpcResponse response;
     private final TransportClient client;
 
     ReleaseMemoryAndRecordReadTimeListener(
@@ -539,12 +547,14 @@
         long dataSize,
         String requestInfo,
         RequestMessage request,
+        RpcResponse response,
         TransportClient client) {
       this.readStartedTime = readStartedTime;
       this.readBufferSize = readBufferSize;
       this.dataSize = dataSize;
       this.requestInfo = requestInfo;
       this.request = request;
+      this.response = response;
       this.client = client;
     }
 
@@ -554,6 +564,20 @@
       long readTime = System.currentTimeMillis() - readStartedTime;
       ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
       shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), readTime);
+      if (request instanceof GetLocalShuffleDataRequest) {
+        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
+        ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(readBufferSize);
+      } else if (request instanceof GetLocalShuffleIndexRequest) {
+        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
+        ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(readBufferSize);
+      } else if (request instanceof GetMemoryShuffleDataRequest) {
+        GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
+            (GetMemoryShuffleDataResponse) response;
+        if (CollectionUtils.isNotEmpty(getMemoryShuffleDataResponse.getBufferSegments())) {
+          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
+          ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
+        }
+      }
       if (!future.isSuccess()) {
         Throwable cause = future.cause();
         String errorMsg =