blob: 295c9d0bdd4166d2140e0ab8fed0073f52d70b8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
final class ConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
public static final int MIN_POOL_SIZE = 2;
public static final int MAX_POOL_SIZE = 8;
public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
public final Host host;
private final Cluster cluster;
private final Client client;
private final List<Connection> connections;
private final AtomicInteger open;
private final Set<Connection> bin = new CopyOnWriteArraySet<>();
private final int minPoolSize;
private final int maxPoolSize;
private final int minSimultaneousUsagePerConnection;
private final int maxSimultaneousUsagePerConnection;
private final int minInProcess;
private final String poolLabel;
private final AtomicInteger scheduledForCreation = new AtomicInteger();
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
private volatile int waiter = 0;
private final Lock waitLock = new ReentrantLock(true);
private final Condition hasAvailableConnection = waitLock.newCondition();
public ConnectionPool(final Host host, final Client client) {
this(host, client, Optional.empty(), Optional.empty());
}
public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize,
final Optional<Integer> overrideMaxPoolSize) {
this.host = host;
this.client = client;
this.cluster = client.cluster;
poolLabel = "Connection Pool {host=" + host + "}";
final Settings.ConnectionPoolSettings settings = settings();
this.minPoolSize = overrideMinPoolSize.orElse(settings.minSize);
this.maxPoolSize = overrideMaxPoolSize.orElse(settings.maxSize);
this.minSimultaneousUsagePerConnection = settings.minSimultaneousUsagePerConnection;
this.maxSimultaneousUsagePerConnection = settings.maxSimultaneousUsagePerConnection;
this.minInProcess = settings.minInProcessPerConnection;
this.connections = new CopyOnWriteArrayList<>();
try {
final List<CompletableFuture<Void>> connCreationFutures = new ArrayList<>();
for (int i = 0; i < minPoolSize; i++) {
connCreationFutures.add(CompletableFuture.runAsync(() -> {
try {
this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection));
} catch (ConnectionException e) {
throw new CompletionException(e);
}
}, cluster.executor()));
}
CompletableFuture.allOf(connCreationFutures.toArray(new CompletableFuture[0])).join();
} catch (CancellationException ce) {
logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce);
throw ce;
} catch (CompletionException ce) {
// Some connections might have been initialized. Close the connection pool gracefully to close them.
this.closeAsync();
final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." +
" Successful connections=" + this.connections.size() +
". Closing the connection pool.";
Throwable cause = null;
Throwable result = ce;
if (null != (cause = result.getCause())) {
result = cause;
}
throw new CompletionException(errMsg, result);
}
this.open = new AtomicInteger(connections.size());
logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize);
}
public Settings.ConnectionPoolSettings settings() {
return cluster.connectionPoolSettings();
}
public Connection borrowConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
logger.debug("Borrowing connection from pool on {} - timeout in {} {}", host, timeout, unit);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final Connection leastUsedConn = selectLeastUsed();
if (connections.isEmpty()) {
logger.debug("Tried to borrow connection but the pool was empty for {} - scheduling pool creation and waiting for connection", host);
for (int i = 0; i < minPoolSize; i++) {
// If many connections are borrowed at the same time there needs to be a check to make sure no
// additional ones get scheduled for creation
if (scheduledForCreation.get() < minPoolSize) {
scheduledForCreation.incrementAndGet();
newConnection();
}
}
return waitForConnection(timeout, unit);
}
if (null == leastUsedConn) {
if (isClosed())
throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
logger.debug("Pool was initialized but a connection could not be selected earlier - waiting for connection on {}", host);
return waitForConnection(timeout, unit);
}
// if the number borrowed on the least used connection exceeds the max allowed and the pool size is
// not at maximum then consider opening a connection
final int currentPoolSize = connections.size();
if (leastUsedConn.borrowed.get() >= maxSimultaneousUsagePerConnection && currentPoolSize < maxPoolSize) {
if (logger.isDebugEnabled())
logger.debug("Least used {} on {} exceeds maxSimultaneousUsagePerConnection but pool size {} < maxPoolSize - consider new connection",
leastUsedConn.getConnectionInfo(), host, currentPoolSize);
considerNewConnection();
}
while (true) {
final int borrowed = leastUsedConn.borrowed.get();
final int availableInProcess = leastUsedConn.availableInProcess();
if (borrowed >= maxSimultaneousUsagePerConnection && leastUsedConn.availableInProcess() == 0) {
logger.debug("Least used connection selected from pool for {} but borrowed [{}] >= availableInProcess [{}] - wait",
host, borrowed, availableInProcess);
return waitForConnection(timeout, unit);
}
if (leastUsedConn.borrowed.compareAndSet(borrowed, borrowed + 1)) {
if (logger.isDebugEnabled())
logger.debug("Return least used {} on {}", leastUsedConn.getConnectionInfo(), host);
return leastUsedConn;
}
}
}
public void returnConnection(final Connection connection) throws ConnectionException {
logger.debug("Attempting to return {} on {}", connection, host);
if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final int borrowed = connection.borrowed.decrementAndGet();
if (connection.isDead()) {
logger.debug("Marking {} as dead", this.host);
this.replaceConnection(connection);
} else {
if (bin.contains(connection) && borrowed == 0) {
logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection);
if (bin.remove(connection))
connection.closeAsync();
return;
}
// destroy a connection that exceeds the minimum pool size - it does not have the right to live if it
// isn't busy. replace a connection that has a low available in process count which likely means that
// it's backing up with requests that might never have returned. consider the maxPoolSize in this condition
// because if it is equal to 1 (which it is for a session) then there is no need to replace the connection
// as it will be responsible for every single request. if neither of these scenarios are met then let the
// world know the connection is available.
final int poolSize = connections.size();
final int availableInProcess = connection.availableInProcess();
if (poolSize > minPoolSize && borrowed <= minSimultaneousUsagePerConnection) {
if (logger.isDebugEnabled())
logger.debug("On {} pool size of {} > minPoolSize {} and borrowed of {} <= minSimultaneousUsagePerConnection {} so destroy {}",
host, poolSize, minPoolSize, borrowed, minSimultaneousUsagePerConnection, connection.getConnectionInfo());
destroyConnection(connection);
} else if (availableInProcess < minInProcess && maxPoolSize > 1) {
if (logger.isDebugEnabled())
logger.debug("On {} availableInProcess {} < minInProcess {} so replace {}", host, availableInProcess, minInProcess, connection.getConnectionInfo());
replaceConnection(connection);
} else
announceAvailableConnection();
}
}
Client getClient() {
return client;
}
Cluster getCluster() {
return cluster;
}
public boolean isClosed() {
return this.closeFuture.get() != null;
}
/**
* Permanently kills the pool.
*/
public synchronized CompletableFuture<Void> closeAsync() {
if (closeFuture.get() != null) return closeFuture.get();
logger.info("Signalled closing of connection pool on {} with core size of {}", host, minPoolSize);
announceAllAvailableConnection();
final CompletableFuture<Void> future = killAvailableConnections();
closeFuture.set(future);
return future;
}
/**
* Required for testing
*/
int numConnectionsWaitingToCleanup() {
return bin.size();
}
private CompletableFuture<Void> killAvailableConnections() {
final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size());
for (Connection connection : connections) {
final CompletableFuture<Void> future = connection.closeAsync();
future.thenRun(open::decrementAndGet);
futures.add(future);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
/**
* This method is not idempotent and should only be called once per connection.
*/
void replaceConnection(final Connection connection) {
logger.info("Replace {}", connection);
// Do not replace connection if the conn pool is closing/closed.
// Do not replace connection if it is already being replaced.
if (connection.isBeingReplaced.getAndSet(true) || isClosed()) {
return;
}
considerNewConnection();
definitelyDestroyConnection(connection);
}
private void considerNewConnection() {
logger.debug("Considering new connection on {} where pool size is {}", host, connections.size());
while (true) {
int inCreation = scheduledForCreation.get();
logger.debug("There are {} connections scheduled for creation on {}", inCreation, host);
// don't create more than one at a time
if (inCreation >= 1)
return;
if (scheduledForCreation.compareAndSet(inCreation, inCreation + 1))
break;
}
newConnection();
}
private void newConnection() {
cluster.executor().submit(() -> {
addConnectionIfUnderMaximum();
scheduledForCreation.decrementAndGet();
return null;
});
}
private boolean addConnectionIfUnderMaximum() {
while (true) {
int opened = open.get();
if (opened >= maxPoolSize)
return false;
if (open.compareAndSet(opened, opened + 1))
break;
}
if (isClosed()) {
open.decrementAndGet();
return false;
}
try {
connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection));
} catch (ConnectionException ce) {
logger.error("Connections were under max, but there was an error creating the connection.", ce);
open.decrementAndGet();
considerHostUnavailable();
return false;
}
announceAvailableConnection();
return true;
}
private boolean destroyConnection(final Connection connection) {
while (true) {
int opened = open.get();
if (opened <= minPoolSize)
return false;
if (open.compareAndSet(opened, opened - 1))
break;
}
definitelyDestroyConnection(connection);
return true;
}
public void definitelyDestroyConnection(final Connection connection) {
// only add to the bin for future removal if its not already there.
if (!bin.contains(connection) && !connection.isClosing()) {
bin.add(connection);
connections.remove(connection);
open.decrementAndGet();
}
// only close the connection for good once it is done being borrowed or when it is dead
if (connection.isDead() || connection.borrowed.get() == 0) {
if (bin.remove(connection)) {
connection.closeAsync();
// TODO: Log the following message on completion of the future returned by closeAsync.
logger.debug("{} destroyed", connection.getConnectionInfo());
}
}
}
private Connection waitForConnection(final long timeout, final TimeUnit unit) throws TimeoutException, ConnectionException {
long start = System.nanoTime();
long remaining = timeout;
long to = timeout;
do {
try {
awaitAvailableConnection(remaining, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
to = 0;
}
if (isClosed())
throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown");
final Connection leastUsed = selectLeastUsed();
if (leastUsed != null) {
while (true) {
final int inFlight = leastUsed.borrowed.get();
final int availableInProcess = leastUsed.availableInProcess();
if (inFlight >= availableInProcess) {
logger.debug("Least used {} on {} has requests borrowed [{}] >= availableInProcess [{}] - may timeout waiting for connection",
leastUsed, host, inFlight, availableInProcess);
break;
}
if (leastUsed.borrowed.compareAndSet(inFlight, inFlight + 1)) {
if (logger.isDebugEnabled())
logger.debug("Return least used {} on {} after waiting", leastUsed.getConnectionInfo(), host);
return leastUsed;
}
}
}
remaining = to - TimeUtil.timeSince(start, unit);
logger.debug("Continue to wait for connection on {} if {} > 0", host, remaining);
} while (remaining > 0);
logger.error("Timed-out ({} {}) waiting for connection on {} - possibly unavailable", timeout, unit, host);
// if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short).
// either way supply a function to reconnect
this.considerHostUnavailable();
throw new TimeoutException("Timed-out waiting for connection on " + host + " - possibly unavailable");
}
public void considerHostUnavailable() {
// called when a connection is "dead" due to a non-recoverable error.
host.makeUnavailable(this::tryReconnect);
// if the host is unavailable then we should release the connections
connections.forEach(this::definitelyDestroyConnection);
// let the load-balancer know that the host is acting poorly
this.cluster.loadBalancingStrategy().onUnavailable(host);
}
/**
* Attempt to reconnect to the {@link Host} that was previously marked as unavailable. This method gets called
* as part of a schedule in {@link Host} to periodically try to create working connections.
*/
private boolean tryReconnect(final Host h) {
logger.debug("Trying to re-establish connection on {}", h);
Connection connection = null;
try {
connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create();
final CompletableFuture<ResultSet> f = new CompletableFuture<>();
connection.write(ping, f);
f.get().all().get();
// host is reconnected and a connection is now available
this.cluster.loadBalancingStrategy().onAvailable(h);
return true;
} catch (Exception ex) {
logger.debug("Failed reconnect attempt on {}", h, ex);
if (connection != null) definitelyDestroyConnection(connection);
return false;
}
}
private void announceAvailableConnection() {
logger.debug("Announce connection available on {}", host);
if (waiter == 0)
return;
waitLock.lock();
try {
hasAvailableConnection.signal();
} finally {
waitLock.unlock();
}
}
private Connection selectLeastUsed() {
int minInFlight = Integer.MAX_VALUE;
Connection leastBusy = null;
for (Connection connection : connections) {
final int inFlight = connection.borrowed.get();
if (!connection.isDead() && inFlight < minInFlight) {
minInFlight = inFlight;
leastBusy = connection;
}
}
return leastBusy;
}
private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
logger.debug("Wait {} {} for an available connection on {} with {}", timeout, unit, host, Thread.currentThread());
waitLock.lock();
waiter++;
try {
hasAvailableConnection.await(timeout, unit);
} finally {
waiter--;
waitLock.unlock();
}
}
private void announceAllAvailableConnection() {
if (waiter == 0)
return;
waitLock.lock();
try {
hasAvailableConnection.signalAll();
} finally {
waitLock.unlock();
}
}
/**
* Returns the set of Channel IDs maintained by the connection pool.
* Currently, only used for testing.
*/
Set<String> getConnectionIDs() {
return connections.stream().map(Connection::getChannelId).collect(Collectors.toSet());
}
public String getPoolInfo() {
final StringBuilder sb = new StringBuilder("ConnectionPool (");
sb.append(host);
sb.append(") - ");
connections.forEach(c -> {
sb.append(c);
sb.append(",");
});
return sb.toString().trim();
}
@Override
public String toString() {
return poolLabel;
}
}