blob: 8da917384de9556c92f4a965a15d7051ee7d7010 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A bounded thread pool server customized for HBase.
*/
@InterfaceAudience.Private
public class TBoundedThreadPoolServer extends TServer {
private static final String QUEUE_FULL_MSG = "Queue is full, closing connection";
/**
* The "core size" of the thread pool. New threads are created on every connection until this many
* threads are created.
*/
public static final String MIN_WORKER_THREADS_CONF_KEY = "hbase.thrift.minWorkerThreads";
/**
* This default core pool size should be enough for many test scenarios. We want to override this
* with a much larger number (e.g. at least 200) for a large-scale production setup.
*/
public static final int DEFAULT_MIN_WORKER_THREADS = 16;
/**
* The maximum size of the thread pool. When the pending request queue overflows, new threads are
* created until their number reaches this number. After that, the server starts dropping
* connections.
*/
public static final String MAX_WORKER_THREADS_CONF_KEY = "hbase.thrift.maxWorkerThreads";
public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
/**
* The maximum number of pending connections waiting in the queue. If there are no idle threads in
* the pool, the server queues requests. Only when the queue overflows, new threads are added, up
* to hbase.thrift.maxQueuedRequests threads.
*/
public static final String MAX_QUEUED_REQUESTS_CONF_KEY = "hbase.thrift.maxQueuedRequests";
public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
/**
* Default amount of time in seconds to keep a thread alive. Worker threads are stopped after
* being idle for this long.
*/
public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
"hbase.thrift.threadKeepAliveTimeSec";
private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
/**
* Time to wait after interrupting all worker threads. This is after a clean shutdown has been
* attempted.
*/
public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
private static final Logger LOG =
LoggerFactory.getLogger(TBoundedThreadPoolServer.class.getName());
private final CallQueue callQueue;
public static class Args extends TThreadPoolServer.Args {
int maxQueuedRequests;
int threadKeepAliveTimeSec;
public Args(TServerTransport transport, Configuration conf) {
super(transport);
minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY, DEFAULT_MIN_WORKER_THREADS);
maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY, DEFAULT_MAX_WORKER_THREADS);
maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY, DEFAULT_MAX_QUEUED_REQUESTS);
threadKeepAliveTimeSec =
conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY, DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
}
@Override
public String toString() {
return "min worker threads=" + minWorkerThreads + ", max worker threads=" + maxWorkerThreads
+ ", max queued requests=" + maxQueuedRequests;
}
}
/** Executor service for handling client connections */
private ThreadPoolExecutor executorService;
/** Flag for stopping the server */
private volatile boolean stopped;
private Args serverOptions;
public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
super(options);
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
if (options.maxQueuedRequests > 0) {
this.callQueue = new CallQueue(new LinkedBlockingQueue<>(options.maxQueuedRequests), metrics);
minWorkerThreads = maxWorkerThreads;
} else {
this.callQueue = new CallQueue(new SynchronousQueue<>(), metrics);
}
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d");
executorService = new THBaseThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
options.threadKeepAliveTimeSec, TimeUnit.SECONDS, this.callQueue, tfb.build(), metrics);
executorService.allowCoreThreadTimeOut(true);
serverOptions = options;
}
@Override
public void serve() {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOG.error("Error occurred during listening.", ttx);
return;
}
Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") {
@Override
public void run() {
TBoundedThreadPoolServer.this.stop();
}
});
stopped = false;
while (!stopped && !Thread.interrupted()) {
TTransport client = null;
try {
client = serverTransport_.accept();
} catch (TTransportException ttx) {
if (!stopped) {
LOG.warn("Transport error when accepting message", ttx);
continue;
} else {
// The server has been stopped
break;
}
}
ClientConnnection command = new ClientConnnection(client);
try {
executorService.execute(command);
} catch (RejectedExecutionException rex) {
if (client.getClass() == TSocket.class) {
// We expect the client to be TSocket.
LOG.warn(
QUEUE_FULL_MSG + " from " + ((TSocket) client).getSocket().getRemoteSocketAddress());
} else {
LOG.warn(QUEUE_FULL_MSG, rex);
}
client.close();
}
}
shutdownServer();
}
/**
* Loop until {@link ExecutorService#awaitTermination} finally does return without an interrupted
* exception. If we don't do this, then we'll shut down prematurely. We want to let the executor
* service clear its task queue, closing client sockets appropriately.
*/
private void shutdownServer() {
executorService.shutdown();
long msLeftToWait = serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
long timeMillis = EnvironmentEdgeManager.currentTime();
LOG
.info("Waiting for up to " + msLeftToWait + " ms to finish processing" + " pending requests");
boolean interrupted = false;
while (msLeftToWait >= 0) {
try {
executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long timePassed = EnvironmentEdgeManager.currentTime() - timeMillis;
msLeftToWait -= timePassed;
timeMillis += timePassed;
interrupted = true;
}
}
LOG.info("Interrupting all worker threads and waiting for " + TIME_TO_WAIT_AFTER_SHUTDOWN_MS
+ " ms longer");
// This will interrupt all the threads, even those running a task.
executorService.shutdownNow();
Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
// Preserve the interrupted status.
if (interrupted) {
Thread.currentThread().interrupt();
}
LOG.info("Thrift server shutdown complete");
}
@Override
public void stop() {
stopped = true;
serverTransport_.interrupt();
}
private final class ClientConnnection implements Runnable {
private TTransport client;
/**
* Default constructor.
* @param client Transport to process
*/
private ClientConnnection(TTransport client) {
this.client = client;
}
/**
* Loops on processing a client forever
*/
@Override
public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
try {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
while (true) {
if (stopped) {
break;
}
processor.process(inputProtocol, outputProtocol);
}
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
LOG.error("Thrift error occurred during processing of message.", tx);
} catch (Exception x) {
LOG.error("Error occurred during processing of message.", x);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
}
}
}