blob: 9cc289956078f8d756cfea16e4d858d2145ce92f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.rpc;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.net.ssl.SSLServerSocket;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.PropertyType;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Factory methods for creating Thrift server objects
*/
public class TServerUtils {
private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
/**
* Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client
* address of any incoming RPC.
*/
public static final ThreadLocal<String> clientAddress = new ThreadLocal<>();
/**
*
* @param hostname
* name of the host
* @param ports
* array of ports
* @return array of HostAndPort objects
*/
public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) {
return ports.mapToObj(port -> HostAndPort.fromParts(hostname, port))
.toArray(HostAndPort[]::new);
}
/**
*
* @param config
* Accumulo configuration
* @return A Map object with reserved port numbers as keys and Property objects as values
*/
public static Map<Integer,Property> getReservedPorts(AccumuloConfiguration config) {
return EnumSet.allOf(Property.class).stream()
.filter(p -> p.getType() == PropertyType.PORT && p != Property.TSERV_CLIENTPORT)
.flatMap(rp -> config.getPortStream(rp).mapToObj(portNum -> new Pair<>(portNum, rp)))
.collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
}
/**
* Start a server, at the given port, or higher, if that port is not available.
*
* @param service
* RPC configuration
* @param portHintProperty
* the port to attempt to open, can be zero, meaning "any available port"
* @param processor
* the service to be started
* @param serverName
* the name of the class that is providing the service
* @param threadName
* name this service's thread for better debugging
* @param portSearchProperty
* A boolean Property to control if port-search should be used, or null to disable
* @param minThreadProperty
* A Property to control the minimum number of threads in the pool
* @param timeBetweenThreadChecksProperty
* A Property to control the amount of time between checks to resize the thread pool
* @param maxMessageSizeProperty
* A Property to control the maximum Thrift message size accepted
* @return the server object created, and the port actually used
* @throws UnknownHostException
* when we don't know our own address
*/
public static ServerAddress startServer(MetricsSystem metricsSystem, ServerContext service,
String hostname, Property portHintProperty, TProcessor processor, String serverName,
String threadName, Property portSearchProperty, Property minThreadProperty,
Property threadTimeOutProperty, Property timeBetweenThreadChecksProperty,
Property maxMessageSizeProperty) throws UnknownHostException {
final AccumuloConfiguration config = service.getConfiguration();
final IntStream portHint = config.getPortStream(portHintProperty);
int minThreads = 2;
if (minThreadProperty != null) {
minThreads = config.getCount(minThreadProperty);
}
long threadTimeOut = ThreadPools.DEFAULT_TIMEOUT_MILLISECS;
if (threadTimeOutProperty != null) {
threadTimeOut = config.getTimeInMillis(threadTimeOutProperty);
}
long timeBetweenThreadChecks = 1000;
if (timeBetweenThreadChecksProperty != null) {
timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
}
long maxMessageSize = 10 * 1000 * 1000;
if (maxMessageSizeProperty != null) {
maxMessageSize = config.getAsBytes(maxMessageSizeProperty);
}
boolean portSearch = false;
if (portSearchProperty != null) {
portSearch = config.getBoolean(portSearchProperty);
}
final ThriftServerType serverType = service.getThriftServerType();
if (serverType == ThriftServerType.SASL) {
processor = updateSaslProcessor(serverType, processor);
}
// create the TimedProcessor outside the port search loop so we don't try to
// register the same
// metrics mbean more than once
TimedProcessor timedProcessor =
new TimedProcessor(metricsSystem, config, processor, serverName, threadName);
HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
try {
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis(),
addresses);
} catch (TTransportException e) {
if (portSearch) {
// Build a list of reserved ports - as identified by properties of type PropertyType.PORT
Map<Integer,Property> reservedPorts = getReservedPorts(config);
HostAndPort last = addresses[addresses.length - 1];
// Attempt to allocate a port outside of the specified port property
// Search sequentially over the next 1000 ports
for (int port = last.getPort() + 1; port < last.getPort() + 1001; port++) {
if (reservedPorts.containsKey(port)) {
log.debug("During port search, skipping reserved port {} - property {} ({})", port,
reservedPorts.get(port).getKey(), reservedPorts.get(port).getDescription());
continue;
}
if (PortRange.VALID_RANGE.isBefore(port)) {
break;
}
try {
HostAndPort addr = HostAndPort.fromParts(hostname, port);
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize,
service.getServerSslParams(), service.getSaslParams(),
service.getClientTimeoutInMillis(), addr);
} catch (TTransportException tte) {
log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
}
}
log.error("Unable to start TServer", e);
throw new UnknownHostException("Unable to find a listen port");
} else {
log.error("Unable to start TServer", e);
throw new UnknownHostException("Unable to find a listen port");
}
}
}
/**
* Create a non blocking server with multiple select threads and a custom thread pool that can
* dynamically resize itself.
*/
public static ServerAddress createThreadedSelectorServer(HostAndPort address,
TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf,
long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
final TNonblockingServerSocket transport =
new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()));
TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport);
options.selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 4);
log.info("selectorThreads : " + options.selectorThreads);
options.protocolFactory(protocolFactory);
options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
options.maxReadBufferBytes = maxMessageSize;
options.stopTimeoutVal(5);
// Create our own very special thread pool.
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
conf, timeBetweenThreadChecks);
options.executorService(pool);
options.processorFactory(new TProcessorFactory(processor));
if (address.getPort() == 0) {
address = HostAndPort.fromParts(address.getHost(), transport.getPort());
}
return new ServerAddress(new CustomThreadedSelectorServer(options), address);
}
/**
* Create a NonBlockingServer with a single select threads and a custom thread pool that can
* dynamically resize itself.
*/
public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, final String serverName, final int numThreads,
final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
long maxMessageSize) throws TTransportException {
final TNonblockingServerSocket transport =
new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()));
final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
options.protocolFactory(protocolFactory);
options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
options.maxReadBufferBytes = maxMessageSize;
options.stopTimeoutVal(5);
// Create our own very special thread pool.
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
conf, timeBetweenThreadChecks);
options.executorService(pool);
options.processorFactory(new TProcessorFactory(processor));
if (address.getPort() == 0) {
address = HostAndPort.fromParts(address.getHost(), transport.getPort());
}
return new ServerAddress(new CustomNonBlockingServer(options), address);
}
/**
* Creates a {@link ThreadPoolExecutor} which uses a ScheduledThreadPoolExecutor to inspect the
* core pool size and number of active threads of the {@link ThreadPoolExecutor} and increase or
* decrease the core pool size based on activity (excessive or lack thereof).
*
* @param serverName
* A name to describe the thrift server this executor will service
* @param executorThreads
* The minimum number of threads for the executor
* @param threadTimeOut
* The time after which threads are allowed to terminate including core threads. If set
* to 0, the core threads will indefinitely stay running waiting for work.
* @param conf
* Accumulo Configuration
* @param timeBetweenThreadChecks
* The amount of time, in millis, between attempts to resize the executor thread pool
* @return A {@link ThreadPoolExecutor} which will resize itself automatically
*/
public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName,
final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf,
long timeBetweenThreadChecks) {
final ThreadPoolExecutor pool = ThreadPools.createFixedThreadPool(executorThreads,
threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", false);
// periodically adjust the number of threads we need by checking how busy our threads are
ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> {
// there is a minor race condition between sampling the current state of the thread pool and
// adjusting it
// however, this isn't really an issue, since it adjusts periodically anyway
if (pool.getCorePoolSize() <= pool.getActiveCount()) {
int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
log.info("Increasing server thread pool size on {} to {}", serverName, larger);
pool.setMaximumPoolSize(larger);
pool.setCorePoolSize(larger);
} else {
if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
if (smaller != pool.getCorePoolSize()) {
log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
pool.setCorePoolSize(smaller);
}
}
}
}, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS);
return pool;
}
/**
* Creates a TThreadPoolServer for normal unsecure operation. Useful for comparing performance
* against SSL or SASL transports.
*
* @param address
* Address to bind to
* @param processor
* TProcessor for the server
* @param maxMessageSize
* Maximum size of a Thrift message allowed
* @return A configured TThreadPoolServer and its bound address information
*/
public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads,
long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks)
throws TTransportException {
InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
// Must use an ISA, providing only a port would ignore the hostname given
TServerSocket transport = new TServerSocket(isa);
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
conf, timeBetweenThreadChecks);
TThreadPoolServer server = createTThreadPoolServer(transport, processor,
ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
if (address.getPort() == 0) {
address =
HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort());
log.info("Blocking Server bound on {}", address);
}
return new ServerAddress(server, address);
}
/**
* Create a {@link TThreadPoolServer} with the provided server transport, processor and transport
* factory.
*
* @param transport
* TServerTransport for the server
* @param processor
* TProcessor for the server
* @param transportFactory
* TTransportFactory for the server
*/
public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport,
TProcessor processor, TTransportFactory transportFactory, TProtocolFactory protocolFactory,
ExecutorService service) {
TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
options.protocolFactory(protocolFactory);
options.transportFactory(transportFactory);
options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
if (service != null) {
options.executorService(service);
}
return new TThreadPoolServer(options);
}
/**
* Create the Thrift server socket for RPC running over SSL.
*
* @param port
* Port of the server socket to bind to
* @param timeout
* Socket timeout
* @param address
* Address to bind the socket to
* @param params
* SSL parameters
* @return A configured TServerSocket configured to use SSL
*/
public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address,
SslConnectionParams params) throws TTransportException {
TServerSocket tServerSock;
if (params.useJsse()) {
tServerSock =
TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
} else {
tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address,
params.getTTransportParams());
}
final ServerSocket serverSock = tServerSock.getServerSocket();
if (serverSock instanceof SSLServerSocket) {
SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
String[] protocols = params.getServerProtocols();
// Be nice for the user and automatically remove protocols that might not exist in their JVM.
// Keeps us from forcing config alterations too
// e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
Set<String> socketEnabledProtocols =
new HashSet<>(Arrays.asList(sslServerSock.getEnabledProtocols()));
// Keep only the enabled protocols that were specified by the configuration
socketEnabledProtocols.retainAll(Arrays.asList(protocols));
if (socketEnabledProtocols.isEmpty()) {
// Bad configuration...
throw new RuntimeException(
"No available protocols available for secure socket. Availaable protocols: "
+ Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: "
+ Arrays.toString(protocols));
}
// Set the protocol(s) on the server socketlong
sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
}
return tServerSock;
}
/**
* Create a Thrift SSL server.
*
* @param address
* host and port to bind to
* @param processor
* TProcessor for the server
* @param socketTimeout
* Socket timeout
* @param sslParams
* SSL parameters
* @return A ServerAddress with the bound-socket information and the Thrift server
*/
public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams,
String serverName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf,
long timeBetweenThreadChecks) throws TTransportException {
TServerSocket transport;
try {
transport = getSslServerSocket(address.getPort(), (int) socketTimeout,
InetAddress.getByName(address.getHost()), sslParams);
} catch (UnknownHostException e) {
throw new TTransportException(e);
}
if (address.getPort() == 0) {
address =
HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort());
log.info("SSL Thread Pool Server bound on {}", address);
}
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
conf, timeBetweenThreadChecks);
return new ServerAddress(createTThreadPoolServer(transport, processor,
ThriftUtil.transportFactory(), protocolFactory, pool), address);
}
public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params,
final String serverName, final int numThreads, final long threadTimeOut,
final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws TTransportException {
// We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the
// TThreadPoolServer does,
// but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it
// open()'s which will fail
// when the server does an accept() to (presumably) wake up the eventing system.
log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHost(),
address.getPort());
InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
// Must use an ISA, providing only a port would ignore the hostname given
TServerSocket transport = new TServerSocket(isa, (int) socketTimeout);
String hostname, fqdn;
try {
hostname = InetAddress.getByName(address.getHost()).getCanonicalHostName();
fqdn = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
transport.close();
throw new TTransportException(e);
}
// If we can't get a real hostname from the provided host test, use the hostname from DNS for
// localhost
if ("0.0.0.0".equals(hostname)) {
hostname = fqdn;
}
// ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients
// and servers have to agree upon the FQDN
// so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for
// this host, fail quickly and inform them to update
// their configuration.
if (!hostname.equals(fqdn)) {
log.error("Expected hostname of '{}' but got '{}'. Ensure the entries in"
+ " the Accumulo hosts files (e.g. managers, tservers) are the FQDN for"
+ " each host when using SASL.", fqdn, hostname);
transport.close();
throw new RuntimeException("SASL requires that the address the thrift"
+ " server listens on is the same as the FQDN for this host");
}
final UserGroupInformation serverUser;
try {
serverUser = UserGroupInformation.getLoginUser();
} catch (IOException e) {
transport.close();
throw new TTransportException(e);
}
log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser,
params.getKerberosServerPrimary(), hostname);
// Make the SASL transport factory with the instance and primary from the kerberos server
// principal, SASL properties
// and the SASL callback handler from Hadoop to ensure authorization ID is the authentication
// ID. Despite the 'protocol' argument seeming to be useless, it
// *must* be the primary of the server.
TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(),
hostname, params.getSaslProperties(), new SaslRpcServer.SaslGssCallbackHandler());
if (params.getSecretManager() != null) {
log.info("Adding DIGEST-MD5 server definition for delegation tokens");
saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5,
params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
new SaslServerDigestCallbackHandler(params.getSecretManager()));
} else {
log.info("SecretManager is null, not adding support for delegation token authentication");
}
// Make sure the TTransportFactory is performing a UGI.doAs
TTransportFactory ugiTransportFactory =
new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
if (address.getPort() == 0) {
// If we chose a port dynamically, make a new use it (along with the proper hostname)
address =
HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort());
log.info("SASL thrift server bound on {}", address);
}
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
conf, timeBetweenThreadChecks);
final TThreadPoolServer server =
createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool);
return new ServerAddress(server, address);
}
public static ServerAddress startTServer(MetricsSystem metricsSystem,
final AccumuloConfiguration conf, ThriftServerType serverType, TProcessor processor,
String serverName, String threadName, int numThreads, long threadTimeOut,
long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses)
throws TTransportException {
if (serverType == ThriftServerType.SASL) {
processor = updateSaslProcessor(serverType, processor);
}
return startTServer(serverType,
new TimedProcessor(metricsSystem, conf, processor, serverName, threadName), serverName,
threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize,
sslParams, saslParams, serverSocketTimeout, addresses);
}
/**
* @see #startTServer(ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int,
* long, AccumuloConfiguration, long, long, SslConnectionParams, SaslServerConnectionParams,
* long, HostAndPort...)
*/
public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
String serverName, String threadName, int numThreads, long threadTimeOut,
final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
return startTServer(serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams,
saslParams, serverSocketTimeout, addresses);
}
/**
* Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters.
* Non-null SSL parameters will cause an SSL server to be started.
*
* @return A ServerAddress encapsulating the Thrift server created and the host/port which it is
* bound to.
*/
public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads,
long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
// This is presently not supported. It's hypothetically possible, I believe, to work, but it
// would require changes in how the transports
// work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's
// quality of protection addresses privacy issues.
checkArgument(sslParams == null || saslParams == null,
"Cannot start a Thrift server using both SSL and SASL");
ServerAddress serverAddress = null;
for (HostAndPort address : addresses) {
try {
switch (serverType) {
case SSL:
log.debug("Instantiating SSL Thrift server");
serverAddress = createSslThreadPoolServer(address, processor, protocolFactory,
serverSocketTimeout, sslParams, serverName, numThreads, threadTimeOut, conf,
timeBetweenThreadChecks);
break;
case SASL:
log.debug("Instantiating SASL Thrift server");
serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory,
serverSocketTimeout, saslParams, serverName, numThreads, threadTimeOut, conf,
timeBetweenThreadChecks);
break;
case THREADPOOL:
log.debug("Instantiating unsecure TThreadPool Thrift server");
serverAddress =
createBlockingServer(address, processor, protocolFactory, maxMessageSize,
serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks);
break;
case THREADED_SELECTOR:
log.debug("Instantiating default, unsecure Threaded selector Thrift server");
serverAddress =
createThreadedSelectorServer(address, processor, protocolFactory, serverName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize);
break;
case CUSTOM_HS_HA:
log.debug("Instantiating unsecure custom half-async Thrift server");
serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize);
break;
default:
throw new IllegalArgumentException("Unknown server type " + serverType);
}
break;
} catch (TTransportException e) {
log.warn("Error attempting to create server at {}. Error: {}", address, e.getMessage());
}
}
if (serverAddress == null) {
throw new TTransportException(
"Unable to create server on addresses: " + Arrays.toString(addresses));
}
final TServer finalServer = serverAddress.server;
Threads.createThread(threadName, () -> {
try {
finalServer.serve();
} catch (Error e) {
Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
}
}).start();
// check for the special "bind to everything address"
if (serverAddress.address.getHost().equals("0.0.0.0")) {
// can't get the address from the bind, so we'll do our best to invent our hostname
try {
serverAddress = new ServerAddress(finalServer, HostAndPort
.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
} catch (UnknownHostException e) {
throw new TTransportException(e);
}
}
return serverAddress;
}
/**
* Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to
* forcibly shut down the threadpool.
*
* @param s
* The TServer to stop
*/
public static void stopTServer(TServer s) {
if (s == null) {
return;
}
s.stop();
try {
Field f = s.getClass().getDeclaredField("executorService_");
f.setAccessible(true);
ExecutorService es = (ExecutorService) f.get(s);
es.shutdownNow();
} catch (Exception e) {
log.error("Unable to call shutdownNow", e);
}
}
/**
* Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication
* works. Requires the <code>serverType</code> to be {@link ThriftServerType#SASL} and throws an
* exception when it is not.
*
* @return A {@link UGIAssumingProcessor} which wraps the provided processor
*/
private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
checkArgument(serverType == ThriftServerType.SASL);
// Wrap the provided processor in our special processor which proxies the provided UGI on the
// logged-in UGI
// Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are
// still reported
// as the logged-in user.
log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
return new UGIAssumingProcessor(processor);
}
}