| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.mina.transport.socket.nio; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.mina.common.ExceptionMonitor; |
| import org.apache.mina.common.IoAcceptor; |
| import org.apache.mina.common.IoHandler; |
| import org.apache.mina.common.IoServiceConfig; |
| import org.apache.mina.common.support.BaseIoAcceptor; |
| import org.apache.mina.util.Queue; |
| import org.apache.mina.util.NewThreadExecutor; |
| import org.apache.mina.util.NamePreservingRunnable; |
| import edu.emory.mathcs.backport.java.util.concurrent.Executor; |
| |
| /** |
| * {@link IoAcceptor} for socket transport (TCP/IP). |
| * |
| * @author The Apache Directory Project (mina-dev@directory.apache.org) |
| * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $ |
| */ |
| public class MultiThreadSocketAcceptor extends SocketAcceptor |
| { |
| /** |
| * @noinspection StaticNonFinalField |
| */ |
| private static volatile int nextId = 0; |
| |
| private final Executor executor; |
| private final Object lock = new Object(); |
| private final int id = nextId ++; |
| private final String threadName = "SocketAcceptor-" + id; |
| private final Map channels = new HashMap(); |
| |
| private final Queue registerQueue = new Queue(); |
| private final Queue cancelQueue = new Queue(); |
| |
| private final MultiThreadSocketIoProcessor[] ioProcessors; |
| private final int processorCount; |
| |
| /** |
| * @noinspection FieldAccessedSynchronizedAndUnsynchronized |
| */ |
| private Selector selector; |
| private Worker worker; |
| private int processorDistributor = 0; |
| |
| /** |
| * Create an acceptor with a single processing thread using a NewThreadExecutor |
| */ |
| public MultiThreadSocketAcceptor() |
| { |
| this( 1, new NewThreadExecutor() ); |
| } |
| |
| /** |
| * Create an acceptor with the desired number of processing threads |
| * |
| * @param processorCount Number of processing threads |
| * @param executor Executor to use for launching threads |
| */ |
| public MultiThreadSocketAcceptor( int processorCount, Executor executor ) |
| { |
| if( processorCount < 1 ) |
| { |
| throw new IllegalArgumentException( "Must have at least one processor" ); |
| } |
| |
| this.executor = executor; |
| this.processorCount = processorCount; |
| ioProcessors = new MultiThreadSocketIoProcessor[processorCount]; |
| |
| for( int i = 0; i < processorCount; i++ ) |
| { |
| ioProcessors[i] = new MultiThreadSocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i, executor ); |
| } |
| } |
| |
| |
| /** |
| * Binds to the specified <code>address</code> and handles incoming connections with the specified |
| * <code>handler</code>. Backlog value is configured to the value of <code>backlog</code> property. |
| * |
| * @throws IOException if failed to bind |
| */ |
| public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException |
| { |
| if( handler == null ) |
| { |
| throw new NullPointerException( "handler" ); |
| } |
| |
| if( address != null && !( address instanceof InetSocketAddress ) ) |
| { |
| throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() ); |
| } |
| |
| if( config == null ) |
| { |
| config = getDefaultConfig(); |
| } |
| |
| RegistrationRequest request = new RegistrationRequest( address, handler, config ); |
| |
| synchronized( registerQueue ) |
| { |
| registerQueue.push( request ); |
| } |
| |
| startupWorker(); |
| |
| selector.wakeup(); |
| |
| synchronized( request ) |
| { |
| while( !request.done ) |
| { |
| try |
| { |
| request.wait(); |
| } |
| catch( InterruptedException e ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e ); |
| } |
| } |
| } |
| |
| if( request.exception != null ) |
| { |
| throw request.exception; |
| } |
| } |
| |
| |
| private synchronized void startupWorker() throws IOException |
| { |
| synchronized( lock ) |
| { |
| if( worker == null ) |
| { |
| selector = Selector.open(); |
| worker = new Worker(); |
| |
| executor.execute( new NamePreservingRunnable( worker ) ); |
| } |
| } |
| } |
| |
| public void unbind( SocketAddress address ) |
| { |
| if( address == null ) |
| { |
| throw new NullPointerException( "address" ); |
| } |
| |
| CancellationRequest request = new CancellationRequest( address ); |
| |
| try |
| { |
| startupWorker(); |
| } |
| catch( IOException e ) |
| { |
| // IOException is thrown only when Worker thread is not |
| // running and failed to open a selector. We simply throw |
| // IllegalArgumentException here because we can simply |
| // conclude that nothing is bound to the selector. |
| throw new IllegalArgumentException( "Address not bound: " + address ); |
| } |
| |
| synchronized( cancelQueue ) |
| { |
| cancelQueue.push( request ); |
| } |
| |
| selector.wakeup(); |
| |
| synchronized( request ) |
| { |
| while( !request.done ) |
| { |
| try |
| { |
| request.wait(); |
| } |
| catch( InterruptedException e ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e ); |
| } |
| } |
| } |
| |
| if( request.exception != null ) |
| { |
| request.exception.fillInStackTrace(); |
| |
| throw request.exception; |
| } |
| } |
| |
| |
| private class Worker implements Runnable |
| { |
| public void run() |
| { |
| Thread.currentThread().setName(MultiThreadSocketAcceptor.this.threadName ); |
| |
| for( ; ; ) |
| { |
| try |
| { |
| int nKeys = selector.select(); |
| |
| registerNew(); |
| |
| if( nKeys > 0 ) |
| { |
| processSessions( selector.selectedKeys() ); |
| } |
| |
| cancelKeys(); |
| |
| if( selector.keys().isEmpty() ) |
| { |
| synchronized( lock ) |
| { |
| if( selector.keys().isEmpty() && |
| registerQueue.isEmpty() && |
| cancelQueue.isEmpty() ) |
| { |
| worker = null; |
| try |
| { |
| selector.close(); |
| } |
| catch( IOException e ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e ); |
| } |
| finally |
| { |
| selector = null; |
| } |
| break; |
| } |
| } |
| } |
| } |
| catch( IOException e ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e ); |
| |
| try |
| { |
| Thread.sleep( 1000 ); |
| } |
| catch( InterruptedException e1 ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e1 ); |
| } |
| } |
| } |
| } |
| |
| private void processSessions( Set keys ) throws IOException |
| { |
| Iterator it = keys.iterator(); |
| while( it.hasNext() ) |
| { |
| SelectionKey key = ( SelectionKey ) it.next(); |
| |
| it.remove(); |
| |
| if( !key.isAcceptable() ) |
| { |
| continue; |
| } |
| |
| ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel(); |
| |
| SocketChannel ch = ssc.accept(); |
| |
| if( ch == null ) |
| { |
| continue; |
| } |
| |
| boolean success = false; |
| try |
| { |
| |
| RegistrationRequest req = ( RegistrationRequest ) key.attachment(); |
| |
| MultiThreadSocketSessionImpl session = new MultiThreadSocketSessionImpl( |
| MultiThreadSocketAcceptor.this, nextProcessor(), getListeners(), |
| req.config, ch, req.handler, req.address ); |
| |
| // New Interface |
| // SocketSessionImpl session = new SocketSessionImpl( |
| // SocketAcceptor.this, nextProcessor(), getListeners(), |
| // req.config, ch, req.handler, req.address ); |
| |
| |
| getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); |
| req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() ); |
| req.config.getThreadModel().buildFilterChain( session.getFilterChain() ); |
| session.getIoProcessor().addNew( session ); |
| success = true; |
| } |
| catch( Throwable t ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( t ); |
| } |
| finally |
| { |
| if( !success ) |
| { |
| ch.close(); |
| } |
| } |
| } |
| } |
| } |
| |
| private MultiThreadSocketIoProcessor nextProcessor() |
| { |
| return ioProcessors[processorDistributor++ % processorCount]; |
| } |
| |
| |
| private void registerNew() |
| { |
| if( registerQueue.isEmpty() ) |
| { |
| return; |
| } |
| |
| for( ; ; ) |
| { |
| RegistrationRequest req; |
| |
| synchronized( registerQueue ) |
| { |
| req = ( RegistrationRequest ) registerQueue.pop(); |
| } |
| |
| if( req == null ) |
| { |
| break; |
| } |
| |
| ServerSocketChannel ssc = null; |
| |
| try |
| { |
| ssc = ServerSocketChannel.open(); |
| ssc.configureBlocking( false ); |
| |
| // Configure the server socket, |
| SocketAcceptorConfig cfg; |
| if( req.config instanceof SocketAcceptorConfig ) |
| { |
| cfg = ( SocketAcceptorConfig ) req.config; |
| } |
| else |
| { |
| cfg = ( SocketAcceptorConfig ) getDefaultConfig(); |
| } |
| |
| ssc.socket().setReuseAddress( cfg.isReuseAddress() ); |
| ssc.socket().setReceiveBufferSize( |
| ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() ); |
| |
| // and bind. |
| ssc.socket().bind( req.address, cfg.getBacklog() ); |
| if( req.address == null || req.address.getPort() == 0 ) |
| { |
| req.address = ( InetSocketAddress ) ssc.socket().getLocalSocketAddress(); |
| } |
| ssc.register( selector, SelectionKey.OP_ACCEPT, req ); |
| |
| synchronized( channels ) |
| { |
| channels.put( req.address, ssc ); |
| } |
| |
| getListeners().fireServiceActivated( |
| this, req.address, req.handler, req.config ); |
| } |
| catch( IOException e ) |
| { |
| req.exception = e; |
| } |
| finally |
| { |
| synchronized( req ) |
| { |
| req.done = true; |
| |
| req.notifyAll(); |
| } |
| |
| if( ssc != null && req.exception != null ) |
| { |
| try |
| { |
| ssc.close(); |
| } |
| catch( IOException e ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e ); |
| } |
| } |
| } |
| } |
| } |
| |
| |
| private void cancelKeys() |
| { |
| if( cancelQueue.isEmpty() ) |
| { |
| return; |
| } |
| |
| for( ; ; ) |
| { |
| CancellationRequest request; |
| |
| synchronized( cancelQueue ) |
| { |
| request = ( CancellationRequest ) cancelQueue.pop(); |
| } |
| |
| if( request == null ) |
| { |
| break; |
| } |
| |
| ServerSocketChannel ssc; |
| synchronized( channels ) |
| { |
| ssc = ( ServerSocketChannel ) channels.remove( request.address ); |
| } |
| |
| // close the channel |
| try |
| { |
| if( ssc == null ) |
| { |
| request.exception = new IllegalArgumentException( "Address not bound: " + request.address ); |
| } |
| else |
| { |
| SelectionKey key = ssc.keyFor( selector ); |
| request.registrationRequest = ( RegistrationRequest ) key.attachment(); |
| key.cancel(); |
| |
| selector.wakeup(); // wake up again to trigger thread death |
| |
| ssc.close(); |
| } |
| } |
| catch( IOException e ) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught( e ); |
| } |
| finally |
| { |
| synchronized( request ) |
| { |
| request.done = true; |
| request.notifyAll(); |
| } |
| |
| if( request.exception == null ) |
| { |
| getListeners().fireServiceDeactivated( |
| this, request.address, |
| request.registrationRequest.handler, |
| request.registrationRequest.config ); |
| } |
| } |
| } |
| } |
| |
| private static class RegistrationRequest |
| { |
| private InetSocketAddress address; |
| private final IoHandler handler; |
| private final IoServiceConfig config; |
| private IOException exception; |
| private boolean done; |
| |
| private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config ) |
| { |
| this.address = ( InetSocketAddress ) address; |
| this.handler = handler; |
| this.config = config; |
| } |
| } |
| |
| |
| private static class CancellationRequest |
| { |
| private final SocketAddress address; |
| private boolean done; |
| private RegistrationRequest registrationRequest; |
| private RuntimeException exception; |
| |
| private CancellationRequest( SocketAddress address ) |
| { |
| this.address = address; |
| } |
| } |
| } |