Route-segmented pool: fix timeout race; add direct hand-off; enforce TTL on lease; bound scans. (#575)
Off-thread discard with saturation fallback; tiny RR drainer (pendingā„4); disposer min(cores,8).
No public API changes. JMH: OFFLOCK 50/20 collapse gone.
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java
index 4a18618..40dca76 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/RoutePoolsJmh.java
@@ -26,12 +26,32 @@
*/
package org.apache.hc.core5.benchmark;
-
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Locale;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
@@ -40,7 +60,6 @@
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.pool.PoolReusePolicy;
-import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.pool.RouteSegmentedConnPool;
import org.apache.hc.core5.pool.StrictConnPool;
import org.apache.hc.core5.util.TimeValue;
@@ -51,7 +70,6 @@
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
@@ -62,143 +80,245 @@
import org.openjdk.jmh.annotations.Warmup;
/**
- * Compare StrictConnPool, LaxConnPool, and RouteSegmentedConnPool (“OFFLOCK”)
- * under different contention patterns and slow-disposal rates.
+ * JMH harness that drives StrictConnPool, LaxConnPool, and RouteSegmentedConnPool
+ * against a local HTTP/1.1 mini-cluster using real sockets and keep-alive.
*/
-@BenchmarkMode({Mode.Throughput, Mode.SampleTime})
-@Warmup(iterations = 3, time = 2, timeUnit = TimeUnit.SECONDS)
-@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@BenchmarkMode({Mode.Throughput})
+@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(1)
-@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@OutputTimeUnit(TimeUnit.SECONDS)
public class RoutePoolsJmh {
- /**
- * Minimal connection that can simulate slow close.
- */
- public static final class FakeConn implements ModalCloseable {
- private final int closeDelayMs;
+ // ---------------------------------------------------------------
+ // Utilities
+ // ---------------------------------------------------------------
+ static ThreadFactory daemonFactory(final String prefix) {
+ final AtomicInteger n = new AtomicInteger(1);
+ return r -> {
+ final Thread t = new Thread(r, prefix + "-" + n.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ };
+ }
- public FakeConn(final int closeDelayMs) {
+ // ---------------------------------------------------------------
+ // Real HTTP/1.1 persistent connection used by the pool
+ // ---------------------------------------------------------------
+ public static final class RealConn implements ModalCloseable {
+ private final String host;
+ private final int port;
+ private final int closeDelayMs;
+ private final Socket socket;
+ private final BufferedInputStream in;
+ private final BufferedOutputStream out;
+
+ public RealConn(
+ final String host,
+ final int port,
+ final int closeDelayMs,
+ final int soTimeoutMs,
+ final int connectTimeoutMs) throws IOException {
+ this.host = host;
+ this.port = port;
this.closeDelayMs = closeDelayMs;
+ final Socket s = new Socket();
+ s.setTcpNoDelay(true);
+ s.setSoTimeout(Math.max(1000, soTimeoutMs)); // read timeout
+ s.setKeepAlive(true);
+ s.connect(new InetSocketAddress(host, port), Math.max(1, connectTimeoutMs));
+ this.socket = s;
+ this.in = new BufferedInputStream(s.getInputStream(), 32 * 1024);
+ this.out = new BufferedOutputStream(s.getOutputStream(), 32 * 1024);
+ }
+
+ public void getOnce(final boolean keepAlive) throws IOException {
+ final String req = "GET / HTTP/1.1\r\n" +
+ "Host: " + host + ":" + port + "\r\n" +
+ (keepAlive ? "Connection: keep-alive\r\n" : "Connection: close\r\n") +
+ "\r\n";
+ out.write(req.getBytes(StandardCharsets.ISO_8859_1));
+ out.flush();
+
+ final String status = readLine();
+ if (status == null) {
+ throw new IOException("No status line");
+ }
+ final String[] parts = status.split(" ", 3);
+ if (parts.length < 2 || !parts[0].startsWith("HTTP/1.")) {
+ throw new IOException("Bad status: " + status);
+ }
+ final int code;
+ try {
+ code = Integer.parseInt(parts[1]);
+ } catch (final NumberFormatException nfe) {
+ throw new IOException("Bad status code in: " + status);
+ }
+ if (code != 200) {
+ throw new IOException("Unexpected status: " + status);
+ }
+
+ int contentLength = -1;
+ for (; ; ) {
+ final String line = readLine();
+ if (line == null) {
+ throw new IOException("EOF in headers");
+ }
+ if (line.isEmpty()) {
+ break;
+ }
+ final int colon = line.indexOf(':');
+ if (colon > 0) {
+ final String name = line.substring(0, colon).trim();
+ if ("Content-Length".equalsIgnoreCase(name)) {
+ try {
+ contentLength = Integer.parseInt(line.substring(colon + 1).trim());
+ } catch (final NumberFormatException ignore) {
+ // ignore
+ }
+ }
+ }
+ }
+ if (contentLength < 0) {
+ throw new IOException("Missing Content-Length");
+ }
+
+ int remaining = contentLength;
+ final byte[] buf = new byte[8192];
+ while (remaining > 0) {
+ final int r = in.read(buf, 0, Math.min(buf.length, remaining));
+ if (r == -1) {
+ throw new IOException("unexpected EOF in body");
+ }
+ remaining -= r;
+ }
+ }
+
+ private String readLine() throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
+ for (; ; ) {
+ final int b = in.read();
+ if (b == -1) {
+ if (baos.size() == 0) {
+ return null;
+ }
+ break;
+ }
+ if (b == '\n') {
+ break;
+ }
+ baos.write(b);
+ }
+ final byte[] raw = baos.toByteArray();
+ final int len = raw.length;
+ final int eff = (len > 0 && raw[len - 1] == '\r') ? len - 1 : len;
+ return new String(raw, 0, eff, StandardCharsets.ISO_8859_1);
}
@Override
public void close(final CloseMode closeMode) {
- if (closeDelayMs <= 0) {
- return;
+ if (closeDelayMs > 0) {
+ try {
+ Thread.sleep(closeDelayMs);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
}
try {
- Thread.sleep(closeDelayMs);
- } catch (final InterruptedException ignore) {
- Thread.currentThread().interrupt();
+ socket.close();
+ } catch (final IOException ignore) {
+ // ignore
}
}
@Override
public void close() throws IOException {
-
+ if (closeDelayMs > 0) {
+ try {
+ Thread.sleep(closeDelayMs);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ socket.close();
}
}
- /**
- * All benchmark parameters & shared state live here (required by JMH).
- */
+ // ---------------------------------------------------------------
+ // Benchmark state & setup
+ // ---------------------------------------------------------------
@State(Scope.Benchmark)
public static class BenchState {
-
- /**
- * Which pool to benchmark.
- * STRICT -> StrictConnPool
- * LAX -> LaxConnPool
- * OFFLOCK -> RouteSegmentedConnPool
- */
- @Param({"STRICT", "LAX", "OFFLOCK"})
+ @Param({"OFFLOCK", "STRICT", "LAX"})
public String policy;
-
- /**
- * Number of distinct routes to spread load across.
- * 1 = hot single route; 10 = multi-route scenario.
- */
- @Param({"1", "10"})
+ @Param({"1", "4", "10", "25", "50"})
public int routes;
-
- /**
- * Percent (0..100) of releases that will be non-reusable,
- * triggering a discard (and thus a potentially slow close).
- */
- @Param({"0", "5", "20"})
- public int slowClosePct;
-
- /**
- * Sleep (ms) when a connection is discarded (slow close path).
- */
- @Param({"0", "200"})
- public int closeSleepMs;
-
- /**
- * Max total, default per-route — tuned to create contention.
- */
- @Param({"32"})
+ @Param({"128"})
+ public int payloadBytes;
+ @Param({"100"})
public int maxTotal;
- @Param({"8"})
+ @Param({"5"})
public int defMaxPerRoute;
-
- /**
- * Keep-alive on reusable releases.
- */
+ @Param({"true"})
+ public boolean keepAlive;
@Param({"5000"})
public int keepAliveMs;
+ @Param({"0", "20"})
+ public int slowClosePct;
+ @Param({"0", "200"})
+ public int closeSleepMs;
+ @Param({"10000"})
+ public int soTimeoutMs;
+ @Param({"30000"})
+ public int requestTimeoutMs;
+ @Param({"1000"})
+ public int connectTimeoutMs;
- ManagedConnPool<String, FakeConn> pool;
+ ManagedConnPool<String, RealConn> pool;
+ DisposalCallback<RealConn> disposal;
+ MiniCluster cluster;
String[] routeKeys;
- DisposalCallback<FakeConn> disposal;
+ ScheduledExecutorService maint;
@Setup(Level.Trial)
- public void setUp() {
- // routes list
- routeKeys = new String[routes];
- for (int i = 0; i < routes; i++) {
- routeKeys[i] = "route-" + i;
- }
-
+ public void setUp() throws Exception {
+ cluster = new MiniCluster(routes, payloadBytes);
+ routeKeys = cluster.routeKeys();
disposal = (c, m) -> {
if (c != null) {
c.close(m);
}
};
-
final TimeValue ttl = TimeValue.NEG_ONE_MILLISECOND;
-
switch (policy.toUpperCase(Locale.ROOT)) {
- case "STRICT":
- pool = new StrictConnPool<>(
- defMaxPerRoute,
- maxTotal,
- ttl,
- PoolReusePolicy.LIFO,
- disposal,
- null);
+ case "STRICT": {
+ pool = new StrictConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal, null);
break;
- case "LAX":
- pool = new LaxConnPool<>(
- defMaxPerRoute,
- ttl,
- PoolReusePolicy.LIFO,
- disposal,
- null);
- pool.setMaxTotal(maxTotal);
+ }
+ case "LAX": {
+ final LaxConnPool<String, RealConn> lax = new LaxConnPool<>(defMaxPerRoute, ttl, PoolReusePolicy.LIFO, disposal, null);
+ lax.setMaxTotal(maxTotal);
+ pool = lax;
break;
- case "OFFLOCK":
- pool = new RouteSegmentedConnPool<>(
- defMaxPerRoute,
- maxTotal,
- ttl,
- PoolReusePolicy.LIFO,
- disposal);
+ }
+ case "OFFLOCK": {
+ pool = new RouteSegmentedConnPool<>(defMaxPerRoute, maxTotal, ttl, PoolReusePolicy.LIFO, disposal);
break;
- default:
+ }
+ default: {
throw new IllegalArgumentException("Unknown policy: " + policy);
+ }
}
+ // Light periodic maintenance, close idle/expired like real clients do
+ maint = java.util.concurrent.Executors.newSingleThreadScheduledExecutor(daemonFactory("pool-maint"));
+ maint.scheduleAtFixedRate(() -> {
+ try {
+ pool.closeIdle(TimeValue.ofSeconds(5));
+ pool.closeExpired();
+ } catch (final Exception ignore) {
+ // ignore in benchmark
+ }
+ }, 5, 5, TimeUnit.SECONDS);
}
@TearDown(Level.Trial)
@@ -206,6 +326,17 @@ public void tearDown() {
if (pool != null) {
pool.close(CloseMode.IMMEDIATE);
}
+ if (cluster != null) {
+ cluster.close();
+ }
+ if (maint != null) {
+ maint.shutdownNow();
+ try {
+ maint.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
String pickRoute() {
@@ -214,47 +345,195 @@ String pickRoute() {
}
boolean shouldDiscard() {
- if (slowClosePct <= 0) return false;
- return ThreadLocalRandom.current().nextInt(100) < slowClosePct;
+ return slowClosePct > 0 && ThreadLocalRandom.current().nextInt(100) < slowClosePct;
}
}
- /**
- * Lease+release on a randomly chosen route.
- * Mix of reusable and non-reusable releases (to trigger discard/close).
- */
+ // ---------------------------------------------------------------
+ // Benchmark body
+ // ---------------------------------------------------------------
@Benchmark
@Threads(50)
- public void leaseReleaseMixed(final BenchState s) throws Exception {
+ public void lease_io_release(final BenchState s) {
+ final String key = s.pickRoute();
+ final Future<PoolEntry<String, RealConn>> f = s.pool.lease(key, null, Timeout.DISABLED, null);
+ final PoolEntry<String, RealConn> e;
try {
- final Future<PoolEntry<String, FakeConn>> f = s.pool.lease(s.pickRoute(), null, Timeout.ofMilliseconds(500), null);
- final PoolEntry<String, FakeConn> e = f.get(500, TimeUnit.MILLISECONDS);
- if (!e.hasConnection()) e.assignConnection(new FakeConn(s.closeSleepMs));
- final boolean reusable = !s.shouldDiscard();
- if (reusable) {
- e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs));
- s.pool.release(e, true);
- } else {
- s.pool.release(e, false);
+ e = f.get(s.requestTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException te) {
+ // IMPORTANT: drop waiter on pools that queue
+ f.cancel(true);
+ return;
+ } catch (final ExecutionException ee) {
+ if (ee.getCause() instanceof TimeoutException) {
+ f.cancel(true);
}
- } catch (final IllegalStateException ignored) {
+ return;
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (e == null) {
+ return; // defensive
+ }
+ RealConn c = e.getConnection();
+ if (c == null) {
+ // parse host:port defensively
+ final int colon = key.indexOf(':');
+ if (colon <= 0 || colon >= key.length() - 1) {
+ s.pool.release(e, false);
+ return;
+ }
+ final String host = key.substring(0, colon);
+ final int port;
+ try {
+ port = Integer.parseInt(key.substring(colon + 1));
+ } catch (final NumberFormatException nfe) {
+ s.pool.release(e, false);
+ return;
+ }
+ RealConn fresh = null;
+ try {
+ fresh = new RealConn(host, port, s.closeSleepMs, s.soTimeoutMs, s.connectTimeoutMs);
+ // Double-check before assigning to avoid races
+ final RealConn existing = e.getConnection();
+ if (existing == null) {
+ try {
+ e.assignConnection(fresh);
+ c = fresh;
+ fresh = null; // ownership transferred
+ } catch (final IllegalStateException already) {
+ // someone else assigned concurrently
+ c = e.getConnection();
+ if (c == null) {
+ s.pool.release(e, false);
+ try {
+ fresh.close(CloseMode.IMMEDIATE);
+ } catch (final Exception ignore) {
+ }
+ return;
+ }
+ }
+ } else {
+ c = existing;
+ }
+ } catch (final IOException ioe) {
+ s.pool.release(e, false);
+ if (fresh != null) {
+ try {
+ fresh.close(CloseMode.IMMEDIATE);
+ } catch (final Exception ignore) {
+ }
+ }
+ return;
+ } finally {
+ if (fresh != null) { // we created but didn't assign -> close to avoid leak
+ try {
+ fresh.close(CloseMode.IMMEDIATE);
+ } catch (final Exception ignore) {
+ }
+ }
+ }
+ }
+
+ if (c == null) {
+ s.pool.release(e, false);
+ return;
+ }
+
+ try {
+ c.getOnce(s.keepAlive);
+ } catch (final IOException ioe) {
+ s.pool.release(e, false);
+ return;
+ }
+
+ final boolean reusable = s.keepAlive && !s.shouldDiscard();
+ if (reusable) {
+ e.updateExpiry(TimeValue.ofMilliseconds(s.keepAliveMs));
+ s.pool.release(e, true);
+ } else {
+ s.pool.release(e, false);
}
}
+ // ---------------------------------------------------------------
+ // Local HTTP mini-cluster
+ // ---------------------------------------------------------------
+ static final class MiniCluster {
+ private final List<HttpServer> servers = new ArrayList<>();
+ private final String[] keys;
+ private final byte[] body;
+ private final ExecutorService exec;
- /**
- * Optional stats probe to ensure the benchmark does "something".
- * Not a measured benchmark; use only for sanity runs.
- */
- @Benchmark
- @Threads(1)
- @OperationsPerInvocation(1)
- @BenchmarkMode(Mode.SingleShotTime)
- public void statsProbe(final BenchState s, final org.openjdk.jmh.infra.Blackhole bh) {
- final PoolStats stats = s.pool.getTotalStats();
- bh.consume(stats.getAvailable());
- bh.consume(stats.getLeased());
- bh.consume(stats.getPending());
+ MiniCluster(final int n, final int payloadBytes) throws IOException {
+ this.keys = new String[n];
+ this.body = new byte[payloadBytes];
+ // Bounded, CPU-sized pool to keep the com.sun server in check
+ final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
+ final int coreThreads = Math.min(64, Math.max(cores, n * 2));
+ final int maxThreads = Math.min(128, Math.max(coreThreads, n * 4));
+ this.exec = new java.util.concurrent.ThreadPoolExecutor(
+ coreThreads, maxThreads,
+ 60L, TimeUnit.SECONDS,
+ new java.util.concurrent.LinkedBlockingQueue<>(2048),
+ daemonFactory("mini-http"),
+ new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
+ for (int i = 0; i < n; i++) {
+ final InetSocketAddress bind = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+ final HttpServer s = HttpServer.create(bind, 4096);
+ s.createContext("/", new FixedHandler(body));
+ s.setExecutor(exec);
+ s.start();
+ servers.add(s);
+ keys[i] = "127.0.0.1:" + s.getAddress().getPort();
+ }
+ }
+
+ String[] routeKeys() {
+ return keys;
+ }
+
+ void close() {
+ for (final HttpServer s : servers) {
+ try {
+ s.stop(0);
+ } catch (final Exception ignore) {
+ }
+ }
+ exec.shutdownNow();
+ try {
+ exec.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ static final class FixedHandler implements HttpHandler {
+ private final byte[] body;
+
+ FixedHandler(final byte[] body) {
+ this.body = body;
+ }
+
+ @Override
+ public void handle(final HttpExchange ex) throws IOException {
+ try (InputStream in = ex.getRequestBody()) {
+ final byte[] buf = new byte[1024];
+ while (in.read(buf) != -1) {
+ // drain
+ }
+ }
+ ex.getResponseHeaders().set("Content-Type", "text/plain; charset=US-ASCII");
+ ex.sendResponseHeaders(200, body.length);
+ try (OutputStream os = ex.getResponseBody()) {
+ if (body.length > 0) {
+ os.write(body);
+ }
+ os.flush();
+ }
+ }
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
index 0fc2bd0..d49744a 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java
@@ -27,6 +27,8 @@
package org.apache.hc.core5.pool;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -38,8 +40,12 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,15 +62,15 @@
import org.apache.hc.core5.util.Timeout;
/**
- * Lock-free, route-segmented connection pool.
+ * Lock-free, route-segmented connection pool with tiny, conditional round-robin assistance.
*
- * <p>This implementation keeps per-route state in independent segments and avoids
- * holding a global lock while disposing of connections. Under slow closes
- * (for example TLS shutdown or OS-level socket stalls), threads leasing
- * connections on other routes are not blocked by disposal work.</p>
+ * <p>Per-route state is kept in independent segments. Disposal of connections is offloaded
+ * to a bounded executor so slow closes do not block threads leasing on other routes.
+ * A minimal round-robin drainer is engaged only when there are many pending routes and
+ * there is global headroom; it never scans all routes.</p>
*
* @param <R> route key type
- * @param <C> connection type (must be {@link org.apache.hc.core5.io.ModalCloseable})
+ * @param <C> connection type (must be {@link ModalCloseable})
* @see ManagedConnPool
* @see PoolReusePolicy
* @see DisposalCallback
@@ -74,6 +80,10 @@
@Experimental
public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implements ManagedConnPool<R, C> {
+ // Tiny RR assist: only engage when there are many distinct routes waiting and there is headroom.
+ private static final int RR_MIN_PENDING_ROUTES = 12;
+ private static final int RR_BUDGET = 64;
+
private final PoolReusePolicy reusePolicy;
private final TimeValue timeToLive;
private final DisposalCallback<C> disposal;
@@ -89,6 +99,17 @@ public final class RouteSegmentedConnPool<R, C extends ModalCloseable> implement
private final ScheduledExecutorService timeouts;
+ /**
+ * Dedicated executor for asynchronous, best-effort disposal.
+ * Bounded queue; on saturation we fall back to IMMEDIATE close on the caller thread.
+ */
+ private final ThreadPoolExecutor disposer;
+
+ // Minimal fair round-robin over routes with waiters (no global scans).
+ private final ConcurrentLinkedQueue<R> pendingQueue = new ConcurrentLinkedQueue<>();
+ private final AtomicBoolean draining = new AtomicBoolean(false);
+ private final AtomicInteger pendingRouteCount = new AtomicInteger(0);
+
public RouteSegmentedConnPool(
final int defaultMaxPerRoute,
final int maxTotal,
@@ -108,12 +129,29 @@ public RouteSegmentedConnPool(
return t;
};
this.timeouts = Executors.newSingleThreadScheduledExecutor(tf);
+
+ // Asynchronous disposer for slow GRACEFUL closes.
+ final int cores = Math.max(2, Runtime.getRuntime().availableProcessors());
+ final int nThreads = Math.min(8, Math.max(2, cores)); // allow up to 8 on bigger boxes
+ final int qsize = 1024;
+ final ThreadFactory df = r -> {
+ final Thread t = new Thread(r, "seg-pool-disposer");
+ t.setDaemon(true);
+ return t;
+ };
+ this.disposer = new ThreadPoolExecutor(
+ nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(qsize),
+ df,
+ new ThreadPoolExecutor.AbortPolicy()); // but we preflight capacity to avoid exception storms
}
final class Segment {
final ConcurrentLinkedDeque<PoolEntry<R, C>> available = new ConcurrentLinkedDeque<>();
- final ConcurrentLinkedQueue<Waiter> waiters = new ConcurrentLinkedQueue<>();
+ final ConcurrentLinkedDeque<Waiter> waiters = new ConcurrentLinkedDeque<>();
final AtomicInteger allocated = new AtomicInteger(0);
+ final AtomicBoolean enqueued = new AtomicBoolean(false);
int limitPerRoute(final R route) {
final Integer v = maxPerRoute.get(route);
@@ -122,13 +160,18 @@ int limitPerRoute(final R route) {
}
final class Waiter extends CompletableFuture<PoolEntry<R, C>> {
+ final R route;
final Timeout requestTimeout;
final Object state;
volatile boolean cancelled;
+ volatile ScheduledFuture<?> timeoutTask;
- Waiter(final Timeout t, final Object s) {
+ Waiter(final R route, final Timeout t, final Object s) {
+ this.route = route;
this.requestTimeout = t != null ? t : Timeout.DISABLED;
this.state = s;
+ this.cancelled = false;
+ this.timeoutTask = null;
}
}
@@ -142,6 +185,7 @@ public Future<PoolEntry<R, C>> lease(
ensureOpen();
final Segment seg = segments.computeIfAbsent(route, r -> new Segment());
+ // 1) Try available
PoolEntry<R, C> hit;
for (; ; ) {
hit = pollAvailable(seg, state);
@@ -162,40 +206,46 @@ public Future<PoolEntry<R, C>> lease(
return CompletableFuture.completedFuture(hit);
}
- for (; ; ) {
- final int tot = totalAllocated.get();
- if (tot >= maxTotal.get()) {
- break;
+ // 2) Try to allocate new within caps
+ if (tryAllocateOne(route, seg)) {
+ final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
+ if (callback != null) {
+ callback.completed(entry);
}
- if (totalAllocated.compareAndSet(tot, tot + 1)) {
- for (; ; ) {
- final int per = seg.allocated.get();
- if (per >= seg.limitPerRoute(route)) {
- totalAllocated.decrementAndGet();
- break;
- }
- if (seg.allocated.compareAndSet(per, per + 1)) {
- final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
- if (callback != null) {
- callback.completed(entry);
- }
- return CompletableFuture.completedFuture(entry);
- }
- }
- break;
- }
+ return CompletableFuture.completedFuture(entry);
}
- final Waiter w = new Waiter(requestTimeout, state);
- seg.waiters.add(w);
+ // 3) Enqueue waiter with timeout
+ final Waiter w = new Waiter(route, requestTimeout, state);
+ seg.waiters.addLast(w);
+ enqueueIfNeeded(route, seg);
+ // Late hit after enqueuing
final PoolEntry<R, C> late = pollAvailable(seg, state);
- if (late != null && seg.waiters.remove(w)) {
- if (callback != null) {
- callback.completed(late);
+ if (late != null) {
+ if (seg.waiters.remove(w)) {
+ cancelTimeout(w);
+ if (callback != null) {
+ callback.completed(late);
+ }
+ w.complete(late);
+ dequeueIfDrained(seg);
+ return w;
+ } else {
+ boolean handedOff = false;
+ for (Waiter other; (other = seg.waiters.pollFirst()) != null; ) {
+ if (!other.cancelled && compatible(other.state, late.getState())) {
+ cancelTimeout(other);
+ handedOff = other.complete(late);
+ if (handedOff) {
+ break;
+ }
+ }
+ }
+ if (!handedOff) {
+ offerAvailable(seg, late);
+ }
}
- w.complete(late);
- return w;
}
scheduleTimeout(w, seg);
@@ -209,6 +259,8 @@ public Future<PoolEntry<R, C>> lease(
}
});
}
+
+ triggerDrainIfMany();
return w;
}
@@ -220,7 +272,8 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
final R route = entry.getRoute();
final Segment seg = segments.get(route);
if (seg == null) {
- entry.discardConnection(CloseMode.GRACEFUL);
+ // Segment got removed; dispose off-thread and bail.
+ discardEntry(entry, CloseMode.GRACEFUL);
return;
}
@@ -228,21 +281,11 @@ public void release(final PoolEntry<R, C> entry, final boolean reusable) {
final boolean stillValid = reusable && !isPastTtl(entry) && !entry.getExpiryDeadline().isBefore(now);
if (stillValid) {
- for (; ; ) {
- final Waiter w = seg.waiters.poll();
- if (w == null) {
- break;
- }
- if (w.cancelled) {
- continue;
- }
- if (compatible(w.state, entry.getState())) {
- if (w.complete(entry)) {
- return;
- }
- }
+ if (!handOffToCompatibleWaiter(entry, seg)) {
+ offerAvailable(seg, entry);
+ enqueueIfNeeded(route, seg);
+ triggerDrainIfMany();
}
- offerAvailable(seg, entry);
} else {
discardAndDecr(entry, CloseMode.GRACEFUL);
}
@@ -266,15 +309,19 @@ public void close(final CloseMode closeMode) {
for (final Map.Entry<R, Segment> e : segments.entrySet()) {
final Segment seg = e.getValue();
- // cancel waiters
for (final Waiter w : seg.waiters) {
w.cancelled = true;
+ cancelTimeout(w);
w.completeExceptionally(new TimeoutException("Pool closed"));
}
seg.waiters.clear();
+ if (seg.enqueued.getAndSet(false)) {
+ pendingRouteCount.decrementAndGet();
+ }
+ // discard available
for (final PoolEntry<R, C> p : seg.available) {
- p.discardConnection(orImmediate(closeMode));
+ discardEntry(p, closeMode);
}
seg.available.clear();
@@ -284,6 +331,11 @@ public void close(final CloseMode closeMode) {
}
}
segments.clear();
+ pendingQueue.clear();
+ pendingRouteCount.set(0);
+
+ // Let in-flight graceful closes progress; no blocking here.
+ disposer.shutdown();
}
@Override
@@ -418,42 +470,40 @@ private boolean isPastTtl(final PoolEntry<R, C> p) {
if (timeToLive == null || timeToLive.getDuration() < 0) {
return false;
}
- return (System.currentTimeMillis() - p.getCreated()) >= timeToLive.toMilliseconds();
+ return System.currentTimeMillis() - p.getCreated() >= timeToLive.toMilliseconds();
}
- private void scheduleTimeout(
- final Waiter w,
- final Segment seg) {
-
+ private void scheduleTimeout(final Waiter w, final Segment seg) {
if (!TimeValue.isPositive(w.requestTimeout)) {
return;
}
- timeouts.schedule(() -> {
+ w.timeoutTask = timeouts.schedule(() -> {
if (w.isDone()) {
return;
}
w.cancelled = true;
- final TimeoutException tex = new TimeoutException("Lease timed out");
- w.completeExceptionally(tex);
+ seg.waiters.remove(w);
+ w.completeExceptionally(new TimeoutException("Lease timed out"));
+ dequeueIfDrained(seg);
+ maybeCleanupSegment(w.route, seg);
final PoolEntry<R, C> p = pollAvailable(seg, w.state);
if (p != null) {
- boolean handedOff = false;
- for (Waiter other; (other = seg.waiters.poll()) != null; ) {
- if (!other.cancelled && compatible(other.state, p.getState())) {
- handedOff = other.complete(p);
- if (handedOff) {
- break;
- }
- }
- }
- if (!handedOff) {
+ // Try to hand off that available entry to some other compatible waiter.
+ if (!handOffToCompatibleWaiter(p, seg)) {
offerAvailable(seg, p);
}
}
}, w.requestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
+ private void cancelTimeout(final Waiter w) {
+ final ScheduledFuture<?> t = w.timeoutTask;
+ if (t != null) {
+ t.cancel(false);
+ }
+ }
+
private void offerAvailable(final Segment seg, final PoolEntry<R, C> p) {
if (reusePolicy == PoolReusePolicy.LIFO) {
seg.available.addFirst(p);
@@ -463,6 +513,9 @@ private void offerAvailable(final Segment seg, final PoolEntry<R, C> p) {
}
private PoolEntry<R, C> pollAvailable(final Segment seg, final Object neededState) {
+ if (neededState == null) {
+ return seg.available.pollFirst();
+ }
for (final Iterator<PoolEntry<R, C>> it = seg.available.iterator(); it.hasNext(); ) {
final PoolEntry<R, C> p = it.next();
if (compatible(neededState, p.getState())) {
@@ -477,13 +530,42 @@ private boolean compatible(final Object needed, final Object have) {
return needed == null || Objects.equals(needed, have);
}
+ private boolean handOffToCompatibleWaiter(final PoolEntry<R, C> entry, final Segment seg) {
+ final Deque<Waiter> skipped = new ArrayDeque<>();
+ boolean handedOff = false;
+ for (; ; ) {
+ final Waiter w = seg.waiters.pollFirst();
+ if (w == null) {
+ break;
+ }
+ if (w.cancelled || w.isDone()) {
+ continue;
+ }
+ if (compatible(w.state, entry.getState())) {
+ cancelTimeout(w);
+ handedOff = w.complete(entry);
+ if (handedOff) {
+ dequeueIfDrained(seg);
+ break;
+ }
+ } else {
+ skipped.addLast(w);
+ }
+ }
+ // Restore non-compatible waiters to the head to preserve ordering.
+ while (!skipped.isEmpty()) {
+ seg.waiters.addFirst(skipped.pollLast());
+ }
+ return handedOff;
+ }
+
private void discardAndDecr(final PoolEntry<R, C> p, final CloseMode mode) {
- p.discardConnection(orImmediate(mode));
totalAllocated.decrementAndGet();
final Segment seg = segments.get(p.getRoute());
if (seg != null) {
seg.allocated.decrementAndGet();
}
+ discardEntry(p, mode);
}
private CloseMode orImmediate(final CloseMode m) {
@@ -493,6 +575,143 @@ private CloseMode orImmediate(final CloseMode m) {
private void maybeCleanupSegment(final R route, final Segment seg) {
if (seg.allocated.get() == 0 && seg.available.isEmpty() && seg.waiters.isEmpty()) {
segments.remove(route, seg);
+ if (seg.enqueued.getAndSet(false)) {
+ pendingRouteCount.decrementAndGet();
+ }
+ }
+ }
+
+ private boolean tryAllocateOne(final R route, final Segment seg) {
+ for (; ; ) {
+ final int tot = totalAllocated.get();
+ if (tot >= maxTotal.get()) {
+ return false;
+ }
+ if (!totalAllocated.compareAndSet(tot, tot + 1)) {
+ continue;
+ }
+ for (; ; ) {
+ final int per = seg.allocated.get();
+ if (per >= seg.limitPerRoute(route)) {
+ totalAllocated.decrementAndGet();
+ return false;
+ }
+ if (seg.allocated.compareAndSet(per, per + 1)) {
+ return true;
+ }
+ }
+ }
+ }
+
+ private void enqueueIfNeeded(final R route, final Segment seg) {
+ if (seg.enqueued.compareAndSet(false, true)) {
+ pendingQueue.offer(route);
+ pendingRouteCount.incrementAndGet();
+ }
+ }
+
+ private void dequeueIfDrained(final Segment seg) {
+ if (seg.waiters.isEmpty() && seg.enqueued.getAndSet(false)) {
+ pendingRouteCount.decrementAndGet();
+ }
+ }
+
+ private void triggerDrainIfMany() {
+ // Engage RR only if there is global headroom and many distinct routes pending
+ if (pendingRouteCount.get() < RR_MIN_PENDING_ROUTES) {
+ return;
+ }
+ if (totalAllocated.get() >= maxTotal.get()) {
+ return;
+ }
+ if (!draining.compareAndSet(false, true)) {
+ return;
+ }
+ disposer.execute(() -> {
+ try {
+ serveRoundRobin(RR_BUDGET);
+ } finally {
+ draining.set(false);
+ if (pendingRouteCount.get() >= RR_MIN_PENDING_ROUTES
+ && totalAllocated.get() < maxTotal.get()
+ && !pendingQueue.isEmpty()) {
+ triggerDrainIfMany();
+ }
+ }
+ });
+ }
+
+ private void serveRoundRobin(final int budget) {
+ int created = 0;
+ for (; created < budget; ) {
+ final R route = pendingQueue.poll();
+ if (route == null) {
+ break;
+ }
+ final Segment seg = segments.get(route);
+ if (seg == null) {
+ continue;
+ }
+ if (seg.waiters.isEmpty()) {
+ if (seg.enqueued.getAndSet(false)) {
+ pendingRouteCount.decrementAndGet();
+ }
+ continue;
+ }
+
+ if (!tryAllocateOne(route, seg)) {
+ // No headroom or hit per-route cap. Re-queue for later.
+ pendingQueue.offer(route);
+ continue;
+ }
+
+ final Waiter w = seg.waiters.pollFirst();
+ if (w == null || w.cancelled) {
+ seg.allocated.decrementAndGet();
+ totalAllocated.decrementAndGet();
+ } else {
+ final PoolEntry<R, C> entry = new PoolEntry<>(route, timeToLive, disposal);
+ cancelTimeout(w);
+ w.complete(entry);
+ created++;
+ }
+
+ if (!seg.waiters.isEmpty()) {
+ pendingQueue.offer(route);
+ } else {
+ if (seg.enqueued.getAndSet(false)) {
+ pendingRouteCount.decrementAndGet();
+ }
+ }
+ }
+ }
+
+ /**
+ * Dispose a pool entry's connection asynchronously if possible; under pressure fall back to IMMEDIATE on caller.
+ */
+ private void discardEntry(final PoolEntry<R, C> p, final CloseMode preferred) {
+ final CloseMode mode = orImmediate(preferred);
+ // Pre-flight capacity to avoid exception storms under saturation
+ if (disposer.isShutdown()) {
+ p.discardConnection(CloseMode.IMMEDIATE);
+ return;
+ }
+ final LinkedBlockingQueue<Runnable> q = (LinkedBlockingQueue<Runnable>) disposer.getQueue();
+ if (q.remainingCapacity() == 0) {
+ p.discardConnection(CloseMode.IMMEDIATE);
+ return;
+ }
+ try {
+ disposer.execute(() -> {
+ try {
+ p.discardConnection(mode);
+ } catch (final RuntimeException ignore) {
+ // best-effort
+ }
+ });
+ } catch (final RejectedExecutionException saturated) {
+ // Saturated or shutting down: never block caller
+ p.discardConnection(CloseMode.IMMEDIATE);
}
}
}
diff --git a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java
index 7d6b81b..ce4b919 100644
--- a/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java
+++ b/httpcore5/src/test/java/org/apache/hc/core5/pool/RouteSegmentedConnPoolTest.java
@@ -39,10 +39,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
@@ -208,7 +210,6 @@ void poolCloseCancelsWaitersAndDrainsAvailable() throws Exception {
assertEquals("Pool closed", ex.getCause().getMessage());
}
-
@Test
void reusePolicyLifoVsFifoIsObservable() throws Exception {
final RouteSegmentedConnPool<String, FakeConnection> lifo =
@@ -250,9 +251,14 @@ void reusePolicyLifoVsFifoIsObservable() throws Exception {
@Test
void disposalIsCalledOnDiscard() throws Exception {
final List<FakeConnection> closed = new ArrayList<>();
+ final CountDownLatch disposed = new CountDownLatch(1);
final DisposalCallback<FakeConnection> disposal = (c, m) -> {
- c.close(m);
- closed.add(c);
+ try {
+ c.close(m);
+ } finally {
+ closed.add(c);
+ disposed.countDown();
+ }
};
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(1, 1, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
@@ -261,6 +267,9 @@ void disposalIsCalledOnDiscard() throws Exception {
final FakeConnection conn = new FakeConnection();
e.assignConnection(conn);
pool.release(e, false);
+
+ // Wait for async disposer to run
+ assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time");
assertEquals(1, closed.size());
assertEquals(1, closed.get(0).closeCount());
pool.close(CloseMode.IMMEDIATE);
@@ -268,23 +277,37 @@ void disposalIsCalledOnDiscard() throws Exception {
@Test
void slowDisposalDoesNotBlockOtherRoutes() throws Exception {
- final DisposalCallback<FakeConnection> disposal = FakeConnection::close;
+ final CountDownLatch disposed = new CountDownLatch(1);
+ final AtomicLong closedAt = new AtomicLong(0L);
+ final DisposalCallback<FakeConnection> disposal = (c, m) -> {
+ try {
+ c.close(m); // FakeConnection sleeps closeDelayMs internally
+ } finally {
+ closedAt.set(System.nanoTime());
+ disposed.countDown();
+ }
+ };
final RouteSegmentedConnPool<String, FakeConnection> pool =
newPool(2, 2, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, disposal);
final PoolEntry<String, FakeConnection> e1 = pool.lease("r1", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
- e1.assignConnection(new FakeConnection(600));
- final long startDiscard = System.nanoTime();
- pool.release(e1, false);
+ e1.assignConnection(new FakeConnection(600)); // close sleeps ~600ms
+ final long startDiscard = System.nanoTime();
+ pool.release(e1, false); // triggers async disposal
+
+ // Lease on another route must not be blocked by slow disposal
final long t0 = System.nanoTime();
final PoolEntry<String, FakeConnection> e2 = pool.lease("r2", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
final long tLeaseMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
assertTrue(tLeaseMs < 200, "Other route lease blocked by disposal: " + tLeaseMs + "ms");
pool.release(e2, false);
- final long discardMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startDiscard);
- assertTrue(discardMs >= 600, "Discard should reflect slow close path");
+
+ // Wait for disposer to finish, then assert the slow path really took ~600ms
+ assertTrue(disposed.await(2, TimeUnit.SECONDS), "Disposal did not complete in time");
+ final long discardMs = TimeUnit.NANOSECONDS.toMillis(closedAt.get() - startDiscard);
+ assertTrue(discardMs >= 600, "Discard should reflect slow close path (took " + discardMs + "ms)");
pool.close(CloseMode.IMMEDIATE);
}
@@ -296,40 +319,43 @@ void getRoutesCoversAllocatedAvailableAndWaiters() throws Exception {
assertTrue(pool.getRoutes().isEmpty(), "Initially there should be no routes");
+ // Allocate on rA
final PoolEntry<String, FakeConnection> a =
pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
assertEquals(new HashSet<String>(Collections.singletonList("rA")), pool.getRoutes(),
"rA must be listed because it is leased (allocated > 0)");
+ // Make rA available
a.assignConnection(new FakeConnection());
a.updateExpiry(TimeValue.ofSeconds(30));
pool.release(a, true);
assertEquals(new HashSet<>(Collections.singletonList("rA")), pool.getRoutes(),
"rA must be listed because it has AVAILABLE entries");
+ // Enqueue waiter on rB (will time out)
final Future<PoolEntry<String, FakeConnection>> waiterB =
- pool.lease("rB", null, Timeout.ofMilliseconds(300), null); // enqueues immediately
+ pool.lease("rB", null, Timeout.ofMilliseconds(300), null);
final Set<String> routesNow = pool.getRoutes();
assertTrue(routesNow.contains("rA") && routesNow.contains("rB"),
"Both rA (available) and rB (waiter) must be listed");
- final PoolEntry<String, FakeConnection> a2 =
- pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
- pool.release(a2, false); // discard
- final Set<String> afterDropA = pool.getRoutes();
- assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up");
- assertTrue(afterDropA.contains("rB"), "rB (waiter) should remain listed");
-
+ // Let rB time out (do NOT free capacity before the timeout fires)
final ExecutionException ex = assertThrows(
ExecutionException.class,
() -> waiterB.get(600, TimeUnit.MILLISECONDS));
assertInstanceOf(TimeoutException.class, ex.getCause());
assertEquals("Lease timed out", ex.getCause().getMessage());
- // Final cleanup: after close everything is cleared
+ // Now drain rA by leasing and discarding to trigger segment cleanup
+ final PoolEntry<String, FakeConnection> a2 =
+ pool.lease("rA", null, Timeout.ofSeconds(1), null).get(1, TimeUnit.SECONDS);
+ pool.release(a2, false); // discard
+ final Set<String> afterDropA = pool.getRoutes();
+ assertFalse(afterDropA.contains("rA"), "rA segment should be cleaned up");
+ assertFalse(afterDropA.contains("rB"), "rB waiter timed out; should not remain listed");
+
+ // Final cleanup
pool.close(CloseMode.IMMEDIATE);
assertTrue(pool.getRoutes().isEmpty(), "All routes must be gone after close()");
}
-
-
}