blob: b6d07ad623436b6a601bbdcea050e1cb052b67be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.asyncweb.server.transport.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.mina.core.write.DefaultWriteRequest;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.write.WriteRequest;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.ProtocolDecoderException;
import org.apache.asyncweb.common.HttpRequest;
import org.apache.asyncweb.common.HttpResponseStatus;
import org.apache.asyncweb.common.HttpVersion;
import org.apache.asyncweb.common.MutableHttpResponse;
import org.apache.mina.handler.multiton.SingleSessionIoHandler;
import org.apache.asyncweb.common.*;
import org.apache.asyncweb.common.codec.HttpCodecFactory;
import org.apache.asyncweb.common.codec.HttpRequestDecoderException;
import org.apache.asyncweb.server.ServiceContainer;
import org.apache.asyncweb.server.context.AbstractHttpServiceContext;
import org.apache.asyncweb.server.pipeline.RequestPipeline;
import org.apache.asyncweb.server.pipeline.RequestPipelineListener;
import org.apache.asyncweb.server.pipeline.StandardRequestPipeline;
import org.apache.asyncweb.server.HttpServiceContext;
import org.apache.asyncweb.server.HttpServiceFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The single session handler implementation.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
*/
public class SingleHttpSessionIoHandler implements SingleSessionIoHandler
{
private static final Logger LOG = LoggerFactory.getLogger( SingleHttpSessionIoHandler.class );
/** the default idle time */
private static final int DEFAULT_IDLE_TIME = 30000;
/** out default pipeline */
private static final int DEFAULT_PIPELINE = 100;
/** the HttpService container **/
private final ServiceContainer container;
/** the session bound to this single session handler */
protected final IoSession session;
/** the request pipeline */
private final RequestPipeline pipeline;
/** the current context being processed */
private DefaultHttpServiceContext currentContext;
/** idle time for request reads */
private int readIdleTime = DEFAULT_IDLE_TIME;
public SingleHttpSessionIoHandler( ServiceContainer container, IoSession session )
{
this.container = container;
this.session = session;
this.pipeline = new StandardRequestPipeline( DEFAULT_PIPELINE );
session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readIdleTime );
session.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new HttpCodecFactory() ) );
session.getFilterChain().addLast( "converter", new ContextConverter() );
session.getFilterChain().addLast( "pipeline", new RequestPipelineAdapter( pipeline ) );
int i = 0;
for ( HttpServiceFilter serviceFilter : container.getServiceFilters() )
{
session.getFilterChain().addLast( "serviceFilter." + i++, new ServiceFilterAdapter( serviceFilter ) );
}
}
public void sessionCreated()
{
LOG.info( "Session created: {}", session.getRemoteAddress() );
}
public void sessionOpened()
{
LOG.info( "Session opened: {}", session.getRemoteAddress() );
}
public void sessionClosed()
{
LOG.info( "Session closed: {}", session.getRemoteAddress() );
if ( currentContext != null )
{
currentContext.fireClientDisconnected();
}
}
/**
* Invoked when this connection idles out.
* If we are in the process of parsing a request, the current request
* is rejected with a {@link HttpResponseStatus#REQUEST_TIMEOUT} response status.
*/
public void sessionIdle( IdleStatus idleType )
{
if ( session.getIdleCount( idleType ) >= 1 )
{
// // FIXME currentRequest is always null now; we need to cooperate with a decoder.
// if (currentContext != null) {
// LOG.info("Read idled out while parsing request. Scheduling timeout response");
// handleReadFailure(currentContext, HttpResponseStatus.REQUEST_TIMEOUT, "Timeout while reading request");
// } else {
LOG.debug( "Session idle detected on context {} with idleType {}", currentContext, idleType );
if ( currentContext != null )
{
if ( IdleStatus.BOTH_IDLE == idleType || IdleStatus.READER_IDLE == idleType )
{
currentContext.fireClientIdle( session.getLastReaderIdleTime(), session.getReaderIdleCount() );
}
}
else
{
// TODO - look further into this - it may present serious issues when dealing with HTTP/1.1
LOG.debug( "Idled with no current request. Scheduling closure when pipeline empties" );
pipeline.runWhenEmpty( new Runnable()
{
public void run()
{
LOG.info( "Pipeline empty after idle. Closing session: {}", session.getRemoteAddress() );
session.close();
}
});
}
}
}
public void exceptionCaught( Throwable cause )
{
MutableHttpResponse response = null;
if ( cause instanceof ProtocolDecoderException )
{
HttpResponseStatus status;
if ( cause instanceof HttpRequestDecoderException )
{
status = ( ( HttpRequestDecoderException ) cause ).getResponseStatus();
}
else
{
status = HttpResponseStatus.BAD_REQUEST;
}
LOG.warn( "Bad request: {}", session.getRemoteAddress(), cause );
response = new DefaultHttpResponse();
response.setProtocolVersion( HttpVersion.HTTP_1_1 );
response.setStatus( status );
}
else if ( cause instanceof IOException )
{
LOG.warn( "IOException on HTTP connection", cause );
session.close();
}
else
{
response = new DefaultHttpResponse();
response.setProtocolVersion( HttpVersion.HTTP_1_1 );
response.setStatus( HttpResponseStatus.INTERNAL_SERVER_ERROR );
LOG.error( "Unexpected exception from a service : "+session.getRemoteAddress(), cause );
}
if ( response != null )
{
HttpServiceContext context = this.currentContext;
if ( context == null )
{
context = createContext( new DefaultHttpRequest() );
}
context.commitResponse( response );
}
}
public void messageReceived( Object message )
{
// FIXME messageReceived invoked only when whole message is built.
// When headers were built
//sendContinuationIfRequested(request);
// When body has been built
}
/**
* Invoked when we fail to parse an incoming request.
* We configure our parser to discard any further data received from the client,
* and schedule a response with the appropriate failure code for the
* current request
*
* @param context the service context
* @param status the status
* @param message failure message
*/
private void handleReadFailure( HttpServiceContext context, HttpResponseStatus status, String message )
{
LOG.info( "Failed to handle client {} request. Reason: {}", session.getRemoteAddress(), status );
MutableHttpResponse response = new DefaultHttpResponse();
response.setStatusReasonPhrase( message );
response.setStatus( status );
context.commitResponse( response );
}
/**
* Invoked when data wrote has been fully written.
* If we have scheduled closure after sending a final response, we will
* be provided with the <code>CLOSE_MARKER</code> as our marker object.<br/>
* This signals us to schedule closure of the connection
*
* @param message The marker provided when writing data. If this is
* our closure marker, we schedule closure of the connection
*/
public void messageSent( Object message )
{
}
/**
* Sets the read idle time for all connections
*
* @param readIdleTime The read idle time (seconds)
*/
public void setReadIdleTime( int readIdleTime )
{
this.readIdleTime = readIdleTime;
session.getConfig().setReaderIdleTime( readIdleTime );
}
protected DefaultHttpServiceContext createContext( HttpRequest request )
{
return new DefaultHttpServiceContext( request );
}
private class ContextConverter extends IoFilterAdapter
{
@Override
public void filterWrite( NextFilter nextFilter, IoSession session, WriteRequest writeRequest ) throws Exception
{
nextFilter.filterWrite( session, new DefaultWriteRequest(
( ( HttpServiceContext ) writeRequest.getMessage() )
.getCommittedResponse(), writeRequest.getFuture()));
}
@Override
public void messageReceived( NextFilter nextFilter, IoSession session, Object message ) throws Exception
{
HttpRequest request = ( HttpRequest ) message;
currentContext = createContext( request );
nextFilter.messageReceived( session, currentContext );
}
}
private class ServiceFilterAdapter extends IoFilterAdapter
{
private final HttpServiceFilter filter;
public ServiceFilterAdapter( HttpServiceFilter filter )
{
this.filter = filter;
}
@Override
public void messageReceived( final NextFilter nextFilter,
final IoSession session, final Object message ) throws Exception
{
HttpServiceFilter.NextFilter nextFilterAdapter = new HttpServiceFilter.NextFilter()
{
public void invoke()
{
nextFilter.messageReceived(session, message);
}
};
filter.handleRequest( nextFilterAdapter, ( HttpServiceContext ) message );
}
@Override
public void filterWrite( final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest )
throws Exception
{
HttpServiceFilter.NextFilter nextFilterAdapter = new HttpServiceFilter.NextFilter()
{
public void invoke()
{
nextFilter.filterWrite( session, writeRequest );
}
};
HttpServiceContext context = ( HttpServiceContext ) writeRequest.getMessage();
filter.handleResponse( nextFilterAdapter, context );
}
}
private class RequestPipelineAdapter extends IoFilterAdapter
{
private final RequestPipeline pipeline;
public RequestPipelineAdapter( final RequestPipeline pipeline )
{
this.pipeline = pipeline;
}
@Override
public void sessionOpened(final NextFilter nextFilter,
final IoSession session) {
pipeline.setPipelineListener(new RequestPipelineListener() {
public void responseReleased(HttpServiceContext context) {
nextFilter.filterWrite(session, new DefaultWriteRequest(
context, ((DefaultHttpServiceContext) context)
.getWriteFuture()));
}
});
nextFilter.sessionOpened(session);
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
HttpServiceContext context = (HttpServiceContext) message;
if (pipeline.addRequest(context)) {
if (LOG.isDebugEnabled())
LOG.debug("Allocated slot in request pipeline");
nextFilter.messageReceived(session, message);
} else {
// The client has filled their pipeline. Currently, this
// triggers closure. Another option would be to drop read interest
// until we drain.
LOG.warn("Could not allocate room in the pipeline for request");
handleReadFailure(context,
HttpResponseStatus.SERVICE_UNAVAILABLE, "Pipeline full");
}
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
DefaultHttpServiceContext context = (DefaultHttpServiceContext) writeRequest
.getMessage();
context.setWriteFuture(writeRequest.getFuture());
pipeline.releaseResponse(context);
// nextFilter will be invoked when pipeline listener is notified.
}
}
private class DefaultHttpServiceContext extends AbstractHttpServiceContext
{
private WriteFuture writeFuture;
private DefaultHttpServiceContext( HttpRequest request )
{
super( ( InetSocketAddress ) session.getLocalAddress(), ( InetSocketAddress ) session.getRemoteAddress(), request, container );
}
private WriteFuture getWriteFuture()
{
return writeFuture;
}
private void setWriteFuture( WriteFuture writeFuture )
{
if ( ! isResponseCommitted() )
{
throw new IllegalStateException();
}
this.writeFuture = writeFuture;
}
@Override
protected void doWrite( boolean requiresClosure )
{
currentContext = null;
WriteFuture future = session.write( this );
if ( requiresClosure )
{
LOG.debug( "Added CLOSE future listener." );
future.addListener( IoFutureListener.CLOSE );
}
}
public void fireClientIdle( long idleTime, int idleCount )
{
super.fireClientIdle( idleTime, idleCount );
}
public void fireClientDisconnected()
{
super.fireClientDisconnected();
}
}
}