blob: b8c6f297205f9fb24e40f70983df743f849cbafd [file] [log] [blame]
package org.apache.mina.filter.codec;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
import org.apache.mina.common.*;
import org.apache.mina.common.support.DefaultWriteFuture;
import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
import org.apache.mina.util.SessionLog;
import org.apache.mina.util.Queue;
public class QpidProtocolCodecFilter extends IoFilterAdapter
{
public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder";
public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder";
private static final Class[] EMPTY_PARAMS = new Class[0];
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] );
private final ProtocolCodecFactory factory;
public QpidProtocolCodecFilter( ProtocolCodecFactory factory )
{
if( factory == null )
{
throw new NullPointerException( "factory" );
}
this.factory = factory;
}
public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder )
{
if( encoder == null )
{
throw new NullPointerException( "encoder" );
}
if( decoder == null )
{
throw new NullPointerException( "decoder" );
}
this.factory = new ProtocolCodecFactory()
{
public ProtocolEncoder getEncoder()
{
return encoder;
}
public ProtocolDecoder getDecoder()
{
return decoder;
}
};
}
public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass )
{
if( encoderClass == null )
{
throw new NullPointerException( "encoderClass" );
}
if( decoderClass == null )
{
throw new NullPointerException( "decoderClass" );
}
if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) )
{
throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() );
}
if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) )
{
throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() );
}
try
{
encoderClass.getConstructor( EMPTY_PARAMS );
}
catch( NoSuchMethodException e )
{
throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." );
}
try
{
decoderClass.getConstructor( EMPTY_PARAMS );
}
catch( NoSuchMethodException e )
{
throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." );
}
this.factory = new ProtocolCodecFactory()
{
public ProtocolEncoder getEncoder() throws Exception
{
return ( ProtocolEncoder ) encoderClass.newInstance();
}
public ProtocolDecoder getDecoder() throws Exception
{
return ( ProtocolDecoder ) decoderClass.newInstance();
}
};
}
public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception
{
if( parent.contains( ProtocolCodecFilter.class ) )
{
throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." );
}
}
public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
{
if( !( message instanceof ByteBuffer ) )
{
nextFilter.messageReceived( session, message );
return;
}
ByteBuffer in = ( ByteBuffer ) message;
ProtocolDecoder decoder = getDecoder( session );
ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
try
{
decoder.decode( session, in, decoderOut );
}
catch( Throwable t )
{
ProtocolDecoderException pde;
if( t instanceof ProtocolDecoderException )
{
pde = ( ProtocolDecoderException ) t;
}
else
{
pde = new ProtocolDecoderException( t );
}
pde.setHexdump( in.getHexDump() );
throw pde;
}
finally
{
// Dispose the decoder if this session is connectionless.
if( session.getTransportType().isConnectionless() )
{
disposeDecoder( session );
}
// Release the read buffer.
in.release();
decoderOut.flush();
}
}
public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception
{
if( message instanceof HiddenByteBuffer )
{
return;
}
if( !( message instanceof MessageByteBuffer ) )
{
nextFilter.messageSent( session, message );
return;
}
nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message );
}
public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception
{
Object message = writeRequest.getMessage();
if( message instanceof ByteBuffer )
{
nextFilter.filterWrite( session, writeRequest );
return;
}
ProtocolEncoder encoder = getEncoder( session );
ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest );
try
{
encoder.encode( session, message, encoderOut );
encoderOut.flush();
nextFilter.filterWrite(
session,
new IoFilter.WriteRequest(
new MessageByteBuffer( writeRequest.getMessage() ),
writeRequest.getFuture(), writeRequest.getDestination() ) );
}
catch( Throwable t )
{
ProtocolEncoderException pee;
if( t instanceof ProtocolEncoderException )
{
pee = ( ProtocolEncoderException ) t;
}
else
{
pee = new ProtocolEncoderException( t );
}
throw pee;
}
finally
{
// Dispose the encoder if this session is connectionless.
if( session.getTransportType().isConnectionless() )
{
disposeEncoder( session );
}
}
}
public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception
{
// Call finishDecode() first when a connection is closed.
ProtocolDecoder decoder = getDecoder( session );
ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter );
try
{
decoder.finishDecode( session, decoderOut );
}
catch( Throwable t )
{
ProtocolDecoderException pde;
if( t instanceof ProtocolDecoderException )
{
pde = ( ProtocolDecoderException ) t;
}
else
{
pde = new ProtocolDecoderException( t );
}
throw pde;
}
finally
{
// Dispose all.
disposeEncoder( session );
disposeDecoder( session );
decoderOut.flush();
}
nextFilter.sessionClosed( session );
}
private ProtocolEncoder getEncoder( IoSession session ) throws Exception
{
ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER );
if( encoder == null )
{
encoder = factory.getEncoder();
session.setAttribute( ENCODER, encoder );
}
return encoder;
}
private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
{
return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest );
}
private ProtocolDecoder getDecoder( IoSession session ) throws Exception
{
ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER );
if( decoder == null )
{
decoder = factory.getDecoder();
session.setAttribute( DECODER, decoder );
}
return decoder;
}
private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter )
{
return new SimpleProtocolDecoderOutput( session, nextFilter );
}
private void disposeEncoder( IoSession session )
{
ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER );
if( encoder == null )
{
return;
}
try
{
encoder.dispose( session );
}
catch( Throwable t )
{
SessionLog.warn(
session,
"Failed to dispose: " + encoder.getClass().getName() +
" (" + encoder + ')' );
}
}
private void disposeDecoder( IoSession session )
{
ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER );
if( decoder == null )
{
return;
}
try
{
decoder.dispose( session );
}
catch( Throwable t )
{
SessionLog.warn(
session,
"Falied to dispose: " + decoder.getClass().getName() +
" (" + decoder + ')' );
}
}
private static class HiddenByteBuffer extends ByteBufferProxy
{
private HiddenByteBuffer( ByteBuffer buf )
{
super( buf );
}
}
private static class MessageByteBuffer extends ByteBufferProxy
{
private final Object message;
private MessageByteBuffer( Object message )
{
super( EMPTY_BUFFER );
this.message = message;
}
public void acquire()
{
// no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
}
public void release()
{
// no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
}
}
private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput
{
private ByteBuffer buffer;
private final IoSession session;
private final IoFilter.NextFilter nextFilter;
private final IoFilter.WriteRequest writeRequest;
public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest )
{
this.session = session;
this.nextFilter = nextFilter;
this.writeRequest = writeRequest;
}
public void write( ByteBuffer buf )
{
if(buffer != null)
{
flush();
}
buffer = buf;
}
public void mergeAll()
{
}
public WriteFuture flush()
{
WriteFuture future = null;
if( buffer == null )
{
return null;
}
else
{
ByteBuffer buf = buffer;
// Flush only when the buffer has remaining.
if( buf.hasRemaining() )
{
future = doFlush( buf );
}
}
return future;
}
protected WriteFuture doFlush( ByteBuffer buf )
{
WriteFuture future = new DefaultWriteFuture( session );
nextFilter.filterWrite(
session,
new IoFilter.WriteRequest(
buf,
future, writeRequest.getDestination() ) );
return future;
}
}
}