SSL enhancements:

- new SSL related events: start handshake, complete handshake, secure closed-
- enhance SSL state machine
- add unit tests
diff --git a/core/src/main/java/org/apache/mina/api/AbstractIoFilter.java b/core/src/main/java/org/apache/mina/api/AbstractIoFilter.java
index b0f36ee..70f4bc2 100644
--- a/core/src/main/java/org/apache/mina/api/AbstractIoFilter.java
+++ b/core/src/main/java/org/apache/mina/api/AbstractIoFilter.java
@@ -73,4 +73,26 @@
     @Override
     public void messageSent(final IoSession session, final Object message) {
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handshakeStarted(IoSession session) {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handshakeCompleted(IoSession session) {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void secureClosed(IoSession session) {
+    }
+    
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/mina/api/AbstractIoHandler.java b/core/src/main/java/org/apache/mina/api/AbstractIoHandler.java
index 05c61af..5d9421a 100644
--- a/core/src/main/java/org/apache/mina/api/AbstractIoHandler.java
+++ b/core/src/main/java/org/apache/mina/api/AbstractIoHandler.java
@@ -87,4 +87,25 @@
         LOG.error("Unexpected exception, we close the session : ", cause);
         session.close(true);
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handshakeStarted(IoSession abstractIoSession) {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handshakeCompleted(IoSession session) {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void secureClosed(IoSession session) {
+    }
 }
diff --git a/core/src/main/java/org/apache/mina/api/IoFilter.java b/core/src/main/java/org/apache/mina/api/IoFilter.java
index 7e53ab6..9dcb8fb 100644
--- a/core/src/main/java/org/apache/mina/api/IoFilter.java
+++ b/core/src/main/java/org/apache/mina/api/IoFilter.java
@@ -75,4 +75,26 @@
      * @param message the incoming message to process
      */
     void messageSent(IoSession session, Object message);
+    
+    /**
+     * Invoked when a secure handshake has been started.
+     * 
+     * @param session {@link IoSession} associated with the invocation
+     */
+    void handshakeStarted(IoSession session);
+    
+    /**
+     * Invoked when a secure handshake has been completed.
+     * 
+     * @param session {@link IoSession} associated with the invocation
+     */
+    void handshakeCompleted(IoSession session);
+
+    /**
+     * Invoked when a secure context has been closed.
+     * 
+     * @param session {@link IoSession} associated with the invocation
+     */
+    void secureClosed(IoSession session);
+
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/mina/api/IoHandler.java b/core/src/main/java/org/apache/mina/api/IoHandler.java
index b3a6acf..1c214fd 100644
--- a/core/src/main/java/org/apache/mina/api/IoHandler.java
+++ b/core/src/main/java/org/apache/mina/api/IoHandler.java
@@ -92,4 +92,25 @@
      * @param cause the caught exception
      */
     void exceptionCaught(IoSession session, Exception cause);
+    
+    /**
+     * Invoked for secured session when the handshake has been started. May be called
+     * several times for a single session in case of rehandshake.
+     * @param session {@link IoSession} associated with the invocation
+     */
+    void handshakeStarted(IoSession abstractIoSession);
+
+    /**
+     * Invoked for secured session when the handshake has been completed. May be called
+     * several times for a single session in case of rehandshake.
+     * @param session {@link IoSession} associated with the invocation
+     */
+    void handshakeCompleted(IoSession session);
+    
+    /**
+     * Invoked for secured session when underlying SSL/TLS session has been closed.
+     * @param session {@link IoSession} associated with the invocation
+     */
+    void secureClosed(IoSession session);
+
 }
diff --git a/core/src/main/java/org/apache/mina/service/executor/EventVisitor.java b/core/src/main/java/org/apache/mina/service/executor/EventVisitor.java
index 1e8ecc3..6ece82c 100644
--- a/core/src/main/java/org/apache/mina/service/executor/EventVisitor.java
+++ b/core/src/main/java/org/apache/mina/service/executor/EventVisitor.java
@@ -35,4 +35,10 @@
     void visit(SentEvent event);
 
     void visit(IdleEvent event);
+    
+    void visit(HandshakeStartedEvent event);
+
+    void visit(HandshakeCompletedEvent handshakeCompletedEvent);
+
+    void visit(SecureClosedEvent secureClosedEvent);
 }
diff --git a/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java b/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
index 47fdb42..f0f8095 100644
--- a/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
+++ b/core/src/main/java/org/apache/mina/service/executor/HandlerCaller.java
@@ -81,4 +81,34 @@
             session.getService().getIoHandler().exceptionCaught(session, e);
         }
     }
+
+    @Override
+    public void visit(HandshakeStartedEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().handshakeStarted(session);
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+    }
+
+    @Override
+    public void visit(HandshakeCompletedEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().handshakeCompleted(session);
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+    }
+
+    @Override
+    public void visit(SecureClosedEvent event) {
+        IoSession session = event.getSession();
+        try {
+            session.getService().getIoHandler().secureClosed(session);
+        } catch (Exception e) {
+            session.getService().getIoHandler().exceptionCaught(session, e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/mina/service/executor/HandshakeCompletedEvent.java b/core/src/main/java/org/apache/mina/service/executor/HandshakeCompletedEvent.java
new file mode 100644
index 0000000..b9bdbf9
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/service/executor/HandshakeCompletedEvent.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Founimport org.apache.mina.api.IoSession;
+tributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+import org.apache.mina.api.IoSession;
+
+/**
+ * An {@link IoSession} handshake started {@link Event}
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class HandshakeCompletedEvent implements Event {
+    private final IoSession session;
+
+    public HandshakeCompletedEvent(final IoSession session) {
+        this.session = session;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IoSession getSession() {
+        return session;
+    }
+
+    @Override
+    public void visit(EventVisitor visitor) {
+        visitor.visit(this);
+    }
+}
diff --git a/core/src/main/java/org/apache/mina/service/executor/HandshakeStartedEvent.java b/core/src/main/java/org/apache/mina/service/executor/HandshakeStartedEvent.java
new file mode 100644
index 0000000..f5ca879
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/service/executor/HandshakeStartedEvent.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Founimport org.apache.mina.api.IoSession;
+tributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+import org.apache.mina.api.IoSession;
+
+/**
+ * An {@link IoSession} handshake started {@link Event}
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class HandshakeStartedEvent implements Event {
+    private final IoSession session;
+
+    public HandshakeStartedEvent(final IoSession session) {
+        this.session = session;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IoSession getSession() {
+        return session;
+    }
+
+    @Override
+    public void visit(EventVisitor visitor) {
+        visitor.visit(this);
+    }
+}
diff --git a/core/src/main/java/org/apache/mina/service/executor/SecureClosedEvent.java b/core/src/main/java/org/apache/mina/service/executor/SecureClosedEvent.java
new file mode 100644
index 0000000..166c6c6
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/service/executor/SecureClosedEvent.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Founimport org.apache.mina.api.IoSession;
+tributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.service.executor;
+
+import org.apache.mina.api.IoSession;
+
+/**
+ * An {@link IoSession} secure closed {@link Event}
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class SecureClosedEvent implements Event {
+    private final IoSession session;
+
+    public SecureClosedEvent(final IoSession session) {
+        this.session = session;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IoSession getSession() {
+        return session;
+    }
+
+    @Override
+    public void visit(EventVisitor visitor) {
+        visitor.visit(this);
+    }
+}
diff --git a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
index 1504c13..5ec8cc2 100644
--- a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
+++ b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
@@ -40,10 +40,13 @@
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
 import org.apache.mina.service.executor.CloseEvent;
+import org.apache.mina.service.executor.HandshakeCompletedEvent;
+import org.apache.mina.service.executor.HandshakeStartedEvent;
 import org.apache.mina.service.executor.IdleEvent;
 import org.apache.mina.service.executor.IoHandlerExecutor;
 import org.apache.mina.service.executor.OpenEvent;
 import org.apache.mina.service.executor.ReceiveEvent;
+import org.apache.mina.service.executor.SecureClosedEvent;
 import org.apache.mina.service.executor.SentEvent;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.transport.nio.SelectorLoop;
@@ -330,14 +333,8 @@
      */
     @Override
     public boolean isSecured() {
-        return secured;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void setSecured(boolean secured) {
-        this.secured = secured;
+        SslHelper helper = attributes.getAttribute(SSL_HELPER);
+        return helper != null && helper.isActive();
     }
 
     /**
@@ -347,9 +344,7 @@
     public void initSecure(SSLContext sslContext) {
         SslHelper sslHelper = new SslHelper(this, sslContext);
         sslHelper.init();
-
         attributes.setAttribute(SSL_HELPER, sslHelper);
-        setSecured(true);
     }
 
     /**
@@ -765,6 +760,98 @@
 
     }
 
+    public void processHandshakeStarted() {
+        if (IS_DEBUG) {
+            LOG.debug("processing handshake started event for session {}", this);
+        }
+
+        try {
+            int size = chain.length;
+
+            for (int i = size - 1; i >= 0; i--) {
+                chain[i].handshakeStarted(this);
+            }
+
+            IoHandler handler = getService().getIoHandler();
+
+            if (handler != null) {
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new HandshakeStartedEvent(this));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.handshakeStarted(this);
+                }
+            }
+        } catch (RuntimeException e) {
+            processException(e);
+        }
+
+    }
+
+    public void processHandshakeCompleted() {
+        if (IS_DEBUG) {
+            LOG.debug("processing handshake completed event for session {}", this);
+        }
+
+        try {
+            int size = chain.length;
+
+            for (int i = size - 1; i >= 0; i--) {
+                chain[i].handshakeCompleted(this);
+            }
+
+            IoHandler handler = getService().getIoHandler();
+
+            if (handler != null) {
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new HandshakeCompletedEvent(this));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.handshakeCompleted(this);
+                }
+            }
+        } catch (RuntimeException e) {
+            processException(e);
+        }
+
+    }
+
+    public void processSecureClosed() {
+        if (IS_DEBUG) {
+            LOG.debug("processing secure closed event for session {}", this);
+        }
+
+        try {
+            int size = chain.length;
+
+            for (int i = size - 1; i >= 0; i--) {
+                chain[i].secureClosed(this);
+            }
+
+            IoHandler handler = getService().getIoHandler();
+
+            if (handler != null) {
+                IoHandlerExecutor executor = getService().getIoHandlerExecutor();
+
+                if (executor != null) {
+                    // asynchronous event
+                    executor.execute(new SecureClosedEvent(this));
+                } else {
+                    // synchronous call (in the I/O loop)
+                    handler.secureClosed(this);
+                }
+            }
+        } catch (RuntimeException e) {
+            processException(e);
+        }
+    }
+
     /**
      * process session message received event using the filter chain. To be called by the session {@link SelectorLoop} .
      * 
diff --git a/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java b/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java
index 3269efd..2e0b5a8 100644
--- a/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java
+++ b/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java
@@ -38,6 +38,12 @@
 
     /** the future to complete when this message is written */
     private IoFuture<Void> future;
+    
+    /**
+     * The secure internal flag that tells if the message must be encrypted
+     * when sent (false) or not (true)
+     */
+    private boolean secureInternal = false;
 
     /**
      * Creates a new instance of a WriteRequest, storing the message as it was
@@ -142,4 +148,14 @@
 
         return sb.toString();
     }
+
+    @Override
+    public boolean isSecureInternal() {
+        return secureInternal;
+    }
+
+    @Override
+    public void setSecureInternal(boolean secureInternal) {
+        this.secureInternal = secureInternal;        
+    }
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/mina/session/WriteRequest.java b/core/src/main/java/org/apache/mina/session/WriteRequest.java
index b525892..cb43354 100644
--- a/core/src/main/java/org/apache/mina/session/WriteRequest.java
+++ b/core/src/main/java/org/apache/mina/session/WriteRequest.java
@@ -69,4 +69,19 @@
      * @param the future
      */
     void setFuture(IoFuture<Void> future);
+    
+    /**
+     * Get the flag that tells that the underlying message is an internal one,
+     * not needed to be encrypted
+     * @return the internal secure flag of the message
+     */
+    boolean isSecureInternal();
+    
+
+    /**
+     * Set the flag that tells that the underlying message is an internal one,
+     * not needed to be encrypted
+     * @param secureInternal the secure internal flag
+     */
+    void setSecureInternal(boolean secureInternal);
 }
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java b/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java
index c66fd1f..41a97b0 100644
--- a/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java
+++ b/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java
@@ -26,6 +26,9 @@
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.net.ssl.SSLException;
+
+import org.apache.mina.api.IoClient;
 import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSession;
@@ -40,7 +43,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Common ancestor for NIO based {@link IoSession} implmentation.
+ * Common ancestor for NIO based {@link IoSession} implementation.
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
@@ -99,6 +102,18 @@
         }
     };
 
+    @Override
+    public void processSessionOpen() {
+        super.processSessionOpen();
+        try {
+            if (isSecured() && getService() instanceof IoClient) {
+                getAttribute(SSL_HELPER).beginHandshake();
+            }
+        } catch (IOException e) {
+            processException(e);
+        }
+    }
+
     /**
      * {@inheritDoc}
      */
@@ -114,6 +129,16 @@
                 channelClose();
                 processSessionClosed();
             } else {
+                if (isSecured()) {
+                    SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+                    if (sslHelper != null) {
+                        try {
+                            sslHelper.close();
+                        } catch (IOException e) {
+                            processException(e);
+                        }
+                    }
+                }
                 // flush this session the flushing code will close the session
                 flushWriteQueue();
             }
@@ -146,7 +171,7 @@
             LOG.debug("enqueueWriteRequest {}", writeRequest);
         }
 
-        if (isConnectedSecured()) {
+        if (isSecured()) {
             // SSL/TLS : we have to encrypt the message
             SslHelper sslHelper = getAttribute(SSL_HELPER, null);
 
@@ -154,10 +179,12 @@
                 throw new IllegalStateException();
             }
 
-            writeRequest = sslHelper.processWrite(this, writeRequest.getMessage(), writeQueue);
+            if (!writeRequest.isSecureInternal()) {
+                writeRequest = sslHelper.processWrite(this, writeRequest.getMessage(), writeQueue);
+            }
         }
 
-        /*synchronized (writeQueue)*/{
+        if (writeRequest != null) {
             ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
 
             if (writeQueue.isEmpty()) {
@@ -214,6 +241,9 @@
 
                 // We have to push the request on the writeQueue
                 writeQueue.add(writeRequest);
+                if (!registeredForWrite.getAndSet(true)) {
+                    flushWriteQueue();
+                }
             }
         }
 
diff --git a/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java b/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java
index 889a65f..83cce1b 100644
--- a/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java
+++ b/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java
@@ -21,14 +21,17 @@
 
 import static org.apache.mina.session.AttributeKey.createKey;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLSession;
 
@@ -58,7 +61,31 @@
 
     /** The current session */
     private final IoSession session;
+    
+    /**
+     * The internal secure state of the session.
+     * CREDENTIALS_NOT_YET_AVAILABLE: the session is currently handskaking, application messages
+     * will be queued before being encrypted and sent.
+     * CREDENTAILS_AVAILABLE: the session has completed handshake, application messages
+     * can be encrypted and sent as they are submitted.
+     * NO_CREDENTIALS: secure credentials are removed from the session, application messages
+     * are not encrypted anymore.
+     * 
+     */
+    enum State {
+        CREDENTIALS_NOT_YET_AVAILABLE,
+        CREDENTAILS_AVAILABLE,
+        NO_CREDENTIALS
+    }
 
+    private State state = State.CREDENTIALS_NOT_YET_AVAILABLE;
+    
+    /**
+     * The list of applications messages queued because submitted while the initial handshake was
+     * not yet finished.
+     */
+    private ConcurrentLinkedQueue<WriteRequest> messages = new ConcurrentLinkedQueue<WriteRequest>();
+    
     /**
      * A session attribute key that should be set to an {@link InetSocketAddress}. Setting this attribute causes
      * {@link SSLContext#createSSLEngine(String, int)} to be called passing the hostname and port of the
@@ -105,9 +132,22 @@
         return sslEngine;
     }
 
+    /**
+     * Return the state (credentials state) of the session.
+     * 
+     * @return the credentials state
+     */
+    State getState() {
+        return state;
+    }
+    
     boolean isHanshaking() {
         return sslEngine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING;
     }
+    
+    public boolean isActive() {
+        return state != State.NO_CREDENTIALS;
+    }
 
     /**
      * Initialize the SSL handshake.
@@ -119,7 +159,7 @@
             return;
         }
 
-        LOGGER.debug("{} Initializing the SSL Helper", session);
+        LOGGER.debug("{} Initializing the SSLEngine", session);
 
         InetSocketAddress peer = session.getAttribute(PEER_ADDRESS, null);
 
@@ -217,7 +257,7 @@
             case NOT_HANDSHAKING:
             case FINISHED:
                 result = sslEngine.unwrap(tempBuffer, appBuffer);
-                handshakeStatus = result.getHandshakeStatus();
+                processResult(session, handshakeStatus, result);
 
                 switch (result.getStatus()) {
                 case BUFFER_UNDERFLOW:
@@ -233,6 +273,14 @@
                         appBuffer.flip();
                         session.processMessageReceived(appBuffer);
                     }
+                    break;
+                case CLOSED:
+                    break;
+                }
+                if (sslEngine != null) {
+                    handshakeStatus = sslEngine.getHandshakeStatus();
+                } else {
+                    done = true;
                 }
                 break;
             case NEED_TASK:
@@ -245,7 +293,7 @@
                 break;
             case NEED_WRAP:
                 result = sslEngine.wrap(EMPTY_BUFFER, appBuffer);
-                handshakeStatus = result.getHandshakeStatus();
+                processResult(session, handshakeStatus, result);
                 switch (result.getStatus()) {
                 case BUFFER_OVERFLOW:
                     appBuffer = ByteBuffer.allocateDirect(appBuffer.capacity() * 2);
@@ -256,11 +304,20 @@
                 case CLOSED:
                 case OK:
                     appBuffer.flip();
-                    WriteRequest writeRequest = new DefaultWriteRequest(readBuffer);
+                    WriteRequest writeRequest = new DefaultWriteRequest(appBuffer);
                     writeRequest.setMessage(appBuffer);
+                    writeRequest.setSecureInternal(true);
                     session.enqueueWriteRequest(writeRequest);
                     break;
                 }
+                if (sslEngine != null) {
+                    handshakeStatus = sslEngine.getHandshakeStatus();
+                } else {
+                    done = true;
+                }
+            }
+            if (handshakeStatus == HandshakeStatus.FINISHED) {
+                state = State.CREDENTAILS_AVAILABLE;
             }
         }
         if (tempBuffer.remaining() > 0) {
@@ -272,7 +329,61 @@
     }
 
     /**
-     * Process the application data encryption for a session.
+     * Process the close event from the SSL engine. If the closed event has not been
+     * processed, then send an event.
+     * 
+     * @param session the {@link AbstractIoSession} MINA internal IO session
+     */
+    void switchToNoSecure(AbstractIoSession session) {
+        if (state != State.NO_CREDENTIALS) {
+            session.processSecureClosed();
+            state = State.NO_CREDENTIALS;
+            sslEngine = null;
+        }
+    }
+    
+    /**
+     * Process the session handshake status and the last operation result in order to
+     * update the internal state and propagate handshake related events.
+     * 
+     * @param session the {@link AbstractIoSession} MINA internal IO session
+     * @param sessionStatus the last session handshake status
+     * @param operationStatus the returned operation status
+     */
+    private void processResult(AbstractIoSession session, HandshakeStatus sessionStatus, SSLEngineResult result) {
+        LOGGER.debug("handshake status:" + sessionStatus + " engine result:" + result);
+        switch (sessionStatus) {
+        case NEED_TASK:
+        case NEED_UNWRAP:
+        case NEED_WRAP:
+            if (result.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+                state = State.CREDENTAILS_AVAILABLE;
+                session.processHandshakeCompleted();
+                for(WriteRequest request : messages) {
+                    session.enqueueWriteRequest(request);
+                }
+                messages.clear();
+            }
+            if (result.getStatus() == Status.CLOSED) {
+                switchToNoSecure(session);
+            }
+            break;
+        case FINISHED:
+        case NOT_HANDSHAKING:
+            if (result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                session.processHandshakeStarted();
+            }
+            break;
+        }
+    }
+
+    /**
+     * Process the application data encryption for a session. As the SSLEngine is record
+     * oriented, then depending on the message size, this may lead to several encrypted
+     * messages to be generated. So, if n messages are generated, the first n-1 will
+     * be queued and the last one will be returned. It will be automatically added
+     * to the end of the queue by the called because a non empty queue will be
+     * detected.
      * 
      * @param session The session sending encrypted data to the peer.
      * @param message The message to encrypt
@@ -280,35 +391,69 @@
      * @return The written WriteRequest
      */
     /** No qualifier */
-    WriteRequest processWrite(IoSession session, Object message, Queue<WriteRequest> writeQueue) {
-        ByteBuffer buf = (ByteBuffer) message;
-        ByteBuffer appBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+    WriteRequest processWrite(AbstractIoSession session, Object message, Queue<WriteRequest> writeQueue) {
+        WriteRequest request = null;
+        
+        switch (state) {
+        case CREDENTAILS_AVAILABLE:
+            ByteBuffer buf = (ByteBuffer) message;
+            ByteBuffer appBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+            try {
+                boolean done = false;
+                while (!done) {
+                    // Encrypt the message
+                    SSLEngineResult result = sslEngine.wrap(buf, appBuffer);
 
-        try {
-            while (true) {
-                // Encrypt the message
-                SSLEngineResult result = sslEngine.wrap(buf, appBuffer);
+                    switch (result.getStatus()) {
+                    case BUFFER_OVERFLOW:
+                        // Increase the buffer size as needed
+                        appBuffer = ByteBuffer.allocate(appBuffer.capacity() + 4096);
+                        break;
+                    case CLOSED:
+                        switchToNoSecure(session);
+                        done = true;
+                        break;
 
-                switch (result.getStatus()) {
-                case BUFFER_OVERFLOW:
-                    // Increase the buffer size as needed
-                    appBuffer = ByteBuffer.allocate(appBuffer.capacity() + 4096);
-                    break;
-
-                case BUFFER_UNDERFLOW:
-                case CLOSED:
-                    break;
-
-                case OK:
-                    // We are done. Flip the buffer and push it to the write queue.
-                    appBuffer.flip();
-                    WriteRequest request = new DefaultWriteRequest(appBuffer);
-
-                    return request;
+                    case BUFFER_UNDERFLOW:
+                    case OK:
+                        // We are done. Flip the buffer and push it to the write queue.
+                        appBuffer.flip();
+                        done = buf.remaining() == 0;
+                        if (done) {
+                            request = new DefaultWriteRequest(appBuffer);
+                        } else {
+                            writeQueue.offer(new DefaultWriteRequest(appBuffer));
+                            appBuffer = ByteBuffer.allocateDirect(appBuffer.capacity());
+                        }
+                        break;
+                    }
                 }
+            } catch (SSLException se) {
+                throw new IllegalStateException(se.getMessage());
             }
-        } catch (SSLException se) {
-            throw new IllegalStateException(se.getMessage());
+            break;
+        case CREDENTIALS_NOT_YET_AVAILABLE:
+            messages.add(new DefaultWriteRequest(message));
+            break;
+        case NO_CREDENTIALS:
+            request = new DefaultWriteRequest(message);
+            break;
+        }
+        return request;
+    }
+
+    public void beginHandshake() throws IOException {
+        if (sslEngine != null) {
+            ((AbstractIoSession)session).processHandshakeStarted();
+            sslEngine.beginHandshake();
+            processRead((AbstractIoSession) session, EMPTY_BUFFER);
+        }
+    }
+    
+    public void close() throws IOException {
+        if (sslEngine != null) {
+            sslEngine.closeOutbound();
+            processRead((AbstractIoSession) session, EMPTY_BUFFER);
         }
     }
 }
diff --git a/core/src/test/java/org/apache/mina/transport/nio/SslTest.java b/core/src/test/java/org/apache/mina/transport/nio/SslTest.java
index fad8428..ad4cb1c 100644
--- a/core/src/test/java/org/apache/mina/transport/nio/SslTest.java
+++ b/core/src/test/java/org/apache/mina/transport/nio/SslTest.java
@@ -19,24 +19,40 @@
  */
 package org.apache.mina.transport.nio;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.WriteAbortedException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.Security;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.net.SocketFactory;
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.net.ssl.HandshakeCompletedListener;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManagerFactory;
 
@@ -45,6 +61,7 @@
 import org.apache.mina.transport.nio.NioTcpServer;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test a SSL session where the connection is established and closed twice. It should be
@@ -72,36 +89,54 @@
     }
 
     private static class TestHandler extends AbstractIoHandler {
+        private String data = "";
+        private boolean second = false;
         public void messageReceived(IoSession session, Object message) {
             String line = Charset.defaultCharset().decode((ByteBuffer) message).toString();
-
-            if (line.startsWith("hello")) {
-                System.out.println("Server got: 'hello', waiting for 'send'");
-            } else if (line.startsWith("send")) {
-                System.out.println("Server got: 'send', sending 'data'");
+            data += line;
+            if (!second && data.startsWith("hello")) {
+                second = true;
+            } else if (second && data.contains("send")) {
                 session.write(Charset.defaultCharset().encode("data\n"));
+                data = "";
+                second = false;
             }
         }
     }
+    
+    private static enum Client {
+        JDK,
+        MINA_BEFORE_HANDSHAKE,
+        MINA_AFTER_HANDSHAKE;
+    }
 
     /**
      * Starts a Server with the SSL Filter and a simple text line 
      * protocol codec filter
      */
-    private static int startServer() throws Exception {
+    private static NioTcpServer startServer(AbstractIoHandler handler) throws Exception {
         NioTcpServer server = new NioTcpServer();
 
         server.setReuseAddress(true);
         server.getSessionConfig().setSslContext(createSSLContext());
-        server.setIoHandler(new TestHandler());
+        server.setIoHandler(handler);
         server.bind(new InetSocketAddress(0));
-        return server.getServerSocketChannel().socket().getLocalPort();
+        return server;
+    }
+    
+    private static NioTcpClient startClient(AbstractIoHandler handler, int port) throws Exception {
+        NioTcpClient client = new NioTcpClient();
+        
+        client.getSessionConfig().setSslContext(createSSLContext());
+        client.setIoHandler(handler);
+        client.connect(new InetSocketAddress("localhost", port));
+        return client;
     }
 
     /**
      * Starts a client which will connect twice using SSL
      */
-    private static void startClient(int port) throws Exception {
+    private static void startJDKClient(int port) throws Exception {
         address = InetAddress.getByName("localhost");
 
         SSLContext context = createSSLContext();
@@ -123,7 +158,7 @@
         socket.setSoTimeout(10000);
 
         System.out.println("Client sending: send");
-        socket.getOutputStream().write("send\n".getBytes());
+        socket.getOutputStream().write("send                       \n".getBytes());
         socket.getOutputStream().flush();
 
         BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
@@ -154,14 +189,14 @@
     }
 
     @Test
-    @Ignore("check for fragmentation")
+    //@Ignore("check for fragmentation")
     public void testSSL() throws Exception {
-        final int port = startServer();
+        final NioTcpServer server  = startServer(new TestHandler());
 
         Thread t = new Thread() {
             public void run() {
                 try {
-                    startClient(port);
+                    startJDKClient(server.getServerSocketChannel().socket().getLocalPort());
                 } catch (Exception e) {
                     clientError = e;
                 }
@@ -169,21 +204,147 @@
         };
         t.start();
         t.join();
+        server.unbind();
         if (clientError != null)
             throw clientError;
     }
 
     @Test
-    public void testBigMessage() throws IOException, GeneralSecurityException, InterruptedException {
-        final CountDownLatch counter = new CountDownLatch(1);
-        NioTcpServer server = new NioTcpServer();
-        final int messageSize = 1 * 1024 * 1024;
+    public void checkThatSecureEventsArePropagatedServerSide() throws Exception {
+        final AtomicInteger startHandshakeCount = new AtomicInteger();
+        final AtomicInteger completedHandshakeCount = new AtomicInteger();
+        final AtomicInteger secureClosedCount = new AtomicInteger();
+        final CountDownLatch closedCount = new CountDownLatch(1);
+        final NioTcpServer server = startServer(new AbstractIoHandler() {
 
-        /*
-         * Server
-         */
+            @Override
+            public void handshakeStarted(IoSession abstractIoSession) {
+                startHandshakeCount.incrementAndGet();
+            }
+
+            @Override
+            public void handshakeCompleted(IoSession session) {
+                completedHandshakeCount.incrementAndGet();
+            }
+
+            @Override
+            public void secureClosed(IoSession session) {
+                secureClosedCount.incrementAndGet();
+            }
+
+            @Override
+            public void sessionClosed(IoSession session) {
+                closedCount.countDown();
+            }
+        });
+        SSLSocketFactory factory = createSSLContext().getSocketFactory();
+        SSLSocket s = (SSLSocket) factory.createSocket("localhost", server.getServerSocketChannel().socket().getLocalPort());
+        s.startHandshake();
+        s.close();
+        assertTrue(closedCount.await(10, TimeUnit.SECONDS));
+        assertEquals(1, startHandshakeCount.get());
+        assertEquals(1, completedHandshakeCount.get());
+        assertEquals(1, secureClosedCount.get());
+    }
+    
+    @Test
+    public void checkThatSecureEventsArePropagatedServerSideWithSecondHandshake() throws Exception {
+        final CountDownLatch closeCount = new CountDownLatch(1);
+        final AtomicInteger startHandshakeCount = new AtomicInteger();
+        final AtomicInteger completedHandshakeCount = new AtomicInteger();
+        final AtomicInteger secureClosedCount = new AtomicInteger();
+        final NioTcpServer server = startServer(new AbstractIoHandler() {
+
+            @Override
+            public void handshakeStarted(IoSession abstractIoSession) {
+                startHandshakeCount.incrementAndGet();
+            }
+
+            @Override
+            public void handshakeCompleted(IoSession session) {
+                completedHandshakeCount.incrementAndGet();
+            }
+
+            @Override
+            public void secureClosed(IoSession session) {
+                secureClosedCount.incrementAndGet();
+            }
+
+            @Override
+            public void sessionClosed(IoSession session) {
+                closeCount.countDown();
+            }
+        });
+        SSLSocketFactory factory = createSSLContext().getSocketFactory();
+        SSLSocket s = (SSLSocket) factory.createSocket("localhost", server.getServerSocketChannel().socket().getLocalPort());
+        final AtomicInteger handskaheCounter = new AtomicInteger();
+        s.addHandshakeCompletedListener(new HandshakeCompletedListener() {
+            @Override
+            public void handshakeCompleted(HandshakeCompletedEvent event) {
+                final int count = handskaheCounter.getAndIncrement();
+                if (count == 0) {
+                    try {
+                        event.getSocket().startHandshake();
+                        event.getSocket().setSoTimeout(5000);
+                        event.getSocket().getInputStream().read();
+                    }
+                    catch (IOException e) {}
+                } else {
+                    try {
+                        event.getSocket().close();
+                    }
+                    catch (IOException e) {}
+                }
+            }
+        });
+        s.startHandshake();
+        assertTrue(closeCount.await(10, TimeUnit.SECONDS));
+        assertEquals(2, startHandshakeCount.get());
+        assertEquals(2, completedHandshakeCount.get());
+        assertEquals(1, secureClosedCount.get());
+        server.unbind();
+    }
+
+    @Test
+    public void checkThatSecureEventsArePropagatedClientSide() throws Exception {
+        final AtomicInteger startHandshakeCount = new AtomicInteger();
+        final AtomicInteger completedHandshakeCount = new AtomicInteger();
+        final AtomicInteger secureClosedCount = new AtomicInteger();
+        final CountDownLatch closeCount = new CountDownLatch(1);
+        final NioTcpServer server = startServer(new AbstractIoHandler() {});
+        final NioTcpClient client = startClient(new AbstractIoHandler() {
+            @Override
+            public void handshakeStarted(IoSession abstractIoSession) {
+                startHandshakeCount.incrementAndGet();
+            }
+
+            @Override
+            public void handshakeCompleted(IoSession session) {
+                completedHandshakeCount.incrementAndGet();
+                session.close(false);
+            }
+
+            @Override
+            public void secureClosed(IoSession session) {
+                secureClosedCount.incrementAndGet();
+            }
+
+            @Override
+            public void sessionClosed(IoSession session) {
+                closeCount.countDown();
+            }
+        }, server.getServerSocketChannel().socket().getLocalPort());
+        assertTrue(closeCount.await(10, TimeUnit.SECONDS));
+        assertEquals(1, startHandshakeCount.get());
+        assertEquals(1, completedHandshakeCount.get());
+        assertEquals(1, secureClosedCount.get());
+    }
+
+    private static NioTcpServer createReceivingServer(final int size, final CountDownLatch counter, final OutputStream stream) throws IOException, GeneralSecurityException {
+        NioTcpServer server = new NioTcpServer();
         server.setReuseAddress(true);
         server.getSessionConfig().setSslContext(createSSLContext());
+        final WritableByteChannel channel = (stream!=null)?Channels.newChannel(stream):null;
         server.setIoHandler(new AbstractIoHandler() {
             private int receivedSize = 0;
 
@@ -193,22 +354,130 @@
             @Override
             public void messageReceived(IoSession session, Object message) {
                 receivedSize += ((ByteBuffer) message).remaining();
-                if (receivedSize == messageSize) {
+                if (channel != null) {
+                    try {
+                        channel.write((ByteBuffer) message);
+                    } catch (IOException e) {
+                        exceptionCaught(session, e);
+                    }
+                }
+                if (receivedSize == size) {
                     counter.countDown();
+                    if (channel != null) {
+                        try {
+                            channel.close();
+                        } catch (IOException e) {
+                            exceptionCaught(session, e);
+                        }
+                    }
                 }
             }
         });
         server.bind(new InetSocketAddress(0));
-        int port = server.getServerSocketChannel().socket().getLocalPort();
+        return server;
+    }
+    
+    protected void testMessage(final int size, final Client clientType) throws IOException, GeneralSecurityException, InterruptedException {
+        final CountDownLatch counter = new CountDownLatch(1);
+        final byte[] message = new byte[size];
+        new Random().nextBytes(message);
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            /*
+                 * Server
+                 */
+            NioTcpServer server = createReceivingServer(size, counter, bos);
+            try {
+                int port = server.getServerSocketChannel().socket().getLocalPort();
+                /*
+                 * Client
+                 */
+                if (clientType == Client.JDK) {
+                    Socket socket = server.getSessionConfig().getSslContext().getSocketFactory()
+                            .createSocket("localhost", port);
+                    socket.getOutputStream().write(message);
+                    socket.getOutputStream().flush();
+                    socket.close();
+                } else {
+                    NioTcpClient client = new NioTcpClient();
+                    client.setIoHandler(new AbstractIoHandler() {
+                        int sendSize = 0;
+                        
+                        @Override
+                        public void sessionOpened(IoSession session) {
+                            if (clientType == Client.MINA_BEFORE_HANDSHAKE) {
+                                session.write(ByteBuffer.wrap(message));
+                            }
+                        }
 
-        /*
-         * Client
-         */
-        Socket socket = server.getSessionConfig().getSslContext().getSocketFactory().createSocket("localhost", port);
-        socket.getOutputStream().write(new byte[messageSize]);
-        socket.getOutputStream().flush();
-        socket.close();
-        assertTrue(counter.await(10, TimeUnit.SECONDS));
+                        @Override
+                        public void handshakeCompleted(IoSession session) {
+                            if (clientType == Client.MINA_AFTER_HANDSHAKE) {
+                                session.write(ByteBuffer.wrap(message));
+                            }
+                        }
 
+                        @Override
+                        public void messageSent(IoSession session, Object message) {
+                            sendSize += ((ByteBuffer)message).capacity();
+                        }
+                    });
+                    client.getSessionConfig().setSslContext(createSSLContext());
+                    client.connect(new InetSocketAddress(port));
+                    
+                }
+                assertTrue(counter.await(10, TimeUnit.MINUTES));
+                assertArrayEquals(message, bos.toByteArray());
+            } finally {
+                server.unbind();
+            }
+        } finally {
+            bos.close();
+        }
+    }
+    
+    @Test
+    public void testSingleByteMessageWithJDKClient() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1, Client.JDK);
+    }
+    
+    @Test
+    public void testSingleByteMessageWithMINAClientAfterHandkhake() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1, Client.MINA_AFTER_HANDSHAKE);
+    }
+    
+    @Test
+    public void testSingleByteMessageWithMINAClientBeforeHandkhake() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1, Client.MINA_BEFORE_HANDSHAKE);
+    }
+
+    @Test
+    public void test1KMessageWithJDKClient() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1024, Client.JDK);
+    }
+    
+    @Test
+    public void test1KMessageWithMINAClientAfterHandskahe() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1024, Client.MINA_AFTER_HANDSHAKE);
+    }
+    
+    @Test
+    public void test1KMessageWithMINAClientBeforeHandskahe() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1024, Client.MINA_BEFORE_HANDSHAKE);
+    }
+    
+    @Test
+    public void test1MMessageWithJDKClient() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1024 * 1024, Client.JDK);
+    }
+    
+    @Test
+    public void test1MMessageWithMINAClientAfterHandshake() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1024 * 1024, Client.MINA_AFTER_HANDSHAKE);
+    }
+    
+    @Test
+    public void test1MMessageWithMINAClientBeforeHandshake() throws IOException, GeneralSecurityException, InterruptedException {
+        testMessage(1024 * 1024, Client.MINA_BEFORE_HANDSHAKE);
     }
 }