RATIS-1384.Change pending request limit unit to MB (#483)

diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index b662dff..989070f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -81,6 +81,15 @@
     };
   }
 
+  static BiConsumer<String, SizeInBytes> requireMinSizeInByte(SizeInBytes min) {
+    return (key, value) -> {
+      if (value.getSize() < min.getSize()) {
+        throw new IllegalArgumentException(
+            key + " = " + value + " < min = " + min);
+      }
+    };
+  }
+
   static BiConsumer<String, Long> requireMax(long max) {
     return (key, value) -> {
       if (value > max) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
index 160cf44..25667a3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -24,6 +24,7 @@
  */
 public final class SizeInBytes {
   public static final SizeInBytes ONE_KB = valueOf("1k");
+  public static final SizeInBytes ONE_MB = valueOf("1m");
 
   public static SizeInBytes valueOf(long size) {
     final String s = String.valueOf(size);
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 892cc16..586f359 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -114,7 +114,7 @@
     SizeInBytes BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("64MB");
     static SizeInBytes byteLimit(RaftProperties properties) {
       return getSizeInBytes(properties::getSizeInBytes,
-          BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog());
+          BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog(), requireMinSizeInByte(SizeInBytes.ONE_MB));
     }
     static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
       setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index cda61df..0812c29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -44,32 +44,46 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 class PendingRequests {
   public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class);
 
+  private static final int ONE_MB = SizeInBytes.ONE_MB.getSizeInt();
+
+  /**
+   * Round up to the nearest MB.
+   */
+  static int roundUpMb(long bytes) {
+    return Math.toIntExact((bytes - 1) / ONE_MB + 1);
+  }
+
   static class Permit {}
 
   static class RequestLimits extends ResourceSemaphore.Group {
-    RequestLimits(int elementLimit, SizeInBytes byteLimit) {
-      super(elementLimit, byteLimit.getSizeInt());
+    RequestLimits(int elementLimit, int megabyteLimit) {
+      super(elementLimit, megabyteLimit);
     }
 
     int getElementCount() {
       return get(0).used();
     }
 
-    int getByteSize() {
+    int getMegaByteSize() {
       return get(1).used();
     }
 
-    ResourceSemaphore.ResourceAcquireStatus tryAcquire(Message message) {
-      return tryAcquire(1, Message.getSize(message));
+    ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
+      return tryAcquire(1, messageSizeMb);
     }
 
-    void release(Message message) {
-      release(1, Message.getSize(message));
+    void releaseExtraMb(int extraMb) {
+      release(0, extraMb);
+    }
+
+    void release(int diffMb) {
+      release(1, diffMb);
     }
   }
 
@@ -82,19 +96,24 @@
     private final Map<Permit, Permit> permits = new HashMap<>();
     /** Track and limit the number of requests and the total message size. */
     private final RequestLimits resource;
+    /** The size (in byte) of all the requests in this map. */
+    private final AtomicLong requestSize = new AtomicLong();
 
-    RequestMap(Object name, int elementLimit, SizeInBytes byteLimit, RaftServerMetricsImpl raftServerMetrics) {
+
+    RequestMap(Object name, int elementLimit, int megabyteLimit, RaftServerMetricsImpl raftServerMetrics) {
       this.name = name;
-      this.resource = new RequestLimits(elementLimit, byteLimit);
+      this.resource = new RequestLimits(elementLimit, megabyteLimit);
       this.raftServerMetrics = raftServerMetrics;
 
       raftServerMetrics.addNumPendingRequestsGauge(resource::getElementCount);
-      raftServerMetrics.addNumPendingRequestsByteSize(resource::getByteSize);
+      raftServerMetrics.addNumPendingRequestsMegaByteSize(resource::getMegaByteSize);
     }
 
     Permit tryAcquire(Message message) {
-      final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(message);
-      LOG.trace("tryAcquire? {}", acquired);
+      final int messageSize = Message.getSize(message);
+      final int messageSizeMb = roundUpMb(messageSize );
+      final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
+      LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
       if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
         raftServerMetrics.onRequestQueueLimitHit();
         raftServerMetrics.onResourceLimitHit();
@@ -104,6 +123,14 @@
         raftServerMetrics.onResourceLimitHit();
         return null;
       }
+
+      // release extra MB
+      final long oldSize = requestSize.getAndAdd(messageSize);
+      final long newSize = oldSize + messageSize;
+      final int diffMb = roundUpMb(newSize) - roundUpMb(oldSize);
+      if (messageSizeMb > diffMb) {
+        resource.releaseExtraMb(messageSizeMb - diffMb);
+      }
       return putPermit();
     }
 
@@ -140,8 +167,12 @@
       if (r == null) {
         return null;
       }
-      resource.release(r.getRequest().getMessage());
-      LOG.trace("release");
+      final int messageSize = Message.getSize(r.getRequest().getMessage());
+      final long oldSize = requestSize.getAndAdd(-messageSize);
+      final long newSize = oldSize - messageSize;
+      final int diffMb = roundUpMb(oldSize) - roundUpMb(newSize);
+      resource.release(diffMb);
+      LOG.trace("release {} MB", diffMb);
       return r;
     }
 
@@ -183,7 +214,9 @@
     this.name = id + "-" + JavaUtils.getClassSimpleName(getClass());
     this.pendingRequests = new RequestMap(id,
         RaftServerConfigKeys.Write.elementLimit(properties),
-        RaftServerConfigKeys.Write.byteLimit(properties),
+        Math.toIntExact(
+            RaftServerConfigKeys.Write.byteLimit(properties).getSize()
+                / SizeInBytes.ONE_MB.getSize()), //round down
         raftServerMetrics);
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index 28f2950..4a0a9f3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -61,7 +61,7 @@
   public static final String RESOURCE_LIMIT_HIT_COUNTER = "leaderNumResourceLimitHits";
   public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = "numRequestsByteSizeLimitHits";
   public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
-  public static final String REQUEST_BYTE_SIZE = "numPendingRequestByteSize";
+  public static final String REQUEST_MEGA_BYTE_SIZE = "numPendingRequestMegaByteSize";
   public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = "retryCacheEntryCount";
   public static final String RETRY_CACHE_HIT_COUNT_METRIC = "retryCacheHitCount";
   public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
@@ -221,12 +221,12 @@
     return registry.remove(REQUEST_QUEUE_SIZE);
   }
 
-  public void addNumPendingRequestsByteSize(Gauge byteSize) {
-    registry.gauge(REQUEST_BYTE_SIZE, () -> byteSize);
+  public void addNumPendingRequestsMegaByteSize(Gauge megabyteSize) {
+    registry.gauge(REQUEST_MEGA_BYTE_SIZE, () -> megabyteSize);
   }
 
   public boolean removeNumPendingRequestsByteSize() {
-    return registry.remove(REQUEST_BYTE_SIZE);
+    return registry.remove(REQUEST_MEGA_BYTE_SIZE);
   }
 
   public void onRequestByteSizeLimitHit() {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index b170309..71783e8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -22,7 +22,7 @@
 import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WATCH_REQUEST;
 import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WRITE_REQUEST;
 import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
-import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE_SIZE;
+import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_MEGA_BYTE_SIZE;
 import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
 import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RESOURCE_LIMIT_HIT_COUNTER;
 
@@ -210,7 +210,7 @@
   public void testRaftServerMetrics() throws Exception {
     final RaftProperties p = getProperties();
     RaftServerConfigKeys.Write.setElementLimit(p, 10);
-    RaftServerConfigKeys.Write.setByteLimit(p, SizeInBytes.valueOf(110));
+    RaftServerConfigKeys.Write.setByteLimit(p, SizeInBytes.valueOf("1MB"));
     try {
       runWithNewCluster(3, this::testRequestMetrics);
     } finally {
@@ -242,10 +242,9 @@
 
 
       final SortedMap<String, Gauge> gaugeMap = getRaftServerMetrics(cluster.getLeader())
-          .getRegistry().getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
+          .getRegistry().getGauges((s, metric) -> s.contains(
+              REQUEST_MEGA_BYTE_SIZE));
 
-      RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
-              300, 5000);
 
       for (int i = 0; i < 10; i++) {
         client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
@@ -259,11 +258,12 @@
 
       stateMachine.unblockFlushStateMachineData();
 
-      // Send a message with 120, our byte size limit is 110, so it should fail
+      // Send a message with 1025kb , our byte size limit is 1024kb (1mb) , so it should fail
       // and byte size counter limit will be hit.
 
       client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
-      client.async().send(new SimpleMessage(RandomStringUtils.random(120, true, false)));
+      client.async().send(new SimpleMessage(RandomStringUtils
+          .random(SizeInBytes.valueOf("1025kb").getSizeInt(), true, false)));
       clients.add(client);
 
       RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java b/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java
index 0b56bef..bb386e8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java
@@ -22,6 +22,7 @@
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,6 +37,10 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.setSizeInBytes;
+import static org.apache.ratis.server.RaftServerConfigKeys.Write.BYTE_LIMIT_KEY;
+
 /**
  * Test cases to verify RaftServerConfigKeys.
  */
@@ -95,4 +100,19 @@
     Assert.assertEquals(directories.size(), storageDirs.size());
     Assert.assertEquals(0, actualDirs.size());
   }
+
+  /**
+   * Sets the value to <code>raft.server.write.byte-limit</code> via
+   * RaftServerConfigKeys and also verifies the same via RaftServerConfigKeys.
+   */
+  @Test public void testPendingRequestSize() {
+    RaftProperties properties = new RaftProperties();
+    // setting to 4GB
+    setSizeInBytes(properties::set, BYTE_LIMIT_KEY, SizeInBytes.valueOf("4gb"),
+        requireMin(1L));
+    int pendingRequestMegabyteLimit = Math.toIntExact(
+        RaftServerConfigKeys.Write.byteLimit(properties).getSize()
+            / SizeInBytes.ONE_MB.getSize());
+    Assert.assertEquals(4096, pendingRequestMegabyteLimit);
+  }
 }
\ No newline at end of file