blob: b145d9c51d73e5d76ba58fe1a68d2736969a0734 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.mina.filter.codec;
import java.nio.ByteBuffer;
import org.apache.mina.api.AbstractIoFilter;
import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoSession;
import org.apache.mina.codec.ProtocolDecoder;
import org.apache.mina.codec.ProtocolDecoderException;
import org.apache.mina.codec.ProtocolEncoder;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
import org.apache.mina.session.AttributeKey;
import org.apache.mina.session.WriteRequest;
import org.apache.mina.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An {@link IoFilter} which translates binary or protocol specific data into
* message objects and vice versa using {@link ProtocolCodecFactory},
* {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
*
* @param MESSAGE
* the kind of high level business message this filter will encode
* and decode.
* @param ENCODED
* the kind of low level message (most of time {@link ByteBuffer})
* this filter will produce of consume.
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class ProtocolCodecFilter<MESSAGE, ENCODED, ENCODING_STATE, DECODING_STATE> extends AbstractIoFilter {
/** A logger for this class */
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
/** the immutable encoder */
private final ProtocolEncoder<MESSAGE, ENCODED, ENCODING_STATE> encoder;
/** the immutable decoder */
private final ProtocolDecoder<ENCODED, MESSAGE, DECODING_STATE> decoder;
/** key for session attribute holding the encoder */
private static final AttributeKey<Object> ENCODER = new AttributeKey<Object>(Object.class, "internal_encoder");
/** key for session attribute holding the decoder */
private static final AttributeKey<Object> DECODER = new AttributeKey<Object>(Object.class, "internal_decoder");
/**
* Creates a new instance of ProtocolCodecFilter, with the specified encoder
* and decoder.
*
*/
public ProtocolCodecFilter(ProtocolEncoder<MESSAGE, ENCODED, ENCODING_STATE> encoder,
ProtocolDecoder<ENCODED, MESSAGE, DECODING_STATE> decoder) {
Assert.assertNotNull(encoder, "encoder");
Assert.assertNotNull(decoder, "decoder");
this.encoder = encoder;
this.decoder = decoder;
}
/**
* Process the incoming message, calling the session decoder. As the
* incoming buffer might contains more than one messages, we have to loop
* until the decoder throws an exception. <code>
* while ( buffer not empty )
* try
* decode ( buffer )
* catch
* break;
* </code>
*/
@SuppressWarnings("unchecked")
@Override
public void messageReceived(IoSession session, Object in, ReadFilterChainController controller) {
LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session);
DECODING_STATE state = getDecodingState(session);
// Loop until the decoder cannot decode more
MESSAGE msg;
try {
while (((msg = decoder.decode((ENCODED) in, state)) != null)) {
super.messageReceived(session, msg, controller);
}
} catch (ProtocolDecoderException e) {
LOGGER.debug("decoding exception : ", e);
throw e;
}
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public void messageWriting(IoSession session, WriteRequest message, WriteFilterChainController controller) {
LOGGER.debug("Processing a MESSAGE_WRITTING for session {}", session);
ENCODED encoded = encoder.encode((MESSAGE) message.getMessage(), getEncodingState(session));
message.setMessage(encoded);
super.messageWriting(session, message, controller);
}
/**
* {@inheritDoc}
*/
@Override
public void sessionOpened(IoSession session) {
// Initialize the encoder and decoder state
ENCODING_STATE encodingState = encoder.createEncoderState();
session.setAttribute(ENCODER, encodingState);
DECODING_STATE decodingState = decoder.createDecoderState();
session.setAttribute(DECODER, decodingState);
super.sessionOpened(session);
}
/**
* {@inheritDoc}
*/
@Override
public void sessionClosed(IoSession session) {
decoder.finishDecode(getDecodingState(session));
super.sessionClosed(session);
}
// ----------- Helper methods ---------------------------------------------
@SuppressWarnings("unchecked")
protected DECODING_STATE getDecodingState(IoSession session) {
return (DECODING_STATE) session.getAttribute(DECODER);
}
@SuppressWarnings("unchecked")
protected ENCODING_STATE getEncodingState(IoSession session) {
return (ENCODING_STATE) session.getAttribute(ENCODER);
}
}