blob: 21bce9ce745fd83e2c23d4f1448e0c89cd192262 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
final class ConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
public static final int MIN_POOL_SIZE = 2;
public static final int MAX_POOL_SIZE = 8;
public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
// A small buffer in millis used for comparing if a connection was created within a certain amount of time.
private static final int CONNECTION_SETUP_TIME_DELTA = 25;
public final Host host;
private final Cluster cluster;
private final Client client;
private final List<Connection> connections;
private final AtomicInteger open;
private final Set<Connection> bin = new CopyOnWriteArraySet<>();
private final int minPoolSize;
private final int maxPoolSize;
private final int minSimultaneousUsagePerConnection;
private final int maxSimultaneousUsagePerConnection;
private final int minInProcess;
private final String poolLabel;
private final AtomicInteger scheduledForCreation = new AtomicInteger();
private final AtomicReference<ConnectionResult> latestConnectionResult = new AtomicReference<>(new ConnectionResult());
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
private volatile int waiter = 0;
private final Lock waitLock = new ReentrantLock(true);
private final Condition hasAvailableConnection = waitLock.newCondition();
ConnectionFactory connectionFactory;
/**
* The result of a connection attempt. A null value for failureCause means that the connection attempt was successful.
*/
public class ConnectionResult {
private long timeOfConnectionAttempt;
private Throwable failureCause;
public ConnectionResult() {}
public Throwable getFailureCause() { return failureCause; }
public long getTime() { return timeOfConnectionAttempt; }
public void setFailureCause(Throwable cause) { failureCause = cause; }
public void setTimeNow() { timeOfConnectionAttempt = System.currentTimeMillis(); }
}
public ConnectionPool(final Host host, final Client client) {
this(host, client, Optional.empty(), Optional.empty());
}
public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
final Optional<Integer> overrideMaxPoolSize) {
this(host, client, overrideMinPoolSize, overrideMaxPoolSize, new ConnectionFactory.DefaultConnectionFactory());
}
ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
final Optional<Integer> overrideMaxPoolSize, final ConnectionFactory connectionFactory) {
this.host = host;
this.client = client;
this.cluster = client.cluster;
this.connectionFactory = connectionFactory;
poolLabel = "Connection Pool {host=" + host + "}";
final Settings.ConnectionPoolSettings settings = settings();
this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
this.maxPoolSize = overrideMaxPoolSize.orElse(settings.maxSize);
this.minSimultaneousUsagePerConnection = settings.minSimultaneousUsagePerConnection;
this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
this.minInProcess = settings.minInProcessPerConnection;
this.connections = new CopyOnWriteArrayList<>();
this.open = new AtomicInteger();
try {
final List<CompletableFuture<Void>> connectionCreationFutures = new ArrayList<>();
for (int i = 0; i < minPoolSize; i++) {
connectionCreationFutures.add(CompletableFuture.runAsync(() -> {
final ConnectionResult result = new ConnectionResult();
try {
this.connections.add(connectionFactory.create(this));
this.open.incrementAndGet();
} catch (ConnectionException e) {
result.setFailureCause(e);
throw new CompletionException(e);
} finally {
result.setTimeNow();
if ((latestConnectionResult.get() == null) ||
(latestConnectionResult.get().getTime() < result.getTime())) {
latestConnectionResult.set(result);
}
}
}, cluster.connectionScheduler()));
}
CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join();
} catch (CancellationException ce) {
logger.warn("Initialization of connections cancelled for {}", this.getPoolInfo(), ce);
throw ce;
} catch (CompletionException ce) {
// Some connections might have been initialized. Close the connection pool gracefully to close them.
this.closeAsync();
final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
" Successful connections=" + this.connections.size() +
". Closing the connection pool.";
Throwable cause;
Throwable result = ce;
if (null != (cause = result.getCause())) {
result = cause;
}
throw new CompletionException(errMsg, result);
}
logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);
}
public Settings.ConnectionPoolSettings settings() {
return cluster.connectionPoolSettings();
}
public Connection borrowConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
logger.debug("Borrowing connection from pool on {} - timeout in {} {}", host, timeout, unit);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
if (connections.isEmpty()) {
logger.debug("Tried to borrow connection but the pool was empty for {} - scheduling pool creation and waiting for connection", host);
for (int i = 0; i < minPoolSize; i++) {
// If many connections are borrowed at the same time there needs to be a check to make sure no
// additional ones get scheduled for creation
if (scheduledForCreation.get() < minPoolSize) {
scheduledForCreation.incrementAndGet();
newConnection();
}
}
return waitForConnection(timeout, unit);
}
// Get the least used valid connection
final Connection leastUsedConn = getLeastUsedValidConnection();
if (null == leastUsedConn) {
if (isClosed())
throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
logger.debug("Pool was initialized but a connection could not be selected earlier - waiting for connection on {}", host);
return waitForConnection(timeout, unit);
}
if (logger.isDebugEnabled())
logger.debug("Return least used {} on {}", leastUsedConn.getConnectionInfo(), host);
return leastUsedConn;
}
public void returnConnection(final Connection connection) throws ConnectionException {
logger.debug("Attempting to return {} on {}", connection, host);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final int borrowed = connection.borrowed.decrementAndGet();
if (connection.isDead()) {
logger.debug("Marking {} as dead", this.host);
this.replaceConnection(connection);
} else {
if (bin.contains(connection) && borrowed == 0) {
logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
if (bin.remove(connection))
connection.closeAsync();
return;
}
// destroy a connection that exceeds the minimum pool size - it does not have the right to live if it
// isn't busy. replace a connection that has a low available in process count which likely means that
// it's backing up with requests that might never have returned. consider the maxPoolSize in this condition
// because if it is equal to 1 (which it is for a session) then there is no need to replace the connection
// as it will be responsible for every single request. if neither of these scenarios are met then let the
// world know the connection is available.
final int poolSize = connections.size();
final int availableInProcess = connection.availableInProcess();
if (poolSize > minPoolSize && borrowed <= minSimultaneousUsagePerConnection) {
if (logger.isDebugEnabled())
logger.debug("On {} pool size of {} > minPoolSize {} and borrowed of {} <= minSimultaneousUsagePerConnection {} so destroy {}",
host, poolSize, minPoolSize, borrowed, minSimultaneousUsagePerConnection, connection.getConnectionInfo());
destroyConnection(connection);
} else if (availableInProcess < minInProcess && maxPoolSize > 1) {
if (logger.isDebugEnabled())
logger.debug("On {} availableInProcess {} < minInProcess {} so replace {}", host, availableInProcess, minInProcess, connection.getConnectionInfo());
replaceConnection(connection);
} else
announceAvailableConnection();
}
}
Client getClient() {
return client;
}
Cluster getCluster() {
return cluster;
}
public boolean isClosed() {
return this.closeFuture.get() != null;
}
/**
* Permanently kills the pool.
*/
public synchronized CompletableFuture<Void> closeAsync() {
if (closeFuture.get() != null) return closeFuture.get();
logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
announceAllAvailableConnection();
final CompletableFuture<Void> future = killAvailableConnections();
closeFuture.set(future);
return future;
}
/**
* Required for testing
*/
int numConnectionsWaitingToCleanup() {
return bin.size();
}
/**
* Calls close on connections in the pool gathering close futures from both active connections and ones in the
* bin.
*/
private CompletableFuture<Void> killAvailableConnections() {
final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size() + bin.size());
for (Connection connection : connections) {
final CompletableFuture<Void> future = connection.closeAsync();
future.thenRun(open::decrementAndGet);
futures.add(future);
}
// Without the ones in the bin the close for the ConnectionPool won't account for their shutdown and could
// lead to scenario where the bin connections stay open after the channel executor is closed which then
// leads to close operation getting rejected in Connection.close() for channel.newPromise().
for (Connection connection : bin) {
futures.add(connection.closeAsync());
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
/**
* This method is not idempotent and should only be called once per connection.
*/
void replaceConnection(final Connection connection) {
logger.info("Replace {}", connection);
// Do not replace connection if the conn pool is closing/closed.
// Do not replace connection if it is already being replaced.
if (connection.isBeingReplaced.getAndSet(true) || isClosed()) {
return;
}
considerNewConnection();
definitelyDestroyConnection(connection);
}
private void considerNewConnection() {
logger.debug("Considering new connection on {} where pool size is {}", host, connections.size());
while (true) {
int inCreation = scheduledForCreation.get();
logger.debug("There are {} connections scheduled for creation on {}", inCreation, host);
// don't create more than one at a time
if (inCreation >= 1)
return;
if (scheduledForCreation.compareAndSet(inCreation, inCreation + 1))
break;
}
newConnection();
}
private void newConnection() {
cluster.connectionScheduler().submit(() -> {
// seems like this should be decremented first because if addConnectionIfUnderMaximum fails there is
// nothing that wants to decrement this number and so it leaves things in a state where you could
// newConnection() doesn't seem to get called at all because it believes connections are being currently
// created. this seems to lead to situations where the client can never borrow a connection and it's as
// though it can't reconnect at all. this was hard to test but it seemed to happen regularly after
// introduced the ConnectionFactory that enabled a way to introduce connection jitters (i.e. failures in
// creation of a Connection) at which point it seemed to happen with some regularity.
scheduledForCreation.decrementAndGet();
addConnectionIfUnderMaximum();
return null;
});
}
private boolean addConnectionIfUnderMaximum() {
final int openCountToActOn;
while (true) {
final int opened = open.get();
if (opened >= maxPoolSize)
return false;
if (open.compareAndSet(opened, opened + 1)) {
openCountToActOn = opened;
break;
}
}
if (isClosed()) {
open.decrementAndGet();
return false;
}
final ConnectionResult result = new ConnectionResult();
try {
connections.add(connectionFactory.create(this));
} catch (Exception ex) {
open.decrementAndGet();
logger.error(String.format(
"Connections[%s] were under maximum allowed[%s], but there was an error creating a new connection",
openCountToActOn, maxPoolSize),
ex);
considerHostUnavailable();
result.setFailureCause(ex);
return false;
} finally {
result.setTimeNow();
if (latestConnectionResult.get().getTime() < result.getTime()) {
latestConnectionResult.set(result);
}
}
announceAllAvailableConnection();
return true;
}
private boolean destroyConnection(final Connection connection) {
while (true) {
final int opened = open.get();
if (opened <= minPoolSize)
return false;
if (open.compareAndSet(opened, opened - 1))
break;
}
definitelyDestroyConnection(connection);
return true;
}
public void definitelyDestroyConnection(final Connection connection) {
// only add to the bin for future removal if its not already there.
if (!bin.contains(connection) && !connection.isClosing()) {
bin.add(connection);
connections.remove(connection);
open.decrementAndGet();
}
// only close the connection for good once it is done being borrowed or when it is dead
if (connection.isDead() || connection.borrowed.get() == 0) {
if (bin.remove(connection)) {
final CompletableFuture<Void> closeFuture = connection.closeAsync();
closeFuture.whenComplete((v, t) -> {
logger.debug("Destroyed {}{}{}", connection.getConnectionInfo(), System.lineSeparator(), this.getPoolInfo());
});
}
}
}
private Connection waitForConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
long start = System.nanoTime();
long remaining = timeout;
long to = timeout;
do {
try {
awaitAvailableConnection(remaining, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
to = 0;
}
if (isClosed())
throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final Connection leastUsed = getLeastUsedValidConnection();
if (leastUsed != null) {
if (logger.isDebugEnabled())
logger.debug("Return least used {} on {} after waiting", leastUsed.getConnectionInfo(), host);
return leastUsed;
}
remaining = to - TimeUtil.timeSince(start, unit);
logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
} while (remaining > 0);
final StringBuilder cause = new StringBuilder("Potential Cause: ");
final ConnectionResult res = latestConnectionResult.get();
// If a connection attempt failed within a request's maxWaitForConnection then it likely contributed to that
// request getting a TimeoutException.
if (((System.currentTimeMillis() - res.getTime()) < (cluster.getMaxWaitForConnection() + CONNECTION_SETUP_TIME_DELTA)) &&
(res.getFailureCause() != null)) {
// Assumes that the root cause will give better information about why the connection failed.
cause.append(ExceptionHelper.getRootCause(res.getFailureCause()).getMessage());
} else if (open.get() >= maxPoolSize) {
cause.append(Client.TOO_MANY_IN_FLIGHT_REQUESTS);
} else {
cause.setLength(0);
}
// if we get to this point, we waited up to maxWaitForConnection from the pool and did not get a Connection.
// this can either mean the pool is exhausted or the host is unavailable. for the former, this could mean
// the connection pool and/or server is not sized correctly for the workload as connections are not freeing up
// to keep up requests and for the latter it could mean anything from a network hiccup to the server simply
// being down. it is critical that the caller be able to discern between these two. the following error
// message written to the log should provide some insight into the state of the connection pool telling the
// user if the pool was running at maximum.
final String timeoutErrorMessage = String.format(
"Timed-out (%s %s) waiting for connection on %s. %s%s%s",
timeout, unit, host, cause.toString(), System.lineSeparator(), this.getPoolInfo());
logger.error(timeoutErrorMessage);
// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
final TimeoutException timeoutException = new TimeoutException(timeoutErrorMessage);
this.considerHostUnavailable();
throw timeoutException;
}
/**
* On a failure to get a {@link Connection} this method is called to determine if the {@link Host} should be
* marked as unavailable and to establish a background reconnect operation.
*/
public void considerHostUnavailable() {
// if there is at least one available connection the host has to still be around (or is perhaps on its way out
// but we'll stay optimistic in this check). no connections also means "unhealthy". unsure if there is an ok
// "no connections" state we can get into. in any event if there are no connections then we'd just try to
// immediately reconnect below anyway so perhaps that state isn't really something to worry about.
final boolean maybeUnhealthy = connections.stream().allMatch(Connection::isDead);
if (maybeUnhealthy) {
// immediately fire off an attempt to reconnect because there are no active connections.
host.tryReconnectingImmediately(this::tryReconnect);
// let the load-balancer know that the host is acting poorly
if (!host.isAvailable()) {
// if the host is unavailable then we should release the connections
connections.forEach(this::definitelyDestroyConnection);
this.cluster.loadBalancingStrategy().onUnavailable(host);
}
}
}
/**
* Attempt to reconnect to the {@link Host} that was previously marked as unavailable. This method gets called
* as part of a schedule in {@link Host} to periodically try to create working connections.
*/
private boolean tryReconnect(final Host h) {
logger.debug("Trying to re-establish connection on {}", h);
Connection connection = null;
try {
// rather than rely on borrowConnection() infrastructure and the pool create a brand new Connection
// instance solely for the purpose of this ping. this ensures that if the pool is overloaded that we
// make an honest attempt at validating host health without failing over some timeout waiting for a
// connection in the pool. not sure if we should try to keep this connection if it succeeds and if the
// pool needs it. for now that seems like an unnecessary added bit of complexity for dealing with this
// error state
connection = connectionFactory.create(this);
final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
final CompletableFuture<ResultSet> f = new CompletableFuture<>();
connection.write(ping, f);
f.get().all().get();
// host is reconnected and a connection is now available
this.cluster.loadBalancingStrategy().onAvailable(h);
return true;
} catch (Exception ex) {
logger.error(String.format("Failed reconnect attempt on %s%s%s",
h, System.lineSeparator(), this.getPoolInfo()), ex);
return false;
} finally {
if (connection != null) {
connection.closeAsync();
}
}
}
private void announceAvailableConnection() {
logger.debug("Announce connection available on {}", host);
if (waiter == 0)
return;
waitLock.lock();
try {
hasAvailableConnection.signal();
} finally {
waitLock.unlock();
}
}
/**
* Get the least-used connection from the pool. Also triggers consideration of a new connection if the least-used
* connection has hit the usage maximum or no valid connection could be retrieved from the pool.
*
* @return The least-used connection from the pool. Returns null if no valid connection could be retrieved from the
* pool.
*/
private synchronized Connection getLeastUsedValidConnection() {
int minInFlight = Integer.MAX_VALUE;
Connection leastBusy = null;
for (Connection connection : connections) {
final int inFlight = connection.borrowed.get();
if (!connection.isDead() && inFlight < minInFlight && inFlight < maxSimultaneousUsagePerConnection) {
minInFlight = inFlight;
leastBusy = connection;
}
}
if (leastBusy != null) {
// Increment borrow count and consider making a new connection if least used connection hits usage maximum
if (leastBusy.borrowed.incrementAndGet() >= maxSimultaneousUsagePerConnection
&& connections.size() < maxPoolSize) {
if (logger.isDebugEnabled())
logger.debug("Least used {} on {} reached maxSimultaneousUsagePerConnection but pool size {} < maxPoolSize - consider new connection",
leastBusy.getConnectionInfo(), host, connections.size());
considerNewConnection();
}
} else if (connections.size() < maxPoolSize) {
// A safeguard for scenarios where consideration of a new connection was somehow not triggered by an
// existing connection hitting the usage maximum
considerNewConnection();
}
return leastBusy;
}
private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
waitLock.lock();
waiter++;
try {
hasAvailableConnection.await(timeout, unit);
} finally {
waiter--;
waitLock.unlock();
}
}
private void announceAllAvailableConnection() {
if (waiter == 0)
return;
waitLock.lock();
try {
hasAvailableConnection.signalAll();
} finally {
waitLock.unlock();
}
}
/**
* Returns the set of Channel IDs maintained by the connection pool.
* Currently, only used for testing.
*/
Set<String> getConnectionIDs() {
return connections.stream().map(Connection::getChannelId).collect(Collectors.toSet());
}
/**
* Gets a message that describes the state of the connection pool.
*/
public String getPoolInfo() {
return getPoolInfo(null);
}
/**
* Gets a message that describes the state of the connection pool.
*
* @param connectionToCallout the connection from the pool to identify more clearly in the message
*/
public String getPoolInfo(final Connection connectionToCallout) {
final StringBuilder sb = new StringBuilder("ConnectionPool (");
sb.append(host.toString());
sb.append(")");
if (connections.isEmpty()) {
sb.append("- no connections in pool");
} else {
final int connectionCount = connections.size();
sb.append(System.lineSeparator());
sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s bin=%s)",
connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), bin.size()));
sb.append(System.lineSeparator());
appendConnections(sb, connectionToCallout, connections);
sb.append(System.lineSeparator());
sb.append("-- bin --");
sb.append(System.lineSeparator());
appendConnections(sb, connectionToCallout, new ArrayList<>(bin));
}
return sb.toString().trim();
}
private void appendConnections(final StringBuilder sb, final Connection connectionToCallout,
final List<Connection> connections) {
final int connectionCount = connections.size();
for (int ix = 0; ix < connectionCount; ix++) {
final Connection c = connections.get(ix);
if (c.equals(connectionToCallout))
sb.append("==> ");
else
sb.append("> ");
sb.append(c.getConnectionInfo(false));
if (ix < connectionCount - 1)
sb.append(System.lineSeparator());
}
}
@Override
public String toString() {
return poolLabel;
}
}