Route-segmented pool: fix timeout race; add direct hand-off; enforce TTL on lease; bound scans. (#575)

Off-thread discard with saturation fallback; tiny RR drainer (pending≄4); disposer min(cores,8).
No public API changes. JMH: OFFLOCK 50/20 collapse gone.
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java
index 4a18618..40dca76 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java
@@ -26,12 +26,32 @@
  */
 package org.apache.hc.core5.benchmark;
 
-
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
 
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.ModalCloseable;
@@ -40,7 +60,6 @@
 import org.apache.hc.core5.pool.ManagedConnPool;
 import org.apache.hc.core5.pool.PoolEntry;
 import org.apache.hc.core5.pool.PoolReusePolicy;
-import org.apache.hc.core5.pool.PoolStats;
 import org.apache.hc.core5.pool.RouteSegmentedConnPool;
 import org.apache.hc.core5.pool.StrictConnPool;
 import org.apache.hc.core5.util.TimeValue;
@@ -51,7 +70,6 @@
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
@@ -62,143 +80,245 @@
 import org.openjdk.jmh.annotations.Warmup;
 
 /**
- * Compare StrictConnPool, LaxConnPool, and RouteSegmentedConnPool (“OFFLOCK”)
- * under different contention patterns and slow-disposal rates.
+ * JMH harness that drives StrictConnPool, LaxConnPool, and RouteSegmentedConnPool
+ * against a local HTTP/1.1 mini-cluster using real sockets and keep-alive.
  */
-@BenchmarkMode({Mode.Throughput, Mode.SampleTime})
-@Warmup(iterations = 3, time = 2, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@BenchmarkMode({Mode.Throughput})
+@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
 @Fork(1)
-@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@OutputTimeUnit(TimeUnit.SECONDS)
 public class RoutePoolsJmh {
 
-    /**
-     * Minimal connection that can simulate slow close.
-     */
-    public static final class FakeConn implements ModalCloseable {
-        private final int closeDelayMs;
+    // ---------------------------------------------------------------
+    // Utilities
+    // ---------------------------------------------------------------
+    static ThreadFactory daemonFactory(final String prefix) {
+        final AtomicInteger n = new AtomicInteger(1);
+        return r -> {
+            final Thread t = new Thread(r, prefix + "-" + n.getAndIncrement());
+            t.setDaemon(true);
+            return t;
+        };
+    }
 
-        public FakeConn(final int closeDelayMs) {
+    // ---------------------------------------------------------------
+    // Real HTTP/1.1 persistent connection used by the pool
+    // ---------------------------------------------------------------
+    public static final class RealConn implements ModalCloseable {
+        private final String host;
+        private final int port;
+        private final int closeDelayMs;
+        private final Socket socket;
+        private final BufferedInputStream in;
+        private final BufferedOutputStream out;
+
+        public RealConn(
+                final String host,
+                final int port,
+                final int closeDelayMs,
+                final int soTimeoutMs,
+                final int connectTimeoutMs) throws IOException {
+            this.host = host;
+            this.port = port;
             this.closeDelayMs = closeDelayMs;
+            final Socket s = new Socket();
+            s.setTcpNoDelay(true);
+            s.setSoTimeout(Math.max(1000, soTimeoutMs)); // read timeout
+            s.setKeepAlive(true);
+            s.connect(new InetSocketAddress(host, port), Math.max(1, connectTimeoutMs));
+            this.socket = s;
+            this.in = new BufferedInputStream(s.getInputStream(), 32 * 1024);
+            this.out = new BufferedOutputStream(s.getOutputStream(), 32 * 1024);
+        }
+
+        public void getOnce(final boolean keepAlive) throws IOException {
+            final String req = "GET / HTTP/1.1\r\n" +
+                    "Host: " + host + ":" + port + "\r\n" +
+                    (keepAlive ? "Connection: keep-alive\r\n" : "Connection: close\r\n") +
+                    "\r\n";
+            out.write(req.getBytes(StandardCharsets.ISO_8859_1));
+            out.flush();
+
+            final String status = readLine();
+            if (status == null) {
+                throw new IOException("No status line");
+            }
+            final String[] parts = status.split(" ", 3);
+            if (parts.length < 2 || !parts[0].startsWith("HTTP/1.")) {
+                throw new IOException("Bad status: " + status);
+            }
+            final int code;
+            try {
+                code = Integer.parseInt(parts[1]);
+            } catch (final NumberFormatException nfe) {
+                throw new IOException("Bad status code in: " + status);
+            }
+            if (code != 200) {
+                throw new IOException("Unexpected status: " + status);
+            }
+
+            int contentLength = -1;
+            for (; ; ) {
+                final String line = readLine();
+                if (line == null) {
+                    throw new IOException("EOF in headers");
+                }
+                if (line.isEmpty()) {
+                    break;
+                }
+                final int colon = line.indexOf(':');
+                if (colon > 0) {
+                    final String name = line.substring(0, colon).trim();
+                    if ("Content-Length".equalsIgnoreCase(name)) {
+                        try {
+                            contentLength = Integer.parseInt(line.substring(colon + 1).trim());
+                        } catch (final NumberFormatException ignore) {
+                            // ignore
+                        }
+                    }
+                }
+            }
+            if (contentLength < 0) {
+                throw new IOException("Missing Content-Length");
+            }
+
+            int remaining = contentLength;
+            final byte[] buf = new byte[8192];
+            while (remaining > 0) {
+                final int r = in.read(buf, 0, Math.min(buf.length, remaining));
+                if (r == -1) {
+                    throw new IOException("unexpected EOF in body");
+                }
+                remaining -= r;
+            }
+        }
+
+        private String readLine() throws IOException {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
+            for (; ; ) {
+                final int b = in.read();
+                if (b == -1) {
+                    if (baos.size() == 0) {
+                        return null;
+                    }
+                    break;
+                }
+                if (b == '\n') {
+                    break;
+                }
+                baos.write(b);
+            }
+            final byte[] raw = baos.toByteArray();
+            final int len = raw.length;
+            final int eff = (len > 0 && raw[len - 1] == '\r') ? len - 1 : len;
+            return new String(raw, 0, eff, StandardCharsets.ISO_8859_1);
         }
 
         @Override
         public void close(final CloseMode closeMode) {
-            if (closeDelayMs <= 0) {
-                return;
+            if (closeDelayMs > 0) {
+                try {
+                    Thread.sleep(closeDelayMs);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
             }
             try {
-                Thread.sleep(closeDelayMs);
-            } catch (final InterruptedException ignore) {
-                Thread.currentThread().interrupt();
+                socket.close();
+            } catch (final IOException ignore) {
+                // ignore
             }
         }
 
         @Override
         public void close() throws IOException {
-
+            if (closeDelayMs > 0) {
+                try {
+                    Thread.sleep(closeDelayMs);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            socket.close();
         }
     }
 
-    /**
-     * All benchmark parameters & shared state live here (required by JMH).
-     */
+    // ---------------------------------------------------------------
+    // Benchmark state & setup
+    // ---------------------------------------------------------------
     @State(Scope.Benchmark)
     public static class BenchState {
-
-        /**
-         * Which pool to benchmark.
-         * STRICT  -> StrictConnPool
-         * LAX     -> LaxConnPool
-         * OFFLOCK -> RouteSegmentedConnPool
-         */
-        @Param({"STRICT", "LAX", "OFFLOCK"})
+        @Param({"OFFLOCK", "STRICT", "LAX"})
         public String policy;
-
-        /**
-         * Number of distinct routes to spread load across.
-         * 1 = hot single route; 10 = multi-route scenario.
-         */
-        @Param({"1", "10"})
+        @Param({"1", "4", "10", "25", "50"})
         public int routes;
-
-        /**
-         * Percent (0..100) of releases that will be non-reusable,
-         * triggering a discard (and thus a potentially slow close).
-         */
-        @Param({"0", "5", "20"})
-        public int slowClosePct;
-
-        /**
-         * Sleep (ms) when a connection is discarded (slow close path).
-         */
-        @Param({"0", "200"})
-        public int closeSleepMs;
-
-        /**
-         * Max total, default per-route — tuned to create contention.
-         */
-        @Param({"32"})
+        @Param({"128"})
+        public int payloadBytes;
+        @Param({"100"})
         public int maxTotal;
-        @Param({"8"})
+        @Param({"5"})
         public int defMaxPerRoute;
-
-        /**
-         * Keep-alive on reusable releases.
-         */
+        @Param({"true"})
+        public boolean keepAlive;
         @Param({"5000"})
         public int keepAliveMs;
+        @Param({"0", "20"})
+        public int slowClosePct;
+        @Param({"0", "200"})
+        public int closeSleepMs;
+        @Param({"10000"})
+        public int soTimeoutMs;
+        @Param({"30000"})
+        public int requestTimeoutMs;
+        @Param({"1000"})
+        public int connectTimeoutMs;
 
-        ManagedConnPool<String, FakeConn> pool;
+        ManagedConnPool<String, RealConn> pool;
+        DisposalCallback<RealConn> disposal;
+        MiniCluster cluster;
         String[] routeKeys;
-        DisposalCallback<FakeConn> disposal;
+        ScheduledExecutorService maint;
 
         @Setup(Level.Trial)
-        public void setUp() {
-            // routes list
-            routeKeys = new String[routes];
-            for (int i = 0; i < routes; i++) {
-                routeKeys[i] = "route-" + i;
-            }
-
+        public void setUp() throws Exception {
+            cluster = new MiniCluster(routes, payloadBytes);
+            routeKeys = cluster.routeKeys();
             disposal = (c, m) -> {
                 if (c != null) {
                     c.close(m);
                 }
             };
-
             final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND;
-
             switch (policy.toUpperCase(Locale.ROOT)) {
-                case "STRICT":
-                    pool = new StrictConnPool<>(
-                            defMaxPerRoute,
-                            maxTotal,
-                            ttl,
-                            PoolReusePolicy.LIFO,
-                            disposal,
-                            null);
+                case "STRICT": {
+                    pool = new StrictConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal, null);
                     break;
-                case "LAX":
-                    pool = new LaxConnPool<>(
-                            defMaxPerRoute,
-                            ttl,
-                            PoolReusePolicy.LIFO,
-                            disposal,
-                            null);
-                    pool.setMaxTotal(maxTotal);
+                }
+                case "LAX": {
+                    final LaxConnPool<String, RealConn> lax = new LaxConnPool<>(defMaxPerRoute, ttl, PoolReusePolicy.LIFO, disposal, null);
+                    lax.setMaxTotal(maxTotal);
+                    pool = lax;
                     break;
-                case "OFFLOCK":
-                    pool = new RouteSegmentedConnPool<>(
-                            defMaxPerRoute,
-                            maxTotal,
-                            ttl,
-                            PoolReusePolicy.LIFO,
-                            disposal);
+                }
+                case "OFFLOCK": {
+                    pool = new RouteSegmentedConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal);
                     break;
-                default:
+                }
+                default: {
                     throw new IllegalArgumentException("Unknown policy: " + policy);
+                }
             }
+            // Light periodic maintenance, close idle/expired like real clients do
+            maint = java.util.concurrent.Executors.newSingleThreadScheduledExecutor(daemonFactory("pool-maint"));
+            maint.scheduleAtFixedRate(() -> {
+                try {
+                    pool.closeIdle(TimeValue.ofSeconds(5));
+                    pool.closeExpired();
+                } catch (final Exception ignore) {
+                    // ignore in benchmark
+                }
+            }, 5, 5, TimeUnit.SECONDS);
         }
 
         @TearDown(Level.Trial)
@@ -206,6 +326,17 @@ public void tearDown() {
             if (pool != null) {
                 pool.close(CloseMode.IMMEDIATE);
             }
+            if (cluster != null) {
+                cluster.close();
+            }
+            if (maint != null) {
+                maint.shutdownNow();
+                try {
+                    maint.awaitTermination(5, TimeUnit.SECONDS);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
         }
 
         String pickRoute() {
@@ -214,47 +345,195 @@ String pickRoute() {
         }
 
         boolean shouldDiscard() {
-            if (slowClosePct <= 0) return false;
-            return ThreadLocalRandom.current().nextInt(100) < slowClosePct;
+            return slowClosePct > 0 && ThreadLocalRandom.current().nextInt(100) < slowClosePct;
         }
     }
 
-    /**
-     * Lease+release on a randomly chosen route.
-     * Mix of reusable and non-reusable releases (to trigger discard/close).
-     */
+    // ---------------------------------------------------------------
+    // Benchmark body
+    // ---------------------------------------------------------------
     @Benchmark
     @Threads(50)
-    public void leaseReleaseMixed(final BenchState s) throws Exception {
+    public void lease_io_release(final BenchState s) {
+        final String key = s.pickRoute();
+        final Future<PoolEntry<String, RealConn>> f = s.pool.lease(key, null, Timeout.DISABLED, null);
+        final PoolEntry<String, RealConn> e;
         try {
-            final Future<PoolEntry<String, FakeConn>> f = s.pool.lease(s.pickRoute(), null, Timeout.ofMilliseconds(500), null);
-            final PoolEntry<String, FakeConn> e = f.get(500, TimeUnit.MILLISECONDS);
-            if (!e.hasConnection()) e.assignConnection(new FakeConn(s.closeSleepMs));
-            final boolean reusable = !s.shouldDiscard();
-            if (reusable) {
-                e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs));
-                s.pool.release(e, true);
-            } else {
-                s.pool.release(e, false);
+            e = f.get(s.requestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (final TimeoutException te) {
+            // IMPORTANT: drop waiter on pools that queue
+            f.cancel(true);
+            return;
+        } catch (final ExecutionException ee) {
+            if (ee.getCause() instanceof TimeoutException) {
+                f.cancel(true);
             }
-        } catch (final IllegalStateException ignored) {
+            return;
+        } catch (final InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            return;
+        }
+        if (e == null) {
+            return; // defensive
+        }
 
+        RealConn c = e.getConnection();
+        if (c == null) {
+            // parse host:port defensively
+            final int colon = key.indexOf(':');
+            if (colon <= 0 || colon >= key.length() - 1) {
+                s.pool.release(e, false);
+                return;
+            }
+            final String host = key.substring(0, colon);
+            final int port;
+            try {
+                port = Integer.parseInt(key.substring(colon + 1));
+            } catch (final NumberFormatException nfe) {
+                s.pool.release(e, false);
+                return;
+            }
+            RealConn fresh = null;
+            try {
+                fresh = new RealConn(host, port, s.closeSleepMs, s.soTimeoutMs, s.connectTimeoutMs);
+                // Double-check before assigning to avoid races
+                final RealConn existing = e.getConnection();
+                if (existing == null) {
+                    try {
+                        e.assignConnection(fresh);
+                        c = fresh;
+                        fresh = null; // ownership transferred
+                    } catch (final IllegalStateException already) {
+                        // someone else assigned concurrently
+                        c = e.getConnection();
+                        if (c == null) {
+                            s.pool.release(e, false);
+                            try {
+                                fresh.close(CloseMode.IMMEDIATE);
+                            } catch (final Exception ignore) {
+                            }
+                            return;
+                        }
+                    }
+                } else {
+                    c = existing;
+                }
+            } catch (final IOException ioe) {
+                s.pool.release(e, false);
+                if (fresh != null) {
+                    try {
+                        fresh.close(CloseMode.IMMEDIATE);
+                    } catch (final Exception ignore) {
+                    }
+                }
+                return;
+            } finally {
+                if (fresh != null) { // we created but didn't assign -> close to avoid leak
+                    try {
+                        fresh.close(CloseMode.IMMEDIATE);
+                    } catch (final Exception ignore) {
+                    }
+                }
+            }
+        }
+
+        if (c == null) {
+            s.pool.release(e, false);
+            return;
+        }
+
+        try {
+            c.getOnce(s.keepAlive);
+        } catch (final IOException ioe) {
+            s.pool.release(e, false);
+            return;
+        }
+
+        final boolean reusable = s.keepAlive && !s.shouldDiscard();
+        if (reusable) {
+            e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs));
+            s.pool.release(e, true);
+        } else {
+            s.pool.release(e, false);
         }
     }
 
+    // ---------------------------------------------------------------
+    // Local HTTP mini-cluster
+    // ---------------------------------------------------------------
+    static final class MiniCluster {
+        private final List<HttpServer> servers = new ArrayList<>();
+        private final String[] keys;
+        private final byte[] body;
+        private final ExecutorService exec;
 
-    /**
-     * Optional stats probe to ensure the benchmark does "something".
-     * Not a measured benchmark; use only for sanity runs.
-     */
-    @Benchmark
-    @Threads(1)
-    @OperationsPerInvocation(1)
-    @BenchmarkMode(Mode.SingleShotTime)
-    public void statsProbe(final BenchState s, final org.openjdk.jmh.infra.Blackhole bh) {
-        final PoolStats stats = s.pool.getTotalStats();
-        bh.consume(stats.getAvailable());
-        bh.consume(stats.getLeased());
-        bh.consume(stats.getPending());
+        MiniCluster(final int n, final int payloadBytes) throws IOException {
+            this.keys = new String[n];
+            this.body = new byte[payloadBytes];
+            // Bounded, CPU-sized pool to keep the com.sun server in check
+            final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
+            final int coreThreads = Math.min(64, Math.max(cores, n * 2));
+            final int maxThreads = Math.min(128, Math.max(coreThreads, n * 4));
+            this.exec = new java.util.concurrent.ThreadPoolExecutor(
+                    coreThreads, maxThreads,
+                    60L, TimeUnit.SECONDS,
+                    new java.util.concurrent.LinkedBlockingQueue<>(2048),
+                    daemonFactory("mini-http"),
+                    new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
+            for (int i = 0; i < n; i++) {
+                final InetSocketAddress bind = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+                final HttpServer s = HttpServer.create(bind, 4096);
+                s.createContext("/", new FixedHandler(body));
+                s.setExecutor(exec);
+                s.start();
+                servers.add(s);
+                keys[i] = "127.0.0.1:" + s.getAddress().getPort();
+            }
+        }
+
+        String[] routeKeys() {
+            return keys;
+        }
+
+        void close() {
+            for (final HttpServer s : servers) {
+                try {
+                    s.stop(0);
+                } catch (final Exception ignore) {
+                }
+            }
+            exec.shutdownNow();
+            try {
+                exec.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    static final class FixedHandler implements HttpHandler {
+        private final byte[] body;
+
+        FixedHandler(final byte[] body) {
+            this.body = body;
+        }
+
+        @Override
+        public void handle(final HttpExchange ex) throws IOException {
+            try (InputStream in = ex.getRequestBody()) {
+                final byte[] buf = new byte[1024];
+                while (in.read(buf) != -1) {
+                    // drain
+                }
+            }
+            ex.getResponseHeaders().set("Content-Type", "text/plain; charset=US-ASCII");
+            ex.sendResponseHeaders(200, body.length);
+            try (OutputStream os = ex.getResponseBody()) {
+                if (body.length > 0) {
+                    os.write(body);
+                }
+                os.flush();
+            }
+        }
     }
 }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
index 0fc2bd0..d49744a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
@@ -27,6 +27,8 @@
 package org.apache.hc.core5.pool;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -38,8 +40,12 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,15 +62,15 @@
 import org.apache.hc.core5.util.Timeout;
 
 /**
- * Lock-free, route-segmented connection pool.
+ * Lock-free, route-segmented connection pool with tiny, conditional round-robin assistance.
  *
- * <p>This implementation keeps per-route state in independent segments and avoids
- * holding a global lock while disposing of connections. Under slow closes
- * (for example TLS shutdown or OS-level socket stalls), threads leasing
- * connections on other routes are not blocked by disposal work.</p>
+ * <p>Per-route state is kept in independent segments. Disposal of connections is offloaded
+ * to a bounded executor so slow closes do not block threads leasing on other routes.
+ * A minimal round-robin drainer is engaged only when there are many pending routes and
+ * there is global headroom; it never scans all routes.</p>
  *
  * @param <R> route key type
- * @param <C> connection type (must be {@link org.apache.hc.core5.io.ModalCloseable})
+ * @param <C> connection type (must be {@link ModalCloseable})
  * @see ManagedConnPool
  * @see PoolReusePolicy
  * @see DisposalCallback
@@ -74,6 +80,10 @@
 @Experimental
 public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implements ManagedConnPool<R, C> {
 
+    // Tiny RR assist: only engage when there are many distinct routes waiting and there is headroom.
+    private static final int RR_MIN_PENDING_ROUTES = 12;
+    private static final int RR_BUDGET = 64;
+
     private final PoolReusePolicy reusePolicy;
     private final TimeValue timeToLive;
     private final DisposalCallback<C> disposal;
@@ -89,6 +99,17 @@ public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implement
 
     private final ScheduledExecutorService timeouts;
 
+    /**
+     * Dedicated executor for asynchronous, best-effort disposal.
+     * Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread.
+     */
+    private final ThreadPoolExecutor disposer;
+
+    // Minimal fair round-robin over routes with waiters (no global scans).
+    private final ConcurrentLinkedQueue<R> pendingQueue = new ConcurrentLinkedQueue<>();
+    private final AtomicBoolean draining = new AtomicBoolean(false);
+    private final AtomicInteger pendingRouteCount = new AtomicInteger(0);
+
     public RouteSegmentedConnPool(
             final int defaultMaxPerRoute,
             final int maxTotal,
@@ -108,12 +129,29 @@ public RouteSegmentedConnPool(
             return t;
         };
         this.timeouts = Executors.newSingleThreadScheduledExecutor(tf);
+
+        // Asynchronous disposer for slow GRACEFUL closes.
+        final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
+        final int nThreads = Math.min(8, Math.max(2, cores)); // allow up to 8 on bigger boxes
+        final int qsize = 1024;
+        final ThreadFactory df = r -> {
+            final Thread t = new Thread(r, "seg-pool-disposer");
+            t.setDaemon(true);
+            return t;
+        };
+        this.disposer = new ThreadPoolExecutor(
+                nThreads, nThreads,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(qsize),
+                df,
+                new ThreadPoolExecutor.AbortPolicy()); // but we preflight capacity to avoid exception storms
     }
 
     final class Segment {
         final ConcurrentLinkedDeque<PoolEntry<R, C>> available = new ConcurrentLinkedDeque<>();
-        final ConcurrentLinkedQueue<Waiter> waiters = new ConcurrentLinkedQueue<>();
+        final ConcurrentLinkedDeque<Waiter> waiters = new ConcurrentLinkedDeque<>();
         final AtomicInteger allocated = new AtomicInteger(0);
+        final AtomicBoolean enqueued = new AtomicBoolean(false);
 
         int limitPerRoute(final R route) {
             final Integer v = maxPerRoute.get(route);
@@ -122,13 +160,18 @@ int limitPerRoute(final R route) {
     }
 
     final class Waiter extends CompletableFuture<PoolEntry<R, C>> {
+        final R route;
         final Timeout requestTimeout;
         final Object state;
         volatile boolean cancelled;
+        volatile ScheduledFuture<?> timeoutTask;
 
-        Waiter(final Timeout t, final Object s) {
+        Waiter(final R route, final Timeout t, final Object s) {
+            this.route = route;
             this.requestTimeout = t != null ? t : Timeout.DISABLED;
             this.state = s;
+            this.cancelled = false;
+            this.timeoutTask = null;
         }
     }
 
@@ -142,6 +185,7 @@ public Future<PoolEntry<R, C>> lease(
         ensureOpen();
         final Segment seg = segments.computeIfAbsent(route, r -> new Segment());
 
+        // 1) Try available
         PoolEntry<R, C> hit;
         for (; ; ) {
             hit = pollAvailable(seg, state);
@@ -162,40 +206,46 @@ public Future<PoolEntry<R, C>> lease(
             return CompletableFuture.completedFuture(hit);
         }
 
-        for (; ; ) {
-            final int tot = totalAllocated.get();
-            if (tot >= maxTotal.get()) {
-                break;
+        // 2) Try to allocate new within caps
+        if (tryAllocateOne(route, seg)) {
+            final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
+            if (callback != null) {
+                callback.completed(entry);
             }
-            if (totalAllocated.compareAndSet(tot, tot + 1)) {
-                for (; ; ) {
-                    final int per = seg.allocated.get();
-                    if (per >= seg.limitPerRoute(route)) {
-                        totalAllocated.decrementAndGet();
-                        break;
-                    }
-                    if (seg.allocated.compareAndSet(per, per + 1)) {
-                        final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
-                        if (callback != null) {
-                            callback.completed(entry);
-                        }
-                        return CompletableFuture.completedFuture(entry);
-                    }
-                }
-                break;
-            }
+            return CompletableFuture.completedFuture(entry);
         }
 
-        final Waiter w = new Waiter(requestTimeout, state);
-        seg.waiters.add(w);
+        // 3) Enqueue waiter with timeout
+        final Waiter w = new Waiter(route, requestTimeout, state);
+        seg.waiters.addLast(w);
+        enqueueIfNeeded(route, seg);
 
+        // Late hit after enqueuing
         final PoolEntry<R, C> late = pollAvailable(seg, state);
-        if (late != null && seg.waiters.remove(w)) {
-            if (callback != null) {
-                callback.completed(late);
+        if (late != null) {
+            if (seg.waiters.remove(w)) {
+                cancelTimeout(w);
+                if (callback != null) {
+                    callback.completed(late);
+                }
+                w.complete(late);
+                dequeueIfDrained(seg);
+                return w;
+            } else {
+                boolean handedOff = false;
+                for (Waiter other; (other = seg.waiters.pollFirst()) != null; ) {
+                    if (!other.cancelled && compatible(other.state, late.getState())) {
+                        cancelTimeout(other);
+                        handedOff = other.complete(late);
+                        if (handedOff) {
+                            break;
+                        }
+                    }
+                }
+                if (!handedOff) {
+                    offerAvailable(seg, late);
+                }
             }
-            w.complete(late);
-            return w;
         }
 
         scheduleTimeout(w, seg);
@@ -209,6 +259,8 @@ public Future<PoolEntry<R, C>> lease(
                 }
             });
         }
+
+        triggerDrainIfMany();
         return w;
     }
 
@@ -220,7 +272,8 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
         final R route = entry.getRoute();
         final Segment seg = segments.get(route);
         if (seg == null) {
-            entry.discardConnection(CloseMode.GRACEFUL);
+            // Segment got removed; dispose off-thread and bail.
+            discardEntry(entry, CloseMode.GRACEFUL);
             return;
         }
 
@@ -228,21 +281,11 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
         final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now);
 
         if (stillValid) {
-            for (; ; ) {
-                final Waiter w = seg.waiters.poll();
-                if (w == null) {
-                    break;
-                }
-                if (w.cancelled) {
-                    continue;
-                }
-                if (compatible(w.state, entry.getState())) {
-                    if (w.complete(entry)) {
-                        return;
-                    }
-                }
+            if (!handOffToCompatibleWaiter(entry, seg)) {
+                offerAvailable(seg, entry);
+                enqueueIfNeeded(route, seg);
+                triggerDrainIfMany();
             }
-            offerAvailable(seg, entry);
         } else {
             discardAndDecr(entry, CloseMode.GRACEFUL);
         }
@@ -266,15 +309,19 @@ public void close(final CloseMode closeMode) {
         for (final Map.Entry<R, Segment> e : segments.entrySet()) {
             final Segment seg = e.getValue();
 
-            // cancel waiters
             for (final Waiter w : seg.waiters) {
                 w.cancelled = true;
+                cancelTimeout(w);
                 w.completeExceptionally(new TimeoutException("Pool closed"));
             }
             seg.waiters.clear();
+            if (seg.enqueued.getAndSet(false)) {
+                pendingRouteCount.decrementAndGet();
+            }
 
+            // discard available
             for (final PoolEntry<R, C> p : seg.available) {
-                p.discardConnection(orImmediate(closeMode));
+                discardEntry(p, closeMode);
             }
             seg.available.clear();
 
@@ -284,6 +331,11 @@ public void close(final CloseMode closeMode) {
             }
         }
         segments.clear();
+        pendingQueue.clear();
+        pendingRouteCount.set(0);
+
+        // Let in-flight graceful closes progress; no blocking here.
+        disposer.shutdown();
     }
 
     @Override
@@ -418,42 +470,40 @@ private boolean isPastTtl(final PoolEntry<R, C> p) {
         if (timeToLive == null || timeToLive.getDuration() < 0) {
             return false;
         }
-        return (System.currentTimeMillis() - p.getCreated()) >= timeToLive.toMilliseconds();
+        return System.currentTimeMillis() - p.getCreated() >= timeToLive.toMilliseconds();
     }
 
-    private void scheduleTimeout(
-            final Waiter w,
-            final Segment seg) {
-
+    private void scheduleTimeout(final Waiter w, final Segment seg) {
         if (!TimeValue.isPositive(w.requestTimeout)) {
             return;
         }
-        timeouts.schedule(() -> {
+        w.timeoutTask = timeouts.schedule(() -> {
             if (w.isDone()) {
                 return;
             }
             w.cancelled = true;
-            final TimeoutException tex = new TimeoutException("Lease timed out");
-            w.completeExceptionally(tex);
+            seg.waiters.remove(w);
+            w.completeExceptionally(new TimeoutException("Lease timed out"));
+            dequeueIfDrained(seg);
+            maybeCleanupSegment(w.route, seg);
 
             final PoolEntry<R, C> p = pollAvailable(seg, w.state);
             if (p != null) {
-                boolean handedOff = false;
-                for (Waiter other; (other = seg.waiters.poll()) != null; ) {
-                    if (!other.cancelled && compatible(other.state, p.getState())) {
-                        handedOff = other.complete(p);
-                        if (handedOff) {
-                            break;
-                        }
-                    }
-                }
-                if (!handedOff) {
+                // Try to hand off that available entry to some other compatible waiter.
+                if (!handOffToCompatibleWaiter(p, seg)) {
                     offerAvailable(seg, p);
                 }
             }
         }, w.requestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
     }
 
+    private void cancelTimeout(final Waiter w) {
+        final ScheduledFuture<?> t = w.timeoutTask;
+        if (t != null) {
+            t.cancel(false);
+        }
+    }
+
     private void offerAvailable(final Segment seg, final PoolEntry<R, C> p) {
         if (reusePolicy == PoolReusePolicy.LIFO) {
             seg.available.addFirst(p);
@@ -463,6 +513,9 @@ private void offerAvailable(final Segment seg, final PoolEntry<R, C> p) {
     }
 
     private PoolEntry<R, C> pollAvailable(final Segment seg, final Object neededState) {
+        if (neededState == null) {
+            return seg.available.pollFirst();
+        }
         for (final Iterator<PoolEntry<R, C>> it = seg.available.iterator(); it.hasNext(); ) {
             final PoolEntry<R, C> p = it.next();
             if (compatible(neededState, p.getState())) {
@@ -477,13 +530,42 @@ private boolean compatible(final Object needed, final Object have) {
         return needed == null || Objects.equals(needed, have);
     }
 
+    private boolean handOffToCompatibleWaiter(final PoolEntry<R, C> entry, final Segment seg) {
+        final Deque<Waiter> skipped = new ArrayDeque<>();
+        boolean handedOff = false;
+        for (; ; ) {
+            final Waiter w = seg.waiters.pollFirst();
+            if (w == null) {
+                break;
+            }
+            if (w.cancelled || w.isDone()) {
+                continue;
+            }
+            if (compatible(w.state, entry.getState())) {
+                cancelTimeout(w);
+                handedOff = w.complete(entry);
+                if (handedOff) {
+                    dequeueIfDrained(seg);
+                    break;
+                }
+            } else {
+                skipped.addLast(w);
+            }
+        }
+        // Restore non-compatible waiters to the head to preserve ordering.
+        while (!skipped.isEmpty()) {
+            seg.waiters.addFirst(skipped.pollLast());
+        }
+        return handedOff;
+    }
+
     private void discardAndDecr(final PoolEntry<R, C> p, final CloseMode mode) {
-        p.discardConnection(orImmediate(mode));
         totalAllocated.decrementAndGet();
         final Segment seg = segments.get(p.getRoute());
         if (seg != null) {
             seg.allocated.decrementAndGet();
         }
+        discardEntry(p, mode);
     }
 
     private CloseMode orImmediate(final CloseMode m) {
@@ -493,6 +575,143 @@ private CloseMode orImmediate(final CloseMode m) {
     private void maybeCleanupSegment(final R route, final Segment seg) {
         if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) {
             segments.remove(route, seg);
+            if (seg.enqueued.getAndSet(false)) {
+                pendingRouteCount.decrementAndGet();
+            }
+        }
+    }
+
+    private boolean tryAllocateOne(final R route, final Segment seg) {
+        for (; ; ) {
+            final int tot = totalAllocated.get();
+            if (tot >= maxTotal.get()) {
+                return false;
+            }
+            if (!totalAllocated.compareAndSet(tot, tot + 1)) {
+                continue;
+            }
+            for (; ; ) {
+                final int per = seg.allocated.get();
+                if (per >= seg.limitPerRoute(route)) {
+                    totalAllocated.decrementAndGet();
+                    return false;
+                }
+                if (seg.allocated.compareAndSet(per, per + 1)) {
+                    return true;
+                }
+            }
+        }
+    }
+
+    private void enqueueIfNeeded(final R route, final Segment seg) {
+        if (seg.enqueued.compareAndSet(false, true)) {
+            pendingQueue.offer(route);
+            pendingRouteCount.incrementAndGet();
+        }
+    }
+
+    private void dequeueIfDrained(final Segment seg) {
+        if (seg.waiters.isEmpty() && seg.enqueued.getAndSet(false)) {
+            pendingRouteCount.decrementAndGet();
+        }
+    }
+
+    private void triggerDrainIfMany() {
+        // Engage RR only if there is global headroom and many distinct routes pending
+        if (pendingRouteCount.get() < RR_MIN_PENDING_ROUTES) {
+            return;
+        }
+        if (totalAllocated.get() >= maxTotal.get()) {
+            return;
+        }
+        if (!draining.compareAndSet(false, true)) {
+            return;
+        }
+        disposer.execute(() -> {
+            try {
+                serveRoundRobin(RR_BUDGET);
+            } finally {
+                draining.set(false);
+                if (pendingRouteCount.get() >= RR_MIN_PENDING_ROUTES
+                        && totalAllocated.get() < maxTotal.get()
+                        && !pendingQueue.isEmpty()) {
+                    triggerDrainIfMany();
+                }
+            }
+        });
+    }
+
+    private void serveRoundRobin(final int budget) {
+        int created = 0;
+        for (; created < budget; ) {
+            final R route = pendingQueue.poll();
+            if (route == null) {
+                break;
+            }
+            final Segment seg = segments.get(route);
+            if (seg == null) {
+                continue;
+            }
+            if (seg.waiters.isEmpty()) {
+                if (seg.enqueued.getAndSet(false)) {
+                    pendingRouteCount.decrementAndGet();
+                }
+                continue;
+            }
+
+            if (!tryAllocateOne(route, seg)) {
+                // No headroom or hit per-route cap. Re-queue for later.
+                pendingQueue.offer(route);
+                continue;
+            }
+
+            final Waiter w = seg.waiters.pollFirst();
+            if (w == null || w.cancelled) {
+                seg.allocated.decrementAndGet();
+                totalAllocated.decrementAndGet();
+            } else {
+                final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
+                cancelTimeout(w);
+                w.complete(entry);
+                created++;
+            }
+
+            if (!seg.waiters.isEmpty()) {
+                pendingQueue.offer(route);
+            } else {
+                if (seg.enqueued.getAndSet(false)) {
+                    pendingRouteCount.decrementAndGet();
+                }
+            }
+        }
+    }
+
+    /**
+     * Dispose a pool entry's connection asynchronously if possible; under pressure fall back to IMMEDIATE on caller.
+     */
+    private void discardEntry(final PoolEntry<R, C> p, final CloseMode preferred) {
+        final CloseMode mode = orImmediate(preferred);
+        // Pre-flight capacity to avoid exception storms under saturation
+        if (disposer.isShutdown()) {
+            p.discardConnection(CloseMode.IMMEDIATE);
+            return;
+        }
+        final LinkedBlockingQueue<Runnable> q = (LinkedBlockingQueue<Runnable>) disposer.getQueue();
+        if (q.remainingCapacity() == 0) {
+            p.discardConnection(CloseMode.IMMEDIATE);
+            return;
+        }
+        try {
+            disposer.execute(() -> {
+                try {
+                    p.discardConnection(mode);
+                } catch (final RuntimeException ignore) {
+                    // best-effort
+                }
+            });
+        } catch (final RejectedExecutionException saturated) {
+            // Saturated or shutting down: never block caller
+            p.discardConnection(CloseMode.IMMEDIATE);
         }
     }
 }
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java
index 7d6b81b..ce4b919 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java
@@ -39,10 +39,12 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.ModalCloseable;
@@ -208,7 +210,6 @@ void poolCloseCancelsWaitersAndDrainsAvailable() throws Exception {
         assertEquals("Pool closed", ex.getCause().getMessage());
     }
 
-
     @Test
     void reusePolicyLifoVsFifoIsObservable() throws Exception {
         final RouteSegmentedConnPool<String, FakeConnection> lifo =
@@ -250,9 +251,14 @@ void reusePolicyLifoVsFifoIsObservable() throws Exception {
     @Test
     void disposalIsCalledOnDiscard() throws Exception {
         final List<FakeConnection> closed = new ArrayList<>();
+        final CountDownLatch disposed = new CountDownLatch(1);
         final DisposalCallback<FakeConnection> disposal = (c, m) -> {
-            c.close(m);
-            closed.add(c);
+            try {
+                c.close(m);
+            } finally {
+                closed.add(c);
+                disposed.countDown();
+            }
         };
         final RouteSegmentedConnPool<String, FakeConnection> pool =
                 newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
@@ -261,6 +267,9 @@ void disposalIsCalledOnDiscard() throws Exception {
         final FakeConnection conn = new FakeConnection();
         e.assignConnection(conn);
         pool.release(e, false);
+
+        // Wait for async disposer to run
+        assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time");
         assertEquals(1, closed.size());
         assertEquals(1, closed.get(0).closeCount());
         pool.close(CloseMode.IMMEDIATE);
@@ -268,23 +277,37 @@ void disposalIsCalledOnDiscard() throws Exception {
 
     @Test
     void slowDisposalDoesNotBlockOtherRoutes() throws Exception {
-        final DisposalCallback<FakeConnection> disposal = FakeConnection::close;
+        final CountDownLatch disposed = new CountDownLatch(1);
+        final AtomicLong closedAt = new AtomicLong(0L);
+        final DisposalCallback<FakeConnection> disposal = (c, m) -> {
+            try {
+                c.close(m); // FakeConnection sleeps closeDelayMs internally
+            } finally {
+                closedAt.set(System.nanoTime());
+                disposed.countDown();
+            }
+        };
         final RouteSegmentedConnPool<String, FakeConnection> pool =
                 newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
 
         final PoolEntry<String, FakeConnection> e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
-        e1.assignConnection(new FakeConnection(600));
-        final long startDiscard = System.nanoTime();
-        pool.release(e1, false);
+        e1.assignConnection(new FakeConnection(600)); // close sleeps ~600ms
 
+        final long startDiscard = System.nanoTime();
+        pool.release(e1, false); // triggers async disposal
+
+        // Lease on another route must not be blocked by slow disposal
         final long t0 = System.nanoTime();
         final PoolEntry<String, FakeConnection> e2 = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
         final long tLeaseMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
         assertTrue(tLeaseMs < 200, "Other route lease blocked by disposal: " + tLeaseMs + "ms");
 
         pool.release(e2, false);
-        final long discardMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startDiscard);
-        assertTrue(discardMs >= 600, "Discard should reflect slow close path");
+
+        // Wait for disposer to finish, then assert the slow path really took ~600ms
+        assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time");
+        final long discardMs = TimeUnit.NANOSECONDS.toMillis(closedAt.get() - startDiscard);
+        assertTrue(discardMs >= 600, "Discard should reflect slow close path (took " + discardMs + "ms)");
 
         pool.close(CloseMode.IMMEDIATE);
     }
@@ -296,40 +319,43 @@ void getRoutesCoversAllocatedAvailableAndWaiters() throws Exception {
 
         assertTrue(pool.getRoutes().isEmpty(), "Initially there should be no routes");
 
+        // Allocate on rA
         final PoolEntry<String, FakeConnection> a =
                 pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
         assertEquals(new HashSet<String>(Collections.singletonList("rA")), pool.getRoutes(),
                 "rA must be listed because it is leased (allocated > 0)");
 
+        // Make rA available
         a.assignConnection(new FakeConnection());
         a.updateExpiry(TimeValue.ofSeconds(30));
         pool.release(a, true);
         assertEquals(new HashSet<>(Collections.singletonList("rA")), pool.getRoutes(),
                 "rA must be listed because it has AVAILABLE entries");
 
+        // Enqueue waiter on rB (will time out)
         final Future<PoolEntry<String, FakeConnection>> waiterB =
-                pool.lease("rB", null, Timeout.ofMilliseconds(300), null); // enqueues immediately
+                pool.lease("rB", null, Timeout.ofMilliseconds(300), null);
         final Set<String> routesNow = pool.getRoutes();
         assertTrue(routesNow.contains("rA") && routesNow.contains("rB"),
                 "Both rA (available) and rB (waiter) must be listed");
 
-        final PoolEntry<String, FakeConnection> a2 =
-                pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
-        pool.release(a2, false); // discard
-        final Set<String> afterDropA = pool.getRoutes();
-        assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up");
-        assertTrue(afterDropA.contains("rB"), "rB (waiter) should remain listed");
-
+        // Let rB time out (do NOT free capacity before the timeout fires)
         final ExecutionException ex = assertThrows(
                 ExecutionException.class,
                 () -> waiterB.get(600, TimeUnit.MILLISECONDS));
         assertInstanceOf(TimeoutException.class, ex.getCause());
         assertEquals("Lease timed out", ex.getCause().getMessage());
 
-        // Final cleanup: after close everything is cleared
+        // Now drain rA by leasing and discarding to trigger segment cleanup
+        final PoolEntry<String, FakeConnection> a2 =
+                pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
+        pool.release(a2, false); // discard
+        final Set<String> afterDropA = pool.getRoutes();
+        assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up");
+        assertFalse(afterDropA.contains("rB"), "rB waiter timed out; should not remain listed");
+
+        // Final cleanup
         pool.close(CloseMode.IMMEDIATE);
         assertTrue(pool.getRoutes().isEmpty(), "All routes must be gone after close()");
     }
-
-
 }