| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.thrift.server; |
| |
| import org.apache.thrift.transport.TNonblockingServerTransport; |
| import org.apache.thrift.transport.TNonblockingTransport; |
| import org.apache.thrift.transport.TTransportException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.nio.channels.spi.SelectorProvider; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Set; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * A Half-Sync/Half-Async server with a separate pool of threads to handle |
| * non-blocking I/O. Accepts are handled on a single thread, and a configurable |
| * number of nonblocking selector threads manage reading and writing of client |
| * connections. A synchronous worker thread pool handles processing of requests. |
| * |
| * Performs better than TNonblockingServer/THsHaServer in multi-core |
| * environments when the the bottleneck is CPU on the single selector thread |
| * handling I/O. In addition, because the accept handling is decoupled from |
| * reads/writes and invocation, the server has better ability to handle back- |
| * pressure from new connections (e.g. stop accepting when busy). |
| * |
| * Like TNonblockingServer, it relies on the use of TFramedTransport. |
| * |
| * <p/> |
| * This implementation fixes the issue where TThreadedSelectorServer (v0.9.0) would hang |
| * during stop, which can happen when Thrift server is stopped when AcceptThread is |
| * blocked on adding a connection to acceptedQueue. |
| * The fix is to block accept thread for a set time when adding new connections |
| * to accepted queue, this way the accept thread can check for stop flag every now and then. |
| * @see <a href="https://issues.cask.co/browse/TEPHRA-132">TEPHRA-132</a> |
| */ |
| public class TThreadedSelectorServerWithFix extends AbstractNonblockingServer { |
| private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServerWithFix.class.getName()); |
| |
| public static class Args extends AbstractNonblockingServerArgs<Args> { |
| |
| /** The number of threads for selecting on already-accepted connections */ |
| public int selectorThreads = 2; |
| /** |
| * The size of the executor service (if none is specified) that will handle |
| * invocations. This may be set to 0, in which case invocations will be |
| * handled directly on the selector threads (as is in TNonblockingServer) |
| */ |
| private int workerThreads = 5; |
| /** Time to wait for server to stop gracefully */ |
| private int stopTimeoutVal = 60; |
| private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; |
| /** The ExecutorService for handling dispatched requests */ |
| private ExecutorService executorService = null; |
| /** |
| * The size of the blocking queue per selector thread for passing accepted |
| * connections to the selector thread |
| */ |
| private int acceptQueueSizePerThread = 4; |
| |
| /** |
| * Determines the strategy for handling new accepted connections. |
| */ |
| public static enum AcceptPolicy { |
| /** |
| * Require accepted connection registration to be handled by the executor. |
| * If the worker pool is saturated, further accepts will be closed |
| * immediately. Slightly increases latency due to an extra scheduling. |
| */ |
| FAIR_ACCEPT, |
| /** |
| * Handle the accepts as fast as possible, disregarding the status of the |
| * executor service. |
| */ |
| FAST_ACCEPT |
| } |
| |
| private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT; |
| |
| public Args(TNonblockingServerTransport transport) { |
| super(transport); |
| } |
| |
| public Args selectorThreads(int i) { |
| selectorThreads = i; |
| return this; |
| } |
| |
| public int getSelectorThreads() { |
| return selectorThreads; |
| } |
| |
| public Args workerThreads(int i) { |
| workerThreads = i; |
| return this; |
| } |
| |
| public int getWorkerThreads() { |
| return workerThreads; |
| } |
| |
| public int getStopTimeoutVal() { |
| return stopTimeoutVal; |
| } |
| |
| public Args stopTimeoutVal(int stopTimeoutVal) { |
| this.stopTimeoutVal = stopTimeoutVal; |
| return this; |
| } |
| |
| public TimeUnit getStopTimeoutUnit() { |
| return stopTimeoutUnit; |
| } |
| |
| public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { |
| this.stopTimeoutUnit = stopTimeoutUnit; |
| return this; |
| } |
| |
| public ExecutorService getExecutorService() { |
| return executorService; |
| } |
| |
| public Args executorService(ExecutorService executorService) { |
| this.executorService = executorService; |
| return this; |
| } |
| |
| public int getAcceptQueueSizePerThread() { |
| return acceptQueueSizePerThread; |
| } |
| |
| public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) { |
| this.acceptQueueSizePerThread = acceptQueueSizePerThread; |
| return this; |
| } |
| |
| public AcceptPolicy getAcceptPolicy() { |
| return acceptPolicy; |
| } |
| |
| public Args acceptPolicy(AcceptPolicy acceptPolicy) { |
| this.acceptPolicy = acceptPolicy; |
| return this; |
| } |
| |
| public void validate() { |
| if (selectorThreads <= 0) { |
| throw new IllegalArgumentException("selectorThreads must be positive."); |
| } |
| if (workerThreads < 0) { |
| throw new IllegalArgumentException("workerThreads must be non-negative."); |
| } |
| if (acceptQueueSizePerThread <= 0) { |
| throw new IllegalArgumentException("acceptQueueSizePerThread must be positive."); |
| } |
| } |
| } |
| |
| // Flag for stopping the server |
| private volatile boolean stopped_ = true; |
| |
| // The thread handling all accepts |
| private AcceptThread acceptThread; |
| |
| // Threads handling events on client transports |
| private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); |
| |
| // This wraps all the functionality of queueing and thread pool management |
| // for the passing of Invocations from the selector thread(s) to the workers |
| // (if any). |
| private final ExecutorService invoker; |
| |
| private final Args args; |
| |
| /** |
| * Create the server with the specified Args configuration |
| */ |
| public TThreadedSelectorServerWithFix(Args args) { |
| super(args); |
| args.validate(); |
| invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService; |
| this.args = args; |
| } |
| |
| /** |
| * Start the accept and selector threads running to deal with clients. |
| * |
| * @return true if everything went ok, false if we couldn't start for some |
| * reason. |
| */ |
| @Override |
| protected boolean startThreads() { |
| LOGGER.info("Starting {}", TThreadedSelectorServerWithFix.class.getSimpleName()); |
| try { |
| for (int i = 0; i < args.selectorThreads; ++i) { |
| selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); |
| } |
| acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, |
| createSelectorThreadLoadBalancer(selectorThreads)); |
| stopped_ = false; |
| for (SelectorThread thread : selectorThreads) { |
| thread.start(); |
| } |
| acceptThread.start(); |
| return true; |
| } catch (IOException e) { |
| LOGGER.error("Failed to start threads!", e); |
| return false; |
| } |
| } |
| |
| /** |
| * Joins the accept and selector threads and shuts down the executor service. |
| */ |
| @Override |
| protected void waitForShutdown() { |
| try { |
| joinThreads(); |
| } catch (InterruptedException e) { |
| // Non-graceful shutdown occurred |
| LOGGER.error("Interrupted while joining threads!", e); |
| } |
| gracefullyShutdownInvokerPool(); |
| } |
| |
| protected void joinThreads() throws InterruptedException { |
| // wait until the io threads exit |
| acceptThread.join(); |
| for (SelectorThread thread : selectorThreads) { |
| thread.join(); |
| } |
| } |
| |
| /** |
| * Stop serving and shut everything down. |
| */ |
| @Override |
| public void stop() { |
| stopped_ = true; |
| |
| // Stop queuing connect attempts asap |
| stopListening(); |
| |
| if (acceptThread != null) { |
| acceptThread.wakeupSelector(); |
| } |
| if (selectorThreads != null) { |
| for (SelectorThread thread : selectorThreads) { |
| if (thread != null) |
| thread.wakeupSelector(); |
| } |
| } |
| } |
| |
| protected void gracefullyShutdownInvokerPool() { |
| // try to gracefully shut down the executor service |
| invoker.shutdown(); |
| |
| // Loop until awaitTermination finally does return without a interrupted |
| // exception. If we don't do this, then we'll shut down prematurely. We want |
| // to let the executorService clear it's task queue, closing client sockets |
| // appropriately. |
| long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); |
| long now = System.currentTimeMillis(); |
| while (timeoutMS >= 0) { |
| try { |
| invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); |
| break; |
| } catch (InterruptedException ix) { |
| long newnow = System.currentTimeMillis(); |
| timeoutMS -= (newnow - now); |
| now = newnow; |
| } |
| } |
| } |
| |
| /** |
| * We override the standard invoke method here to queue the invocation for |
| * invoker service instead of immediately invoking. If there is no thread |
| * pool, handle the invocation inline on this thread |
| */ |
| @Override |
| protected boolean requestInvoke(FrameBuffer frameBuffer) { |
| Runnable invocation = getRunnable(frameBuffer); |
| if (invoker != null) { |
| try { |
| invoker.execute(invocation); |
| return true; |
| } catch (RejectedExecutionException rx) { |
| LOGGER.warn("ExecutorService rejected execution!", rx); |
| return false; |
| } |
| } else { |
| // Invoke on the caller's thread |
| invocation.run(); |
| return true; |
| } |
| } |
| |
| protected Runnable getRunnable(FrameBuffer frameBuffer) { |
| return new Invocation(frameBuffer); |
| } |
| |
| /** |
| * Helper to create the invoker if one is not specified |
| */ |
| protected static ExecutorService createDefaultExecutor(Args options) { |
| return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null; |
| } |
| |
| private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) { |
| if (queueSize == 0) { |
| // Unbounded queue |
| return new LinkedBlockingQueue<TNonblockingTransport>(); |
| } |
| return new ArrayBlockingQueue<TNonblockingTransport>(queueSize); |
| } |
| |
| /** |
| * The thread that selects on the server transport (listen socket) and accepts |
| * new connections to hand off to the IO selector threads |
| */ |
| protected class AcceptThread extends Thread { |
| |
| // The listen socket to accept on |
| private final TNonblockingServerTransport serverTransport; |
| private final Selector acceptSelector; |
| |
| private final SelectorThreadLoadBalancer threadChooser; |
| |
| /** |
| * Set up the AcceptThead |
| * |
| * @throws IOException |
| */ |
| public AcceptThread(TNonblockingServerTransport serverTransport, |
| SelectorThreadLoadBalancer threadChooser) throws IOException { |
| this.serverTransport = serverTransport; |
| this.threadChooser = threadChooser; |
| this.acceptSelector = SelectorProvider.provider().openSelector(); |
| this.serverTransport.registerSelector(acceptSelector); |
| } |
| |
| /** |
| * The work loop. Selects on the server transport and accepts. If there was |
| * a server transport that had blocking accepts, and returned on blocking |
| * client transports, that should be used instead |
| */ |
| public void run() { |
| try { |
| while (!stopped_) { |
| select(); |
| } |
| } catch (Throwable t) { |
| LOGGER.error("run() exiting due to uncaught error", t); |
| } finally { |
| // This will wake up the selector threads |
| TThreadedSelectorServerWithFix.this.stop(); |
| } |
| } |
| |
| /** |
| * If the selector is blocked, wake it up. |
| */ |
| public void wakeupSelector() { |
| acceptSelector.wakeup(); |
| } |
| |
| /** |
| * Select and process IO events appropriately: If there are connections to |
| * be accepted, accept them. |
| */ |
| private void select() { |
| try { |
| // wait for connect events. |
| acceptSelector.select(); |
| |
| // process the io events we received |
| Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator(); |
| while (!stopped_ && selectedKeys.hasNext()) { |
| SelectionKey key = selectedKeys.next(); |
| selectedKeys.remove(); |
| |
| // skip if not valid |
| if (!key.isValid()) { |
| continue; |
| } |
| |
| if (key.isAcceptable()) { |
| handleAccept(); |
| } else { |
| LOGGER.warn("Unexpected state in select! " + key.interestOps()); |
| } |
| } |
| } catch (IOException e) { |
| LOGGER.warn("Got an IOException while selecting!", e); |
| } |
| } |
| |
| /** |
| * Accept a new connection. |
| */ |
| private void handleAccept() { |
| final TNonblockingTransport client = doAccept(); |
| if (client != null) { |
| // Pass this connection to a selector thread |
| final SelectorThread targetThread = threadChooser.nextThread(); |
| |
| if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { |
| doAddAccept(targetThread, client); |
| } else { |
| // FAIR_ACCEPT |
| try { |
| invoker.submit(new Runnable() { |
| public void run() { |
| doAddAccept(targetThread, client); |
| } |
| }); |
| } catch (RejectedExecutionException rx) { |
| LOGGER.warn("ExecutorService rejected accept registration!", rx); |
| // close immediately |
| client.close(); |
| } |
| } |
| } |
| } |
| |
| private TNonblockingTransport doAccept() { |
| try { |
| return (TNonblockingTransport) serverTransport.accept(); |
| } catch (TTransportException tte) { |
| // something went wrong accepting. |
| LOGGER.warn("Exception trying to accept!", tte); |
| return null; |
| } |
| } |
| |
| private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { |
| if (!thread.addAcceptedConnection(client)) { |
| client.close(); |
| } |
| } |
| } // AcceptThread |
| |
| /** |
| * The SelectorThread(s) will be doing all the selecting on accepted active |
| * connections. |
| */ |
| protected class SelectorThread extends AbstractSelectThread { |
| |
| // Accepted connections added by the accept thread. |
| private final BlockingQueue<TNonblockingTransport> acceptedQueue; |
| |
| /** |
| * Set up the SelectorThread with an unbounded queue for incoming accepts. |
| * |
| * @throws IOException |
| * if a selector cannot be created |
| */ |
| public SelectorThread() throws IOException { |
| this(new LinkedBlockingQueue<TNonblockingTransport>()); |
| } |
| |
| /** |
| * Set up the SelectorThread with an bounded queue for incoming accepts. |
| * |
| * @throws IOException |
| * if a selector cannot be created |
| */ |
| public SelectorThread(int maxPendingAccepts) throws IOException { |
| this(createDefaultAcceptQueue(maxPendingAccepts)); |
| } |
| |
| /** |
| * Set up the SelectorThread with a specified queue for connections. |
| * |
| * @param acceptedQueue |
| * The BlockingQueue implementation for holding incoming accepted |
| * connections. |
| * @throws IOException |
| * if a selector cannot be created. |
| */ |
| public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException { |
| this.acceptedQueue = acceptedQueue; |
| } |
| |
| /** |
| * Hands off an accepted connection to be handled by this thread. This |
| * method will block if the queue for new connections is at capacity. |
| * |
| * @param accepted |
| * The connection that has been accepted. |
| * @return true if the connection has been successfully added. |
| */ |
| public boolean addAcceptedConnection(TNonblockingTransport accepted) { |
| try { |
| while (!acceptedQueue.offer(accepted, 200, TimeUnit.MILLISECONDS)) { |
| // If server is stopped, then return false. |
| if (stopped_) { |
| return false; |
| } |
| } |
| } catch (InterruptedException e) { |
| LOGGER.warn("Interrupted while adding accepted connection!", e); |
| return false; |
| } |
| selector.wakeup(); |
| return true; |
| } |
| |
| /** |
| * The work loop. Handles selecting (read/write IO), dispatching, and |
| * managing the selection preferences of all existing connections. |
| */ |
| public void run() { |
| try { |
| while (!stopped_) { |
| select(); |
| processAcceptedConnections(); |
| processInterestChanges(); |
| } |
| for (SelectionKey selectionKey : selector.keys()) { |
| cleanupSelectionKey(selectionKey); |
| } |
| } catch (Throwable t) { |
| LOGGER.error("run() exiting due to uncaught error", t); |
| } finally { |
| // This will wake up the accept thread and the other selector threads |
| TThreadedSelectorServerWithFix.this.stop(); |
| } |
| } |
| |
| /** |
| * Select and process IO events appropriately: If there are existing |
| * connections with data waiting to be read, read it, buffering until a |
| * whole frame has been read. If there are any pending responses, buffer |
| * them until their target client is available, and then send the data. |
| */ |
| private void select() { |
| try { |
| // wait for io events. |
| selector.select(); |
| |
| // process the io events we received |
| Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); |
| while (!stopped_ && selectedKeys.hasNext()) { |
| SelectionKey key = selectedKeys.next(); |
| selectedKeys.remove(); |
| |
| // skip if not valid |
| if (!key.isValid()) { |
| cleanupSelectionKey(key); |
| continue; |
| } |
| |
| if (key.isReadable()) { |
| // deal with reads |
| handleRead(key); |
| } else if (key.isWritable()) { |
| // deal with writes |
| handleWrite(key); |
| } else { |
| LOGGER.warn("Unexpected state in select! " + key.interestOps()); |
| } |
| } |
| } catch (IOException e) { |
| LOGGER.warn("Got an IOException while selecting!", e); |
| } |
| } |
| |
| private void processAcceptedConnections() { |
| // Register accepted connections |
| while (!stopped_) { |
| TNonblockingTransport accepted = acceptedQueue.poll(); |
| if (accepted == null) { |
| break; |
| } |
| registerAccepted(accepted); |
| } |
| } |
| |
| private void registerAccepted(TNonblockingTransport accepted) { |
| SelectionKey clientKey = null; |
| try { |
| clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); |
| |
| FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); |
| clientKey.attach(frameBuffer); |
| } catch (IOException e) { |
| LOGGER.warn("Failed to register accepted connection to selector!", e); |
| if (clientKey != null) { |
| cleanupSelectionKey(clientKey); |
| } |
| accepted.close(); |
| } |
| } |
| } // SelectorThread |
| |
| /** |
| * Creates a SelectorThreadLoadBalancer to be used by the accept thread for |
| * assigning newly accepted connections across the threads. |
| */ |
| protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) { |
| return new SelectorThreadLoadBalancer(threads); |
| } |
| |
| /** |
| * A round robin load balancer for choosing selector threads for new |
| * connections. |
| */ |
| protected class SelectorThreadLoadBalancer { |
| private final Collection<? extends SelectorThread> threads; |
| private Iterator<? extends SelectorThread> nextThreadIterator; |
| |
| public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) { |
| if (threads.isEmpty()) { |
| throw new IllegalArgumentException("At least one selector thread is required"); |
| } |
| this.threads = Collections.unmodifiableList(new ArrayList<T>(threads)); |
| nextThreadIterator = this.threads.iterator(); |
| } |
| |
| public SelectorThread nextThread() { |
| // Choose a selector thread (round robin) |
| if (!nextThreadIterator.hasNext()) { |
| nextThreadIterator = threads.iterator(); |
| } |
| return nextThreadIterator.next(); |
| } |
| } |
| } |