| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.asyncweb.server.transport.mina; |
| |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| |
| import org.apache.mina.core.write.DefaultWriteRequest; |
| import org.apache.mina.core.session.IdleStatus; |
| import org.apache.mina.core.filterchain.IoFilterAdapter; |
| import org.apache.mina.core.future.IoFutureListener; |
| import org.apache.mina.core.session.IoSession; |
| import org.apache.mina.core.future.WriteFuture; |
| import org.apache.mina.core.write.WriteRequest; |
| import org.apache.mina.filter.codec.ProtocolCodecFilter; |
| import org.apache.mina.filter.codec.ProtocolDecoderException; |
| import org.apache.asyncweb.common.HttpRequest; |
| import org.apache.asyncweb.common.HttpResponseStatus; |
| import org.apache.asyncweb.common.HttpVersion; |
| import org.apache.asyncweb.common.MutableHttpResponse; |
| import org.apache.mina.handler.multiton.SingleSessionIoHandler; |
| import org.apache.asyncweb.common.*; |
| import org.apache.asyncweb.common.codec.HttpCodecFactory; |
| import org.apache.asyncweb.common.codec.HttpRequestDecoderException; |
| import org.apache.asyncweb.server.ServiceContainer; |
| import org.apache.asyncweb.server.context.AbstractHttpServiceContext; |
| import org.apache.asyncweb.server.pipeline.RequestPipeline; |
| import org.apache.asyncweb.server.pipeline.RequestPipelineListener; |
| import org.apache.asyncweb.server.pipeline.StandardRequestPipeline; |
| import org.apache.asyncweb.server.HttpServiceContext; |
| import org.apache.asyncweb.server.HttpServiceFilter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * The single session handler implementation. |
| * |
| * @author The Apache MINA Project (dev@mina.apache.org) |
| * @version $Rev$, $Date$ |
| */ |
| public class SingleHttpSessionIoHandler implements SingleSessionIoHandler |
| { |
| private static final Logger LOG = LoggerFactory.getLogger( SingleHttpSessionIoHandler.class ); |
| |
| /** the default idle time */ |
| private static final int DEFAULT_IDLE_TIME = 30000; |
| |
| /** out default pipeline */ |
| private static final int DEFAULT_PIPELINE = 100; |
| |
| /** the HttpService container **/ |
| private final ServiceContainer container; |
| |
| /** the session bound to this single session handler */ |
| protected final IoSession session; |
| |
| /** the request pipeline */ |
| private final RequestPipeline pipeline; |
| |
| /** the current context being processed */ |
| private DefaultHttpServiceContext currentContext; |
| |
| /** idle time for request reads */ |
| private int readIdleTime = DEFAULT_IDLE_TIME; |
| |
| |
| public SingleHttpSessionIoHandler( ServiceContainer container, IoSession session ) |
| { |
| this.container = container; |
| this.session = session; |
| this.pipeline = new StandardRequestPipeline( DEFAULT_PIPELINE ); |
| |
| session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readIdleTime ); |
| session.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new HttpCodecFactory() ) ); |
| session.getFilterChain().addLast( "converter", new ContextConverter() ); |
| session.getFilterChain().addLast( "pipeline", new RequestPipelineAdapter( pipeline ) ); |
| |
| int i = 0; |
| for ( HttpServiceFilter serviceFilter : container.getServiceFilters() ) |
| { |
| session.getFilterChain().addLast( "serviceFilter." + i++, new ServiceFilterAdapter( serviceFilter ) ); |
| } |
| } |
| |
| |
| public void sessionCreated() |
| { |
| LOG.info( "Session created: {}", session.getRemoteAddress() ); |
| } |
| |
| |
| public void sessionOpened() |
| { |
| LOG.info( "Session opened: {}", session.getRemoteAddress() ); |
| } |
| |
| |
| public void sessionClosed() |
| { |
| LOG.info( "Session closed: {}", session.getRemoteAddress() ); |
| |
| if ( currentContext != null ) |
| { |
| currentContext.fireClientDisconnected(); |
| } |
| } |
| |
| |
| /** |
| * Invoked when this connection idles out. |
| * If we are in the process of parsing a request, the current request |
| * is rejected with a {@link HttpResponseStatus#REQUEST_TIMEOUT} response status. |
| */ |
| public void sessionIdle( IdleStatus idleType ) |
| { |
| if ( session.getIdleCount( idleType ) >= 1 ) |
| { |
| // // FIXME currentRequest is always null now; we need to cooperate with a decoder. |
| // if (currentContext != null) { |
| // LOG.info("Read idled out while parsing request. Scheduling timeout response"); |
| // handleReadFailure(currentContext, HttpResponseStatus.REQUEST_TIMEOUT, "Timeout while reading request"); |
| // } else { |
| LOG.debug( "Session idle detected on context {} with idleType {}", currentContext, idleType ); |
| |
| if ( currentContext != null ) |
| { |
| if ( IdleStatus.BOTH_IDLE == idleType || IdleStatus.READER_IDLE == idleType ) |
| { |
| currentContext.fireClientIdle( session.getLastReaderIdleTime(), session.getReaderIdleCount() ); |
| } |
| } |
| else |
| { |
| // TODO - look further into this - it may present serious issues when dealing with HTTP/1.1 |
| LOG.debug( "Idled with no current request. Scheduling closure when pipeline empties" ); |
| pipeline.runWhenEmpty( new Runnable() |
| { |
| public void run() |
| { |
| LOG.info( "Pipeline empty after idle. Closing session: {}", session.getRemoteAddress() ); |
| session.close(); |
| } |
| }); |
| } |
| } |
| } |
| |
| |
| public void exceptionCaught( Throwable cause ) |
| { |
| MutableHttpResponse response = null; |
| |
| if ( cause instanceof ProtocolDecoderException ) |
| { |
| HttpResponseStatus status; |
| |
| if ( cause instanceof HttpRequestDecoderException ) |
| { |
| status = ( ( HttpRequestDecoderException ) cause ).getResponseStatus(); |
| } |
| else |
| { |
| status = HttpResponseStatus.BAD_REQUEST; |
| } |
| |
| LOG.warn( "Bad request: {}", session.getRemoteAddress(), cause ); |
| |
| response = new DefaultHttpResponse(); |
| response.setProtocolVersion( HttpVersion.HTTP_1_1 ); |
| response.setStatus( status ); |
| } |
| else if ( cause instanceof IOException ) |
| { |
| LOG.warn( "IOException on HTTP connection", cause ); |
| session.close(); |
| } |
| else |
| { |
| response = new DefaultHttpResponse(); |
| response.setProtocolVersion( HttpVersion.HTTP_1_1 ); |
| response.setStatus( HttpResponseStatus.INTERNAL_SERVER_ERROR ); |
| LOG.error( "Unexpected exception from a service : "+session.getRemoteAddress(), cause ); |
| } |
| |
| if ( response != null ) |
| { |
| HttpServiceContext context = this.currentContext; |
| if ( context == null ) |
| { |
| context = createContext( new DefaultHttpRequest() ); |
| } |
| context.commitResponse( response ); |
| } |
| } |
| |
| |
| public void messageReceived( Object message ) |
| { |
| // FIXME messageReceived invoked only when whole message is built. |
| |
| // When headers were built |
| //sendContinuationIfRequested(request); |
| |
| // When body has been built |
| } |
| |
| |
| /** |
| * Invoked when we fail to parse an incoming request. |
| * We configure our parser to discard any further data received from the client, |
| * and schedule a response with the appropriate failure code for the |
| * current request |
| * |
| * @param context the service context |
| * @param status the status |
| * @param message failure message |
| */ |
| private void handleReadFailure( HttpServiceContext context, HttpResponseStatus status, String message ) |
| { |
| LOG.info( "Failed to handle client {} request. Reason: {}", session.getRemoteAddress(), status ); |
| MutableHttpResponse response = new DefaultHttpResponse(); |
| response.setStatusReasonPhrase( message ); |
| response.setStatus( status ); |
| context.commitResponse( response ); |
| } |
| |
| |
| /** |
| * Invoked when data wrote has been fully written. |
| * If we have scheduled closure after sending a final response, we will |
| * be provided with the <code>CLOSE_MARKER</code> as our marker object.<br/> |
| * This signals us to schedule closure of the connection |
| * |
| * @param message The marker provided when writing data. If this is |
| * our closure marker, we schedule closure of the connection |
| */ |
| public void messageSent( Object message ) |
| { |
| } |
| |
| |
| /** |
| * Sets the read idle time for all connections |
| * |
| * @param readIdleTime The read idle time (seconds) |
| */ |
| public void setReadIdleTime( int readIdleTime ) |
| { |
| this.readIdleTime = readIdleTime; |
| session.getConfig().setReaderIdleTime( readIdleTime ); |
| } |
| |
| |
| protected DefaultHttpServiceContext createContext( HttpRequest request ) |
| { |
| return new DefaultHttpServiceContext( request ); |
| } |
| |
| |
| private class ContextConverter extends IoFilterAdapter |
| { |
| @Override |
| public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception |
| { |
| nextFilter.filterWrite( session, new DefaultWriteRequest( |
| ( ( HttpServiceContext ) writeRequest.getMessage() ) |
| .getCommittedResponse(), writeRequest.getFuture())); |
| } |
| |
| @Override |
| public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception |
| { |
| HttpRequest request = ( HttpRequest ) message; |
| currentContext = createContext( request ); |
| nextFilter.messageReceived( session, currentContext ); |
| } |
| } |
| |
| |
| private class ServiceFilterAdapter extends IoFilterAdapter |
| { |
| private final HttpServiceFilter filter; |
| |
| public ServiceFilterAdapter( HttpServiceFilter filter ) |
| { |
| this.filter = filter; |
| } |
| |
| @Override |
| public void messageReceived( final NextFilter nextFilter, |
| final IoSession session, final Object message ) throws Exception |
| { |
| HttpServiceFilter.NextFilter nextFilterAdapter = new HttpServiceFilter.NextFilter() |
| { |
| public void invoke() |
| { |
| nextFilter.messageReceived(session, message); |
| } |
| }; |
| filter.handleRequest( nextFilterAdapter, ( HttpServiceContext ) message ); |
| } |
| |
| @Override |
| public void filterWrite( final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest ) |
| throws Exception |
| { |
| HttpServiceFilter.NextFilter nextFilterAdapter = new HttpServiceFilter.NextFilter() |
| { |
| public void invoke() |
| { |
| nextFilter.filterWrite( session, writeRequest ); |
| } |
| }; |
| |
| HttpServiceContext context = ( HttpServiceContext ) writeRequest.getMessage(); |
| filter.handleResponse( nextFilterAdapter, context ); |
| } |
| } |
| |
| |
| private class RequestPipelineAdapter extends IoFilterAdapter |
| { |
| private final RequestPipeline pipeline; |
| |
| public RequestPipelineAdapter( final RequestPipeline pipeline ) |
| { |
| this.pipeline = pipeline; |
| } |
| |
| @Override |
| public void sessionOpened(final NextFilter nextFilter, |
| final IoSession session) { |
| pipeline.setPipelineListener(new RequestPipelineListener() { |
| public void responseReleased(HttpServiceContext context) { |
| nextFilter.filterWrite(session, new DefaultWriteRequest( |
| context, ((DefaultHttpServiceContext) context) |
| .getWriteFuture())); |
| } |
| }); |
| |
| nextFilter.sessionOpened(session); |
| } |
| |
| @Override |
| public void messageReceived(NextFilter nextFilter, IoSession session, |
| Object message) throws Exception { |
| HttpServiceContext context = (HttpServiceContext) message; |
| if (pipeline.addRequest(context)) { |
| if (LOG.isDebugEnabled()) |
| LOG.debug("Allocated slot in request pipeline"); |
| nextFilter.messageReceived(session, message); |
| } else { |
| // The client has filled their pipeline. Currently, this |
| // triggers closure. Another option would be to drop read interest |
| // until we drain. |
| LOG.warn("Could not allocate room in the pipeline for request"); |
| handleReadFailure(context, |
| HttpResponseStatus.SERVICE_UNAVAILABLE, "Pipeline full"); |
| } |
| } |
| |
| @Override |
| public void filterWrite(NextFilter nextFilter, IoSession session, |
| WriteRequest writeRequest) throws Exception { |
| DefaultHttpServiceContext context = (DefaultHttpServiceContext) writeRequest |
| .getMessage(); |
| context.setWriteFuture(writeRequest.getFuture()); |
| pipeline.releaseResponse(context); |
| // nextFilter will be invoked when pipeline listener is notified. |
| } |
| } |
| |
| |
| private class DefaultHttpServiceContext extends AbstractHttpServiceContext |
| { |
| private WriteFuture writeFuture; |
| |
| private DefaultHttpServiceContext( HttpRequest request ) |
| { |
| super( ( InetSocketAddress ) session.getLocalAddress(), ( InetSocketAddress ) session.getRemoteAddress(), request, container ); |
| } |
| |
| private WriteFuture getWriteFuture() |
| { |
| return writeFuture; |
| } |
| |
| private void setWriteFuture( WriteFuture writeFuture ) |
| { |
| if ( ! isResponseCommitted() ) |
| { |
| throw new IllegalStateException(); |
| } |
| this.writeFuture = writeFuture; |
| } |
| |
| @Override |
| protected void doWrite( boolean requiresClosure ) |
| { |
| currentContext = null; |
| WriteFuture future = session.write( this ); |
| if ( requiresClosure ) |
| { |
| LOG.debug( "Added CLOSE future listener." ); |
| future.addListener( IoFutureListener.CLOSE ); |
| } |
| } |
| |
| |
| public void fireClientIdle( long idleTime, int idleCount ) |
| { |
| super.fireClientIdle( idleTime, idleCount ); |
| } |
| |
| |
| public void fireClientDisconnected() |
| { |
| super.fireClientDisconnected(); |
| } |
| } |
| } |