[improve][ml] Use lock-free queue in InflightReadsLimiter since there's no concurrent access  (#23962)

(cherry picked from commit 38a41e0d29192d0e29cc172cccf6c187cf7cb542)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index 1f4d2c2..1d5e0a3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -22,6 +22,7 @@
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.metrics.ObservableLongCounter;
 import io.prometheus.client.Gauge;
+import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +32,6 @@
 import org.apache.pulsar.opentelemetry.Constants;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization;
 import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
-import org.jctools.queues.SpscArrayQueue;
 
 @Slf4j
 public class InflightReadsLimiter implements AutoCloseable {
@@ -51,6 +51,7 @@
     public static final String INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME =
             "pulsar.broker.managed_ledger.inflight.read.usage";
     private final ObservableLongCounter inflightReadsUsageCounter;
+    private final int maxReadsInFlightAcquireQueueSize;
 
     @PulsarDeprecatedMetric(newMetricName = INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
     @Deprecated
@@ -82,9 +83,10 @@
         this.remainingBytes = maxReadsInFlightSize;
         this.acquireTimeoutMillis = acquireTimeoutMillis;
         this.timeOutExecutor = timeOutExecutor;
+        this.maxReadsInFlightAcquireQueueSize = maxReadsInFlightAcquireQueueSize;
         if (maxReadsInFlightSize > 0) {
             enabled = true;
-            this.queuedHandles = new SpscArrayQueue<>(maxReadsInFlightAcquireQueueSize);
+            this.queuedHandles = new ArrayDeque<>();
         } else {
             enabled = false;
             this.queuedHandles = null;
@@ -174,13 +176,14 @@
             updateMetrics();
             return Optional.of(new Handle(maxReadsInFlightSize, handle.creationTime, true));
         } else {
-            if (queuedHandles.offer(new QueuedHandle(handle, callback))) {
-                scheduleTimeOutCheck(acquireTimeoutMillis);
-                return Optional.empty();
-            } else {
+            if (queuedHandles.size() >= maxReadsInFlightAcquireQueueSize) {
                 log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}",
                         permits, handle.creationTime, remainingBytes);
                 return Optional.of(new Handle(0, handle.creationTime, false));
+            } else {
+                queuedHandles.offer(new QueuedHandle(handle, callback));
+                scheduleTimeOutCheck(acquireTimeoutMillis);
+                return Optional.empty();
             }
         }
     }