Adds unit test for DIRMINA-1142
diff --git a/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java b/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java
new file mode 100644
index 0000000..905dcf1
--- /dev/null
+++ b/mina-core/src/test/java/org/apache/mina/filter/codec/ParallelProtocolEncoderTest.java
@@ -0,0 +1,184 @@
+package org.apache.mina.filter.codec;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.future.WriteFuture;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
+import org.junit.Test;
+
+public class ParallelProtocolEncoderTest {
+ private NioSocketConnector connector = null;
+ private NioSocketAcceptor acceptor = null;
+ private static int LOOP = 1000;
+ private static int THREAD = 3;
+
+ private static Logger logger = LogManager.getLogger(ParallelProtocolEncoderTest.class);
+ private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD);
+
+ @Test
+ public void missingMessageTest() throws Exception {
+ String host = "localhost";
+ int port = 28_000;
+
+ // server
+ acceptor = new NioSocketAcceptor();
+ acceptor.getFilterChain().addFirst("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+ ServerHandler serverHandler = new ServerHandler();
+ acceptor.setHandler(serverHandler);
+ acceptor.bind(new InetSocketAddress(host, port));
+
+ // client
+ connector = new NioSocketConnector(1);
+ connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
+ ClientHandler clientHandler = new ClientHandler();
+ connector.setHandler(clientHandler);
+ ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, 28_000));
+ connectFuture.awaitUninterruptibly();
+
+ final IoSession ioSession = connectFuture.getSession();
+
+ logger.info("missingMessageTest.begin with " + LOOP + " messages and " + THREAD + " threads");
+
+ for (int i = 1; i <= LOOP; i++) {
+ final String message = "Message:" + i;
+ executorService.submit(new Runnable() {
+
+ @Override
+ public void run() {
+
+ logger.info("missingMessageTest.client.write "+message);
+
+ final WriteFuture future = ioSession.write(message);
+ if (future != null) {
+ future.addListener(new IoFutureListener<WriteFuture>() {
+ @Override
+ public void operationComplete(WriteFuture writeFuture) {
+ if (!future.isWritten()) {
+ logger.error("writeFuture: " + writeFuture.getException());
+ }
+ }
+ });
+ }
+ }
+ });
+
+ }
+ logger.info("missingMessageTest.end");
+
+ int maxSleep = 5_000;
+ int time = 1000;
+ int sleep = 0;
+ while ((!clientHandler.isFinished() || !serverHandler.isFinished()) && maxSleep > sleep) {
+ sleep += time;
+ logger.info("missingMessageTest.sleep... " + sleep);
+ Thread.sleep(time);
+ }
+
+ logger.info("missingMessageTest.close");
+
+ ioSession.closeNow();
+ connector.dispose();
+ acceptor.dispose();
+
+ if (!serverHandler.isFinished()) {
+ Set<String> missingMessages = clientHandler.getMessages();
+ missingMessages.removeAll(serverHandler.getMessages());
+ logger.error("missing <" + missingMessages.size() + "> messages : " + missingMessages);
+ }
+
+ assertTrue(serverHandler.isFinished());
+ assertTrue(clientHandler.isFinished());
+ }
+
+ private static class ServerHandler extends IoHandlerAdapter {
+ private Set<String> messages = new HashSet<>(LOOP);
+ private AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public void messageReceived(IoSession session, Object message) throws Exception {
+
+ String messageString = (String) message;
+ count.incrementAndGet();
+
+ if (messages.contains(messageString)) {
+ logger.error("messageReceived: message <" + messageString + "> already received");
+ }
+ messages.add(messageString);
+
+ // logger.info("messageReceived: <"+message+">, count="+count);
+
+ if (isFinished()) {
+ logger.info("messageReceived: finish");
+ }
+
+ super.messageReceived(session, message);
+ }
+
+ public boolean isFinished() {
+ return count.get() == LOOP;
+ }
+
+ /**
+ * Get the messages.
+ *
+ * @return the messages
+ */
+ public Set<String> getMessages() {
+ return messages;
+ }
+ }
+
+ private static class ClientHandler extends IoHandlerAdapter {
+ private Set<String> messages = new HashSet<>(LOOP);
+ private AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public void messageSent(IoSession session, Object message) throws Exception {
+
+ logger.info("messageSent " + message);
+
+ count.incrementAndGet();
+ String messageString = (String) message;
+ if (messages.contains(messageString)) {
+ logger.error("messageSent: message <" + messageString + "> already sent");
+ }
+ messages.add(messageString);
+
+ // logger.info("messageSent: <"+message+">, count="+count);
+
+ if (isFinished()) {
+ logger.info("messageSent: finish");
+ }
+ super.messageSent(session, message);
+ }
+
+ public boolean isFinished() {
+ return count.get() == LOOP;
+ }
+
+ /**
+ * Get the messages.
+ *
+ * @return the messages
+ */
+ public Set<String> getMessages() {
+ return messages;
+ }
+ }
+}