blob: 3aa60941f8d79d0259012271b836d081691cdd7a [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.pool;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicMarkableReference;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Deadline;
import org.apache.hc.core5.util.DeadlineTimeoutException;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
/**
* Connection pool with higher concurrency but with lax connection limit guarantees.
*
* @param <T> route
* @param <C> connection object
*
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.SAFE)
@Experimental
public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
private final TimeValue timeToLive;
private final PoolReusePolicy policy;
private final DisposalCallback<C> disposalCallback;
private final ConnPoolListener<T> connPoolListener;
private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
private final AtomicBoolean isShutDown;
private volatile int defaultMaxPerRoute;
/**
* @since 5.0
*/
public LaxConnPool(
final int defaultMaxPerRoute,
final TimeValue timeToLive,
final PoolReusePolicy policy,
final DisposalCallback<C> disposalCallback,
final ConnPoolListener<T> connPoolListener) {
super();
Args.positive(defaultMaxPerRoute, "Max per route value");
this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
this.disposalCallback = disposalCallback;
this.connPoolListener = connPoolListener;
this.routeToPool = new ConcurrentHashMap<>();
this.isShutDown = new AtomicBoolean(false);
this.defaultMaxPerRoute = defaultMaxPerRoute;
}
/**
* @since 5.0
*/
public LaxConnPool(
final int defaultMaxPerRoute,
final TimeValue timeToLive,
final PoolReusePolicy policy,
final ConnPoolListener<T> connPoolListener) {
this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
}
public LaxConnPool(final int defaultMaxPerRoute) {
this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
}
public boolean isShutdown() {
return isShutDown.get();
}
@Override
public void close(final CloseMode closeMode) {
if (isShutDown.compareAndSet(false, true)) {
for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
final PerRoutePool<T, C> routePool = it.next();
routePool.shutdown(closeMode);
}
routeToPool.clear();
}
}
@Override
public void close() {
close(CloseMode.GRACEFUL);
}
private PerRoutePool<T, C> getPool(final T route) {
PerRoutePool<T, C> routePool = routeToPool.get(route);
if (routePool == null) {
final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
route,
defaultMaxPerRoute,
timeToLive,
policy,
this,
disposalCallback,
connPoolListener);
routePool = routeToPool.putIfAbsent(route, newRoutePool);
if (routePool == null) {
routePool = newRoutePool;
}
}
return routePool;
}
@Override
public Future<PoolEntry<T, C>> lease(
final T route, final Object state,
final Timeout requestTimeout,
final FutureCallback<PoolEntry<T, C>> callback) {
Args.notNull(route, "Route");
Asserts.check(!isShutDown.get(), "Connection pool shut down");
final PerRoutePool<T, C> routePool = getPool(route);
return routePool.lease(state, requestTimeout, callback);
}
public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
return lease(route, state, Timeout.DISABLED, null);
}
@Override
public void release(final PoolEntry<T, C> entry, final boolean reusable) {
if (entry == null) {
return;
}
if (isShutDown.get()) {
return;
}
final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
routePool.release(entry, reusable);
}
public void validatePendingRequests() {
for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
routePool.validatePendingRequests();
}
}
@Override
public void setMaxTotal(final int max) {
}
@Override
public int getMaxTotal() {
return 0;
}
@Override
public void setDefaultMaxPerRoute(final int max) {
Args.positive(max, "Max value");
defaultMaxPerRoute = max;
}
@Override
public int getDefaultMaxPerRoute() {
return defaultMaxPerRoute;
}
@Override
public void setMaxPerRoute(final T route, final int max) {
Args.notNull(route, "Route");
final PerRoutePool<T, C> routePool = getPool(route);
routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
}
@Override
public int getMaxPerRoute(final T route) {
Args.notNull(route, "Route");
final PerRoutePool<T, C> routePool = getPool(route);
return routePool.getMax();
}
@Override
public PoolStats getTotalStats() {
int leasedTotal = 0;
int pendingTotal = 0;
int availableTotal = 0;
int maxTotal = 0;
for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
leasedTotal += routePool.getLeasedCount();
pendingTotal += routePool.getPendingCount();
availableTotal += routePool.getAvailableCount();
maxTotal += routePool.getMax();
}
return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
}
@Override
public PoolStats getStats(final T route) {
Args.notNull(route, "Route");
final PerRoutePool<T, C> routePool = getPool(route);
return new PoolStats(
routePool.getLeasedCount(),
routePool.getPendingCount(),
routePool.getAvailableCount(),
routePool.getMax());
}
@Override
public Set<T> getRoutes() {
return new HashSet<>(routeToPool.keySet());
}
public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
routePool.enumAvailable(callback);
}
}
public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
routePool.enumLeased(callback);
}
}
@Override
public void closeIdle(final TimeValue idleTime) {
final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
enumAvailable(new Callback<PoolEntry<T, C>>() {
@Override
public void execute(final PoolEntry<T, C> entry) {
if (entry.getUpdated() <= deadline) {
entry.discardConnection(CloseMode.GRACEFUL);
}
}
});
}
@Override
public void closeExpired() {
final long now = System.currentTimeMillis();
enumAvailable(new Callback<PoolEntry<T, C>>() {
@Override
public void execute(final PoolEntry<T, C> entry) {
if (entry.getExpiryDeadline().isBefore(now)) {
entry.discardConnection(CloseMode.GRACEFUL);
}
}
});
}
@Override
public String toString() {
final PoolStats totalStats = getTotalStats();
final StringBuilder buffer = new StringBuilder();
buffer.append("[leased: ");
buffer.append(totalStats.getLeased());
buffer.append("][available: ");
buffer.append(totalStats.getAvailable());
buffer.append("][pending: ");
buffer.append(totalStats.getPending());
buffer.append("]");
return buffer.toString();
}
static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
private final Object state;
private final Deadline deadline;
private final BasicFuture<PoolEntry<T, C>> future;
LeaseRequest(
final Object state,
final Timeout requestTimeout,
final BasicFuture<PoolEntry<T, C>> future) {
super();
this.state = state;
this.deadline = Deadline.calculate(requestTimeout);
this.future = future;
}
BasicFuture<PoolEntry<T, C>> getFuture() {
return this.future;
}
public Object getState() {
return this.state;
}
public Deadline getDeadline() {
return this.deadline;
}
public boolean isDone() {
return this.future.isDone();
}
public boolean completed(final PoolEntry<T, C> result) {
return future.completed(result);
}
public boolean failed(final Exception ex) {
return future.failed(ex);
}
@Override
public boolean cancel() {
return future.cancel();
}
}
static class PerRoutePool<T, C extends ModalCloseable> {
private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
private final T route;
private final TimeValue timeToLive;
private final PoolReusePolicy policy;
private final DisposalCallback<C> disposalCallback;
private final ConnPoolListener<T> connPoolListener;
private final ConnPoolStats<T> connPoolStats;
private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
private final Deque<LeaseRequest<T, C>> pending;
private final AtomicBoolean terminated;
private final AtomicInteger allocated;
private final AtomicLong releaseSeqNum;
private volatile int max;
PerRoutePool(
final T route,
final int max,
final TimeValue timeToLive,
final PoolReusePolicy policy,
final ConnPoolStats<T> connPoolStats,
final DisposalCallback<C> disposalCallback,
final ConnPoolListener<T> connPoolListener) {
super();
this.route = route;
this.timeToLive = timeToLive;
this.policy = policy;
this.connPoolStats = connPoolStats;
this.disposalCallback = disposalCallback;
this.connPoolListener = connPoolListener;
this.leased = new ConcurrentHashMap<>();
this.available = new ConcurrentLinkedDeque<>();
this.pending = new ConcurrentLinkedDeque<>();
this.terminated = new AtomicBoolean(false);
this.allocated = new AtomicInteger(0);
this.releaseSeqNum = new AtomicLong(0);
this.max = max;
}
public void shutdown(final CloseMode closeMode) {
if (terminated.compareAndSet(false, true)) {
AtomicMarkableReference<PoolEntry<T, C>> entryRef;
while ((entryRef = available.poll()) != null) {
entryRef.getReference().discardConnection(closeMode);
}
for (final PoolEntry<T, C> entry : leased.keySet()) {
entry.discardConnection(closeMode);
}
leased.clear();
LeaseRequest<T, C> leaseRequest;
while ((leaseRequest = pending.poll()) != null) {
leaseRequest.cancel();
}
}
}
private PoolEntry<T, C> createPoolEntry() {
final int poolmax = max;
int prev, next;
do {
prev = allocated.get();
next = (prev<poolmax)? prev+1 : prev;
} while (!allocated.compareAndSet(prev, next));
return (prev < next)? new PoolEntry<T,C>(route, timeToLive, disposalCallback) : null;
}
private void deallocatePoolEntry() {
allocated.decrementAndGet();
}
private void addLeased(final PoolEntry<T, C> entry) {
if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
throw new IllegalStateException("Pool entry already present in the set of leased entries");
} else if (connPoolListener != null) {
connPoolListener.onLease(route, connPoolStats);
}
}
private void removeLeased(final PoolEntry<T, C> entry) {
if (connPoolListener != null) {
connPoolListener.onRelease(route, connPoolStats);
}
if (!leased.remove(entry, Boolean.TRUE)) {
throw new IllegalStateException("Pool entry is not present in the set of leased entries");
}
}
private PoolEntry<T, C> getAvailableEntry(final Object state) {
for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
final PoolEntry<T, C> entry = ref.getReference();
if (ref.compareAndSet(entry, entry, false, true)) {
it.remove();
if (entry.getExpiryDeadline().isExpired()) {
entry.discardConnection(CloseMode.GRACEFUL);
}
if (!Objects.equals(entry.getState(), state)) {
entry.discardConnection(CloseMode.GRACEFUL);
}
return entry;
}
}
return null;
}
public Future<PoolEntry<T, C>> lease(
final Object state,
final Timeout requestTimeout,
final FutureCallback<PoolEntry<T, C>> callback) {
Asserts.check(!terminated.get(), "Connection pool shut down");
final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
final long releaseState = releaseSeqNum.get();
PoolEntry<T, C> entry = null;
if (pending.isEmpty()) {
entry = getAvailableEntry(state);
if (entry == null) {
entry = createPoolEntry();
}
}
if (entry != null) {
addLeased(entry);
future.completed(entry);
} else {
pending.add(new LeaseRequest<>(state, requestTimeout, future));
if (releaseState != releaseSeqNum.get()) {
servicePendingRequest();
}
}
return future;
}
public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
removeLeased(releasedEntry);
if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
releasedEntry.discardConnection(CloseMode.GRACEFUL);
}
if (releasedEntry.hasConnection()) {
switch (policy) {
case LIFO:
available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
break;
case FIFO:
available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
break;
default:
throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
}
}
else {
deallocatePoolEntry();
}
releaseSeqNum.incrementAndGet();
servicePendingRequest();
}
private void servicePendingRequest() {
servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
}
private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
LeaseRequest<T, C> leaseRequest;
while ((leaseRequest = pending.poll()) != null) {
if (leaseRequest.isDone()) {
continue;
}
final Object state = leaseRequest.getState();
final Deadline deadline = leaseRequest.getDeadline();
if (deadline.isExpired()) {
leaseRequest.failed(DeadlineTimeoutException.from(deadline));
} else {
final long releaseState = releaseSeqNum.get();
PoolEntry<T, C> entry = getAvailableEntry(state);
if (entry == null) {
entry = createPoolEntry();
}
if (entry != null) {
addLeased(entry);
if (!leaseRequest.completed(entry)) {
release(entry, true);
}
if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
break;
}
}
else {
pending.addFirst(leaseRequest);
if (releaseState == releaseSeqNum.get()) {
break;
}
}
}
}
}
public void validatePendingRequests() {
final Iterator<LeaseRequest<T, C>> it = pending.iterator();
while (it.hasNext()) {
final LeaseRequest<T, C> request = it.next();
final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
if (future.isCancelled() && !request.isDone()) {
it.remove();
} else {
final Deadline deadline = request.getDeadline();
if (deadline.isExpired()) {
request.failed(DeadlineTimeoutException.from(deadline));
}
if (request.isDone()) {
it.remove();
}
}
}
}
public final T getRoute() {
return route;
}
public int getMax() {
return max;
}
public void setMax(final int max) {
this.max = max;
}
public int getPendingCount() {
return pending.size();
}
public int getLeasedCount() {
return leased.size();
}
public int getAvailableCount() {
return available.size();
}
public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
final PoolEntry<T, C> entry = ref.getReference();
if (ref.compareAndSet(entry, entry, false, true)) {
callback.execute(entry);
if (!entry.hasConnection()) {
deallocatePoolEntry();
it.remove();
}
else {
ref.set(entry, false);
}
}
}
releaseSeqNum.incrementAndGet();
servicePendingRequests(RequestServiceStrategy.ALL);
}
public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
final PoolEntry<T, C> entry = it.next();
callback.execute(entry);
if (!entry.hasConnection()) {
deallocatePoolEntry();
it.remove();
}
}
}
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder();
buffer.append("[route: ");
buffer.append(route);
buffer.append("][leased: ");
buffer.append(leased.size());
buffer.append("][available: ");
buffer.append(available.size());
buffer.append("][pending: ");
buffer.append(pending.size());
buffer.append("]");
return buffer.toString();
}
}
}