blob: 8b340c2718542f130963326ae2b2b6853ec82380 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.impl;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
public class ThriftTransportPool {
private static final SecureRandom random = new SecureRandom();
private long killTime = 1000 * 3;
private static class CachedConnections {
LinkedList<CachedConnection> unreserved = new LinkedList<>();
Map<CachedTTransport,CachedConnection> reserved = new HashMap<>();
public CachedConnection reserveAny() {
if (unreserved.size() > 0) {
CachedConnection cachedConnection = unreserved.removeFirst();
cachedConnection.reserve();
reserved.put(cachedConnection.transport, cachedConnection);
if (log.isTraceEnabled()) {
log.trace("Using existing connection to {}", cachedConnection.transport.cacheKey);
}
return cachedConnection;
}
return null;
}
}
private Map<ThriftTransportKey,CachedConnections> cache = new HashMap<>();
private Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
private CountDownLatch closerExitLatch;
private static final Logger log = LoggerFactory.getLogger(ThriftTransportPool.class);
private static final Long ERROR_THRESHOLD = 20L;
private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
private static class CachedConnection {
public CachedConnection(CachedTTransport t) {
this.transport = t;
}
void reserve() {
Preconditions.checkState(!this.transport.reserved);
this.transport.setReserved(true);
}
void unreserve() {
Preconditions.checkState(this.transport.reserved);
this.transport.setReserved(false);
}
final CachedTTransport transport;
long lastReturnTime;
}
public static class TransportPoolShutdownException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
private static class Closer implements Runnable {
final ThriftTransportPool pool;
private CountDownLatch closerExitLatch;
public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
this.pool = pool;
this.closerExitLatch = closerExitLatch;
}
private void closeConnections() {
while (true) {
ArrayList<CachedConnection> connectionsToClose = new ArrayList<>();
synchronized (pool) {
for (CachedConnections cachedConns : pool.getCache().values()) {
Iterator<CachedConnection> iter = cachedConns.unreserved.iterator();
while (iter.hasNext()) {
CachedConnection cachedConnection = iter.next();
if (System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
connectionsToClose.add(cachedConnection);
iter.remove();
}
}
for (CachedConnection cachedConnection : cachedConns.reserved.values()) {
cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
}
}
Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
while (iter.hasNext()) {
Entry<ThriftTransportKey,Long> entry = iter.next();
long delta = System.currentTimeMillis() - entry.getValue();
if (delta >= STUCK_THRESHOLD) {
pool.errorCount.remove(entry.getKey());
iter.remove();
}
}
}
// close connections outside of sync block
for (CachedConnection cachedConnection : connectionsToClose) {
cachedConnection.transport.close();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
log.debug("Sleep interrupted in closeConnections()", e);
}
}
}
@Override
public void run() {
try {
closeConnections();
} catch (TransportPoolShutdownException e) {} finally {
closerExitLatch.countDown();
}
}
}
static class CachedTTransport extends TTransport {
private ThriftTransportKey cacheKey;
private TTransport wrappedTransport;
private boolean sawError = false;
private volatile String ioThreadName = null;
private volatile long ioStartTime = 0;
private volatile boolean reserved = false;
private String stuckThreadName = null;
int ioCount = 0;
int lastIoCount = -1;
private void sawError(Exception e) {
sawError = true;
}
final void setReserved(boolean reserved) {
this.reserved = reserved;
if (reserved) {
ioThreadName = Thread.currentThread().getName();
ioCount = 0;
lastIoCount = -1;
} else {
if ((ioCount & 1) == 1) {
// connection unreserved, but it seems io may still be happening
log.warn("Connection returned to thrift connection pool that may still be in use {} {}",
ioThreadName, Thread.currentThread().getName(), new Exception());
}
ioCount = 0;
lastIoCount = -1;
ioThreadName = null;
}
checkForStuckIO(STUCK_THRESHOLD);
}
final void checkForStuckIO(long threshold) {
// checking for stuck io needs to be light weight.
// Tried to call System.currentTimeMillis() and Thread.currentThread() before every io
// operation.... this dramatically slowed things down. So switched to
// incrementing a counter before and after each io operation.
if ((ioCount & 1) == 1) {
// when ioCount is odd, it means I/O is currently happening
if (ioCount == lastIoCount) {
// still doing same I/O operation as last time this
// functions was called
long delta = System.currentTimeMillis() - ioStartTime;
if (delta >= threshold && stuckThreadName == null) {
stuckThreadName = ioThreadName;
log.warn("Thread \"{}\" stuck on IO to {} for at least {} ms", ioThreadName, cacheKey,
delta);
}
} else {
// remember this ioCount and the time we saw it, need to see
// if it changes
lastIoCount = ioCount;
ioStartTime = System.currentTimeMillis();
if (stuckThreadName != null) {
// doing I/O, but ioCount changed so no longer stuck
log.info("Thread \"{}\" no longer stuck on IO to {} sawError = {}", stuckThreadName,
cacheKey, sawError);
stuckThreadName = null;
}
}
} else {
// I/O is not currently happening
if (stuckThreadName != null) {
// no longer stuck, and was stuck in the past
log.info("Thread \"{}\" no longer stuck on IO to {} sawError = {}", stuckThreadName,
cacheKey, sawError);
stuckThreadName = null;
}
}
}
public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
this.wrappedTransport = transport;
this.cacheKey = cacheKey2;
}
@Override
public boolean isOpen() {
return wrappedTransport.isOpen();
}
@Override
public void open() throws TTransportException {
try {
ioCount++;
wrappedTransport.open();
} catch (TTransportException tte) {
sawError(tte);
throw tte;
} finally {
ioCount++;
}
}
@Override
public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
try {
ioCount++;
return wrappedTransport.read(arg0, arg1, arg2);
} catch (TTransportException tte) {
sawError(tte);
throw tte;
} finally {
ioCount++;
}
}
@Override
public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
try {
ioCount++;
return wrappedTransport.readAll(arg0, arg1, arg2);
} catch (TTransportException tte) {
sawError(tte);
throw tte;
} finally {
ioCount++;
}
}
@Override
public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
try {
ioCount++;
wrappedTransport.write(arg0, arg1, arg2);
} catch (TTransportException tte) {
sawError(tte);
throw tte;
} finally {
ioCount++;
}
}
@Override
public void write(byte[] arg0) throws TTransportException {
try {
ioCount++;
wrappedTransport.write(arg0);
} catch (TTransportException tte) {
sawError(tte);
throw tte;
} finally {
ioCount++;
}
}
@Override
public void close() {
try {
ioCount++;
wrappedTransport.close();
} finally {
ioCount++;
}
}
@Override
public void flush() throws TTransportException {
try {
ioCount++;
wrappedTransport.flush();
} catch (TTransportException tte) {
sawError(tte);
throw tte;
} finally {
ioCount++;
}
}
@Override
public boolean peek() {
try {
ioCount++;
return wrappedTransport.peek();
} finally {
ioCount++;
}
}
@Override
public byte[] getBuffer() {
try {
ioCount++;
return wrappedTransport.getBuffer();
} finally {
ioCount++;
}
}
@Override
public int getBufferPosition() {
try {
ioCount++;
return wrappedTransport.getBufferPosition();
} finally {
ioCount++;
}
}
@Override
public int getBytesRemainingInBuffer() {
try {
ioCount++;
return wrappedTransport.getBytesRemainingInBuffer();
} finally {
ioCount++;
}
}
@Override
public void consumeBuffer(int len) {
try {
ioCount++;
wrappedTransport.consumeBuffer(len);
} finally {
ioCount++;
}
}
public ThriftTransportKey getCacheKey() {
return cacheKey;
}
}
private ThriftTransportPool() {}
public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context)
throws TTransportException {
return getTransport(new ThriftTransportKey(location, milliseconds, context));
}
private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
// compute hash code outside of lock, this lowers the time the lock is held
cacheKey.precomputeHashCode();
synchronized (this) {
// atomically reserve location if it exist in cache
CachedConnections ccl = getCache().get(cacheKey);
if (ccl == null) {
ccl = new CachedConnections();
getCache().put(cacheKey, ccl);
}
CachedConnection cachedConnection = ccl.reserveAny();
if (cachedConnection != null) {
log.trace("Using existing connection to {}", cacheKey.getServer());
return cachedConnection.transport;
}
}
return createNewTransport(cacheKey);
}
@VisibleForTesting
public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers,
boolean preferCachedConnection) throws TTransportException {
servers = new ArrayList<>(servers);
if (preferCachedConnection) {
HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
synchronized (this) {
// randomly pick a server from the connection cache
serversSet.retainAll(getCache().keySet());
if (serversSet.size() > 0) {
ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet);
Collections.shuffle(cachedServers, random);
for (ThriftTransportKey ttk : cachedServers) {
CachedConnection cachedConnection = getCache().get(ttk).reserveAny();
if (cachedConnection != null) {
final String serverAddr = ttk.getServer().toString();
log.trace("Using existing connection to {}", serverAddr);
return new Pair<>(serverAddr, cachedConnection.transport);
}
}
}
}
}
int retryCount = 0;
while (servers.size() > 0 && retryCount < 10) {
int index = random.nextInt(servers.size());
ThriftTransportKey ttk = servers.get(index);
if (preferCachedConnection) {
synchronized (this) {
CachedConnections cachedConns = getCache().get(ttk);
if (cachedConns != null) {
CachedConnection cachedConnection = cachedConns.reserveAny();
if (cachedConnection != null) {
final String serverAddr = ttk.getServer().toString();
return new Pair<>(serverAddr, cachedConnection.transport);
}
}
}
}
try {
return new Pair<>(ttk.getServer().toString(), createNewTransport(ttk));
} catch (TTransportException tte) {
log.debug("Failed to connect to {}", servers.get(index), tte);
servers.remove(index);
retryCount++;
}
}
throw new TTransportException("Failed to connect to a server");
}
private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(),
(int) cacheKey.getTimeout(), cacheKey.getSslParams(), cacheKey.getSaslParams());
log.trace("Creating new connection to connection to {}", cacheKey.getServer());
CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
CachedConnection cc = new CachedConnection(tsc);
cc.reserve();
try {
synchronized (this) {
CachedConnections cachedConns = getCache().get(cacheKey);
if (cachedConns == null) {
cachedConns = new CachedConnections();
getCache().put(cacheKey, cachedConns);
}
cachedConns.reserved.put(cc.transport, cc);
}
} catch (TransportPoolShutdownException e) {
cc.transport.close();
throw e;
}
return cc.transport;
}
public void returnTransport(TTransport tsc) {
if (tsc == null) {
return;
}
boolean existInCache = false;
CachedTTransport ctsc = (CachedTTransport) tsc;
ArrayList<CachedConnection> closeList = new ArrayList<>();
synchronized (this) {
CachedConnections cachedConns = getCache().get(ctsc.getCacheKey());
if (cachedConns != null) {
CachedConnection cachedConnection = cachedConns.reserved.remove(ctsc);
if (cachedConnection != null) {
if (ctsc.sawError) {
closeList.add(cachedConnection);
log.trace("Returned connection had error {}", ctsc.getCacheKey());
Long ecount = errorCount.get(ctsc.getCacheKey());
if (ecount == null)
ecount = 0L;
ecount++;
errorCount.put(ctsc.getCacheKey(), ecount);
Long etime = errorTime.get(ctsc.getCacheKey());
if (etime == null) {
errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
}
if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
log.warn(
"Server {} had {} failures in a short time period, will not complain anymore",
ctsc.getCacheKey(), ecount);
serversWarnedAbout.add(ctsc.getCacheKey());
}
cachedConnection.unreserve();
// remove all unreserved cached connection when a sever has an error, not just the
// connection that was returned
closeList.addAll(cachedConns.unreserved);
cachedConns.unreserved.clear();
} else {
log.trace("Returned connection {} ioCount: {}", ctsc.getCacheKey(),
cachedConnection.transport.ioCount);
cachedConnection.lastReturnTime = System.currentTimeMillis();
cachedConnection.unreserve();
// Calling addFirst to use unreserved as LIFO queue. Using LIFO ensures that when the #
// of pooled connections exceeds the working set size that the
// idle times at the end of the list grow. The connections with large idle times will be
// cleaned up. Using a FIFO could continually reset the idle
// times of all connections, even when there are more than the working set size.
cachedConns.unreserved.addFirst(cachedConnection);
}
existInCache = true;
}
}
}
// close outside of sync block
for (CachedConnection cachedConnection : closeList) {
try {
cachedConnection.transport.close();
} catch (Exception e) {
log.debug("Failed to close connection w/ errors", e);
}
}
if (!existInCache) {
log.warn("Returned tablet server connection to cache that did not come from cache");
// close outside of sync block
tsc.close();
}
}
/**
* Set the time after which idle connections should be closed
*/
public synchronized void setIdleTime(long time) {
this.killTime = time;
log.debug("Set thrift transport pool idle time to {}", time);
}
private static ThriftTransportPool instance = new ThriftTransportPool();
private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
public static ThriftTransportPool getInstance() {
if (daemonStarted.compareAndSet(false, true)) {
CountDownLatch closerExitLatch = new CountDownLatch(1);
new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
instance.setCloserExitLatch(closerExitLatch);
}
return instance;
}
private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
this.closerExitLatch = closerExitLatch;
}
public void shutdown() {
synchronized (this) {
if (cache == null)
return;
// close any connections in the pool... even ones that are in use
for (CachedConnections cachedConn : getCache().values()) {
for (CachedConnection cc : Iterables.concat(cachedConn.reserved.values(),
cachedConn.unreserved)) {
try {
cc.transport.close();
} catch (Exception e) {
log.debug("Error closing transport during shutdown", e);
}
}
}
// this will render the pool unusable and cause the background thread to exit
this.cache = null;
}
try {
closerExitLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private Map<ThriftTransportKey,CachedConnections> getCache() {
if (cache == null)
throw new TransportPoolShutdownException();
return cache;
}
}