blob: cbcbc9a8f7a83a9b33ebac9754494fe7ed2221f5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
/**
* The RPC server with native java NIO implementation deriving from Hadoop to
* host protobuf described Services. It's the original one before HBASE-17262,
* and the default RPC server for now.
*
* An RpcServer instance has a Listener that hosts the socket. Listener has fixed number
* of Readers in an ExecutorPool, 10 by default. The Listener does an accept and then
* round robin a Reader is chosen to do the read. The reader is registered on Selector. Read does
* total read off the channel and the parse from which it makes a Call. The call is wrapped in a
* CallRunner and passed to the scheduler to be run. Reader goes back to see if more to be done
* and loops till done.
*
* <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
* has given the queues into which calls (i.e. CallRunner instances) are inserted. Handlers run
* taking from the queue. They run the CallRunner#run method on each item gotten from queue
* and keep taking while the server is up.
*
* CallRunner#run executes the call. When done, asks the included Call to put itself on new
* queue for Responder to pull from and return result to client.
*
* @see BlockingRpcClient
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.CONFIG})
public class SimpleRpcServer extends RpcServer {
protected int port; // port we listen on
protected InetSocketAddress address; // inet address we listen on
private int readThreads; // number of read threads
protected int socketSendBufferSize;
protected final long purgeTimeout; // in milliseconds
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
private Listener listener = null;
protected SimpleRpcServerResponder responder = null;
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private int currentReader = 0;
private final int readerPendingConnectionQueueLength;
private ExecutorService readPool;
public Listener(final String name) throws IOException {
super(name);
// The backlog of requests that we will have the serversocket carry.
int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
readerPendingConnectionQueueLength =
conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the binding addrees (can be different from the default interface)
bind(acceptChannel.socket(), bindAddress, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
// create a selector;
selector = Selector.open();
readers = new Reader[readThreads];
// Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
// has an advantage in that it is easy to shutdown the pool.
readPool = Executors.newFixedThreadPool(readThreads,
new ThreadFactoryBuilder().setNameFormat(
"Reader=%d,bindAddress=" + bindAddress.getHostName() +
",port=" + port).setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
for (int i = 0; i < readThreads; ++i) {
Reader reader = new Reader();
readers[i] = reader;
readPool.execute(reader);
}
LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("Listener,port=" + port);
this.setDaemon(true);
}
private class Reader implements Runnable {
final private LinkedBlockingQueue<SimpleServerRpcConnection> pendingConnections;
private final Selector readSelector;
Reader() throws IOException {
this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
this.readSelector = Selector.open();
}
@Override
public void run() {
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
}
}
}
private synchronized void doRunLoop() {
while (running) {
try {
// Consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
SimpleServerRpcConnection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select();
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (CancelledKeyException e) {
LOG.error(getName() + ": CancelledKeyException in Reader", e);
} catch (IOException ex) {
LOG.info(getName() + ": IOException in Reader", ex);
}
}
}
/**
* Updating the readSelector while it's being used is not thread-safe,
* so the connection must be queued. The reader will drain the queue
* and update its readSelector before performing the next select
*/
public void addConnection(SimpleServerRpcConnection conn) throws IOException {
pendingConnections.add(conn);
readSelector.wakeup();
}
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="selector access is not synchronized; seems fine but concerned changing " +
"it will have per impact")
public void run() {
LOG.info(getName() + ": starting");
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
key = null;
}
} catch (OutOfMemoryError e) {
if (errorHandler != null) {
if (errorHandler.checkOOME(e)) {
LOG.info(getName() + ": exiting on OutOfMemoryError");
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
return;
}
} else {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn(getName() + ": OutOfMemoryError in server select", e);
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
try {
Thread.sleep(60000);
} catch (InterruptedException ex) {
LOG.debug("Interrupted while sleeping");
}
}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info(getName() + ": stopping");
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
selector= null;
acceptChannel= null;
// close all connections
connectionManager.stopIdleScan();
connectionManager.closeAll();
}
}
private void closeCurrentConnection(SelectionKey key, Throwable e) {
if (key != null) {
SimpleServerRpcConnection c = (SimpleServerRpcConnection)key.attachment();
if (c != null) {
closeConnection(c);
key.attach(null);
}
}
}
InetSocketAddress getAddress() {
return address;
}
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(tcpKeepAlive);
Reader reader = getReader();
SimpleServerRpcConnection c = connectionManager.register(channel);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanupWithLogger(LOG, channel);
}
continue;
}
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}
void doRead(SelectionKey key) throws InterruptedException {
int count;
SimpleServerRpcConnection c = (SimpleServerRpcConnection) key.attachment();
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught exception while reading:", e);
}
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
closeConnection(c);
c = null;
} else {
c.setLastContact(System.currentTimeMillis());
}
}
synchronized void doStop() {
if (selector != null) {
selector.wakeup();
Thread.yield();
}
if (acceptChannel != null) {
try {
acceptChannel.socket().close();
} catch (IOException e) {
LOG.info(getName() + ": exception in closing listener socket. " + e);
}
}
readPool.shutdownNow();
}
// The method that will return the next reader to work with
// Simplistic implementation of round robin for now
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
}
/**
* Constructs a server listening on the named port and address.
* @param server hosting instance of {@link Server}. We will do authentications if an
* instance else pass null for no authentication check.
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
* @param services A list of services.
* @param bindAddress Where to listen
* @param conf
* @param scheduler
* @param reservoirEnabled Enable ByteBufferPool or not.
*/
public SimpleRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler, boolean reservoirEnabled) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
this.socketSendBufferSize = 0;
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
// Start the listener here and let it bind to the port
listener = new Listener(name);
this.port = listener.getAddress().getPort();
// Create the responder here
responder = new SimpleRpcServerResponder(this);
connectionManager = new ConnectionManager();
initReconfigurable(conf);
this.scheduler.init(new RpcSchedulerContext(this));
}
/**
* Subclasses of HBaseServer can override this to provide their own
* Connection implementations.
*/
protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
return new SimpleServerRpcConnection(this, channel, time);
}
protected void closeConnection(SimpleServerRpcConnection connection) {
connectionManager.close(connection);
}
/** Sets the socket buffer size used for responding to RPCs.
* @param size send size
*/
@Override
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
/** Starts the service. Must be called before any calls will be handled. */
@Override
public synchronized void start() {
if (started) {
return;
}
authTokenSecretMgr = createSecretManager();
if (authTokenSecretMgr != null) {
setSecretManager(authTokenSecretMgr);
authTokenSecretMgr.start();
}
this.authManager = new ServiceAuthorizationManager();
HBasePolicyProvider.init(conf, authManager);
responder.start();
listener.start();
scheduler.start();
started = true;
}
/** Stops the service. No new calls will be handled after this is called. */
@Override
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
if (authTokenSecretMgr != null) {
authTokenSecretMgr.stop();
authTokenSecretMgr = null;
}
listener.interrupt();
listener.doStop();
responder.interrupt();
scheduler.stop();
notifyAll();
}
/**
* Wait for the server to be stopped. Does not wait for all subthreads to finish.
* @see #stop()
*/
@Override
public synchronized void join() throws InterruptedException {
while (running) {
wait();
}
}
/**
* Return the socket (ip+port) on which the RPC server is listening to. May return null if
* the listener channel is closed.
* @return the socket (ip+port) on which the RPC server is listening to, or null if this
* information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
if (listener == null) {
return null;
}
return listener.getAddress();
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),
0);
}
@Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
long startTime, int timeout) throws IOException {
SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null);
return call(fakeCall, status);
}
/**
* This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
* If the amount of data is large, it writes to channel in smaller chunks.
* This is to avoid jdk from creating many direct buffers as the size of
* buffer increases. This also minimizes extra copies in NIO layer
* as a result of multiple write operations required to write a large
* buffer.
*
* @param channel writable byte channel to write to
* @param bufferChain Chain of buffers to write
* @return number of bytes written
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
*/
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
throws IOException {
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
if (count > 0) {
this.metrics.sentBytes(count);
}
return count;
}
/**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
* @param socket the socket to bind
* @param address the address to bind to
* @param backlog the number of connections allowed in the queue
* @throws BindException if the address can't be bound
* @throws UnknownHostException if the address isn't a valid host name
* @throws IOException other random errors from bind
*/
public static void bind(ServerSocket socket, InetSocketAddress address, int backlog)
throws IOException {
try {
socket.bind(address, backlog);
} catch (BindException e) {
BindException bindException =
new BindException("Problem binding to " + address + " : " + e.getMessage());
bindException.initCause(e);
throw bindException;
} catch (SocketException e) {
// If they try to bind to a different host's address, give a better
// error message.
if ("Unresolved address".equals(e.getMessage())) {
throw new UnknownHostException("Invalid hostname for server: " + address.getHostName());
}
throw e;
}
}
/**
* The number of open RPC conections
* @return the number of open rpc connections
*/
@Override
public int getNumOpenConnections() {
return connectionManager.size();
}
private class ConnectionManager {
final private AtomicInteger count = new AtomicInteger();
final private Set<SimpleServerRpcConnection> connections;
final private Timer idleScanTimer;
final private int idleScanThreshold;
final private int idleScanInterval;
final private int maxIdleTime;
final private int maxIdleToClose;
ConnectionManager() {
this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
this.idleScanInterval =
conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
int maxConnectionQueueSize =
handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
// create a set with concurrency -and- a thread-safe iterator, add 2
// for listener and idle closer threads
this.connections = Collections.newSetFromMap(
new ConcurrentHashMap<SimpleServerRpcConnection,Boolean>(
maxConnectionQueueSize, 0.75f, readThreads+2));
}
private boolean add(SimpleServerRpcConnection connection) {
boolean added = connections.add(connection);
if (added) {
count.getAndIncrement();
}
return added;
}
private boolean remove(SimpleServerRpcConnection connection) {
boolean removed = connections.remove(connection);
if (removed) {
count.getAndDecrement();
}
return removed;
}
int size() {
return count.get();
}
SimpleServerRpcConnection[] toArray() {
return connections.toArray(new SimpleServerRpcConnection[0]);
}
SimpleServerRpcConnection register(SocketChannel channel) {
SimpleServerRpcConnection connection = getConnection(channel, System.currentTimeMillis());
add(connection);
if (LOG.isTraceEnabled()) {
LOG.trace("Connection from " + connection +
"; connections=" + size() +
", queued calls size (bytes)=" + callQueueSizeInBytes.sum() +
", general queued calls=" + scheduler.getGeneralQueueLength() +
", priority queued calls=" + scheduler.getPriorityQueueLength() +
", meta priority queued calls=" + scheduler.getMetaPriorityQueueLength());
}
return connection;
}
boolean close(SimpleServerRpcConnection connection) {
boolean exists = remove(connection);
if (exists) {
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getName() +
": disconnecting client " + connection +
". Number of active connections: "+ size());
}
// only close if actually removed to avoid double-closing due
// to possible races
connection.close();
}
return exists;
}
// synch'ed to avoid explicit invocation upon OOM from colliding with
// timer task firing
synchronized void closeIdle(boolean scanAll) {
long minLastContact = System.currentTimeMillis() - maxIdleTime;
// concurrent iterator might miss new connections added
// during the iteration, but that's ok because they won't
// be idle yet anyway and will be caught on next scan
int closed = 0;
for (SimpleServerRpcConnection connection : connections) {
// stop if connections dropped below threshold unless scanning all
if (!scanAll && size() < idleScanThreshold) {
break;
}
// stop if not scanning all and max connections are closed
if (connection.isIdle() &&
connection.getLastContact() < minLastContact &&
close(connection) &&
!scanAll && (++closed == maxIdleToClose)) {
break;
}
}
}
void closeAll() {
// use a copy of the connections to be absolutely sure the concurrent
// iterator doesn't miss a connection
for (SimpleServerRpcConnection connection : toArray()) {
close(connection);
}
}
void startIdleScan() {
scheduleIdleScanTask();
}
void stopIdleScan() {
idleScanTimer.cancel();
}
private void scheduleIdleScanTask() {
if (!running) {
return;
}
TimerTask idleScanTask = new TimerTask(){
@Override
public void run() {
if (!running) {
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("running");
}
try {
closeIdle(false);
} finally {
// explicitly reschedule so next execution occurs relative
// to the end of this scan, not the beginning
scheduleIdleScanTask();
}
}
};
idleScanTimer.schedule(idleScanTask, idleScanInterval);
}
}
}