[#1596] fix(netty): Use a ChannelFutureListener callback mechanism to release readMemory (#1605)

### What changes were proposed in this pull request?

1. Add a `ChannelFutureListener` and use its callback mechanism to release `readMemory` only after the `writeAndFlush` method is truly completed.
2. Change the descriptions of configurations `rss.server.buffer.capacity.ratio` and `rss.server.read.buffer.capacity.ratio`. 

### Why are the changes needed?

This is actually a bug, which was introduced by PR https://github.com/apache/incubator-uniffle/pull/879. The issue has been present since the very beginning when the Netty feature was first integrated.
Fix https://github.com/apache/incubator-uniffle/issues/1596.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I don't think we need new tests. Tested in our env.
The new log will be:
```
[2024-03-26 23:11:51.039] [epollEventLoopGroup-3-158] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getLocalShuffleData for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1328], offset[0], length[14693742]. Took 1457 ms and retrieved 14693742 bytes of data
[2024-03-26 23:11:51.040] [epollEventLoopGroup-3-130] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getMemoryShuffleData for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1262]. Took 1 ms and retrieved 0 bytes of data
[2024-03-26 23:11:51.068] [epollEventLoopGroup-3-177] [INFO] ShuffleServerNettyHandler.operationComplete - Successfully executed getLocalShuffleIndex for appId[application_1703049085550_7359933_1711463990606], shuffleId[0], partitionId[1366]. Took 918 ms and retrieved 1653600 bytes of data
```
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
index d80d0aa..b96c028 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
@@ -131,4 +131,9 @@
   public long getTimestamp() {
     return timestamp;
   }
+
+  @Override
+  public String getOperationType() {
+    return "getLocalShuffleData";
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
index 1ccdfae..105fea0 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
@@ -93,4 +93,9 @@
   public int getPartitionNum() {
     return partitionNum;
   }
+
+  @Override
+  public String getOperationType() {
+    return "getLocalShuffleIndex";
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
index d358cf7..13a2412 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
@@ -148,4 +148,9 @@
   public Roaring64NavigableMap getExpectedTaskIdsBitmap() {
     return expectedTaskIdsBitmap;
   }
+
+  @Override
+  public String getOperationType() {
+    return "getMemoryShuffleData";
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
index cfa5528..946f906 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
@@ -35,4 +35,6 @@
   public long getRequestId() {
     return requestId;
   }
+
+  public abstract String getOperationType();
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
index 492b5b6..a77b0d3 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
@@ -145,4 +145,9 @@
   public void setTimestamp(long timestamp) {
     this.timestamp = timestamp;
   }
+
+  @Override
+  public String getOperationType() {
+    return "sendShuffleData";
+  }
 }
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 25224d6..efe0856 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -83,11 +83,11 @@
 | rss.server.netty.receive.buf                            | 0                                                                      | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system automatically estimates the receive buffer size based on default settings.                                                          |
 | rss.server.netty.send.buf                               | 0                                                                      | Send buffer size (SO_SNDBUF).                                                                                                                                                                                                                                                                                                                                                                |
 | rss.server.buffer.capacity                              | -1                                                                     | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used                                                                                                                                                                                                                                                                                           |
-| rss.server.buffer.capacity.ratio                        | 0.8                                                                    | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * ratio                                                                                                                                                                                                                                                                                                      |
+| rss.server.buffer.capacity.ratio                        | 0.8                                                                    | when `rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio                                                                                                                                                                                                                                                                |
 | rss.server.memory.shuffle.highWaterMark.percentage      | 75.0                                                                   | Threshold of spill data to storage, percentage of rss.server.buffer.capacity                                                                                                                                                                                                                                                                                                                 |
 | rss.server.memory.shuffle.lowWaterMark.percentage       | 25.0                                                                   | Threshold of keep data in memory, percentage of rss.server.buffer.capacity                                                                                                                                                                                                                                                                                                                   |
 | rss.server.read.buffer.capacity                         | -1                                                                     | Max size of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is used                                                                                                                                                                                                                                                                                                  |
-| rss.server.read.buffer.capacity.ratio                   | 0.4                                                                    | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size * ratio                                                                                                                                                                                                                                                                                                |
+| rss.server.read.buffer.capacity.ratio                   | 0.4                                                                    | when `rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap size or off-heap size(when enabling Netty) * ratio                                                                                                                                                                                                                                                          |
 | rss.server.heartbeat.interval                           | 10000                                                                  | Heartbeat interval to Coordinator (ms)                                                                                                                                                                                                                                                                                                                                                       |
 | rss.server.flush.localfile.threadPool.size              | 10                                                                     | Thread pool for flush data to local file                                                                                                                                                                                                                                                                                                                                                     |
 | rss.server.flush.hadoop.threadPool.size                 | 60                                                                     | Thread pool for flush data to hadoop storage                                                                                                                                                                                                                                                                                                                                                 |
@@ -104,7 +104,7 @@
 | rss.server.max.concurrency.of.per-partition.write       | 30                                                                     | The max concurrency of single partition writer, the data partition file number is equal to this value. Default value is 1. This config could improve the writing speed, especially for huge partition.                                                                                                                                                                                       |
 | rss.server.max.concurrency.limit.of.per-partition.write | -                                                                      | The limit for max concurrency per-partition write specified by client, this won't be enabled by default.                                                                                                                                                                                                                                                                                     |
 | rss.metrics.reporter.class                              | -                                                                      | The class of metrics reporter.                                                                                                                                                                                                                                                                                                                                                               |
-| rss.server.hybrid.storage.manager.selector.class       | org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage.                                                                                                                                                               |
+| rss.server.hybrid.storage.manager.selector.class        | org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is `DefaultStorageManagerSelector`, and another `HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's data to cold storage.                                                                                                                                                               |
 | rss.server.disk-capacity.watermark.check.enabled        | false                                                                  | If it is co-located with other services, the high-low watermark check based on the uniffle used is not correct. Due to this, the whole disk capacity watermark check is necessary, which will reuse the current watermark value. It will be disabled by default.                                                                                                                             |
 
 ### Advanced Configurations
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index e72510e..9ea2e84 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -42,7 +42,7 @@
           .doubleType()
           .defaultValue(0.6)
           .withDescription(
-              "JVM heap size * ratio for the maximum memory of buffer manager for shuffle server, this "
+              "JVM heap size or off-heap size(when enabling Netty) * ratio for the maximum memory of buffer manager for shuffle server, this "
                   + "is only effective when `rss.server.buffer.capacity` is not explicitly set");
 
   public static final ConfigOption<Long> SERVER_READ_BUFFER_CAPACITY =
@@ -56,7 +56,7 @@
           .doubleType()
           .defaultValue(0.2)
           .withDescription(
-              "JVM heap size * ratio for read buffer size, this is only effective when "
+              "JVM heap size or off-heap size(when enabling Netty) * ratio for read buffer size, this is only effective when "
                   + "`rss.server.reader.buffer.capacity.ratio` is not explicitly set");
 
   public static final ConfigOption<Long> SERVER_HEARTBEAT_DELAY =
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ceca592..4d42b05 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -98,7 +98,12 @@
     this.readCapacity = conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
     if (this.readCapacity < 0) {
       this.readCapacity =
-          (long) (heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
+          nettyServerEnabled
+              ? (long)
+                  (NettyUtils.getMaxDirectMemory()
+                      * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO))
+              : (long)
+                  (heapSize * conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
     }
     LOG.info(
         "Init shuffle buffer manager with capacity: {}, read buffer capacity: {}.",
diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ac8973e..2e0c070 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -24,6 +24,8 @@
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -253,7 +255,7 @@
             .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
       }
     }
-    long start = System.currentTimeMillis();
+    final long start = System.currentTimeMillis();
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetMemoryShuffleDataResponse response;
@@ -262,8 +264,9 @@
 
     // todo: if can get the exact memory size?
     if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize)) {
+      ShuffleDataResult shuffleDataResult = null;
       try {
-        ShuffleDataResult shuffleDataResult =
+        shuffleDataResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getInMemoryShuffleData(
@@ -281,19 +284,18 @@
           ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
           ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
         }
-        long costTime = System.currentTimeMillis() - start;
-        shuffleServer
-            .getNettyMetrics()
-            .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(), costTime);
-        LOG.info(
-            "Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" + " data for {}",
-            costTime,
-            data.size(),
-            requestInfo);
-
         response =
             new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, bufferSegments, data);
+        ReleaseMemoryAndRecordReadTimeListener listener =
+            new ReleaseMemoryAndRecordReadTimeListener(
+                start, readBufferSize, data.size(), requestInfo, req, client);
+        client.getChannel().writeAndFlush(response).addListener(listener);
+        return;
       } catch (Exception e) {
+        shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
+        if (shuffleDataResult != null) {
+          shuffleDataResult.release();
+        }
         status = StatusCode.INTERNAL_ERROR;
         msg =
             "Error happened when get in memory shuffle data for "
@@ -304,8 +306,6 @@
         response =
             new GetMemoryShuffleDataResponse(
                 req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
-      } finally {
-        shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
       }
     } else {
       status = StatusCode.INTERNAL_ERROR;
@@ -348,9 +348,10 @@
             .getShuffleServerConf()
             .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
     if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize)) {
+      ShuffleIndexResult shuffleIndexResult = null;
       try {
         final long start = System.currentTimeMillis();
-        ShuffleIndexResult shuffleIndexResult =
+        shuffleIndexResult =
             shuffleServer
                 .getShuffleTaskManager()
                 .getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
@@ -361,13 +362,16 @@
         response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, data, shuffleIndexResult.getDataFileLen());
-        long readTime = System.currentTimeMillis() - start;
-        LOG.info(
-            "Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}",
-            readTime,
-            data.size(),
-            requestInfo);
+        ReleaseMemoryAndRecordReadTimeListener listener =
+            new ReleaseMemoryAndRecordReadTimeListener(
+                start, assumedFileSize, data.size(), requestInfo, req, client);
+        client.getChannel().writeAndFlush(response).addListener(listener);
+        return;
       } catch (FileNotFoundException indexFileNotFoundException) {
+        shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
+        if (shuffleIndexResult != null) {
+          shuffleIndexResult.release();
+        }
         LOG.warn(
             "Index file for {} is not found, maybe the data has been flushed to cold storage.",
             requestInfo,
@@ -376,14 +380,16 @@
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
       } catch (Exception e) {
+        shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
+        if (shuffleIndexResult != null) {
+          shuffleIndexResult.release();
+        }
         status = StatusCode.INTERNAL_ERROR;
         msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
         LOG.error(msg, e);
         response =
             new GetLocalShuffleIndexResponse(
                 req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
-      } finally {
-        shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
       }
     } else {
       status = StatusCode.INTERNAL_ERROR;
@@ -418,7 +424,6 @@
     StatusCode status = StatusCode.SUCCESS;
     String msg = "OK";
     GetLocalShuffleDataResponse response;
-    ShuffleDataResult sdr;
     String requestInfo =
         "appId["
             + appId
@@ -426,11 +431,9 @@
             + shuffleId
             + "], partitionId["
             + partitionId
-            + "]"
-            + "offset["
+            + "], offset["
             + offset
-            + "]"
-            + "length["
+            + "], length["
             + length
             + "]";
 
@@ -445,8 +448,9 @@
     }
 
     if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) {
+      ShuffleDataResult sdr = null;
       try {
-        long start = System.currentTimeMillis();
+        final long start = System.currentTimeMillis();
         sdr =
             shuffleServer
                 .getShuffleTaskManager()
@@ -459,29 +463,27 @@
                     storageType,
                     offset,
                     length);
-        long readTime = System.currentTimeMillis() - start;
-        ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
         ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
         ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
-        shuffleServer
-            .getNettyMetrics()
-            .recordProcessTime(GetLocalShuffleDataRequest.class.getName(), readTime);
-        LOG.info(
-            "Successfully getShuffleData cost {} ms for shuffle" + " data with {}",
-            readTime,
-            requestInfo);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, sdr.getManagedBuffer());
+        ReleaseMemoryAndRecordReadTimeListener listener =
+            new ReleaseMemoryAndRecordReadTimeListener(
+                start, length, sdr.getDataLength(), requestInfo, req, client);
+        client.getChannel().writeAndFlush(response).addListener(listener);
+        return;
       } catch (Exception e) {
+        shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
+        if (sdr != null) {
+          sdr.release();
+        }
         status = StatusCode.INTERNAL_ERROR;
         msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
         LOG.error(msg, e);
         response =
             new GetLocalShuffleDataResponse(
                 req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
-      } finally {
-        shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
       }
     } else {
       status = StatusCode.INTERNAL_ERROR;
@@ -522,4 +524,89 @@
     }
     return ret;
   }
+
+  class ReleaseMemoryAndRecordReadTimeListener implements ChannelFutureListener {
+    private final long readStartedTime;
+    private final long readBufferSize;
+    private final long dataSize;
+    private final String requestInfo;
+    private final RequestMessage request;
+    private final TransportClient client;
+
+    ReleaseMemoryAndRecordReadTimeListener(
+        long readStartedTime,
+        long readBufferSize,
+        long dataSize,
+        String requestInfo,
+        RequestMessage request,
+        TransportClient client) {
+      this.readStartedTime = readStartedTime;
+      this.readBufferSize = readBufferSize;
+      this.dataSize = dataSize;
+      this.requestInfo = requestInfo;
+      this.request = request;
+      this.client = client;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) {
+      shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
+      long readTime = System.currentTimeMillis() - readStartedTime;
+      ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
+      shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), readTime);
+      if (!future.isSuccess()) {
+        Throwable cause = future.cause();
+        String errorMsg =
+            "Error happened when executing "
+                + request.getOperationType()
+                + " for "
+                + requestInfo
+                + ", "
+                + cause.getMessage();
+        LOG.error(errorMsg, future.cause());
+        RpcResponse errorResponse;
+        if (request instanceof GetLocalShuffleDataRequest) {
+          errorResponse =
+              new GetLocalShuffleDataResponse(
+                  request.getRequestId(),
+                  StatusCode.INTERNAL_ERROR,
+                  errorMsg,
+                  new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
+        } else if (request instanceof GetLocalShuffleIndexRequest) {
+          errorResponse =
+              new GetLocalShuffleIndexResponse(
+                  request.getRequestId(),
+                  StatusCode.INTERNAL_ERROR,
+                  errorMsg,
+                  Unpooled.EMPTY_BUFFER,
+                  0L);
+        } else if (request instanceof GetMemoryShuffleDataRequest) {
+          errorResponse =
+              new GetMemoryShuffleDataResponse(
+                  request.getRequestId(),
+                  StatusCode.INTERNAL_ERROR,
+                  errorMsg,
+                  Lists.newArrayList(),
+                  Unpooled.EMPTY_BUFFER);
+        } else {
+          LOG.error("Cannot handle request {}", request.type());
+          return;
+        }
+        client.getChannel().writeAndFlush(errorResponse);
+        LOG.error(
+            "Failed to execute {} for {}. Took {} ms and could not retrieve {} bytes of data",
+            request.getOperationType(),
+            requestInfo,
+            readTime,
+            dataSize);
+      } else {
+        LOG.info(
+            "Successfully executed {} for {}. Took {} ms and retrieved {} bytes of data",
+            request.getOperationType(),
+            requestInfo,
+            readTime,
+            dataSize);
+      }
+    }
+  }
 }