blob: df43f510b9f9663005c2c2db9475343ec2fa37f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import java.security.SecureRandom;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonService;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class ThriftTransportPool {
private static final SecureRandom random = new SecureRandom();
private long killTime = 1000 * 3;
private static class CachedConnections {
/*
* Items are added and removed from this queue in such a way that the queue is ordered from most
* recently used to least recently used. The first position being the most recently used and the
* last position being the least recently used. This is done in the following way.
*
* - Newly unreserved connections are be added using addFirst(). When a connection is added, its
* lastReturnTime is set.
*
* - When an unreserved connection is needed, its taken off using pollFirst().
*
* - Unreserved connections that haven been idle too long are removed using removeLast()
*
* The purpose of maintaining this ordering it to allow efficient removal of idle connection.
* The efficiency comes from avoiding a linear search for idle connection. Since this search is
* done by a background thread holding a lock, thats good for any thread attempting to reserve a
* connection.
*/
Deque<CachedConnection> unreserved = new ArrayDeque<>(); // stack - LIFO
Map<CachedTTransport,CachedConnection> reserved = new HashMap<>();
public CachedConnection reserveAny() {
CachedConnection cachedConnection = unreserved.pollFirst(); // safe pop
if (cachedConnection != null) {
cachedConnection.reserve();
reserved.put(cachedConnection.transport, cachedConnection);
if (log.isTraceEnabled()) {
log.trace("Using existing connection to {}", cachedConnection.transport.cacheKey);
}
}
return cachedConnection;
}
private void removeExpiredConnections(final ArrayList<CachedConnection> expired,
final long killTime) {
long currTime = System.currentTimeMillis();
while (isLastUnreservedExpired(currTime, killTime)) {
expired.add(unreserved.removeLast());
}
}
boolean isLastUnreservedExpired(final long currTime, final long killTime) {
return !unreserved.isEmpty() && (currTime - unreserved.peekLast().lastReturnTime) > killTime;
}
void checkReservedForStuckIO() {
reserved.values().forEach(c -> c.transport.checkForStuckIO(STUCK_THRESHOLD));
}
void closeAllTransports() {
closeTransports(unreserved);
closeTransports(reserved.values());
}
void closeTransports(final Iterable<CachedConnection> stream) {
stream.forEach((connection) -> {
try {
connection.transport.close();
} catch (Exception e) {
log.debug("Error closing transport during shutdown", e);
}
});
}
CachedConnection removeReserved(CachedTTransport transport) {
return reserved.remove(transport);
}
}
private static class ConnectionPool {
final Lock[] locks;
final ConcurrentHashMap<ThriftTransportKey,CachedConnections> connections =
new ConcurrentHashMap<>();
private volatile boolean shutdown = false;
ConnectionPool() {
// intentionally using a prime number, don't use 31
locks = new Lock[37];
for (int i = 0; i < locks.length; i++) {
locks[i] = new ReentrantLock();
}
}
Set<ThriftTransportKey> getThriftTransportKeys() {
return connections.keySet();
}
/**
* Reserve and return a new {@link CachedConnection} from the {@link CachedConnections} mapped
* to the specified transport key. If a {@link CachedConnections} is not found, one will be
* created.
*
* <p>
*
* This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
* until the operation completes.
*
* @param key
* the transport key
* @return the reserved {@link CachedConnection}
*/
CachedConnection reserveAny(final ThriftTransportKey key) {
// It's possible that multiple locks from executeWithinLock will overlap with a single lock
// inside the ConcurrentHashMap which can unnecessarily block threads. Access the
// ConcurrentHashMap outside of executeWithinLock to prevent this.
var connections = getOrCreateCachedConnections(key);
return executeWithinLock(key, connections::reserveAny);
}
/**
* Reserve and return a new {@link CachedConnection} from the {@link CachedConnections} mapped
* to the specified transport key. If a {@link CachedConnections} is not found, null will be
* returned.
*
* <p>
*
* This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
* until the operation completes.
*
* @param key
* the transport key
* @return the reserved {@link CachedConnection}, or null if none were available.
*/
CachedConnection reserveAnyIfPresent(final ThriftTransportKey key) {
// It's possible that multiple locks from executeWithinLock will overlap with a single lock
// inside the ConcurrentHashMap which can unnecessarily block threads. Access the
// ConcurrentHashMap outside of executeWithinLock to prevent this.
var connections = getCachedConnections(key);
return connections == null ? null : executeWithinLock(key, connections::reserveAny);
}
/**
* Puts the specified connection into the reserved map of the {@link CachedConnections} for the
* specified transport key. If a {@link CachedConnections} is not found, one will be created.
*
* <p>
*
* This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
* until the operation completes.
*
* @param key
* the transport key
* @param connection
* the reserved connection
*/
void putReserved(final ThriftTransportKey key, final CachedConnection connection) {
// It's possible that multiple locks from executeWithinLock will overlap with a single lock
// inside the ConcurrentHashMap which can unnecessarily block threads. Access the
// ConcurrentHashMap outside of executeWithinLock to prevent this.
var connections = getOrCreateCachedConnections(key);
executeWithinLock(key, () -> connections.reserved.put(connection.transport, connection));
}
/**
* Returns the connection for the specified transport back to the queue of unreserved
* connections for the {@link CachedConnections} for the specified transport's key. If a
* {@link CachedConnections} is not found, one will be created. If the transport saw an error,
* the connection for the transport will be unreserved, and it and all other unreserved
* connections will be added to the specified toBeClosed list, and the connections' unreserved
* list will be cleared.
*
* <p>
*
* This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
* until the operation completes.
*
* @param transport
* the transport
* @param toBeClosed
* the list to add connections that must be closed after this operation finishes
* @return true if the connection for the transport existed and was initially reserved, or false
* otherwise
*/
boolean returnTransport(final CachedTTransport transport,
final List<CachedConnection> toBeClosed) {
// It's possible that multiple locks from executeWithinLock will overlap with a single lock
// inside the ConcurrentHashMap which can unnecessarily block threads. Access the
// ConcurrentHashMap outside of executeWithinLock to prevent this.
var connections = getOrCreateCachedConnections(transport.getCacheKey());
return executeWithinLock(transport.getCacheKey(),
() -> unreserveConnection(transport, connections, toBeClosed)); // inline
}
@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK",
justification = "FindBugs doesn't recognize that all locks in ConnectionPool.locks are subsequently unlocked in the try-finally in ConnectionPool.shutdown()")
void shutdown() {
// Obtain all locks.
for (Lock lock : locks) {
lock.lock();
}
// All locks are now acquired, so nothing else should be able to run concurrently...
try {
// Check if an shutdown has already been initiated.
if (shutdown) {
return;
}
shutdown = true;
connections.values().forEach(CachedConnections::closeAllTransports);
} finally {
for (Lock lock : locks) {
lock.unlock();
}
}
}
<T> T executeWithinLock(final ThriftTransportKey key, Supplier<T> function) {
Lock lock = getLock(key);
try {
return function.get();
} finally {
lock.unlock();
}
}
void executeWithinLock(final ThriftTransportKey key, Consumer<ThriftTransportKey> consumer) {
Lock lock = getLock(key);
try {
consumer.accept(key);
} finally {
lock.unlock();
}
}
Lock getLock(final ThriftTransportKey key) {
Lock lock = locks[(key.hashCode() & Integer.MAX_VALUE) % locks.length];
lock.lock();
if (shutdown) {
lock.unlock();
throw new TransportPoolShutdownException(
"The Accumulo singleton for connection pooling is disabled. This is likely caused by "
+ "all AccumuloClients being closed or garbage collected.");
}
return lock;
}
CachedConnections getCachedConnections(final ThriftTransportKey key) {
return connections.get(key);
}
CachedConnections getOrCreateCachedConnections(final ThriftTransportKey key) {
return connections.computeIfAbsent(key, k -> new CachedConnections());
}
boolean unreserveConnection(final CachedTTransport transport,
final CachedConnections connections, final List<CachedConnection> toBeClosed) {
if (connections != null) {
CachedConnection connection = connections.removeReserved(transport);
if (connection != null) {
if (transport.sawError) {
unreserveConnectionAndClearUnreserved(connections, connection, toBeClosed);
} else {
returnConnectionToUnreserved(connections, connection);
}
return true;
}
}
return false;
}
void unreserveConnectionAndClearUnreserved(final CachedConnections connections,
final CachedConnection connection, final List<CachedConnection> toBeClosed) {
toBeClosed.add(connection);
connection.unreserve();
// Remove all unreserved cached connection when a sever has an error, not just the
// connection that was returned.
toBeClosed.addAll(connections.unreserved);
connections.unreserved.clear();
}
void returnConnectionToUnreserved(final CachedConnections connections,
final CachedConnection connection) {
log.trace("Returned connection {} ioCount: {}", connection.transport.getCacheKey(),
connection.transport.ioCount);
connection.lastReturnTime = System.currentTimeMillis();
connection.unreserve();
// Using LIFO ensures that when the number of pooled connections exceeds the working
// set size that the idle times at the end of the list grow. The connections with
// large idle times will be cleaned up. Using a FIFO could continually reset the idle
// times of all connections, even when there are more than the working set size.
connections.unreserved.addFirst(connection);
}
List<CachedConnection> removeExpiredConnections(final long killTime) {
ArrayList<CachedConnection> expired = new ArrayList<>();
for (Entry<ThriftTransportKey,CachedConnections> entry : connections.entrySet()) {
CachedConnections connections = entry.getValue();
executeWithinLock(entry.getKey(), (key) -> {
connections.removeExpiredConnections(expired, killTime);
connections.checkReservedForStuckIO();
});
}
return expired;
}
}
private final ConnectionPool connectionPool = new ConnectionPool();
private Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
private Supplier<Thread> checkThreadFactory = Suppliers.memoize(() -> {
var thread = Threads.createThread("Thrift Connection Pool Checker", new Closer());
thread.start();
return thread;
});
private static final Logger log = LoggerFactory.getLogger(ThriftTransportPool.class);
private static final Long ERROR_THRESHOLD = 20L;
private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
private static class CachedConnection {
public CachedConnection(CachedTTransport t) {
this.transport = t;
}
void reserve() {
Preconditions.checkState(!this.transport.reserved);
this.transport.setReserved(true);
}
void unreserve() {
Preconditions.checkState(this.transport.reserved);
this.transport.setReserved(false);
}
final CachedTTransport transport;
long lastReturnTime;
}
public static class TransportPoolShutdownException extends RuntimeException {
public TransportPoolShutdownException(String msg) {
super(msg);
}
private static final long serialVersionUID = 1L;
}
private class Closer implements Runnable {
private void closeConnections() throws InterruptedException {
while (!getConnectionPool().shutdown) {
closeExpiredConnections();
Thread.sleep(500);
}
}
@Override
public void run() {
try {
closeConnections();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TransportPoolShutdownException e) {}
}
}
static class CachedTTransport extends TTransport {
private final ThriftTransportKey cacheKey;
private final TTransport wrappedTransport;
private boolean sawError = false;
private volatile String ioThreadName = null;
private volatile long ioStartTime = 0;
private volatile boolean reserved = false;
private String stuckThreadName = null;
int ioCount = 0;
int lastIoCount = -1;
private void sawError() {
sawError = true;
}
final void setReserved(boolean reserved) {
this.reserved = reserved;
if (reserved) {
ioThreadName = Thread.currentThread().getName();
ioCount = 0;
lastIoCount = -1;
} else {
if ((ioCount & 1) == 1) {
// connection unreserved, but it seems io may still be happening
log.warn("Connection returned to thrift connection pool that may still be in use {} {}",
ioThreadName, Thread.currentThread().getName(), new Exception());
}
ioCount = 0;
lastIoCount = -1;
ioThreadName = null;
}
checkForStuckIO(STUCK_THRESHOLD);
}
final void checkForStuckIO(long threshold) {
// checking for stuck io needs to be light weight.
// Tried to call System.currentTimeMillis() and Thread.currentThread() before every io
// operation.... this dramatically slowed things down. So switched to
// incrementing a counter before and after each io operation.
if ((ioCount & 1) == 1) {
// when ioCount is odd, it means I/O is currently happening
if (ioCount == lastIoCount) {
// still doing same I/O operation as last time this
// functions was called
long delta = System.currentTimeMillis() - ioStartTime;
if (delta >= threshold && stuckThreadName == null) {
stuckThreadName = ioThreadName;
log.warn("Thread \"{}\" stuck on IO to {} for at least {} ms", ioThreadName, cacheKey,
delta);
}
} else {
// remember this ioCount and the time we saw it, need to see
// if it changes
lastIoCount = ioCount;
ioStartTime = System.currentTimeMillis();
if (stuckThreadName != null) {
// doing I/O, but ioCount changed so no longer stuck
log.info("Thread \"{}\" no longer stuck on IO to {} sawError = {}", stuckThreadName,
cacheKey, sawError);
stuckThreadName = null;
}
}
} else {
// I/O is not currently happening
if (stuckThreadName != null) {
// no longer stuck, and was stuck in the past
log.info("Thread \"{}\" no longer stuck on IO to {} sawError = {}", stuckThreadName,
cacheKey, sawError);
stuckThreadName = null;
}
}
}
public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
this.wrappedTransport = transport;
this.cacheKey = cacheKey2;
}
@Override
public boolean isOpen() {
return wrappedTransport.isOpen();
}
@Override
public void open() throws TTransportException {
try {
ioCount++;
wrappedTransport.open();
} catch (TTransportException tte) {
sawError();
throw tte;
} finally {
ioCount++;
}
}
@Override
public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
try {
ioCount++;
return wrappedTransport.read(arg0, arg1, arg2);
} catch (TTransportException tte) {
sawError();
throw tte;
} finally {
ioCount++;
}
}
@Override
public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
try {
ioCount++;
return wrappedTransport.readAll(arg0, arg1, arg2);
} catch (TTransportException tte) {
sawError();
throw tte;
} finally {
ioCount++;
}
}
@Override
public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
try {
ioCount++;
wrappedTransport.write(arg0, arg1, arg2);
} catch (TTransportException tte) {
sawError();
throw tte;
} finally {
ioCount++;
}
}
@Override
public void write(byte[] arg0) throws TTransportException {
try {
ioCount++;
wrappedTransport.write(arg0);
} catch (TTransportException tte) {
sawError();
throw tte;
} finally {
ioCount++;
}
}
@Override
public void close() {
try {
ioCount++;
if (wrappedTransport.isOpen()) {
wrappedTransport.close();
}
} finally {
ioCount++;
}
}
@Override
public void flush() throws TTransportException {
try {
ioCount++;
wrappedTransport.flush();
} catch (TTransportException tte) {
sawError();
throw tte;
} finally {
ioCount++;
}
}
@Override
public boolean peek() {
try {
ioCount++;
return wrappedTransport.peek();
} finally {
ioCount++;
}
}
@Override
public byte[] getBuffer() {
try {
ioCount++;
return wrappedTransport.getBuffer();
} finally {
ioCount++;
}
}
@Override
public int getBufferPosition() {
try {
ioCount++;
return wrappedTransport.getBufferPosition();
} finally {
ioCount++;
}
}
@Override
public int getBytesRemainingInBuffer() {
try {
ioCount++;
return wrappedTransport.getBytesRemainingInBuffer();
} finally {
ioCount++;
}
}
@Override
public void consumeBuffer(int len) {
try {
ioCount++;
wrappedTransport.consumeBuffer(len);
} finally {
ioCount++;
}
}
public ThriftTransportKey getCacheKey() {
return cacheKey;
}
}
private ThriftTransportPool() {}
public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context)
throws TTransportException {
ThriftTransportKey cacheKey = new ThriftTransportKey(location, milliseconds, context);
// compute hash code outside of lock, this lowers the time the lock is held
cacheKey.precomputeHashCode();
ConnectionPool pool = getConnectionPool();
CachedConnection connection = pool.reserveAny(cacheKey);
if (connection != null) {
log.trace("Using existing connection to {}", cacheKey.getServer());
return connection.transport;
} else {
return createNewTransport(cacheKey);
}
}
@VisibleForTesting
public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers,
boolean preferCachedConnection) throws TTransportException {
servers = new ArrayList<>(servers);
if (preferCachedConnection) {
HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
ConnectionPool pool = getConnectionPool();
// randomly pick a server from the connection cache
serversSet.retainAll(pool.getThriftTransportKeys());
if (!serversSet.isEmpty()) {
ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet);
Collections.shuffle(cachedServers, random);
for (ThriftTransportKey ttk : cachedServers) {
CachedConnection connection = pool.reserveAny(ttk);
if (connection != null) {
final String serverAddr = ttk.getServer().toString();
log.trace("Using existing connection to {}", serverAddr);
return new Pair<>(serverAddr, connection.transport);
}
}
}
}
ConnectionPool pool = getConnectionPool();
int retryCount = 0;
while (!servers.isEmpty() && retryCount < 10) {
int index = random.nextInt(servers.size());
ThriftTransportKey ttk = servers.get(index);
if (preferCachedConnection) {
CachedConnection connection = pool.reserveAnyIfPresent(ttk);
if (connection != null) {
return new Pair<>(ttk.getServer().toString(), connection.transport);
}
}
try {
return new Pair<>(ttk.getServer().toString(), createNewTransport(ttk));
} catch (TTransportException tte) {
log.debug("Failed to connect to {}", servers.get(index), tte);
servers.remove(index);
retryCount++;
}
}
throw new TTransportException("Failed to connect to a server");
}
private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(),
(int) cacheKey.getTimeout(), cacheKey.getSslParams(), cacheKey.getSaslParams());
log.trace("Creating new connection to connection to {}", cacheKey.getServer());
CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
CachedConnection connection = new CachedConnection(tsc);
connection.reserve();
try {
ConnectionPool pool = getConnectionPool();
pool.putReserved(cacheKey, connection);
} catch (TransportPoolShutdownException e) {
connection.transport.close();
throw e;
}
return connection.transport;
}
public void returnTransport(TTransport transport) {
if (transport == null) {
return;
}
CachedTTransport cachedTransport = (CachedTTransport) transport;
ArrayList<CachedConnection> closeList = new ArrayList<>();
ConnectionPool pool = getConnectionPool();
boolean existInCache = pool.returnTransport(cachedTransport, closeList);
// close outside of sync block
closeList.forEach((connection) -> {
try {
connection.transport.close();
} catch (Exception e) {
log.debug("Failed to close connection w/ errors", e);
}
});
if (cachedTransport.sawError) {
boolean shouldWarn = false;
Long ecount = null;
synchronized (errorCount) {
ecount = errorCount.merge(cachedTransport.getCacheKey(), 1L, Long::sum);
// logs the first time an error occurred
errorTime.computeIfAbsent(cachedTransport.getCacheKey(), k -> System.currentTimeMillis());
if (ecount >= ERROR_THRESHOLD && serversWarnedAbout.add(cachedTransport.getCacheKey())) {
// boolean facilitates logging outside of lock
shouldWarn = true;
}
}
log.trace("Returned connection had error {}", cachedTransport.getCacheKey());
if (shouldWarn) {
log.warn("Server {} had {} failures in a short time period, will not complain anymore",
cachedTransport.getCacheKey(), ecount);
}
}
if (!existInCache) {
log.warn("Returned tablet server connection to cache that did not come from cache");
// close outside of sync block
transport.close();
}
}
/**
* Set the time after which idle connections should be closed
*/
public synchronized void setIdleTime(long time) {
this.killTime = time;
log.debug("Set thrift transport pool idle time to {}", time);
}
private static ThriftTransportPool instance = null;
static {
SingletonManager.register(new SingletonService() {
@Override
public boolean isEnabled() {
return ThriftTransportPool.isEnabled();
}
@Override
public void enable() {
ThriftTransportPool.enable();
}
@Override
public void disable() {
ThriftTransportPool.disable();
}
});
}
public static synchronized ThriftTransportPool getInstance() {
Preconditions.checkState(instance != null,
"The Accumulo singleton for connection pooling is disabled. This is likely caused by all "
+ "AccumuloClients being closed or garbage collected.");
instance.startCheckerThread();
return instance;
}
private static synchronized boolean isEnabled() {
return instance != null;
}
private static synchronized void enable() {
if (instance == null) {
// this code intentionally does not start the thread that closes idle connections. That thread
// is created the first time something attempts to use this service.
instance = new ThriftTransportPool();
}
}
private static synchronized void disable() {
if (instance != null) {
try {
instance.shutdown();
} finally {
instance = null;
}
}
}
public void startCheckerThread() {
checkThreadFactory.get();
}
void closeExpiredConnections() {
List<CachedConnection> expiredConnections;
ConnectionPool pool = getConnectionPool();
expiredConnections = pool.removeExpiredConnections(killTime);
synchronized (errorCount) {
Iterator<Entry<ThriftTransportKey,Long>> iter = errorTime.entrySet().iterator();
while (iter.hasNext()) {
Entry<ThriftTransportKey,Long> entry = iter.next();
long delta = System.currentTimeMillis() - entry.getValue();
if (delta >= STUCK_THRESHOLD) {
errorCount.remove(entry.getKey());
iter.remove();
}
}
}
// Close connections outside of sync block
expiredConnections.forEach((c) -> c.transport.close());
}
private void shutdown() {
connectionPool.shutdown();
try {
checkThreadFactory.get().join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private ConnectionPool getConnectionPool() {
return connectionPool;
}
}