initial work on parallel codec api
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
index 23a54c0..2997e6a 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolDecoderOutput.java
@@ -19,41 +19,49 @@
*/
package org.apache.mina.filter.codec;
-import java.util.LinkedList;
+import java.util.ArrayDeque;
import java.util.Queue;
+import org.apache.mina.core.filterchain.IoFilter.NextFilter;
+import org.apache.mina.core.session.IoSession;
+
/**
* A {@link ProtocolDecoderOutput} based on queue.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractProtocolDecoderOutput implements ProtocolDecoderOutput {
- /** The queue where decoded messages are stored */
- private final Queue<Object> messageQueue = new LinkedList<>();
+ /** The queue where decoded messages are stored */
+ protected final Queue<Object> messageQueue = new ArrayDeque<>();
- /**
- * Creates a new instance of a AbstractProtocolDecoderOutput
- */
- public AbstractProtocolDecoderOutput() {
- // Do nothing
- }
+ /**
+ * Creates a new instance of a AbstractProtocolDecoderOutput
+ */
+ public AbstractProtocolDecoderOutput() {
+ // Do nothing
+ }
- /**
- * @return The decoder's message queue
- */
- public Queue<Object> getMessageQueue() {
- return messageQueue;
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(Object message) {
+ if (message == null) {
+ throw new IllegalArgumentException("message");
+ }
- /**
- * {@inheritDoc}
- */
- @Override
- public void write(Object message) {
- if (message == null) {
- throw new IllegalArgumentException("message");
- }
+ messageQueue.add(message);
+ }
- messageQueue.add(message);
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush(NextFilter nextFilter, IoSession session) {
+ Object message = null;
+
+ while ((message = messageQueue.poll()) != null) {
+ nextFilter.messageReceived(session, message);
+ }
+ }
}
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
index e369ba9..58b8852 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/AbstractProtocolEncoderOutput.java
@@ -19,10 +19,8 @@
*/
package org.apache.mina.filter.codec;
+import java.util.ArrayDeque;
import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.mina.core.buffer.IoBuffer;
/**
* A {@link ProtocolEncoderOutput} based on queue.
@@ -30,80 +28,25 @@
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput {
- /** The queue where the decoded messages are stored */
- private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<>();
+ /** The queue where the decoded messages are stored */
+ protected final Queue<Object> messageQueue = new ArrayDeque<>();
- private boolean buffersOnly = true;
+ /**
+ * Creates an instance of AbstractProtocolEncoderOutput
+ */
+ public AbstractProtocolEncoderOutput() {
+ // Do nothing
+ }
- /**
- * Creates an instance of AbstractProtocolEncoderOutput
- */
- public AbstractProtocolEncoderOutput() {
- // Do nothing
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(Object message) {
+ if (message == null) {
+ throw new IllegalArgumentException("message");
+ }
- /**
- * @return The message queue
- */
- public Queue<Object> getMessageQueue() {
- return messageQueue;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void write(Object encodedMessage) {
- if (encodedMessage instanceof IoBuffer) {
- IoBuffer buf = (IoBuffer) encodedMessage;
- if (buf.hasRemaining()) {
- messageQueue.offer(buf);
- } else {
- throw new IllegalArgumentException("buf is empty. Forgot to call flip()?");
- }
- } else {
- messageQueue.offer(encodedMessage);
- buffersOnly = false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void mergeAll() {
- if (!buffersOnly) {
- throw new IllegalStateException("the encoded message list contains a non-buffer.");
- }
-
- final int size = messageQueue.size();
-
- if (size < 2) {
- // no need to merge!
- return;
- }
-
- // Get the size of merged BB
- int sum = 0;
- for (Object b : messageQueue) {
- sum += ((IoBuffer) b).remaining();
- }
-
- // Allocate a new BB that will contain all fragments
- IoBuffer newBuf = IoBuffer.allocate(sum);
-
- // and merge all.
- for (;;) {
- IoBuffer buf = (IoBuffer) messageQueue.poll();
- if (buf == null) {
- break;
- }
-
- newBuf.put(buf);
- }
-
- // Push the new buffer finally.
- newBuf.flip();
- messageQueue.add(newBuf);
- }
+ messageQueue.offer(message);
+ }
}
\ No newline at end of file
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
index a460b3d..93039e8 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
@@ -27,13 +27,10 @@
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.filterchain.IoFilterChain;
-import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
-import org.apache.mina.core.session.AbstractIoSession;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.DefaultWriteRequest;
-import org.apache.mina.core.write.NothingWrittenException;
import org.apache.mina.core.write.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,509 +44,418 @@
* @org.apache.xbean.XBean
*/
public class ProtocolCodecFilter extends IoFilterAdapter {
- /** A logger for this class */
- private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
+ /** A logger for this class */
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
- private static final Class<?>[] EMPTY_PARAMS = new Class[0];
+ private static final Class<?>[] EMPTY_PARAMS = new Class[0];
- private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
+ private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
- private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
+ private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
- private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
+ private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
- private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
+ private static final ProtocolDecoderOutputLocal DECODER_OUTPUT = new ProtocolDecoderOutputLocal();
- private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
+ private static final ProtocolEncoderOutputLocal ENCODER_OUTPUT = new ProtocolEncoderOutputLocal();
- /** The factory responsible for creating the encoder and decoder */
- private final ProtocolCodecFactory factory;
+ /** The factory responsible for creating the encoder and decoder */
+ private final ProtocolCodecFactory factory;
- /**
- * Creates a new instance of ProtocolCodecFilter, associating a factory
- * for the creation of the encoder and decoder.
- *
- * @param factory The associated factory
- */
- public ProtocolCodecFilter(ProtocolCodecFactory factory) {
- if (factory == null) {
- throw new IllegalArgumentException("factory");
- }
-
- this.factory = factory;
- }
-
- /**
- * Creates a new instance of ProtocolCodecFilter, without any factory.
- * The encoder/decoder factory will be created as an inner class, using
- * the two parameters (encoder and decoder).
- *
- * @param encoder The class responsible for encoding the message
- * @param decoder The class responsible for decoding the message
- */
- public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
- if (encoder == null) {
- throw new IllegalArgumentException("encoder");
- }
- if (decoder == null) {
- throw new IllegalArgumentException("decoder");
- }
-
- // Create the inner Factory based on the two parameters
- this.factory = new ProtocolCodecFactory() {
- /**
- * {@inheritDoc}
- */
- @Override
- public ProtocolEncoder getEncoder(IoSession session) {
- return encoder;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ProtocolDecoder getDecoder(IoSession session) {
- return decoder;
- }
- };
- }
-
- /**
- * Creates a new instance of ProtocolCodecFilter, without any factory.
- * The encoder/decoder factory will be created as an inner class, using
- * the two parameters (encoder and decoder), which are class names. Instances
- * for those classes will be created in this constructor.
- *
- * @param encoderClass The class responsible for encoding the message
- * @param decoderClass The class responsible for decoding the message
- */
- public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
- final Class<? extends ProtocolDecoder> decoderClass) {
- if (encoderClass == null) {
- throw new IllegalArgumentException("encoderClass");
- }
- if (decoderClass == null) {
- throw new IllegalArgumentException("decoderClass");
- }
- if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
- throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
- }
- if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
- throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
- }
- try {
- encoderClass.getConstructor(EMPTY_PARAMS);
- } catch (NoSuchMethodException e) {
- throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
- }
- try {
- decoderClass.getConstructor(EMPTY_PARAMS);
- } catch (NoSuchMethodException e) {
- throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
- }
-
- final ProtocolEncoder encoder;
-
- try {
- encoder = encoderClass.newInstance();
- } catch (Exception e) {
- throw new IllegalArgumentException("encoderClass cannot be initialized");
- }
-
- final ProtocolDecoder decoder;
-
- try {
- decoder = decoderClass.newInstance();
- } catch (Exception e) {
- throw new IllegalArgumentException("decoderClass cannot be initialized");
- }
-
- // Create the inner factory based on the two parameters.
- this.factory = new ProtocolCodecFactory() {
- /**
- * {@inheritDoc}
- */
- @Override
- public ProtocolEncoder getEncoder(IoSession session) throws Exception {
- return encoder;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public ProtocolDecoder getDecoder(IoSession session) throws Exception {
- return decoder;
- }
- };
- }
-
- /**
- * Get the encoder instance from a given session.
- *
- * @param session The associated session we will get the encoder from
- * @return The encoder instance, if any
- */
- public ProtocolEncoder getEncoder(IoSession session) {
- return (ProtocolEncoder) session.getAttribute(ENCODER);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
- if (parent.contains(this)) {
- throw new IllegalArgumentException(
- "You can't add the same filter instance more than once. Create another instance and add it.");
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
- // Clean everything
- disposeCodec(parent.getSession());
- }
-
- /**
- * Process the incoming message, calling the session decoder. As the incoming
- * buffer might contains more than one messages, we have to loop until the decoder
- * throws an exception.
- *
- * while ( buffer not empty )
- * try
- * decode ( buffer )
- * catch
- * break;
- *
- */
- @Override
- public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
- }
-
- if (!(message instanceof IoBuffer)) {
- nextFilter.messageReceived(session, message);
- return;
- }
-
- IoBuffer in = (IoBuffer) message;
- ProtocolDecoder decoder = factory.getDecoder(session);
- ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
-
- // Loop until we don't have anymore byte in the buffer,
- // or until the decoder throws an unrecoverable exception or
- // can't decoder a message, because there are not enough
- // data in the buffer
- while (in.hasRemaining()) {
- int oldPos = in.position();
- try {
- synchronized (session) {
- // Call the decoder with the read bytes
- decoder.decode(session, in, decoderOut);
- }
- // Finish decoding if no exception was thrown.
- decoderOut.flush(nextFilter, session);
- } catch (Exception e) {
- ProtocolDecoderException pde;
- if (e instanceof ProtocolDecoderException) {
- pde = (ProtocolDecoderException) e;
- } else {
- pde = new ProtocolDecoderException(e);
- }
- if (pde.getHexdump() == null) {
- // Generate a message hex dump
- int curPos = in.position();
- in.position(oldPos);
- pde.setHexdump(in.getHexDump());
- in.position(curPos);
- }
- // Fire the exceptionCaught event.
- decoderOut.flush(nextFilter, session);
- nextFilter.exceptionCaught(session, pde);
- // Retry only if the type of the caught exception is
- // recoverable and the buffer position has changed.
- // We check buffer position additionally to prevent an
- // infinite loop.
- if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
- break;
- }
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
- if (writeRequest instanceof EncodedWriteRequest) {
- return;
- }
-
- nextFilter.messageSent(session, writeRequest);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
- Object message = writeRequest.getMessage();
-
- // Bypass the encoding if the message is contained in a IoBuffer,
- // as it has already been encoded before
- if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
- nextFilter.filterWrite(session, writeRequest);
- return;
- }
-
- // Get the encoder in the session
- ProtocolEncoder encoder = factory.getEncoder(session);
-
- ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
-
- if (encoder == null) {
- throw new ProtocolEncoderException("The encoder is null for the session " + session);
- }
-
- try {
- // Now we can try to encode the response
- encoder.encode(session, message, encoderOut);
-
- // Send it directly
- Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
-
- // Write all the encoded messages now
- while (!bufferQueue.isEmpty()) {
- Object encodedMessage = bufferQueue.poll();
-
- if (encodedMessage == null) {
- break;
- }
-
- // Flush only when the buffer has remaining.
- if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
- if (bufferQueue.isEmpty()) {
- writeRequest.setMessage(encodedMessage);
- nextFilter.filterWrite(session, writeRequest);
- } else {
- SocketAddress destination = writeRequest.getDestination();
- WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
- nextFilter.filterWrite(session, encodedWriteRequest);
- }
+ /**
+ * Creates a new instance of ProtocolCodecFilter, associating a factory for the
+ * creation of the encoder and decoder.
+ *
+ * @param factory The associated factory
+ */
+ public ProtocolCodecFilter(ProtocolCodecFactory factory) {
+ if (factory == null) {
+ throw new IllegalArgumentException("factory");
}
- }
- } catch (Exception e) {
- ProtocolEncoderException pee;
- // Generate the correct exception
- if (e instanceof ProtocolEncoderException) {
- pee = (ProtocolEncoderException) e;
- } else {
- pee = new ProtocolEncoderException(e);
- }
+ this.factory = factory;
+ }
- throw pee;
- }
- }
+ /**
+ * Creates a new instance of ProtocolCodecFilter, without any factory. The
+ * encoder/decoder factory will be created as an inner class, using the two
+ * parameters (encoder and decoder).
+ *
+ * @param encoder The class responsible for encoding the message
+ * @param decoder The class responsible for decoding the message
+ */
+ public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
+ if (encoder == null) {
+ throw new IllegalArgumentException("encoder");
+ }
+ if (decoder == null) {
+ throw new IllegalArgumentException("decoder");
+ }
- /**
- * {@inheritDoc}
- */
- @Override
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
- // Call finishDecode() first when a connection is closed.
- ProtocolDecoder decoder = factory.getDecoder(session);
- ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
+ // Create the inner Factory based on the two parameters
+ this.factory = new ProtocolCodecFactory() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ProtocolEncoder getEncoder(IoSession session) {
+ return encoder;
+ }
- try {
- decoder.finishDecode(session, decoderOut);
- } catch (Exception e) {
- ProtocolDecoderException pde;
- if (e instanceof ProtocolDecoderException) {
- pde = (ProtocolDecoderException) e;
- } else {
- pde = new ProtocolDecoderException(e);
- }
- throw pde;
- } finally {
- // Dispose everything
- disposeCodec(session);
- decoderOut.flush(nextFilter, session);
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ProtocolDecoder getDecoder(IoSession session) {
+ return decoder;
+ }
+ };
+ }
- // Call the next filter
- nextFilter.sessionClosed(session);
- }
+ /**
+ * Creates a new instance of ProtocolCodecFilter, without any factory. The
+ * encoder/decoder factory will be created as an inner class, using the two
+ * parameters (encoder and decoder), which are class names. Instances for those
+ * classes will be created in this constructor.
+ *
+ * @param encoderClass The class responsible for encoding the message
+ * @param decoderClass The class responsible for decoding the message
+ */
+ public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
+ final Class<? extends ProtocolDecoder> decoderClass) {
+ if (encoderClass == null) {
+ throw new IllegalArgumentException("encoderClass");
+ }
+ if (decoderClass == null) {
+ throw new IllegalArgumentException("decoderClass");
+ }
+ if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
+ throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
+ }
+ if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
+ throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
+ }
+ try {
+ encoderClass.getConstructor(EMPTY_PARAMS);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
+ }
+ try {
+ decoderClass.getConstructor(EMPTY_PARAMS);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
+ }
- private static class EncodedWriteRequest extends DefaultWriteRequest {
- public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
- super(encodedMessage, future, destination);
- }
+ final ProtocolEncoder encoder;
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isEncoded() {
- return true;
- }
- }
+ try {
+ encoder = encoderClass.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("encoderClass cannot be initialized");
+ }
- private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
- public ProtocolDecoderOutputImpl() {
- // Do nothing
- }
+ final ProtocolDecoder decoder;
- /**
- * {@inheritDoc}
- */
- @Override
- public void flush(NextFilter nextFilter, IoSession session) {
- Queue<Object> messageQueue = getMessageQueue();
+ try {
+ decoder = decoderClass.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("decoderClass cannot be initialized");
+ }
- while (!messageQueue.isEmpty()) {
- nextFilter.messageReceived(session, messageQueue.poll());
- }
- }
- }
+ // Create the inner factory based on the two parameters.
+ this.factory = new ProtocolCodecFactory() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ProtocolEncoder getEncoder(IoSession session) throws Exception {
+ return encoder;
+ }
- private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
- private final IoSession session;
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ProtocolDecoder getDecoder(IoSession session) throws Exception {
+ return decoder;
+ }
+ };
+ }
- private final NextFilter nextFilter;
+ /**
+ * Get the encoder instance from a given session.
+ *
+ * @param session The associated session we will get the encoder from
+ * @return The encoder instance, if any
+ */
+ public ProtocolEncoder getEncoder(IoSession session) {
+ return (ProtocolEncoder) session.getAttribute(ENCODER);
+ }
- /** The WriteRequest destination */
- private final SocketAddress destination;
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+ if (parent.contains(this)) {
+ throw new IllegalArgumentException(
+ "You can't add the same filter instance more than once. Create another instance and add it.");
+ }
+ }
- public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
- this.session = session;
- this.nextFilter = nextFilter;
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+ // Clean everything
+ disposeCodec(parent.getSession());
+ }
- // Only store the destination, not the full WriteRequest.
- destination = writeRequest.getDestination();
- }
+ /**
+ * Process the incoming message, calling the session decoder. As the incoming
+ * buffer might contains more than one messages, we have to loop until the
+ * decoder throws an exception.
+ *
+ * while ( buffer not empty ) try decode ( buffer ) catch break;
+ *
+ */
+ @Override
+ public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message)
+ throws Exception {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
+ }
- /**
- * {@inheritDoc}
- */
- @Override
- public WriteFuture flush() {
- Queue<Object> bufferQueue = getMessageQueue();
- WriteFuture future = null;
+ if (!(message instanceof IoBuffer)) {
+ nextFilter.messageReceived(session, message);
+ return;
+ }
- while (!bufferQueue.isEmpty()) {
- Object encodedMessage = bufferQueue.poll();
+ final IoBuffer in = (IoBuffer) message;
+ final ProtocolDecoder decoder = factory.getDecoder(session);
+ final ProtocolDecoderOutputImpl decoderOut = DECODER_OUTPUT.get();
- if (encodedMessage == null) {
- break;
- }
+ // Loop until we don't have anymore byte in the buffer,
+ // or until the decoder throws an unrecoverable exception or
+ // can't decoder a message, because there are not enough
+ // data in the buffer
+ while (in.hasRemaining()) {
+ int oldPos = in.position();
+ try {
+ // Call the decoder with the read bytes
+ decoder.decode(session, in, decoderOut);
+ // Finish decoding if no exception was thrown.
+ decoderOut.flush(nextFilter, session);
+ } catch (Exception e) {
+ ProtocolDecoderException pde;
+ if (e instanceof ProtocolDecoderException) {
+ pde = (ProtocolDecoderException) e;
+ } else {
+ pde = new ProtocolDecoderException(e);
+ }
+ if (pde.getHexdump() == null) {
+ // Generate a message hex dump
+ int curPos = in.position();
+ in.position(oldPos);
+ pde.setHexdump(in.getHexDump());
+ in.position(curPos);
+ }
+ // Fire the exceptionCaught event.
+ decoderOut.flush(nextFilter, session);
+ nextFilter.exceptionCaught(session, pde);
+ // Retry only if the type of the caught exception is
+ // recoverable and the buffer position has changed.
+ // We check buffer position additionally to prevent an
+ // infinite loop.
+ if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
+ break;
+ }
+ }
+ }
+ }
- // Flush only when the buffer has remaining.
- if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
- future = new DefaultWriteFuture(session);
- nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
- }
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+ if (writeRequest instanceof EncodedWriteRequest) {
+ return;
+ }
- if (future == null) {
- // Creates an empty writeRequest containing the destination
- future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
- }
+ nextFilter.messageSent(session, writeRequest);
+ }
- return future;
- }
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
+ throws Exception {
+ final Object message = writeRequest.getMessage();
- //----------- Helper methods ---------------------------------------------
- /**
- * Dispose the encoder, decoder, and the callback for the decoded
- * messages.
- */
- private void disposeCodec(IoSession session) {
- // We just remove the two instances of encoder/decoder to release resources
- // from the session
- disposeEncoder(session);
- disposeDecoder(session);
+ // Bypass the encoding if the message is contained in a IoBuffer,
+ // as it has already been encoded before
+ if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
+ nextFilter.filterWrite(session, writeRequest);
+ return;
+ }
- // We also remove the callback
- disposeDecoderOut(session);
- }
+ // Get the encoder in the session
+ final ProtocolEncoder encoder = factory.getEncoder(session);
+ final ProtocolEncoderOutputImpl encoderOut = ENCODER_OUTPUT.get();
- /**
- * Dispose the encoder, removing its instance from the
- * session's attributes, and calling the associated
- * dispose method.
- */
- private void disposeEncoder(IoSession session) {
- ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
- if (encoder == null) {
- return;
- }
+ if (encoder == null) {
+ throw new ProtocolEncoderException("The encoder is null for the session " + session);
+ }
- try {
- encoder.dispose(session);
- } catch (Exception e) {
- LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
- }
- }
+ try {
+ // Now we can try to encode the response
+ encoder.encode(session, message, encoderOut);
- /**
- * Dispose the decoder, removing its instance from the
- * session's attributes, and calling the associated
- * dispose method.
- */
- private void disposeDecoder(IoSession session) {
- ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
- if (decoder == null) {
- return;
- }
+ final Queue<Object> queue = encoderOut.messageQueue;
- try {
- decoder.dispose(session);
- } catch (Exception e) {
- LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
- }
- }
+ if (queue.isEmpty()) {
+ // Write empty message to ensure that messageSent is fired later
+ writeRequest.setMessage(EMPTY_BUFFER);
+ nextFilter.filterWrite(session, writeRequest);
+ } else {
+ // Write all the encoded messages now
+ Object encodedMessage = null;
- /**
- * Return a reference to the decoder callback. If it's not already created
- * and stored into the session, we create a new instance.
- */
- private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
- ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
+ while ((encodedMessage = queue.poll()) != null) {
+ if (queue.isEmpty()) {
+ // Write last message using original WriteRequest to ensure that any Future and
+ // dependency on messageSent event is emitted correctly
+ writeRequest.setMessage(encodedMessage);
+ nextFilter.filterWrite(session, writeRequest);
+ } else {
+ SocketAddress destination = writeRequest.getDestination();
+ WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
+ nextFilter.filterWrite(session, encodedWriteRequest);
+ }
+ }
+ }
+ } catch (final ProtocolEncoderException e) {
+ throw e;
+ } catch (final Exception e) {
+ // Generate the correct exception
+ throw new ProtocolEncoderException(e);
+ }
+ }
- if (out == null) {
- // Create a new instance, and stores it into the session
- out = new ProtocolDecoderOutputImpl();
- session.setAttribute(DECODER_OUT, out);
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+ // Call finishDecode() first when a connection is closed.
+ ProtocolDecoder decoder = factory.getDecoder(session);
+ ProtocolDecoderOutput decoderOut = DECODER_OUTPUT.get();
- return out;
- }
+ try {
+ decoder.finishDecode(session, decoderOut);
+ } catch (Exception e) {
+ ProtocolDecoderException pde;
+ if (e instanceof ProtocolDecoderException) {
+ pde = (ProtocolDecoderException) e;
+ } else {
+ pde = new ProtocolDecoderException(e);
+ }
+ throw pde;
+ } finally {
+ // Dispose everything
+ disposeCodec(session);
+ decoderOut.flush(nextFilter, session);
+ }
- private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
- ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
+ // Call the next filter
+ nextFilter.sessionClosed(session);
+ }
- if (out == null) {
- // Create a new instance, and stores it into the session
- out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
- session.setAttribute(ENCODER_OUT, out);
- }
+ private static class EncodedWriteRequest extends DefaultWriteRequest {
+ public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
+ super(encodedMessage, future, destination);
+ }
- return out;
- }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isEncoded() {
+ return true;
+ }
+ }
- /**
- * Remove the decoder callback from the session's attributes.
- */
- private void disposeDecoderOut(IoSession session) {
- session.removeAttribute(DECODER_OUT);
- }
+ private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
+ public ProtocolDecoderOutputImpl() {
+ // Do nothing
+ }
+ }
+
+ private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
+ public ProtocolEncoderOutputImpl() {
+ // Do nothing
+ }
+ }
+
+ // ----------- Helper methods ---------------------------------------------
+ /**
+ * Dispose the encoder, decoder, and the callback for the decoded messages.
+ */
+ private void disposeCodec(IoSession session) {
+ // We just remove the two instances of encoder/decoder to release resources
+ // from the session
+ disposeEncoder(session);
+ disposeDecoder(session);
+ }
+
+ /**
+ * Dispose the encoder, removing its instance from the session's attributes, and
+ * calling the associated dispose method.
+ */
+ private void disposeEncoder(IoSession session) {
+ ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
+ if (encoder == null) {
+ return;
+ }
+
+ try {
+ encoder.dispose(session);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
+ }
+ }
+
+ /**
+ * Dispose the decoder, removing its instance from the session's attributes, and
+ * calling the associated dispose method.
+ */
+ private void disposeDecoder(IoSession session) {
+ ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
+ if (decoder == null) {
+ return;
+ }
+
+ try {
+ decoder.dispose(session);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
+ }
+ }
+
+ static private class ProtocolDecoderOutputLocal extends ThreadLocal<ProtocolDecoderOutputImpl> {
+ @Override
+ protected ProtocolDecoderOutputImpl initialValue() {
+ return new ProtocolDecoderOutputImpl();
+ }
+ }
+
+ static private class ProtocolEncoderOutputLocal extends ThreadLocal<ProtocolEncoderOutputImpl> {
+ @Override
+ protected ProtocolEncoderOutputImpl initialValue() {
+ return new ProtocolEncoderOutputImpl();
+ }
+ }
}
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
index 2b5f89c..1638491 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
@@ -59,17 +59,8 @@
*/
public class ProtocolCodecSession extends DummySession {
- private final WriteFuture notWrittenFuture = DefaultWriteFuture.newNotWrittenFuture(this,
- new UnsupportedOperationException());
-
private final AbstractProtocolEncoderOutput encoderOutput = new AbstractProtocolEncoderOutput() {
- /**
- * {@inheritDoc}
- */
- @Override
- public WriteFuture flush() {
- return notWrittenFuture;
- }
+
};
private final AbstractProtocolDecoderOutput decoderOutput = new AbstractProtocolDecoderOutput() {
@@ -101,7 +92,7 @@
* @return the {@link Queue} of the buffered encoder output.
*/
public Queue<Object> getEncoderOutputQueue() {
- return encoderOutput.getMessageQueue();
+ return encoderOutput.messageQueue;
}
/**
@@ -116,6 +107,6 @@
* @return the {@link Queue} of the buffered decoder output.
*/
public Queue<Object> getDecoderOutputQueue() {
- return decoderOutput.getMessageQueue();
+ return decoderOutput.messageQueue;
}
}
diff --git a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
index 0fc847c..508ee23 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
@@ -21,44 +21,22 @@
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
-import org.apache.mina.core.future.WriteFuture;
/**
* Callback for {@link ProtocolEncoder} to generate encoded messages such as
- * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)}
+ * {@link IoBuffer}s. {@link ProtocolEncoder} must call {@link #write(Object)}
* for each encoded message.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface ProtocolEncoderOutput {
- /**
- * Callback for {@link ProtocolEncoder} to generate an encoded message such
- * as an {@link IoBuffer}. {@link ProtocolEncoder} must call
- * {@link #write(Object)} for each encoded message.
- *
- * @param encodedMessage the encoded message, typically an {@link IoBuffer}
- * or a {@link FileRegion}.
- */
- void write(Object encodedMessage);
-
- /**
- * Merges all buffers you wrote via {@link #write(Object)} into
- * one {@link IoBuffer} and replaces the old fragmented ones with it.
- * This method is useful when you want to control the way MINA generates
- * network packets. Please note that this method only works when you
- * called {@link #write(Object)} method with only {@link IoBuffer}s.
- *
- * @throws IllegalStateException if you wrote something else than {@link IoBuffer}
- */
- void mergeAll();
-
- /**
- * Flushes all buffers you wrote via {@link #write(Object)} to
- * the session. This operation is asynchronous; please wait for
- * the returned {@link WriteFuture} if you want to wait for
- * the buffers flushed.
- *
- * @return <tt>null</tt> if there is nothing to flush at all.
- */
- WriteFuture flush();
+ /**
+ * Callback for {@link ProtocolEncoder} to generate an encoded message such as
+ * an {@link IoBuffer}. {@link ProtocolEncoder} must call {@link #write(Object)}
+ * for each encoded message.
+ *
+ * @param message the encoded message, typically an {@link IoBuffer} or a
+ * {@link FileRegion}.
+ */
+ void write(Object message);
}
\ No newline at end of file
diff --git a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
index 87b886d..f8497b8 100644
--- a/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
+++ b/mina-http/src/test/java/org/apache/mina/http/HttpServerDecoderTest.java
@@ -25,9 +25,9 @@
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
+import java.util.Queue;
import org.apache.mina.core.buffer.IoBuffer;
-import org.apache.mina.core.filterchain.IoFilter.NextFilter;
import org.apache.mina.core.session.DummySession;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.AbstractProtocolDecoderOutput;
@@ -37,262 +37,250 @@
import org.junit.Test;
public class HttpServerDecoderTest {
- private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
+ private static final CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder(); //$NON-NLS-1$
- private static final ProtocolDecoder decoder = new HttpServerDecoder();
+ private static final ProtocolDecoder decoder = new HttpServerDecoder();
- /*
- * Use a single session for all requests in order to test state management better
- */
- private static IoSession session = new DummySession();
+ /*
+ * Use a single session for all requests in order to test state management
+ * better
+ */
+ private static IoSession session = new DummySession();
- /**
- * Build an IO buffer containing a simple minimal HTTP request.
- *
- * @param method the HTTP method
- * @param body the option body
- * @return the built IO buffer
- * @throws CharacterCodingException if encoding fails
- */
- protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException {
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
-
- if (body != null) {
- buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder);
- buffer.putString(body, encoder);
- } else {
- buffer.putString("\r\n", encoder);
- }
-
- buffer.rewind();
-
- return buffer;
- }
+ /**
+ * Build an IO buffer containing a simple minimal HTTP request.
+ *
+ * @param method the HTTP method
+ * @param body the option body
+ * @return the built IO buffer
+ * @throws CharacterCodingException if encoding fails
+ */
+ protected static IoBuffer getRequestBuffer(String method, String body) throws CharacterCodingException {
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString(method + " / HTTP/1.1\r\nHost: dummy\r\n", encoder);
- protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException {
- return getRequestBuffer(method, null);
- }
+ if (body != null) {
+ buffer.putString("Content-Length: " + body.length() + "\r\n\r\n", encoder);
+ buffer.putString(body, encoder);
+ } else {
+ buffer.putString("\r\n", encoder);
+ }
- /**
- * Execute an HTPP request and return the queue of messages.
- *
- * @param method the HTTP method
- * @param body the optional body
- * @return the protocol output and its queue of messages
- * @throws Exception if error occurs (encoding,...)
- */
- protected static AbstractProtocolDecoderOutput executeRequest(String method, String body) throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
+ buffer.rewind();
- IoBuffer buffer = getRequestBuffer(method, body); //$NON-NLS-1$
-
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
-
- return out;
- }
+ return buffer;
+ }
- @Test
- public void testGetRequestWithoutBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("GET", null);
- assertEquals(2, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ protected static IoBuffer getRequestBuffer(String method) throws CharacterCodingException {
+ return getRequestBuffer(method, null);
+ }
- @Test
- public void testGetRequestBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("GET", "body");
- assertEquals(3, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ protected static class ProtocolDecoderQueue extends AbstractProtocolDecoderOutput {
+ public Queue<Object> getQueue() {
+ return this.messageQueue;
+ }
+ }
- @Test
- public void testPutRequestWithoutBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("PUT", null);
- assertEquals(2, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ /**
+ * Execute an HTPP request and return the queue of messages.
+ *
+ * @param method the HTTP method
+ * @param body the optional body
+ * @return the protocol output and its queue of messages
+ * @throws Exception if error occurs (encoding,...)
+ */
+ protected static ProtocolDecoderQueue executeRequest(String method, String body) throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
- @Test
- public void testPutRequestBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("PUT", "body");
- assertEquals(3, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ IoBuffer buffer = getRequestBuffer(method, body); // $NON-NLS-1$
- @Test
- public void testPostRequestWithoutBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("POST", null);
- assertEquals(2, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
- @Test
- public void testPostRequestBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("POST", "body");
- assertEquals(3, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ return out;
+ }
- @Test
- public void testDeleteRequestWithoutBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("DELETE", null);
- assertEquals(2, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ @Test
+ public void testGetRequestWithoutBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("GET", null);
+ assertEquals(2, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
- @Test
- public void testDeleteRequestBody() throws Exception {
- AbstractProtocolDecoderOutput out = executeRequest("DELETE", "body");
- assertEquals(3, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
-
- @Test
- public void testDIRMINA965NoContent() throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("dummy\r\n\r\n", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- assertEquals(2, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ @Test
+ public void testGetRequestBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("GET", "body");
+ assertEquals(3, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
- @Test
- public void testDIRMINA965WithContent() throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- assertEquals(3, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
- @Test
- public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("B", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- assertEquals(4, out.getMessageQueue().size());
- assertTrue(out.getMessageQueue().poll() instanceof HttpRequest);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof IoBuffer);
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
-
- @Test
- public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- assertEquals(2, out.getMessageQueue().size());
- HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
- assertEquals("localhost", request.getHeader("host"));
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ @Test
+ public void testPutRequestWithoutBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("PUT", null);
+ assertEquals(2, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
- @Test
- public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("GET / HTTP/1.0\r\nHost: localhost\r\n\r\n", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- assertEquals(2, out.getMessageQueue().size());
- HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
- assertEquals("localhost", request.getHeader("host"));
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ @Test
+ public void testPutRequestBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("PUT", "body");
+ assertEquals(3, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
- @Test
- public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception {
- AbstractProtocolDecoderOutput out = new AbstractProtocolDecoderOutput() {
- public void flush(NextFilter nextFilter, IoSession session) {
- }
- };
- IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
- buffer.putString("GET / HTTP/1.0\r\nHost:localhost \r\n\r\n", encoder);
- buffer.rewind();
- while (buffer.hasRemaining()) {
- decoder.decode(session, buffer, out);
- }
- assertEquals(2, out.getMessageQueue().size());
- HttpRequest request = (HttpRequest) out.getMessageQueue().poll();
- assertEquals("localhost", request.getHeader("host"));
- assertTrue(out.getMessageQueue().poll() instanceof HttpEndOfContent);
- }
+ @Test
+ public void testPostRequestWithoutBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("POST", null);
+ assertEquals(2, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void testPostRequestBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("POST", "body");
+ assertEquals(3, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void testDeleteRequestWithoutBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("DELETE", null);
+ assertEquals(2, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void testDeleteRequestBody() throws Exception {
+ ProtocolDecoderQueue out = executeRequest("DELETE", "body");
+ assertEquals(3, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void testDIRMINA965NoContent() throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("dummy\r\n\r\n", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ assertEquals(2, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void testDIRMINA965WithContent() throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("dummy\r\nContent-Length: 1\r\n\r\nA", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+
+ assertEquals(3, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void testDIRMINA965WithContentOnTwoChunks() throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("GET / HTTP/1.1\r\nHost: ", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("dummy\r\nContent-Length: 2\r\n\r\nA", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("B", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ assertEquals(4, out.getQueue().size());
+ assertTrue(out.getQueue().poll() instanceof HttpRequest);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof IoBuffer);
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void verifyThatHeaderWithoutLeadingSpaceIsSupported() throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("GET / HTTP/1.0\r\nHost:localhost\r\n\r\n", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ assertEquals(2, out.getQueue().size());
+ HttpRequest request = (HttpRequest) out.getQueue().poll();
+ assertEquals("localhost", request.getHeader("host"));
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void verifyThatLeadingSpacesAreRemovedFromHeader() throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("GET / HTTP/1.0\r\nHost: localhost\r\n\r\n", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ assertEquals(2, out.getQueue().size());
+ HttpRequest request = (HttpRequest) out.getQueue().poll();
+ assertEquals("localhost", request.getHeader("host"));
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
+
+ @Test
+ public void verifyThatTrailingSpacesAreRemovedFromHeader() throws Exception {
+ ProtocolDecoderQueue out = new ProtocolDecoderQueue();
+ IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true);
+ buffer.putString("GET / HTTP/1.0\r\nHost:localhost \r\n\r\n", encoder);
+ buffer.rewind();
+ while (buffer.hasRemaining()) {
+ decoder.decode(session, buffer, out);
+ }
+ assertEquals(2, out.getQueue().size());
+ HttpRequest request = (HttpRequest) out.getQueue().poll();
+ assertEquals("localhost", request.getHeader("host"));
+ assertTrue(out.getQueue().poll() instanceof HttpEndOfContent);
+ }
}