| package org.apache.mina.filter.codec; |
| |
| |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| import org.apache.mina.common.*; |
| import org.apache.mina.common.support.DefaultWriteFuture; |
| import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; |
| import org.apache.mina.util.SessionLog; |
| import org.apache.mina.util.Queue; |
| |
| |
| public class QpidProtocolCodecFilter extends IoFilterAdapter |
| { |
| public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder"; |
| public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder"; |
| |
| private static final Class[] EMPTY_PARAMS = new Class[0]; |
| private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); |
| |
| private final ProtocolCodecFactory factory; |
| |
| public QpidProtocolCodecFilter( ProtocolCodecFactory factory ) |
| { |
| if( factory == null ) |
| { |
| throw new NullPointerException( "factory" ); |
| } |
| this.factory = factory; |
| } |
| |
| public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) |
| { |
| if( encoder == null ) |
| { |
| throw new NullPointerException( "encoder" ); |
| } |
| if( decoder == null ) |
| { |
| throw new NullPointerException( "decoder" ); |
| } |
| |
| this.factory = new ProtocolCodecFactory() |
| { |
| public ProtocolEncoder getEncoder() |
| { |
| return encoder; |
| } |
| |
| public ProtocolDecoder getDecoder() |
| { |
| return decoder; |
| } |
| }; |
| } |
| |
| public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) |
| { |
| if( encoderClass == null ) |
| { |
| throw new NullPointerException( "encoderClass" ); |
| } |
| if( decoderClass == null ) |
| { |
| throw new NullPointerException( "decoderClass" ); |
| } |
| if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) ) |
| { |
| throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() ); |
| } |
| if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) ) |
| { |
| throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() ); |
| } |
| try |
| { |
| encoderClass.getConstructor( EMPTY_PARAMS ); |
| } |
| catch( NoSuchMethodException e ) |
| { |
| throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." ); |
| } |
| try |
| { |
| decoderClass.getConstructor( EMPTY_PARAMS ); |
| } |
| catch( NoSuchMethodException e ) |
| { |
| throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); |
| } |
| |
| this.factory = new ProtocolCodecFactory() |
| { |
| public ProtocolEncoder getEncoder() throws Exception |
| { |
| return ( ProtocolEncoder ) encoderClass.newInstance(); |
| } |
| |
| public ProtocolDecoder getDecoder() throws Exception |
| { |
| return ( ProtocolDecoder ) decoderClass.newInstance(); |
| } |
| }; |
| } |
| |
| public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception |
| { |
| if( parent.contains( ProtocolCodecFilter.class ) ) |
| { |
| throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." ); |
| } |
| } |
| |
| public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception |
| { |
| if( !( message instanceof ByteBuffer ) ) |
| { |
| nextFilter.messageReceived( session, message ); |
| return; |
| } |
| |
| ByteBuffer in = ( ByteBuffer ) message; |
| ProtocolDecoder decoder = getDecoder( session ); |
| ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); |
| |
| try |
| { |
| decoder.decode( session, in, decoderOut ); |
| } |
| catch( Throwable t ) |
| { |
| ProtocolDecoderException pde; |
| if( t instanceof ProtocolDecoderException ) |
| { |
| pde = ( ProtocolDecoderException ) t; |
| } |
| else |
| { |
| pde = new ProtocolDecoderException( t ); |
| } |
| pde.setHexdump( in.getHexDump() ); |
| throw pde; |
| } |
| finally |
| { |
| // Dispose the decoder if this session is connectionless. |
| if( session.getTransportType().isConnectionless() ) |
| { |
| disposeDecoder( session ); |
| } |
| |
| // Release the read buffer. |
| in.release(); |
| |
| decoderOut.flush(); |
| } |
| } |
| |
| public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception |
| { |
| if( message instanceof HiddenByteBuffer ) |
| { |
| return; |
| } |
| |
| if( !( message instanceof MessageByteBuffer ) ) |
| { |
| nextFilter.messageSent( session, message ); |
| return; |
| } |
| |
| nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); |
| } |
| |
| public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception |
| { |
| Object message = writeRequest.getMessage(); |
| if( message instanceof ByteBuffer ) |
| { |
| nextFilter.filterWrite( session, writeRequest ); |
| return; |
| } |
| |
| ProtocolEncoder encoder = getEncoder( session ); |
| ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); |
| |
| try |
| { |
| encoder.encode( session, message, encoderOut ); |
| encoderOut.flush(); |
| nextFilter.filterWrite( |
| session, |
| new IoFilter.WriteRequest( |
| new MessageByteBuffer( writeRequest.getMessage() ), |
| writeRequest.getFuture(), writeRequest.getDestination() ) ); |
| } |
| catch( Throwable t ) |
| { |
| ProtocolEncoderException pee; |
| if( t instanceof ProtocolEncoderException ) |
| { |
| pee = ( ProtocolEncoderException ) t; |
| } |
| else |
| { |
| pee = new ProtocolEncoderException( t ); |
| } |
| throw pee; |
| } |
| finally |
| { |
| // Dispose the encoder if this session is connectionless. |
| if( session.getTransportType().isConnectionless() ) |
| { |
| disposeEncoder( session ); |
| } |
| } |
| } |
| |
| public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception |
| { |
| // Call finishDecode() first when a connection is closed. |
| ProtocolDecoder decoder = getDecoder( session ); |
| ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); |
| try |
| { |
| decoder.finishDecode( session, decoderOut ); |
| } |
| catch( Throwable t ) |
| { |
| ProtocolDecoderException pde; |
| if( t instanceof ProtocolDecoderException ) |
| { |
| pde = ( ProtocolDecoderException ) t; |
| } |
| else |
| { |
| pde = new ProtocolDecoderException( t ); |
| } |
| throw pde; |
| } |
| finally |
| { |
| // Dispose all. |
| disposeEncoder( session ); |
| disposeDecoder( session ); |
| |
| decoderOut.flush(); |
| } |
| |
| nextFilter.sessionClosed( session ); |
| } |
| |
| private ProtocolEncoder getEncoder( IoSession session ) throws Exception |
| { |
| ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER ); |
| if( encoder == null ) |
| { |
| encoder = factory.getEncoder(); |
| session.setAttribute( ENCODER, encoder ); |
| } |
| return encoder; |
| } |
| |
| private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) |
| { |
| return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); |
| } |
| |
| private ProtocolDecoder getDecoder( IoSession session ) throws Exception |
| { |
| ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); |
| if( decoder == null ) |
| { |
| decoder = factory.getDecoder(); |
| session.setAttribute( DECODER, decoder ); |
| } |
| return decoder; |
| } |
| |
| private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter ) |
| { |
| return new SimpleProtocolDecoderOutput( session, nextFilter ); |
| } |
| |
| private void disposeEncoder( IoSession session ) |
| { |
| ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); |
| if( encoder == null ) |
| { |
| return; |
| } |
| |
| try |
| { |
| encoder.dispose( session ); |
| } |
| catch( Throwable t ) |
| { |
| SessionLog.warn( |
| session, |
| "Failed to dispose: " + encoder.getClass().getName() + |
| " (" + encoder + ')' ); |
| } |
| } |
| |
| private void disposeDecoder( IoSession session ) |
| { |
| ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER ); |
| if( decoder == null ) |
| { |
| return; |
| } |
| |
| try |
| { |
| decoder.dispose( session ); |
| } |
| catch( Throwable t ) |
| { |
| SessionLog.warn( |
| session, |
| "Falied to dispose: " + decoder.getClass().getName() + |
| " (" + decoder + ')' ); |
| } |
| } |
| |
| private static class HiddenByteBuffer extends ByteBufferProxy |
| { |
| private HiddenByteBuffer( ByteBuffer buf ) |
| { |
| super( buf ); |
| } |
| } |
| |
| private static class MessageByteBuffer extends ByteBufferProxy |
| { |
| private final Object message; |
| |
| private MessageByteBuffer( Object message ) |
| { |
| super( EMPTY_BUFFER ); |
| this.message = message; |
| } |
| |
| public void acquire() |
| { |
| // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message |
| } |
| |
| public void release() |
| { |
| // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message |
| } |
| } |
| |
| private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput |
| { |
| private ByteBuffer buffer; |
| |
| private final IoSession session; |
| private final IoFilter.NextFilter nextFilter; |
| private final IoFilter.WriteRequest writeRequest; |
| |
| public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) |
| { |
| this.session = session; |
| this.nextFilter = nextFilter; |
| this.writeRequest = writeRequest; |
| } |
| |
| |
| |
| public void write( ByteBuffer buf ) |
| { |
| if(buffer != null) |
| { |
| flush(); |
| } |
| buffer = buf; |
| } |
| |
| public void mergeAll() |
| { |
| } |
| |
| public WriteFuture flush() |
| { |
| WriteFuture future = null; |
| if( buffer == null ) |
| { |
| return null; |
| } |
| else |
| { |
| ByteBuffer buf = buffer; |
| // Flush only when the buffer has remaining. |
| if( buf.hasRemaining() ) |
| { |
| future = doFlush( buf ); |
| } |
| |
| } |
| |
| return future; |
| } |
| |
| |
| protected WriteFuture doFlush( ByteBuffer buf ) |
| { |
| WriteFuture future = new DefaultWriteFuture( session ); |
| nextFilter.filterWrite( |
| session, |
| new IoFilter.WriteRequest( |
| buf, |
| future, writeRequest.getDestination() ) ); |
| return future; |
| } |
| } |
| } |
| |