| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| |
| package org.apache.thrift.server; |
| |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.thrift.TProcessor; |
| import org.apache.thrift.TProcessorFactory; |
| import org.apache.thrift.protocol.TBinaryProtocol; |
| import org.apache.thrift.protocol.TProtocolFactory; |
| import org.apache.thrift.transport.TFramedTransport; |
| import org.apache.thrift.transport.TNonblockingServerTransport; |
| |
| /** |
| * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. |
| * Like TNonblockingServer, it relies on the use of TFramedTransport. |
| */ |
| public class THsHaServer extends TNonblockingServer { |
| |
| // This wraps all the functionality of queueing and thread pool management |
| // for the passing of Invocations from the Selector to workers. |
| private ExecutorService invoker; |
| |
| protected final int MIN_WORKER_THREADS; |
| protected final int MAX_WORKER_THREADS; |
| protected final int STOP_TIMEOUT_VAL; |
| protected final TimeUnit STOP_TIMEOUT_UNIT; |
| |
| /** |
| * Create server with given processor, and server transport. Default server |
| * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on |
| * both input and output transports. A TProcessorFactory will be created that |
| * always returns the specified processor. |
| */ |
| public THsHaServer( TProcessor processor, |
| TNonblockingServerTransport serverTransport) { |
| this(processor, serverTransport, new Options()); |
| } |
| |
| /** |
| * Create server with given processor, server transport, and server options |
| * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on |
| * both input and output transports. A TProcessorFactory will be created that |
| * always returns the specified processor. |
| */ |
| public THsHaServer( TProcessor processor, |
| TNonblockingServerTransport serverTransport, |
| Options options) { |
| this(new TProcessorFactory(processor), serverTransport, options); |
| } |
| |
| /** |
| * Create server with specified processor factory and server transport. Uses |
| * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is |
| * used on both input and output transports. |
| */ |
| public THsHaServer( TProcessorFactory processorFactory, |
| TNonblockingServerTransport serverTransport) { |
| this(processorFactory, serverTransport, new Options()); |
| } |
| |
| /** |
| * Create server with specified processor factory, server transport, and server |
| * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on |
| * both input and output transports. |
| */ |
| public THsHaServer( TProcessorFactory processorFactory, |
| TNonblockingServerTransport serverTransport, |
| Options options) { |
| this(processorFactory, serverTransport, new TFramedTransport.Factory(), |
| new TBinaryProtocol.Factory(), options); |
| } |
| |
| /** |
| * Server with specified processor, server transport, and in/out protocol |
| * factory. Defaults will be used for in/out transport factory and server |
| * options. |
| */ |
| public THsHaServer( TProcessor processor, |
| TNonblockingServerTransport serverTransport, |
| TProtocolFactory protocolFactory) { |
| this(processor, serverTransport, protocolFactory, new Options()); |
| } |
| |
| /** |
| * Server with specified processor, server transport, and in/out protocol |
| * factory. Defaults will be used for in/out transport factory and server |
| * options. |
| */ |
| public THsHaServer( TProcessor processor, |
| TNonblockingServerTransport serverTransport, |
| TProtocolFactory protocolFactory, |
| Options options) { |
| this(processor, serverTransport, new TFramedTransport.Factory(), |
| protocolFactory); |
| } |
| |
| /** |
| * Create server with specified processor, server transport, in/out |
| * transport factory, in/out protocol factory, and default server options. A |
| * processor factory will be created that always returns the specified |
| * processor. |
| */ |
| public THsHaServer( TProcessor processor, |
| TNonblockingServerTransport serverTransport, |
| TFramedTransport.Factory transportFactory, |
| TProtocolFactory protocolFactory) { |
| this(new TProcessorFactory(processor), serverTransport, |
| transportFactory, protocolFactory); |
| } |
| |
| /** |
| * Create server with specified processor factory, server transport, in/out |
| * transport factory, in/out protocol factory, and default server options. |
| */ |
| public THsHaServer( TProcessorFactory processorFactory, |
| TNonblockingServerTransport serverTransport, |
| TFramedTransport.Factory transportFactory, |
| TProtocolFactory protocolFactory) { |
| this(processorFactory, serverTransport, |
| transportFactory, transportFactory, |
| protocolFactory, protocolFactory, new Options()); |
| } |
| |
| /** |
| * Create server with specified processor factory, server transport, in/out |
| * transport factory, in/out protocol factory, and server options. |
| */ |
| public THsHaServer( TProcessorFactory processorFactory, |
| TNonblockingServerTransport serverTransport, |
| TFramedTransport.Factory transportFactory, |
| TProtocolFactory protocolFactory, |
| Options options) { |
| this(processorFactory, serverTransport, |
| transportFactory, transportFactory, |
| protocolFactory, protocolFactory, |
| options); |
| } |
| |
| /** |
| * Create server with everything specified, except use default server options. |
| */ |
| public THsHaServer( TProcessor processor, |
| TNonblockingServerTransport serverTransport, |
| TFramedTransport.Factory inputTransportFactory, |
| TFramedTransport.Factory outputTransportFactory, |
| TProtocolFactory inputProtocolFactory, |
| TProtocolFactory outputProtocolFactory) { |
| this(new TProcessorFactory(processor), serverTransport, |
| inputTransportFactory, outputTransportFactory, |
| inputProtocolFactory, outputProtocolFactory); |
| } |
| |
| /** |
| * Create server with everything specified, except use default server options. |
| */ |
| public THsHaServer( TProcessorFactory processorFactory, |
| TNonblockingServerTransport serverTransport, |
| TFramedTransport.Factory inputTransportFactory, |
| TFramedTransport.Factory outputTransportFactory, |
| TProtocolFactory inputProtocolFactory, |
| TProtocolFactory outputProtocolFactory) |
| { |
| this(processorFactory, serverTransport, |
| inputTransportFactory, outputTransportFactory, |
| inputProtocolFactory, outputProtocolFactory, new Options()); |
| } |
| |
| /** |
| * Create server with every option fully specified. |
| */ |
| public THsHaServer( TProcessorFactory processorFactory, |
| TNonblockingServerTransport serverTransport, |
| TFramedTransport.Factory inputTransportFactory, |
| TFramedTransport.Factory outputTransportFactory, |
| TProtocolFactory inputProtocolFactory, |
| TProtocolFactory outputProtocolFactory, |
| Options options) |
| { |
| super(processorFactory, serverTransport, |
| inputTransportFactory, outputTransportFactory, |
| inputProtocolFactory, outputProtocolFactory, |
| options); |
| |
| MIN_WORKER_THREADS = options.minWorkerThreads; |
| MAX_WORKER_THREADS = options.maxWorkerThreads; |
| STOP_TIMEOUT_VAL = options.stopTimeoutVal; |
| STOP_TIMEOUT_UNIT = options.stopTimeoutUnit; |
| } |
| |
| /** @inheritDoc */ |
| @Override |
| public void serve() { |
| if (!startInvokerPool()) { |
| return; |
| } |
| |
| // start listening, or exit |
| if (!startListening()) { |
| return; |
| } |
| |
| // start the selector, or exit |
| if (!startSelectorThread()) { |
| return; |
| } |
| |
| // this will block while we serve |
| joinSelector(); |
| |
| gracefullyShutdownInvokerPool(); |
| |
| // do a little cleanup |
| stopListening(); |
| |
| // ungracefully shut down the invoker pool? |
| } |
| |
| protected boolean startInvokerPool() { |
| // start the invoker pool |
| LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); |
| invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS, |
| STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue); |
| |
| return true; |
| } |
| |
| protected void gracefullyShutdownInvokerPool() { |
| // try to gracefully shut down the executor service |
| invoker.shutdown(); |
| |
| // Loop until awaitTermination finally does return without a interrupted |
| // exception. If we don't do this, then we'll shut down prematurely. We want |
| // to let the executorService clear it's task queue, closing client sockets |
| // appropriately. |
| long timeoutMS = 10000; |
| long now = System.currentTimeMillis(); |
| while (timeoutMS >= 0) { |
| try { |
| invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); |
| break; |
| } catch (InterruptedException ix) { |
| long newnow = System.currentTimeMillis(); |
| timeoutMS -= (newnow - now); |
| now = newnow; |
| } |
| } |
| } |
| |
| /** |
| * We override the standard invoke method here to queue the invocation for |
| * invoker service instead of immediately invoking. The thread pool takes care of the rest. |
| */ |
| @Override |
| protected void requestInvoke(FrameBuffer frameBuffer) { |
| invoker.execute(new Invocation(frameBuffer)); |
| } |
| |
| /** |
| * An Invocation represents a method call that is prepared to execute, given |
| * an idle worker thread. It contains the input and output protocols the |
| * thread's processor should use to perform the usual Thrift invocation. |
| */ |
| private class Invocation implements Runnable { |
| |
| private final FrameBuffer frameBuffer; |
| |
| public Invocation(final FrameBuffer frameBuffer) { |
| this.frameBuffer = frameBuffer; |
| } |
| |
| public void run() { |
| frameBuffer.invoke(); |
| } |
| } |
| |
| public static class Options extends TNonblockingServer.Options { |
| public int minWorkerThreads = 5; |
| public int maxWorkerThreads = Integer.MAX_VALUE; |
| public int stopTimeoutVal = 60; |
| public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; |
| } |
| } |