Merge from trunk

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1655057 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 2e7f3ee..6587bc7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -64,12 +64,12 @@
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public abstract class AbstractExchange<T extends AbstractExchange<T>>
         extends AbstractConfiguredObject<T>
@@ -510,7 +510,7 @@
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+            throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
         List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
@@ -910,4 +910,5 @@
             this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
         }
     }
+
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
deleted file mode 100644
index be3a13d..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class BytesOnlyCreditManager extends AbstractFlowCreditManager
-{
-    private final AtomicLong _bytesCredit;
-
-    public BytesOnlyCreditManager(long initialCredit)
-    {
-        _bytesCredit = new AtomicLong(initialCredit);
-    }
-
-    public long getMessageCredit()
-    {
-        return -1L;
-    }
-
-    public long getBytesCredit()
-    {
-        return _bytesCredit.get();
-    }
-
-    public void restoreCredit(long messageCredit, long bytesCredit)
-    {
-        _bytesCredit.addAndGet(bytesCredit);
-        setSuspended(false);
-    }
-
-    public void removeAllCredit()
-    {
-        _bytesCredit.set(0L);
-    }
-
-    public boolean hasCredit()
-    {
-        return _bytesCredit.get() > 0L;
-    }
-
-    public boolean useCreditForMessage(long msgSize)
-    {
-        if(hasCredit())
-        {
-            if(_bytesCredit.addAndGet(-msgSize) >= 0)
-            {
-                return true;
-            }
-            else
-            {
-                _bytesCredit.addAndGet(msgSize);
-                setSuspended(true);
-                return false;
-            }
-        }
-        else
-        {
-            return false;
-        }
-
-    }
-
-    public void setBytesCredit(long bytesCredit)
-    {
-        _bytesCredit.set( bytesCredit );
-    }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index 280f285..08aac0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -24,10 +24,6 @@
 
 public interface FlowCreditManager
 {
-    long getMessageCredit();
-
-    long getBytesCredit();
-
     public static interface FlowCreditManagerListener
     {
         void creditStateChanged(boolean hasCredit);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
deleted file mode 100644
index 31c1fda..0000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.flow;
-
-
-public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
-{
-    private long _messageCredit;
-    private long _bytesCredit;
-
-    public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
-    {
-        _messageCredit = messageCredit;
-        _bytesCredit = bytesCredit;
-    }
-
-    public synchronized long getMessageCredit()
-    {
-        return _messageCredit;
-    }
-
-    public synchronized long getBytesCredit()
-    {
-        return _bytesCredit;
-    }
-
-    public synchronized void restoreCredit(long messageCredit, long bytesCredit)
-    {        
-        _messageCredit += messageCredit;
-        _bytesCredit += bytesCredit;
-        setSuspended(hasCredit());
-    }
-
-    public synchronized void removeAllCredit()
-    {
-        _messageCredit = 0L;
-        _bytesCredit = 0L;
-        setSuspended(true);
-    }
-
-    public synchronized boolean hasCredit()
-    {
-        return (_messageCredit > 0L) && ( _bytesCredit > 0L );
-    }
-
-    public synchronized boolean useCreditForMessage(final long msgSize)
-    {
-        if(_messageCredit == 0L)
-        {
-            setSuspended(true);
-            return false;
-        }
-        else
-        {
-            if(msgSize > _bytesCredit)
-            {
-                setSuspended(true);
-                return false;
-            }
-            _messageCredit--;
-            _bytesCredit -= msgSize;
-            setSuspended(false);
-            return true;
-        }
-        
-    }
-
-    public synchronized void setBytesCredit(long bytesCredit)
-    {
-        _bytesCredit = bytesCredit;
-    }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index f13af47..40aa1bb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -113,4 +113,6 @@
      * @return the time of the last activity or 0 if not in a transaction
      */
     long getTransactionUpdateTime();
+
+    void transportStateChanged();
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index dd5e01e..3c25e09 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,12 +24,8 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.security.Principal;
 import java.util.Set;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.security.auth.Subject;
 
 import org.apache.log4j.Logger;
@@ -43,21 +39,14 @@
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
 public class MultiVersionProtocolEngine implements ServerProtocolEngine
 {
     private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
 
     private final long _id;
-    private final SSLContext _sslContext;
-    private final boolean _wantClientAuth;
-    private final boolean _needClientAuth;
     private final AmqpPort<?> _port;
-    private final Transport _transport;
+    private Transport _transport;
     private final ProtocolEngineCreator[] _creators;
     private final Runnable _onCloseTask;
 
@@ -71,9 +60,6 @@
     private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
 
     public MultiVersionProtocolEngine(final Broker<?> broker,
-                                      SSLContext sslContext,
-                                      boolean wantClientAuth,
-                                      boolean needClientAuth,
                                       final Set<Protocol> supported,
                                       final Protocol defaultSupportedReply,
                                       AmqpPort<?> port,
@@ -92,16 +78,12 @@
         _broker = broker;
         _supported = supported;
         _defaultSupportedReply = defaultSupportedReply;
-        _sslContext = sslContext;
-        _wantClientAuth = wantClientAuth;
-        _needClientAuth = needClientAuth;
         _port = port;
         _transport = transport;
         _creators = creators;
         _onCloseTask = onCloseTask;
     }
 
-
     public SocketAddress getRemoteAddress()
     {
         return _delegate.getRemoteAddress();
@@ -147,6 +129,12 @@
         _delegate.readerIdle();
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+        _delegate.encryptedTransport();
+    }
+
 
     public void received(ByteBuffer msg)
     {
@@ -169,6 +157,18 @@
         return _delegate.getSubject();
     }
 
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _delegate.isTransportBlockedForWriting();
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _delegate.setTransportBlockedForWriting(blocked);
+    }
+
     private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
 
     public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
@@ -247,6 +247,12 @@
 
         }
 
+        @Override
+        public void encryptedTransport()
+        {
+
+        }
+
         public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
         {
 
@@ -274,12 +280,23 @@
         {
             return new Subject();
         }
+
+        @Override
+        public boolean isTransportBlockedForWriting()
+        {
+            return false;
+        }
+
+        @Override
+        public void setTransportBlockedForWriting(final boolean blocked)
+        {
+        }
     }
 
     private class SelfDelegateProtocolEngine implements ServerProtocolEngine
     {
         private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
-        private long _lastReadTime;
+        private long _lastReadTime = System.currentTimeMillis();
 
         public SocketAddress getRemoteAddress()
         {
@@ -360,15 +377,6 @@
                     }
                 }
 
-
-                if(newDelegate == null && looksLikeSSL(headerBytes))
-                {
-                    if(_sslContext !=  null)
-                    {
-                        newDelegate = new SslDelegateProtocolEngine();
-                    }
-                }
-
                 // If no delegate is found then send back a supported protocol version id
                 if(newDelegate == null)
                 {
@@ -423,6 +431,17 @@
             return _delegate.getSubject();
         }
 
+        @Override
+        public boolean isTransportBlockedForWriting()
+        {
+            return false;
+        }
+
+        @Override
+        public void setTransportBlockedForWriting(final boolean blocked)
+        {
+        }
+
         public void exception(Throwable t)
         {
             _logger.error("Error establishing session", t);
@@ -466,6 +485,15 @@
             _network.close();
         }
 
+        @Override
+        public void encryptedTransport()
+        {
+            if(_transport == Transport.TCP)
+            {
+                _transport = Transport.SSL;
+            }
+        }
+
         public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
         {
 
@@ -484,250 +512,5 @@
         }
     }
 
-    private class SslDelegateProtocolEngine implements ServerProtocolEngine
-    {
-        private final MultiVersionProtocolEngine _decryptEngine;
-        private final SSLEngine _engine;
-        private final SSLReceiver _sslReceiver;
-        private final SSLBufferingSender _sslSender;
-        private long _lastReadTime;
 
-        private SslDelegateProtocolEngine()
-        {
-
-            _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
-                                                            _defaultSupportedReply, _port, Transport.SSL, _id, _creators,
-                                                            null);
-
-            _engine = _sslContext.createSSLEngine();
-            _engine.setUseClientMode(false);
-            SSLUtil.removeSSLv3Support(_engine);
-
-            if(_needClientAuth)
-            {
-                _engine.setNeedClientAuth(true);
-            }
-            else if(_wantClientAuth)
-            {
-                _engine.setWantClientAuth(true);
-            }
-
-            SSLStatus sslStatus = new SSLStatus();
-            _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
-            _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
-            _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender), _sslSender);
-        }
-
-        @Override
-        public void received(ByteBuffer msg)
-        {
-            _lastReadTime = System.currentTimeMillis();
-            _sslReceiver.received(msg);
-            _sslSender.send();
-            _sslSender.flush();
-        }
-
-        @Override
-        public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
-        {
-            //TODO - Implement
-        }
-
-        @Override
-        public SocketAddress getRemoteAddress()
-        {
-            return _decryptEngine.getRemoteAddress();
-        }
-
-        @Override
-        public SocketAddress getLocalAddress()
-        {
-            return _decryptEngine.getLocalAddress();
-        }
-
-        @Override
-        public long getWrittenBytes()
-        {
-            return _decryptEngine.getWrittenBytes();
-        }
-
-        @Override
-        public long getReadBytes()
-        {
-            return _decryptEngine.getReadBytes();
-        }
-
-        @Override
-        public void closed()
-        {
-            _decryptEngine.closed();
-        }
-
-        @Override
-        public void writerIdle()
-        {
-            _decryptEngine.writerIdle();
-        }
-
-        @Override
-        public void readerIdle()
-        {
-            _decryptEngine.readerIdle();
-        }
-
-        @Override
-        public void exception(Throwable t)
-        {
-            _decryptEngine.exception(t);
-        }
-
-        @Override
-        public long getConnectionId()
-        {
-            return _decryptEngine.getConnectionId();
-        }
-
-        @Override
-        public Subject getSubject()
-        {
-            return _decryptEngine.getSubject();
-        }
-
-        @Override
-        public long getLastReadTime()
-        {
-            return _lastReadTime;
-        }
-
-        @Override
-        public long getLastWriteTime()
-        {
-            return _decryptEngine.getLastWriteTime();
-        }
-    }
-
-    private boolean looksLikeSSL(byte[] headerBytes)
-    {
-        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
-    }
-
-    private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
-    {
-        return headerBytes[0] == 22 && // SSL Handshake
-               (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
-                (headerBytes[2] == 0 || // SSL 3.0
-                 headerBytes[2] == 1 || // TLS 1.0
-                 headerBytes[2] == 2 || // TLS 1.1
-                 headerBytes[2] == 3)) && // TLS1.2
-               (headerBytes[5] == 1); // client_hello
-    }
-
-    private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
-    {
-        return headerBytes[0] == -128 &&
-               headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
-                (headerBytes[4] == 0 || // SSL 3.0
-                 headerBytes[4] == 1 || // TLS 1.0
-                 headerBytes[4] == 2 || // TLS 1.1
-                 headerBytes[4] == 3);
-    }
-
-
-    private static class SSLNetworkConnection implements NetworkConnection
-    {
-        private final NetworkConnection _network;
-        private final SSLBufferingSender _sslSender;
-        private final SSLEngine _engine;
-        private Principal _principal;
-        private boolean _principalChecked;
-        private final Object _lock = new Object();
-
-        public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
-                                    SSLBufferingSender sslSender)
-        {
-            _engine = engine;
-            _network = network;
-            _sslSender = sslSender;
-
-        }
-
-        @Override
-        public Sender<ByteBuffer> getSender()
-        {
-            return _sslSender;
-        }
-
-        @Override
-        public void start()
-        {
-            _network.start();
-        }
-
-        @Override
-        public void close()
-        {
-            _sslSender.close();
-
-            _network.close();
-        }
-
-        @Override
-        public SocketAddress getRemoteAddress()
-        {
-            return _network.getRemoteAddress();
-        }
-
-        @Override
-        public SocketAddress getLocalAddress()
-        {
-            return _network.getLocalAddress();
-        }
-
-        @Override
-        public void setMaxWriteIdle(int sec)
-        {
-            _network.setMaxWriteIdle(sec);
-        }
-
-        @Override
-        public void setMaxReadIdle(int sec)
-        {
-            _network.setMaxReadIdle(sec);
-        }
-
-        @Override
-        public Principal getPeerPrincipal()
-        {
-            synchronized (_lock)
-            {
-                if(!_principalChecked)
-                {
-                    try
-                    {
-                        _principal =  _engine.getSession().getPeerPrincipal();
-                    }
-                    catch (SSLPeerUnverifiedException e)
-                    {
-                        _principal = null;
-                    }
-
-                    _principalChecked = true;
-                }
-
-                return _principal;
-            }
-        }
-
-        @Override
-        public int getMaxReadIdle()
-        {
-            return _network.getMaxReadIdle();
-        }
-
-        @Override
-        public int getMaxWriteIdle()
-        {
-            return _network.getMaxWriteIdle();
-        }
-    }
 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 5c704c5..a51717e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -27,10 +27,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.net.ssl.SSLContext;
-
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.logging.messages.PortMessages;
 import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
@@ -48,9 +45,6 @@
     private final Broker<?> _broker;
     private final Set<Protocol> _supported;
     private final Protocol _defaultSupportedReply;
-    private final SSLContext _sslContext;
-    private final boolean _wantClientAuth;
-    private final boolean _needClientAuth;
     private final AmqpPort<?> _port;
     private final Transport _transport;
     private final ProtocolEngineCreator[] _creators;
@@ -58,9 +52,6 @@
             _connectionCountDecrementingTask = new ConnectionCountDecrementingTask();
 
     public MultiVersionProtocolEngineFactory(Broker<?> broker,
-                                             SSLContext sslContext,
-                                             boolean wantClientAuth,
-                                             boolean needClientAuth,
                                              final Set<Protocol> supportedVersions,
                                              final Protocol defaultSupportedReply,
                                              AmqpPort<?> port,
@@ -73,7 +64,6 @@
         }
 
         _broker = broker;
-        _sslContext = sslContext;
         _supported = supportedVersions;
         _defaultSupportedReply = defaultSupportedReply;
         final List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
@@ -83,18 +73,16 @@
         }
         Collections.sort(creators, new ProtocolEngineCreatorComparator());
         _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
-        _wantClientAuth = wantClientAuth;
-        _needClientAuth = needClientAuth;
         _port = port;
         _transport = transport;
     }
 
-    public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
+    public MultiVersionProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress)
     {
         if(_port.canAcceptNewConnection(remoteSocketAddress))
         {
             _port.incrementConnectionCount();
-            return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+            return new MultiVersionProtocolEngine(_broker,
                                                   _supported, _defaultSupportedReply, _port, _transport,
                                                   ID_GENERATOR.getAndIncrement(),
                                                   _creators, _connectionCountDecrementingTask);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index f905558..26cbf37 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -50,6 +50,7 @@
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogMessage;
@@ -2522,7 +2523,7 @@
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+            throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
         if(!message.isReferenced(this))
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index b1f6b84..2fd10e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -23,18 +23,20 @@
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
 import java.net.InetSocketAddress;
+import java.util.EnumSet;
 import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
 
 class TCPandSSLTransport implements AcceptingTransport
 {
@@ -78,17 +80,25 @@
         }
 
         final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
-        _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+        _networkTransport = new NonBlockingNetworkTransport();
         final MultiVersionProtocolEngineFactory protocolEngineFactory =
                 new MultiVersionProtocolEngineFactory(
-                _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
-                settings.wantClientAuth(), settings.needClientAuth(),
+                _port.getParent(Broker.class),
                 _supported,
                 _defaultSupportedProtocolReply,
                 _port,
                 _transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
 
-        _networkTransport.accept(settings, protocolEngineFactory, _transports.contains(Transport.TCP) ? null : _sslContext);
+        EnumSet<TransportEncryption> encryptionSet = EnumSet.noneOf(TransportEncryption.class);
+        if(_transports.contains(Transport.TCP))
+        {
+            encryptionSet.add(TransportEncryption.NONE);
+        }
+        if(_transports.contains(Transport.SSL))
+        {
+            encryptionSet.add(TransportEncryption.TLS);
+        }
+        _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
     }
 
     public int getAcceptingPort()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
new file mode 100644
index 0000000..a0bab0b
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
+{
+    public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host)
+    {
+        super("Virtualhost state "
+              + host.getState()
+              + " prevents the message from being sent");
+    }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 1c42d9b..47ed224 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -456,6 +456,12 @@
         {
             return 0;
         }
+
+        @Override
+        public void transportStateChanged()
+        {
+
+        }
     }
 
     private static class MockConnectionModel implements AMQConnectionModel
@@ -663,5 +669,7 @@
         {
 
         }
+
+
     }
 }
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 89d6811..afa4fb8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -158,6 +158,10 @@
         return _name;
     }
 
+    public void transportStateChanged()
+    {
+        _creditManager.restoreCredit(0, 0);
+    }
 
     public static class AddMessageDispositionListenerAction implements Runnable
     {
@@ -555,10 +559,10 @@
         switch(flowMode)
         {
             case CREDIT:
-                _creditManager = new CreditCreditManager(0l,0l);
+                _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
                 break;
             case WINDOW:
-                _creditManager = new WindowCreditManager(0l,0l);
+                _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
                 break;
             default:
                 // this should never happen, as 0-10 is finalised and so the enum should never change
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 8dddac9..e670c1f 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,48 +21,27 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
 public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
+    private final ServerProtocolEngine _serverProtocolEngine;
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
 
-    public CreditCreditManager(long bytesCredit, long messageCredit)
+    public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine)
     {
+        _serverProtocolEngine = serverProtocolEngine;
         _bytesCredit = bytesCredit;
         _messageCredit = messageCredit;
         setSuspended(!hasCredit());
 
     }
 
-
-    public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit)
-    {
-        _bytesCredit = bytesCredit;
-        _messageCredit = messageCredit;
-
-        setSuspended(!hasCredit());
-
-    }
-
-
-    public long getMessageCredit()
-    {
-         return _messageCredit == -1L
-                    ? Long.MAX_VALUE
-                    : _messageCredit;
-    }
-
-    public long getBytesCredit()
-    {
-        return _bytesCredit == -1L
-                    ? Long.MAX_VALUE
-                    : _bytesCredit;
-    }
-
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
+        setSuspended(!hasCredit());
     }
 
 
@@ -107,12 +86,17 @@
     public synchronized boolean hasCredit()
     {
         // Note !=, if credit is < 0 that indicates infinite credit
-        return (_bytesCredit != 0L  && _messageCredit != 0L);
+        return (_bytesCredit != 0L  && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting());
     }
 
     public synchronized boolean useCreditForMessage(long msgSize)
     {
-        if(_messageCredit >= 0L)
+        if (_serverProtocolEngine.isTransportBlockedForWriting())
+        {
+            setSuspended(true);
+            return false;
+        }
+        else if(_messageCredit >= 0L)
         {
             if(_messageCredit > 0)
             {
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 30aecdb..5c91925 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -86,7 +86,10 @@
         conn.setRemoteAddress(network.getRemoteAddress());
         conn.setLocalAddress(network.getLocalAddress());
 
-        return new ProtocolEngine_0_10( conn, network);
+        ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network);
+        conn.setProtocolEngine(protocolEngine);
+
+        return protocolEngine;
     }
 
 
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 854cd38..58b6a3a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -52,8 +52,9 @@
     private ServerConnection _connection;
 
     private long _createTime = System.currentTimeMillis();
-    private long _lastReadTime;
-    private long _lastWriteTime;
+    private long _lastReadTime = _createTime;
+    private long _lastWriteTime = _createTime;
+    private volatile boolean _transportBlockedForWriting;
 
     public ProtocolEngine_0_10(ServerConnection conn,
                                NetworkConnection network)
@@ -101,17 +102,13 @@
         return new Sender<ByteBuffer>()
         {
             @Override
-            public void setIdleTimeout(int i)
-            {
-                sender.setIdleTimeout(i);
-
-            }
-
-            @Override
             public void send(ByteBuffer msg)
             {
                 _lastWriteTime = System.currentTimeMillis();
-                sender.send(msg);
+                ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
+                copy.put(msg);
+                copy.flip();
+                sender.send(copy);
 
             }
 
@@ -190,6 +187,11 @@
         return _writtenBytes;
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
     public void writerIdle()
     {
         _connection.doHeartBeat();
@@ -215,11 +217,6 @@
         return getRemoteAddress().toString();
     }
 
-    public String getAuthId()
-    {
-        return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
-    }
-
     public boolean isDurable()
     {
         return false;
@@ -246,4 +243,18 @@
     {
         return _connection.getAuthorizedSubject();
     }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        _connection.transportStateChanged();
+    }
+
 }
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8567be3..cbd569d 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -37,6 +37,7 @@
 import javax.security.auth.Subject;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.LogSubject;
@@ -90,6 +91,8 @@
     private int _messageCompressionThreshold;
     private int _maxMessageSize;
 
+    private ServerProtocolEngine _serverProtocolEngine;
+
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
                             final AmqpPort<?> port,
@@ -189,6 +192,16 @@
         super.setConnectionDelegate(delegate);
     }
 
+    public ServerProtocolEngine getProtocolEngine()
+    {
+        return _serverProtocolEngine;
+    }
+
+    public void setProtocolEngine(final ServerProtocolEngine serverProtocolEngine)
+    {
+        _serverProtocolEngine = serverProtocolEngine;
+    }
+
     public VirtualHostImpl<?,?,?> getVirtualHost()
     {
         return _virtualHost;
@@ -664,4 +677,12 @@
     {
         return _maxMessageSize;
     }
+
+    public void transportStateChanged()
+    {
+        for (AMQSessionModel ssn : getSessionModels())
+        {
+            ssn.transportStateChanged();
+        }
+    }
 }
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 223de4f..1d8676e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,6 +56,7 @@
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -874,6 +875,15 @@
     }
 
     @Override
+    public void transportStateChanged()
+    {
+        for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
+        {
+            consumerTarget.transportStateChanged();
+        }
+    }
+
+    @Override
     public Object getConnectionReference()
     {
         return getConnection().getReference();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 288a4f9..8fdee7a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -35,8 +35,10 @@
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -243,8 +245,8 @@
                 }
                 else
                 {
-
-                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
+                    ServerProtocolEngine serverProtocolEngine = getServerConnection(session).getProtocolEngine();
+                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, serverProtocolEngine);
 
                     FilterManager filterManager = null;
                     try
@@ -381,58 +383,69 @@
                     new MessageTransferMessage(storeMessage, serverSession.getReference());
             MessageReference<MessageTransferMessage> reference = message.newReference();
 
-            final InstanceProperties instanceProperties = new InstanceProperties()
+            try
             {
-                @Override
-                public Object getProperty(final Property prop)
+                final InstanceProperties instanceProperties = new InstanceProperties()
                 {
-                    switch (prop)
+                    @Override
+                    public Object getProperty(final Property prop)
                     {
-                        case EXPIRATION:
-                            return message.getExpiration();
-                        case IMMEDIATE:
-                            return message.isImmediate();
-                        case MANDATORY:
-                            return (delvProps == null || !delvProps.getDiscardUnroutable())
-                                   && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
-                        case PERSISTENT:
-                            return message.isPersistent();
-                        case REDELIVERED:
-                            return delvProps.getRedelivered();
+                        switch (prop)
+                        {
+                            case EXPIRATION:
+                                return message.getExpiration();
+                            case IMMEDIATE:
+                                return message.isImmediate();
+                            case MANDATORY:
+                                return (delvProps == null || !delvProps.getDiscardUnroutable())
+                                       && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+                            case PERSISTENT:
+                                return message.isPersistent();
+                            case REDELIVERED:
+                                return delvProps.getRedelivered();
+                        }
+                        return null;
                     }
-                    return null;
-                }
-            };
+                };
 
-            int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+                int enqueues = serverSession.enqueue(message, instanceProperties, destination);
 
-            if (enqueues == 0)
-            {
-                if ((delvProps == null || !delvProps.getDiscardUnroutable())
-                    && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                if (enqueues == 0)
                 {
-                    RangeSet rejects = RangeSetFactory.createRangeSet();
-                    rejects.add(xfr.getId());
-                    MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
-                    ssn.invoke(reject);
+                    if ((delvProps == null || !delvProps.getDiscardUnroutable())
+                        && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                    {
+                        RangeSet rejects = RangeSetFactory.createRangeSet();
+                        rejects.add(xfr.getId());
+                        MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+                        ssn.invoke(reject);
+                    }
+                    else
+                    {
+                        virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
+                                                                                         messageMetaData.getRoutingKey()));
+                    }
+                }
+
+                if (serverSession.isTransactional())
+                {
+                    serverSession.processed(xfr);
                 }
                 else
                 {
-                    virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
-                                                                                     messageMetaData.getRoutingKey()));
+                    serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+                                               new CommandProcessedAction(serverSession, xfr));
                 }
             }
+            catch (VirtualHostUnavailableException e)
+            {
+                getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage());
+            }
+            finally
+            {
+                reference.release();
+            }
 
-            if (serverSession.isTransactional())
-            {
-                serverSession.processed(xfr);
-            }
-            else
-            {
-                serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
-                                           new CommandProcessedAction(serverSession, xfr));
-            }
-            reference.release();
         }
     }
 
@@ -549,7 +562,7 @@
         {
             try
             {
-                ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+                ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
             }
             catch (TimeoutDtxException e)
             {
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index 8e48741..e11d2ce 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -21,11 +21,14 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.log4j.Logger;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.flow.AbstractFlowCreditManager;
 
 public class WindowCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
 {
     private static final Logger LOGGER = Logger.getLogger(WindowCreditManager.class);
+    private final ServerProtocolEngine _serverProtocolEngine;
 
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
@@ -33,39 +36,22 @@
     private volatile long _bytesUsed;
     private volatile long _messageUsed;
 
-     public WindowCreditManager()
-     {
-         this(0L, 0L);
-     }
-
-    public WindowCreditManager(long bytesCreditLimit, long messageCreditLimit)
+    public WindowCreditManager(long bytesCreditLimit,
+                               long messageCreditLimit,
+                               ServerProtocolEngine serverProtocolEngine)
     {
+        _serverProtocolEngine = serverProtocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
         setSuspended(!hasCredit());
 
     }
 
-    public long getBytesCreditLimit()
-    {
-        return _bytesCreditLimit;
-    }
-
     public long getMessageCreditLimit()
     {
         return _messageCreditLimit;
     }
 
-    public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
-    {
-        _bytesCreditLimit = bytesCreditLimit;
-        _messageCreditLimit = messageCreditLimit;
-
-        setSuspended(!hasCredit());
-
-    }
-
-
     public long getMessageCredit()
     {
          return _messageCreditLimit == -1L
@@ -121,12 +107,18 @@
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit < 0L || _bytesCreditLimit > _bytesUsed)
-                && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed);
+                && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed)
+                && !_serverProtocolEngine.isTransportBlockedForWriting();
     }
 
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
-        if(_messageCreditLimit >= 0L)
+        if (_serverProtocolEngine.isTransportBlockedForWriting())
+        {
+            setSuspended(true);
+            return false;
+        }
+        else if(_messageCreditLimit >= 0L)
         {
             if(_messageUsed < _messageCreditLimit)
             {
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index 1c4a694..b05edc5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -20,17 +20,25 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
-import org.apache.qpid.server.protocol.v0_10.WindowCreditManager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class WindowCreditManagerTest extends QpidTestCase
 {
     private WindowCreditManager _creditManager;
+    private ServerProtocolEngine _protocolEngine;
 
     protected void setUp() throws Exception
     {
         super.setUp();
-        _creditManager = new WindowCreditManager();
+
+        _protocolEngine = mock(ServerProtocolEngine.class);
+        when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false);
+
+        _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine);
     }
 
     /**
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d52fb73..f7f65e2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -66,8 +66,6 @@
 import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -133,7 +131,8 @@
     private final int _channelId;
 
 
-    private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
+    private final Pre0_10CreditManager _creditManager;
+    private final FlowCreditManager _noAckCreditManager;
 
     /**
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -213,6 +212,9 @@
 
     public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
     {
+        _creditManager = new Pre0_10CreditManager(0l,0l, connection);
+        _noAckCreditManager = new NoAckCreditManager(connection);
+
         _connection = connection;
         _channelId = channelId;
 
@@ -699,7 +701,7 @@
 
         if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
-            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager);
         }
         else if(acks)
         {
@@ -709,7 +711,7 @@
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager);
             options.add(ConsumerImpl.Option.ACQUIRES);
             options.add(ConsumerImpl.Option.SEES_REQUEUES);
         }
@@ -1633,6 +1635,7 @@
         }
     }
 
+
     public synchronized void block(AMQQueue queue)
     {
         if(_blockingEntities.add(queue))
@@ -1661,6 +1664,13 @@
     }
 
     @Override
+    public void transportStateChanged()
+    {
+        _creditManager.restoreCredit(0, 0);
+        _noAckCreditManager.restoreCredit(0, 0);
+    }
+
+    @Override
     public Object getConnectionReference()
     {
         return getConnection().getReference();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 4212505..cea9b09 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -170,7 +170,7 @@
     private Sender<ByteBuffer> _sender;
 
     private volatile boolean _deferFlush;
-    private long _lastReceivedTime;
+    private long _lastReceivedTime = System.currentTimeMillis();  // TODO consider if this is what we want?
     private boolean _blocking;
 
     private final ReentrantLock _receivedLock;
@@ -188,6 +188,7 @@
     private int _currentMethodId;
     private int _binaryDataLimit;
     private long _maxMessageSize;
+    private volatile boolean _transportBlockedForWriting;
 
     public AMQProtocolEngine(Broker<?> broker,
                              final NetworkConnection network,
@@ -250,6 +251,22 @@
         return _authorizedSubject;
     }
 
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        for(AMQChannel channel : _channelMap.values())
+        {
+            channel.transportStateChanged();
+        }
+    }
+
     public void setNetworkConnection(NetworkConnection network)
     {
         setNetworkConnection(network, network.getSender());
@@ -514,20 +531,10 @@
             throw new ServerScopedRuntimeException(e);
         }
 
-        final ByteBuffer buf;
-
-        if(size <= REUSABLE_BYTE_BUFFER_CAPACITY)
-        {
-            buf = _reusableByteBuffer;
-            buf.position(0);
-        }
-        else
-        {
-            buf = ByteBuffer.wrap(data);
-        }
-        buf.limit(_reusableDataOutput.length());
-
-        return buf;
+        final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length());
+        copy.put(data, 0, _reusableDataOutput.length());
+        copy.flip();
+        return copy;
     }
 
 
@@ -1160,6 +1167,11 @@
         }
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
     public void readerIdle()
     {
         Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 43982db..d6642ae 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -136,12 +136,6 @@
 
         }
 
-        @Override
-        public boolean allocateCredit(ServerMessage msg)
-        {
-            return true;
-        }
-
     }
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -215,12 +209,6 @@
 
         }
 
-        @Override
-        public boolean allocateCredit(ServerMessage msg)
-        {
-            return true;
-        }
-
         private static final ServerTransaction.Action NOOP =
                 new ServerTransaction.Action()
                 {
@@ -250,11 +238,6 @@
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
         }
 
-        public boolean allocateCredit(ServerMessage msg)
-        {
-            return getCreditManager().useCreditForMessage(msg.getSize());
-        }
-
     }
 
 
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
similarity index 86%
rename from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
rename to qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
index 1817e8a..af54c91 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
@@ -18,10 +18,13 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
 public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
     private final AtomicLong _messageCredit;
@@ -31,16 +34,6 @@
         _messageCredit = new AtomicLong(initialCredit);
     }
 
-    public long getMessageCredit()
-    {
-        return _messageCredit.get();
-    }
-
-    public long getBytesCredit()
-    {
-        return -1L;
-    }
-
     public void restoreCredit(long messageCredit, long bytesCredit)
     {
         _messageCredit.addAndGet(messageCredit);
@@ -48,12 +41,6 @@
 
     }
 
-    public void removeAllCredit()
-    {
-        setSuspended(true);
-        _messageCredit.set(0L);
-    }
-
     public boolean hasCredit()
     {
         return _messageCredit.get() > 0L;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
new file mode 100644
index 0000000..2d32617
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+
+public class NoAckCreditManager extends AbstractFlowCreditManager
+{
+    private final ServerProtocolEngine _serverProtocolEngine;
+
+    public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine)
+    {
+        _serverProtocolEngine = serverProtocolEngine;
+    }
+
+    @Override
+    public void restoreCredit(final long messageCredit, final long bytesCredit)
+    {
+        setSuspended(!hasCredit());
+    }
+
+    @Override
+    public boolean hasCredit()
+    {
+        return !_serverProtocolEngine.isTransportBlockedForWriting();
+    }
+
+    @Override
+    public boolean useCreditForMessage(final long msgSize)
+    {
+        if (!hasCredit())
+        {
+            setSuspended(true);
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
similarity index 85%
rename from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
rename to qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
index fc2d4bf..e63645e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -18,20 +18,28 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
 
 
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
 
+    private final ServerProtocolEngine _protocolEngine;
     private volatile long _bytesCreditLimit;
     private volatile long _messageCreditLimit;
 
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
 
-    public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
+    public Pre0_10CreditManager(long bytesCreditLimit,
+                                long messageCreditLimit,
+                                ServerProtocolEngine protocolEngine)
     {
+        _protocolEngine = protocolEngine;
         _bytesCreditLimit = bytesCreditLimit;
         _messageCreditLimit = messageCreditLimit;
         _bytesCredit = bytesCreditLimit;
@@ -39,6 +47,7 @@
     }
 
 
+
     public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
     {
         long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
@@ -80,16 +89,6 @@
     }
 
 
-    public long getMessageCredit()
-    {
-        return _messageCredit;
-    }
-
-    public long getBytesCredit()
-    {
-        return _bytesCredit;
-    }
-
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
         final long messageCreditLimit = _messageCreditLimit;
@@ -119,22 +118,21 @@
 
     }
 
-    public synchronized void removeAllCredit()
-    {
-        _bytesCredit = 0L;
-        _messageCredit = 0L;
-        setSuspended(!hasCredit());
-    }
-
     public synchronized boolean hasCredit()
     {
         return (_bytesCreditLimit == 0L || _bytesCredit > 0)
-                && (_messageCreditLimit == 0L || _messageCredit > 0);
+                && (_messageCreditLimit == 0L || _messageCredit > 0)
+                && !_protocolEngine.isTransportBlockedForWriting();
     }
 
     public synchronized boolean useCreditForMessage(final long msgSize)
     {
-        if(_messageCreditLimit != 0L)
+        if (_protocolEngine.isTransportBlockedForWriting())
+        {
+            setSuspended(true);
+            return false;
+        }
+        else if(_messageCreditLimit != 0L)
         {
             if(_messageCredit != 0L)
             {
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 9326f16..55fc865 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -31,8 +31,6 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoredMessage;
@@ -328,7 +326,7 @@
     public void testMessageDequeueRestoresCreditTest() throws Exception
     {
         // Send 10 messages
-        Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+        Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine);
 
 
         _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 7407890..622cf32 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -288,10 +288,6 @@
         {
             _sender = new Sender<ByteBuffer>()
             {
-                public void setIdleTimeout(int i)
-                {
-                }
-
                 public void send(ByteBuffer msg)
                 {
                 }
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
similarity index 87%
rename from qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
rename to qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
index 89fc606..c4c89ac 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
@@ -18,20 +18,14 @@
 * under the License.
 *
 */
-package org.apache.qpid.server.flow;
+package org.apache.qpid.server.protocol.v0_8;
 
 
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+import org.apache.qpid.server.flow.FlowCreditManager;
+
 public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
-    public long getMessageCredit()
-    {
-        return -1L;
-    }
-
-    public long getBytesCredit()
-    {
-        return -1L;
-    }
 
     public void restoreCredit(long messageCredit, long bytesCredit)
     {
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 8e24d55..b55bd03 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -44,6 +44,7 @@
 import org.apache.qpid.amqp_1_0.type.transport.End;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Broker;
@@ -64,6 +65,7 @@
     private final AmqpPort<?> _port;
     private final Broker<?> _broker;
     private final SubjectCreator _subjectCreator;
+    private final ServerProtocolEngine _protocolEngine;
     private VirtualHostImpl _vhost;
     private final Transport _transport;
     private final ConnectionEndpoint _conn;
@@ -101,12 +103,16 @@
     private boolean _closedOnOpen;
 
 
+
     public Connection_1_0(Broker<?> broker,
                           ConnectionEndpoint conn,
                           long connectionId,
                           AmqpPort<?> port,
-                          Transport transport, final SubjectCreator subjectCreator)
+                          Transport transport,
+                          final SubjectCreator subjectCreator,
+                          final ServerProtocolEngine protocolEngine)
     {
+        _protocolEngine = protocolEngine;
         _broker = broker;
         _port = port;
         _transport = transport;
@@ -363,6 +369,11 @@
         return _port;
     }
 
+    public ServerProtocolEngine getProtocolEngine()
+    {
+        return _protocolEngine;
+    }
+
     @Override
     public Transport getTransport()
     {
@@ -480,4 +491,11 @@
     }
 
 
+    public void transportStateChanged()
+    {
+        for (Session_1_0 session : _sessions)
+        {
+            session.transportStateChanged();
+        }
+    }
 }
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index c5d9a5e..b5e1bda 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,6 +40,7 @@
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
@@ -84,7 +85,7 @@
 
     public boolean isSuspended()
     {
-        return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+        return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
 
     }
 
@@ -290,7 +291,9 @@
     {
         synchronized (_link.getLock())
         {
-            final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+            ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+            final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
             if(!hasCredit && getState() == State.ACTIVE)
             {
                 suspend();
@@ -330,7 +333,8 @@
     {
         synchronized(_link.getLock())
         {
-            if(isSuspended() && getEndpoint() != null)
+            ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+            if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
             {
                 updateState(State.SUSPENDED, State.ACTIVE);
                 _transactionId = _link.getTransactionId();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 740b01e..b2783a2 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -118,6 +118,7 @@
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
     private Connection_1_0 _connection;
+    private volatile boolean _transportBlockedForWriting;
 
 
     static enum State {
@@ -179,6 +180,11 @@
         //Todo
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
     public void setNetworkConnection(final NetworkConnection network, final Sender<ByteBuffer> sender)
     {
         _network = network;
@@ -211,7 +217,7 @@
         _endpoint.setProperties(serverProperties);
 
         _endpoint.setRemoteAddress(getRemoteAddress());
-        _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator);
+        _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this);
 
         _endpoint.setConnectionEventListener(_connection);
         _endpoint.setFrameOutputHandler(this);
@@ -524,6 +530,8 @@
 
     }
 
+
+
     public void close()
     {
         _sender.close();
@@ -554,4 +562,18 @@
     {
         return _lastWriteTime;
     }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        _transportBlockedForWriting = blocked;
+        _connection.transportStateChanged();
+
+    }
+
 }
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 2cfe431..f8e4853 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -728,4 +728,9 @@
     {
         return _consumer;
     }
+
+    public ConsumerTarget_1_0 getConsumerTarget()
+    {
+        return _target;
+    }
 }
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1820de9..01c11b9 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -109,6 +109,7 @@
     private final Subject _subject = new Subject();
 
     private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
+    private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
@@ -211,7 +212,7 @@
                         );
 
                         sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink));
-                        registerConsumer(sendingLink.getConsumer());
+                        registerConsumer(sendingLink);
 
                         link = sendingLink;
                         if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
@@ -411,12 +412,14 @@
         }
     }
 
-    private void registerConsumer(final ConsumerImpl consumer)
+    private void registerConsumer(final SendingLink_1_0 link)
     {
+        ConsumerImpl consumer = link.getConsumer();
         if(consumer instanceof Consumer<?>)
         {
             Consumer<?> modelConsumer = (Consumer<?>) consumer;
             _consumers.add(modelConsumer);
+            _sendingLinks.add(link);
             modelConsumer.addChangeListener(_consumerClosedListener);
             consumerAdded(modelConsumer);
         }
@@ -614,6 +617,20 @@
     }
 
     @Override
+    public void transportStateChanged()
+    {
+        for(SendingLink_1_0 link : _sendingLinks)
+        {
+            ConsumerTarget_1_0 target = link.getConsumerTarget();
+            target.flowStateChanged();
+
+
+        }
+
+
+    }
+
+    @Override
     public LogSubject getLogSubject()
     {
         return this;
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index a194ac7..940e24d 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -81,9 +81,7 @@
         _supported = supported;
         _defaultSupportedProtocolReply = defaultSupportedProtocolReply;
         _factory = new MultiVersionProtocolEngineFactory(
-                        _port.getParent(Broker.class), null,
-                        _port.getWantClientAuth(),
-                        _port.getNeedClientAuth(),
+                        _port.getParent(Broker.class),
                         _supported,
                         _defaultSupportedProtocolReply,
                         _port,
@@ -273,12 +271,6 @@
         }
 
         @Override
-        public void setIdleTimeout(final int i)
-        {
-
-        }
-
-        @Override
         public void send(final ByteBuffer msg)
         {
             try
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c2582ac..d5e3027 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -316,6 +316,11 @@
         }
     }
 
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
     public void readerIdle()
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 8a7e6ab..c7dee5b 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -670,7 +670,7 @@
     {
         private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>();
 
-        public void setIdleTimeout(int i)
+        private void setIdleTimeout(int i)
         {
         }
 
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
index 7c3988c1..11b34d3 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
@@ -27,11 +27,6 @@
 public class MockSender implements Sender<ByteBuffer>
 {
 
-    public void setIdleTimeout(int i)
-    {
-
-    }
-
     public void send(ByteBuffer msg)
     {
 
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 6774d0a..cad5461 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -20,14 +20,14 @@
  */
 package org.apache.qpid.protocol;
 
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.TransportActivity;
 
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
 /**
  * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
  * decodes it and then process the result.
@@ -56,7 +56,8 @@
    // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
    void readerIdle();
 
+   void encryptedTransport();
 
    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
 
-}
\ No newline at end of file
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
index 5c6918e..35d262c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -30,4 +30,8 @@
     long getConnectionId();
 
     Subject getSubject();
+
+    boolean isTransportBlockedForWriting();
+
+    void setTransportBlockedForWriting(boolean blocked);
 }
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 1866e1f..c2a0911 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -153,11 +153,9 @@
                               maxFrameSize,
                               actualHeartbeatInterval);
 
-        int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
         conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
         conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
         conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
-        conn.setIdleTimeout(idleTimeout);
 
         int channelMax = tune.getChannelMax();
         //0 means no implied limit, except available server resources
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index e33e007..92ccdb8 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -159,7 +159,6 @@
     public void setSender(Sender<ProtocolEvent> sender)
     {
         this.sender = sender;
-        sender.setIdleTimeout(idleTimeout);
     }
 
     protected void setState(State state)
@@ -675,20 +674,6 @@
         }
     }
 
-    public void setIdleTimeout(int i)
-    {
-        idleTimeout = i;
-        if (sender != null)
-        {
-            sender.setIdleTimeout(i);
-        }
-    }
-
-    public int getIdleTimeout()
-    {
-        return idleTimeout;
-    }
-
     public String getUserID()
     {
         return userID;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
index 6519702..9a6f675 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
@@ -28,7 +28,6 @@
 
 public interface Sender<T>
 {
-    void setIdleTimeout(int i);
 
     void send(T msg);
 
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 82a677b..f8fd286 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -126,8 +126,11 @@
 
     protected void connectionAuthFailed(final Connection conn, Exception e)
     {
-        conn.exception(e);
-        conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+        if (e != null)
+        {
+            conn.exception(e);
+        }
+        conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage());
     }
 
     protected void connectionAuthContinue(final Connection conn, byte[] challenge)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index a804cb2..81a4c78 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -251,11 +251,6 @@
         throw new IllegalArgumentException(String.valueOf(error));
     }
 
-    public void setIdleTimeout(int i)
-    {
-        sender.setIdleTimeout(i);
-    }
-
     @Override
     public void setMaxFrameSize(final int maxFrame)
     {
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
index e0cd9ca..f378c54 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.util.Set;
+
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.protocol.ProtocolEngineFactory;
@@ -29,7 +31,8 @@
 {
     public void accept(NetworkTransportConfiguration config,
                        ProtocolEngineFactory factory,
-                       SSLContext sslContext);
+                       SSLContext sslContext,
+                       final Set<TransportEncryption> encryptionSet);
 
     public int getAcceptingPort();
 }
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
new file mode 100644
index 0000000..b3f1f1c
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+public enum TransportEncryption
+{
+    NONE, TLS
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
new file mode 100644
index 0000000..9d0fe5d
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+// TODO we are no longer using the IncomingNetworkTransport
+public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+                                                              CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+    private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+                                                                  CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+    private Socket _socket;
+    private NetworkConnection _connection;
+    private AcceptingThread _acceptor;
+
+    public NetworkConnection connect(ConnectionSettings settings,
+                                     Receiver<ByteBuffer> delegate,
+                                     TransportActivity transportActivity)
+    {
+        int sendBufferSize = settings.getWriteBufferSize();
+        int receiveBufferSize = settings.getReadBufferSize();
+
+        try
+        {
+            _socket = new Socket();
+            _socket.setReuseAddress(true);
+            _socket.setTcpNoDelay(settings.isTcpNodelay());
+            _socket.setSendBufferSize(sendBufferSize);
+            _socket.setReceiveBufferSize(receiveBufferSize);
+
+            if(LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
+                LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
+                LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+            }
+
+            InetAddress address = InetAddress.getByName(settings.getHost());
+
+            _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
+        }
+        catch (SocketException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+
+        try
+        {
+            IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+            _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+            ticker.setConnection(_connection);
+            _connection.start();
+        }
+        catch(Exception e)
+        {
+            try
+            {
+                _socket.close();
+            }
+            catch(IOException ioe)
+            {
+                //ignored, throw based on original exception
+            }
+
+            throw new TransportException("Error creating network connection", e);
+        }
+
+        return _connection;
+    }
+
+    public void close()
+    {
+        if(_connection != null)
+        {
+            _connection.close();
+        }
+        if(_acceptor != null)
+        {
+            _acceptor.close();
+        }
+    }
+
+    public NetworkConnection getConnection()
+    {
+        return _connection;
+    }
+
+    public void accept(NetworkTransportConfiguration config,
+                       ProtocolEngineFactory factory,
+                       SSLContext sslContext, final Set<TransportEncryption> encryptionSet)
+    {
+        try
+        {
+            _acceptor = new AcceptingThread(config, factory, sslContext);
+            _acceptor.setDaemon(false);
+            _acceptor.start();
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Failed to start AMQP on port : " + config, e);
+        }
+    }
+
+    public int getAcceptingPort()
+    {
+        return _acceptor == null ? -1 : _acceptor.getPort();
+    }
+
+    protected abstract NetworkConnection createNetworkConnection(Socket socket,
+                                                                 Receiver<ByteBuffer> engine,
+                                                                 Integer sendBufferSize,
+                                                                 Integer receiveBufferSize,
+                                                                 int timeout,
+                                                                 IdleTimeoutTicker ticker);
+
+    private class AcceptingThread extends Thread
+    {
+        private volatile boolean _closed = false;
+        private NetworkTransportConfiguration _config;
+        private ProtocolEngineFactory _factory;
+        private SSLContext _sslContext;
+        private ServerSocket _serverSocket;
+        private int _timeout;
+
+        private AcceptingThread(NetworkTransportConfiguration config,
+                                ProtocolEngineFactory factory,
+                                SSLContext sslContext) throws IOException
+        {
+            _config = config;
+            _factory = factory;
+            _sslContext = sslContext;
+            _timeout = TIMEOUT;
+
+            InetSocketAddress address = config.getAddress();
+
+            if(sslContext == null)
+            {
+                _serverSocket = new ServerSocket();
+            }
+            else
+            {
+                SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
+                _serverSocket = socketFactory.createServerSocket();
+
+                SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
+
+                SSLUtil.removeSSLv3Support(sslServerSocket);
+
+                if(config.needClientAuth())
+                {
+                    sslServerSocket.setNeedClientAuth(true);
+                }
+                else if(config.wantClientAuth())
+                {
+                    sslServerSocket.setWantClientAuth(true);
+                }
+
+            }
+
+            _serverSocket.setReuseAddress(true);
+            _serverSocket.bind(address);
+        }
+
+
+        /**
+            Close the underlying ServerSocket if it has not already been closed.
+         */
+        public void close()
+        {
+            LOGGER.debug("Shutting down the Acceptor");
+            _closed = true;
+
+            if (!_serverSocket.isClosed())
+            {
+                try
+                {
+                    _serverSocket.close();
+                }
+                catch (IOException e)
+                {
+                    throw new TransportException(e);
+                }
+            }
+        }
+
+        private int getPort()
+        {
+            return _serverSocket.getLocalPort();
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                while (!_closed)
+                {
+                    Socket socket = null;
+                    try
+                    {
+                        socket = _serverSocket.accept();
+
+                        ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
+
+                        if(engine != null)
+                        {
+                            socket.setTcpNoDelay(_config.getTcpNoDelay());
+                            socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+                            final Integer sendBufferSize = _config.getSendBufferSize();
+                            final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+                            socket.setSendBufferSize(sendBufferSize);
+                            socket.setReceiveBufferSize(receiveBufferSize);
+
+
+                            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+                            NetworkConnection connection =
+                                    createNetworkConnection(socket,
+                                                            engine,
+                                                            sendBufferSize,
+                                                            receiveBufferSize,
+                                                            _timeout,
+                                                            ticker);
+
+                            connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+
+                            ticker.setConnection(connection);
+
+                            engine.setNetworkConnection(connection, connection.getSender());
+
+                            connection.start();
+                        }
+                        else
+                        {
+                            socket.close();
+                        }
+                    }
+                    catch(RuntimeException e)
+                    {
+                        LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+                        closeSocketIfNecessary(socket);
+                    }
+                    catch(IOException e)
+                    {
+                        if(!_closed)
+                        {
+                            LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+                            closeSocketIfNecessary(socket);
+                            try
+                            {
+                                //Delay to avoid tight spinning the loop during issues such as too many open files
+                                Thread.sleep(1000);
+                            }
+                            catch (InterruptedException ie)
+                            {
+                                LOGGER.debug("Stopping acceptor due to interrupt request");
+                                _closed = true;
+                            }
+                        }
+                    }
+                }
+            }
+            finally
+            {
+                if(LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+                                 + _config.getAddress());
+                }
+            }
+        }
+
+        private void closeSocketIfNecessary(final Socket socket)
+        {
+            if(socket != null)
+            {
+                try
+                {
+                    socket.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.debug("Exception while closing socket", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index e5bc9fa..f33f626 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -20,312 +20,25 @@
  */
 package org.apache.qpid.transport.network.io;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketException;
 import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport extends AbstractNetworkTransport
 {
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
-    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
-                                                              CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
-    private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-                                                                  CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
 
 
-    private Socket _socket;
-    private IoNetworkConnection _connection;
-    private AcceptingThread _acceptor;
-
-    public NetworkConnection connect(ConnectionSettings settings,
-                                     Receiver<ByteBuffer> delegate,
-                                     TransportActivity transportActivity)
+    @Override
+    protected IoNetworkConnection createNetworkConnection(final Socket socket,
+                                                       final Receiver<ByteBuffer> engine,
+                                                       final Integer sendBufferSize,
+                                                       final Integer receiveBufferSize,
+                                                       final int timeout,
+                                                       final IdleTimeoutTicker ticker)
     {
-        int sendBufferSize = settings.getWriteBufferSize();
-        int receiveBufferSize = settings.getReadBufferSize();
-
-        try
-        {
-            _socket = new Socket();
-            _socket.setReuseAddress(true);
-            _socket.setTcpNoDelay(settings.isTcpNodelay());
-            _socket.setSendBufferSize(sendBufferSize);
-            _socket.setReceiveBufferSize(receiveBufferSize);
-
-            if(LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
-                LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
-                LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
-            }
-
-            InetAddress address = InetAddress.getByName(settings.getHost());
-
-            _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
-        }
-        catch (SocketException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
-        }
-
-        try
-        {
-            IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
-            ticker.setConnection(_connection);
-            _connection.start();
-        }
-        catch(Exception e)
-        {
-            try
-            {
-                _socket.close();
-            }
-            catch(IOException ioe)
-            {
-                //ignored, throw based on original exception
-            }
-
-            throw new TransportException("Error creating network connection", e);
-        }
-
-        return _connection;
-    }
-
-    public void close()
-    {
-        if(_connection != null)
-        {
-            _connection.close();
-        }
-        if(_acceptor != null)
-        {
-            _acceptor.close();
-        }
-    }
-
-    public NetworkConnection getConnection()
-    {
-        return _connection;
-    }
-
-    public void accept(NetworkTransportConfiguration config,
-                       ProtocolEngineFactory factory,
-                       SSLContext sslContext)
-    {
-        try
-        {
-            _acceptor = new AcceptingThread(config, factory, sslContext);
-            _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
-            _acceptor.setDaemon(false);
-            _acceptor.start();
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Failed to start AMQP on port : " + config, e);
-        }
-    }
-
-    public int getAcceptingPort()
-    {
-        return _acceptor == null ? -1 : _acceptor.getPort();
-    }
-
-    private class AcceptingThread extends Thread
-    {
-        private volatile boolean _closed = false;
-        private NetworkTransportConfiguration _config;
-        private ProtocolEngineFactory _factory;
-        private SSLContext _sslContext;
-        private ServerSocket _serverSocket;
-        private int _timeout;
-
-        private AcceptingThread(NetworkTransportConfiguration config,
-                                ProtocolEngineFactory factory,
-                                SSLContext sslContext) throws IOException
-        {
-            _config = config;
-            _factory = factory;
-            _sslContext = sslContext;
-            _timeout = TIMEOUT;
-
-            InetSocketAddress address = config.getAddress();
-
-            if(sslContext == null)
-            {
-                _serverSocket = new ServerSocket();
-            }
-            else
-            {
-                SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
-                _serverSocket = socketFactory.createServerSocket();
-
-                SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
-                SSLUtil.removeSSLv3Support(sslServerSocket);
-
-                if(config.needClientAuth())
-                {
-                    sslServerSocket.setNeedClientAuth(true);
-                }
-                else if(config.wantClientAuth())
-                {
-                    sslServerSocket.setWantClientAuth(true);
-                }
-
-            }
-
-            _serverSocket.setReuseAddress(true);
-            _serverSocket.bind(address);
-        }
-
-
-        /**
-            Close the underlying ServerSocket if it has not already been closed.
-         */
-        public void close()
-        {
-            LOGGER.debug("Shutting down the Acceptor");
-            _closed = true;
-
-            if (!_serverSocket.isClosed())
-            {
-                try
-                {
-                    _serverSocket.close();
-                }
-                catch (IOException e)
-                {
-                    throw new TransportException(e);
-                }
-            }
-        }
-
-        private int getPort()
-        {
-            return _serverSocket.getLocalPort();
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                while (!_closed)
-                {
-                    Socket socket = null;
-                    try
-                    {
-                        socket = _serverSocket.accept();
-
-                        ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
-                        if(engine != null)
-                        {
-                            socket.setTcpNoDelay(_config.getTcpNoDelay());
-                            socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
-                            final Integer sendBufferSize = _config.getSendBufferSize();
-                            final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
-                            socket.setSendBufferSize(sendBufferSize);
-                            socket.setReceiveBufferSize(receiveBufferSize);
-
-
-                            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-                            NetworkConnection connection =
-                                    new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
-                                                            ticker);
-
-                            connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
-                            ticker.setConnection(connection);
-
-                            engine.setNetworkConnection(connection, connection.getSender());
-
-                            connection.start();
-                        }
-                        else
-                        {
-                            socket.close();
-                        }
-                    }
-                    catch(RuntimeException e)
-                    {
-                        LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                        closeSocketIfNecessary(socket);
-                    }
-                    catch(IOException e)
-                    {
-                        if(!_closed)
-                        {
-                            LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                            closeSocketIfNecessary(socket);
-                            try
-                            {
-                                //Delay to avoid tight spinning the loop during issues such as too many open files
-                                Thread.sleep(1000);
-                            }
-                            catch (InterruptedException ie)
-                            {
-                                LOGGER.debug("Stopping acceptor due to interrupt request");
-                                _closed = true;
-                            }
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                if(LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
-                }
-            }
-        }
-
-        private void closeSocketIfNecessary(final Socket socket)
-        {
-            if(socket != null)
-            {
-                try
-                {
-                    socket.close();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.debug("Exception while closing socket", e);
-                }
-            }
-        }
-
+        return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout,
+                                ticker);
     }
 
 }
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index b52b59a..467115c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -78,7 +78,7 @@
             throw new RuntimeException("Error creating IOReceiver thread",e);
         }
         receiverThread.setDaemon(true);
-        receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+        receiverThread.setName(String.format("IoReceiver-%s", socket.getRemoteSocketAddress()));
     }
 
     public void initiate()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index e06782c..79e9928 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -88,7 +88,7 @@
         }
 
         senderThread.setDaemon(true);
-        senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+        senderThread.setName(String.format("IoSender-%s", _remoteSocketAddress));
     }
 
     public void initiate()
@@ -316,18 +316,6 @@
         }
     }
 
-    public void setIdleTimeout(int i)
-    {
-        try
-        {
-            socket.setSoTimeout(i);
-        }
-        catch (Exception e)
-        {
-            throw new SenderException(e);
-        }
-    }
-
     public void setReceiver(IoReceiver receiver)
     {
         _receiver = receiver;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
new file mode 100644
index 0000000..92cea34
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -0,0 +1,134 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+
+public class NonBlockingConnection implements NetworkConnection
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+    private final SocketChannel _socket;
+    private final long _timeout;
+    private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
+    private int _maxReadIdle;
+    private int _maxWriteIdle;
+    private Principal _principal;
+    private boolean _principalChecked;
+    private final Object _lock = new Object();
+
+    public NonBlockingConnection(SocketChannel socket,
+                                 ServerProtocolEngine delegate,
+                                 int sendBufferSize,
+                                 int receiveBufferSize,
+                                 long timeout,
+                                 Ticker ticker,
+                                 final Set<TransportEncryption> encryptionSet,
+                                 final SSLContext sslContext,
+                                 final boolean wantClientAuth,
+                                 final boolean needClientAuth,
+                                 final Runnable onTransportEncryptionAction)
+    {
+        _socket = socket;
+        _timeout = timeout;
+
+        _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+
+    }
+
+    public void start()
+    {
+        _nonBlockingSenderReceiver.initiate();
+    }
+
+    public Sender<ByteBuffer> getSender()
+    {
+        return _nonBlockingSenderReceiver;
+    }
+
+    public void close()
+    {
+        _nonBlockingSenderReceiver.close();
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return _socket.socket().getRemoteSocketAddress();
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return _socket.socket().getLocalSocketAddress();
+    }
+
+    public void setMaxWriteIdle(int sec)
+    {
+        _maxWriteIdle = sec;
+    }
+
+    public void setMaxReadIdle(int sec)
+    {
+        _maxReadIdle = sec;
+    }
+
+    @Override
+    public Principal getPeerPrincipal()
+    {
+        synchronized (_lock)
+        {
+            if(!_principalChecked)
+            {
+
+                _principal =  _nonBlockingSenderReceiver.getPeerPrincipal();
+
+                _principalChecked = true;
+            }
+
+            return _principal;
+        }
+    }
+
+    @Override
+    public int getMaxReadIdle()
+    {
+        return _maxReadIdle;
+    }
+
+    @Override
+    public int getMaxWriteIdle()
+    {
+        return _maxWriteIdle;
+    }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
new file mode 100644
index 0000000..80ba7a0
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.StandardSocketOptions;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportEncryption;
+
+public class NonBlockingNetworkTransport implements IncomingNetworkTransport
+{
+
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+                                                          CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+    private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+                                                                   CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+    private AcceptingThread _acceptor;
+
+    protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
+                                                            final ServerProtocolEngine engine,
+                                                            final Integer sendBufferSize,
+                                                            final Integer receiveBufferSize,
+                                                            final int timeout,
+                                                            final IdleTimeoutTicker ticker,
+                                                            final Set<TransportEncryption> encryptionSet,
+                                                            final SSLContext sslContext,
+                                                            final boolean wantClientAuth,
+                                                            final boolean needClientAuth,
+                                                            final Runnable onTransportEncryptionAction)
+    {
+        return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker, encryptionSet, sslContext, wantClientAuth, needClientAuth, onTransportEncryptionAction);
+    }
+
+    public void close()
+    {
+        if(_acceptor != null)
+        {
+            _acceptor.close();
+        }
+    }
+
+    public void accept(NetworkTransportConfiguration config,
+                       ProtocolEngineFactory factory,
+                       SSLContext sslContext,
+                       final Set<TransportEncryption> encryptionSet)
+    {
+        try
+        {
+            _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet);
+            _acceptor.setDaemon(false);
+            _acceptor.start();
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Failed to start AMQP on port : " + config, e);
+        }
+    }
+
+    public int getAcceptingPort()
+    {
+        return _acceptor == null ? -1 : _acceptor.getPort();
+    }
+
+    private class AcceptingThread extends Thread
+    {
+        private final Set<TransportEncryption> _encryptionSet;
+        private volatile boolean _closed = false;
+        private final NetworkTransportConfiguration _config;
+        private final ProtocolEngineFactory _factory;
+        private final SSLContext _sslContext;
+        private final ServerSocketChannel _serverSocket;
+        private int _timeout;
+
+        private AcceptingThread(NetworkTransportConfiguration config,
+                                ProtocolEngineFactory factory,
+                                SSLContext sslContext,
+                                final Set<TransportEncryption> encryptionSet) throws IOException
+        {
+            _config = config;
+            _factory = factory;
+            _sslContext = sslContext;
+            _timeout = TIMEOUT;
+
+            InetSocketAddress address = config.getAddress();
+
+            _serverSocket =  ServerSocketChannel.open();
+
+            _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+            _serverSocket.bind(address);
+            _encryptionSet = encryptionSet;
+        }
+
+
+        /**
+         Close the underlying ServerSocket if it has not already been closed.
+         */
+        public void close()
+        {
+            LOGGER.debug("Shutting down the Acceptor");
+            _closed = true;
+
+            if (!_serverSocket.socket().isClosed())
+            {
+                try
+                {
+                    _serverSocket.close();
+                }
+                catch (IOException e)
+                {
+                    throw new TransportException(e);
+                }
+            }
+        }
+
+        private int getPort()
+        {
+            return _serverSocket.socket().getLocalPort();
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                while (!_closed)
+                {
+                    SocketChannel socket = null;
+                    try
+                    {
+                        socket = _serverSocket.accept();
+
+                        final ServerProtocolEngine engine =
+                                (ServerProtocolEngine) _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+
+                        if(engine != null)
+                        {
+                            socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+                            socket.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+                            final Integer sendBufferSize = _config.getSendBufferSize();
+                            final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+                            socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+                            socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+                            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+                            NetworkConnection connection =
+                                    createNetworkConnection(socket,
+                                                            engine,
+                                                            sendBufferSize,
+                                                            receiveBufferSize,
+                                                            _timeout,
+                                                            ticker,
+                                                            _encryptionSet,
+                                                            _sslContext,
+                                                            _config.wantClientAuth(),
+                                                            _config.needClientAuth(),
+                                                            new Runnable()
+                                                            {
+
+                                                                @Override
+                                                                public void run()
+                                                                {
+                                                                    engine.encryptedTransport();
+                                                                }
+                                                            });
+
+                            engine.setNetworkConnection(connection, connection.getSender());
+                            connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+
+                            ticker.setConnection(connection);
+
+                            connection.start();
+                        }
+                        else
+                        {
+                            socket.close();
+                        }
+                    }
+                    catch(RuntimeException e)
+                    {
+                        LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+                        closeSocketIfNecessary(socket.socket());
+                    }
+                    catch(IOException e)
+                    {
+                        if(!_closed)
+                        {
+                            LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+                            closeSocketIfNecessary(socket.socket());
+                            try
+                            {
+                                //Delay to avoid tight spinning the loop during issues such as too many open files
+                                Thread.sleep(1000);
+                            }
+                            catch (InterruptedException ie)
+                            {
+                                LOGGER.debug("Stopping acceptor due to interrupt request");
+                                _closed = true;
+                            }
+                        }
+                    }
+                }
+            }
+            finally
+            {
+                if(LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+                                 + _config.getAddress());
+                }
+            }
+        }
+
+        private void closeSocketIfNecessary(final Socket socket)
+        {
+            if(socket != null)
+            {
+                try
+                {
+                    socket.close();
+                }
+                catch (IOException e)
+                {
+                    LOGGER.debug("Exception while closing socket", e);
+                }
+            }
+        }
+
+    }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
new file mode 100644
index 0000000..acc2b89
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportEncryption;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public class NonBlockingSenderReceiver  implements Runnable, Sender<ByteBuffer>
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
+    public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
+    private final SocketChannel _socketChannel;
+    private final Selector _selector;
+
+    private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+    private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+
+    private final Thread _ioThread;
+    private final String _remoteSocketAddress;
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
+    private final ServerProtocolEngine _receiver;
+    private final int _receiveBufSize;
+    private final Ticker _ticker;
+    private final Set<TransportEncryption> _encryptionSet;
+    private final SSLContext _sslContext;
+    private final Runnable _onTransportEncryptionAction;
+    private ByteBuffer _netInputBuffer;
+    private SSLEngine _sslEngine;
+
+    private ByteBuffer _currentBuffer;
+
+    private TransportEncryption _transportEncryption;
+    private SSLEngineResult _status;
+
+
+    public NonBlockingSenderReceiver(final SocketChannel socketChannel,
+                                     ServerProtocolEngine receiver,
+                                     int receiveBufSize,
+                                     Ticker ticker,
+                                     final Set<TransportEncryption> encryptionSet,
+                                     final SSLContext sslContext,
+                                     final boolean wantClientAuth,
+                                     final boolean needClientAuth,
+                                     final Runnable onTransportEncryptionAction)
+    {
+        _socketChannel = socketChannel;
+        _receiver = receiver;
+        _receiveBufSize = receiveBufSize;
+        _ticker = ticker;
+        _encryptionSet = encryptionSet;
+        _sslContext = sslContext;
+        _onTransportEncryptionAction = onTransportEncryptionAction;
+
+        if(encryptionSet.size() == 1)
+        {
+            _transportEncryption = _encryptionSet.iterator().next();
+            if (_transportEncryption == TransportEncryption.TLS)
+            {
+                onTransportEncryptionAction.run();
+            }
+        }
+
+        if(encryptionSet.contains(TransportEncryption.TLS))
+        {
+            _sslEngine = _sslContext.createSSLEngine();
+            _sslEngine.setUseClientMode(false);
+            SSLUtil.removeSSLv3Support(_sslEngine);
+            if(needClientAuth)
+            {
+                _sslEngine.setNeedClientAuth(true);
+            }
+            else if(wantClientAuth)
+            {
+                _sslEngine.setWantClientAuth(true);
+            }
+            _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
+        }
+
+        try
+        {
+            _remoteSocketAddress = socketChannel.getRemoteAddress().toString();
+            _socketChannel.configureBlocking(false);
+            _selector = Selector.open();
+            _socketChannel.register(_selector, SelectionKey.OP_READ);
+        }
+        catch (IOException e)
+        {
+            throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+        }
+        try
+        {
+            //Create but deliberately don't start the thread.
+            _ioThread = Threading.getThreadFactory().createThread(this);
+        }
+        catch(Exception e)
+        {
+            throw new SenderException("Error creating NonBlockingSenderReceiver thread for " + _remoteSocketAddress, e);
+        }
+
+        _ioThread.setDaemon(true);
+        _ioThread.setName(String.format("NonBlockingSenderReceiver-%s", _remoteSocketAddress));
+
+    }
+
+    public void initiate()
+    {
+        _ioThread.start();
+    }
+
+    @Override
+    public void send(final ByteBuffer msg)
+    {
+        if (_closed.get())
+        {
+            throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed");
+        }
+        // append to list and do selector wakeup
+        _buffers.add(msg);
+        _selector.wakeup();
+    }
+
+    @Override
+    public void run()
+    {
+        LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
+
+
+        while (!_closed.get())
+        {
+
+            try
+            {
+                long currentTime = System.currentTimeMillis();
+                int tick = _ticker.getTimeToNextTick(currentTime);
+                if(tick <= 0)
+                {
+                    tick = _ticker.tick(currentTime);
+                }
+
+                _selector.select(tick <= 0 ? 1 : tick);
+                Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+                selectionKeys.clear();
+
+                _receiver.setTransportBlockedForWriting(!doWrite());
+                doRead();
+                boolean fullyWritten = doWrite();
+                _receiver.setTransportBlockedForWriting(!fullyWritten);
+
+                _socketChannel.register(_selector,
+                                        fullyWritten
+                                                ? SelectionKey.OP_READ
+                                                : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+
+            }
+            catch (IOException e)
+            {
+                LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+                close();
+            }
+        }
+
+        try(Selector selector = _selector; SocketChannel channel = _socketChannel)
+        {
+            while(!doWrite())
+            {
+            }
+
+            _receiver.closed();
+        }
+        catch (IOException e)
+        {
+            LOGGER.info("Exception performing final write/close for thread '" + _remoteSocketAddress + "': " + e);
+        }
+        finally
+        {
+            LOGGER.debug("Shutting down IO thread for " + _remoteSocketAddress);
+        }
+    }
+
+    @Override
+    public void flush()
+    {
+        _selector.wakeup();
+    }
+
+    @Override
+    public void close()
+    {
+        LOGGER.debug("Closing " +  _remoteSocketAddress);
+
+        _closed.set(true);
+        _selector.wakeup();
+
+    }
+
+    private boolean doWrite() throws IOException
+    {
+
+        ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+        Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+        for (int i = 0; i < bufArray.length; i++)
+        {
+            bufArray[i] = bufferIterator.next();
+        }
+
+        int byteBuffersWritten = 0;
+
+        if(_transportEncryption == TransportEncryption.NONE)
+        {
+
+
+            long written = _socketChannel.write(bufArray);
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Written " + written + " bytes");
+            }
+
+            for (ByteBuffer buf : bufArray)
+            {
+                if (buf.remaining() == 0)
+                {
+                    byteBuffersWritten++;
+                    _buffers.poll();
+                }
+            }
+
+
+            return bufArray.length == byteBuffersWritten;
+        }
+        else if(_transportEncryption == TransportEncryption.TLS)
+        {
+            int remaining = 0;
+
+            do
+            {
+                if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
+                {
+                    final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+                    _status = _sslEngine.wrap(bufArray, netBuffer);
+                    runSSLEngineTasks(_status);
+
+                    netBuffer.flip();
+                    remaining = netBuffer.remaining();
+                    if (remaining != 0)
+                    {
+                        _encryptedOutput.add(netBuffer);
+                    }
+                    for (ByteBuffer buf : bufArray)
+                    {
+                        if (buf.remaining() == 0)
+                        {
+                            byteBuffersWritten++;
+                            _buffers.poll();
+                        }
+                    }
+                }
+
+            }
+            while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+
+            ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
+            long written  = _socketChannel.write(encryptedBuffers);
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Written " + written + " encrypted bytes");
+            }
+
+            ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+            while(iter.hasNext())
+            {
+                ByteBuffer buf = iter.next();
+                if(buf.remaining() == 0)
+                {
+                    iter.remove();
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            return bufArray.length == byteBuffersWritten;
+
+        }
+        else
+        {
+            return true;
+        }
+    }
+
+    private void doRead() throws IOException
+    {
+
+        if(_transportEncryption == TransportEncryption.NONE)
+        {
+            int remaining = 0;
+            while (remaining == 0 && !_closed.get())
+            {
+                if (_currentBuffer == null || _currentBuffer.remaining() == 0)
+                {
+                    _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
+                }
+                int read = _socketChannel.read(_currentBuffer);
+                if (read == -1)
+                {
+                    _closed.set(true);
+                }
+                remaining = _currentBuffer.remaining();
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Read " + read + " byte(s)");
+                }
+                ByteBuffer dup = _currentBuffer.duplicate();
+                dup.flip();
+                _currentBuffer = _currentBuffer.slice();
+                _receiver.received(dup);
+            }
+        }
+        else if(_transportEncryption == TransportEncryption.TLS)
+        {
+            int read = 1;
+            while(!_closed.get() && read > 0  && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_WRAP && (_status == null || _status.getStatus() != SSLEngineResult.Status.CLOSED))
+            {
+                read = _socketChannel.read(_netInputBuffer);
+                if (read == -1)
+                {
+                    _closed.set(true);
+                }
+
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+                }
+                _netInputBuffer.flip();
+
+
+                int unwrapped = 0;
+                do
+                {
+                    ByteBuffer appInputBuffer =
+                            ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
+                    _status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
+                    runSSLEngineTasks(_status);
+
+                    appInputBuffer.flip();
+                    unwrapped = appInputBuffer.remaining();
+                    _receiver.received(appInputBuffer);
+                }
+                while(unwrapped > 0);
+
+                _netInputBuffer.compact();
+
+            }
+        }
+        else
+        {
+            int read = 1;
+            while (!_closed.get() && read > 0)
+            {
+
+                read = _socketChannel.read(_netInputBuffer);
+                if (read == -1)
+                {
+                    _closed.set(true);
+                }
+
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Read " + read + " possibly encrypted bytes " + _netInputBuffer);
+                }
+
+                if (_netInputBuffer.position() >= NUMBER_OF_BYTES_FOR_TLS_CHECK)
+                {
+                    _netInputBuffer.flip();
+                    final byte[] headerBytes = new byte[NUMBER_OF_BYTES_FOR_TLS_CHECK];
+                    ByteBuffer dup = _netInputBuffer.duplicate();
+                    dup.get(headerBytes);
+
+                    _transportEncryption =  looksLikeSSL(headerBytes) ? TransportEncryption.TLS : TransportEncryption.NONE;
+                    LOGGER.debug("Identified transport encryption as " + _transportEncryption);
+
+                    if (_transportEncryption == TransportEncryption.NONE)
+                    {
+                        _receiver.received(_netInputBuffer);
+                    }
+                    else
+                    {
+                        _onTransportEncryptionAction.run();
+                        _netInputBuffer.compact();
+                        doRead();
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+    private void runSSLEngineTasks(final SSLEngineResult status)
+    {
+        if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
+        {
+            Runnable task;
+            while((task = _sslEngine.getDelegatedTask()) != null)
+            {
+                task.run();
+            }
+        }
+    }
+
+    private boolean looksLikeSSL(byte[] headerBytes)
+    {
+        return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+    }
+
+    private boolean looksLikeSSLv3ClientHello(byte[] headerBytes)
+    {
+        return headerBytes[0] == 22 && // SSL Handshake
+               (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+                (headerBytes[2] == 0 || // SSL 3.0
+                 headerBytes[2] == 1 || // TLS 1.0
+                 headerBytes[2] == 2 || // TLS 1.1
+                 headerBytes[2] == 3)) && // TLS1.2
+               (headerBytes[5] == 1); // client_hello
+    }
+
+    private boolean looksLikeSSLv2ClientHello(byte[] headerBytes)
+    {
+        return headerBytes[0] == -128 &&
+               headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+               (headerBytes[4] == 0 || // SSL 3.0
+                headerBytes[4] == 1 || // TLS 1.0
+                headerBytes[4] == 2 || // TLS 1.1
+                headerBytes[4] == 3);
+    }
+
+    public Principal getPeerPrincipal()
+    {
+
+        if (_sslEngine != null)
+        {
+            try
+            {
+                return _sslEngine.getSession().getPeerPrincipal();
+            }
+            catch (SSLPeerUnverifiedException e)
+            {
+                return null;
+            }
+        }
+
+        return null;
+    }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 098f2fb..fa1801c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -103,11 +103,6 @@
         }        
     }
 
-    public void setIdleTimeout(int i) 
-    {
-        delegate.setIdleTimeout(i);
-    }
-    
     public void securityLayerEstablished()
     {
         appData = new byte[getSendBuffSize()];
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
index 24f95d7..e69de29 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.security.ssl;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
-
-public class SSLBufferingSender implements Sender<ByteBuffer>
-{
-    private static final Logger log = Logger.get(SSLBufferingSender.class);
-    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
-
-    private final Sender<ByteBuffer> delegate;
-    private final SSLEngine engine;
-    private final int sslBufSize;
-    private final ByteBuffer netData;
-    private final SSLStatus _sslStatus;
-
-    private String _hostname;
-
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
-
-
-    public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
-    {
-        this.engine = engine;
-        this.delegate = delegate;
-        sslBufSize = engine.getSession().getPacketBufferSize();
-        netData = ByteBuffer.allocate(sslBufSize);
-        _sslStatus = sslStatus;
-    }
-
-    public void setHostname(String hostname)
-    {
-        _hostname = hostname;
-    }
-
-    public void close()
-    {
-        if (!closed.getAndSet(true))
-        {
-            if (engine.isOutboundDone())
-            {
-                return;
-            }
-            log.debug("Closing SSL connection");
-            doSend();
-            engine.closeOutbound();
-            try
-            {
-                tearDownSSLConnection();
-            }
-            catch(Exception e)
-            {
-                throw new SenderException("Error closing SSL connection",e);
-            }
-
-
-            synchronized(_sslStatus.getSslLock())
-            {
-                while (!engine.isOutboundDone())
-                {
-                    try
-                    {
-                        _sslStatus.getSslLock().wait();
-                    }
-                    catch(InterruptedException e)
-                    {
-                        // pass
-                    }
-
-                }
-            }
-            delegate.close();
-        }
-    }
-
-    private void tearDownSSLConnection() throws Exception
-    {
-        SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
-        Status status = result.getStatus();
-        int read   = result.bytesProduced();
-        while (status != Status.CLOSED)
-        {
-            if (status == Status.BUFFER_OVERFLOW)
-            {
-                netData.clear();
-            }
-            if(read > 0)
-            {
-                int limit = netData.limit();
-                netData.limit(netData.position());
-                netData.position(netData.position() - read);
-
-                ByteBuffer data = netData.slice();
-
-                netData.limit(limit);
-                netData.position(netData.position() + read);
-
-                delegate.send(data);
-                flush();
-            }
-            result = engine.wrap(ByteBuffer.allocate(0), netData);
-            status = result.getStatus();
-            read   = result.bytesProduced();
-        }
-    }
-
-    public void flush()
-    {
-        delegate.flush();
-    }
-
-    public void send()
-    {
-        if(!closed.get())
-        {
-            doSend();
-        }
-    }
-
-    public synchronized void send(ByteBuffer appData)
-    {
-        boolean buffered;
-        if(buffered = _appData.hasRemaining())
-        {
-            ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
-            newBuf.put(_appData);
-            newBuf.put(appData);
-            newBuf.flip();
-            _appData = newBuf;
-        }
-        if (closed.get())
-        {
-            throw new SenderException("SSL Sender is closed");
-        }
-        doSend();
-        if(!appData.hasRemaining())
-        {
-            _appData = EMPTY_BYTE_BUFFER;
-        }
-        else if(!buffered)
-        {
-            _appData = ByteBuffer.allocate(appData.remaining());
-            _appData.put(appData);
-            _appData.flip();
-        }
-    }
-
-    private synchronized void doSend()
-    {
-
-        HandshakeStatus handshakeStatus;
-        Status status;
-
-        while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
-              && !_sslStatus.getSslErrorFlag())
-        {
-            int read = 0;
-            try
-            {
-                SSLEngineResult result = engine.wrap(_appData, netData);
-                read   = result.bytesProduced();
-                status = result.getStatus();
-                handshakeStatus = result.getHandshakeStatus();
-            }
-            catch(SSLException e)
-            {
-                // Should this set _sslError??
-                throw new SenderException("SSL, Error occurred while encrypting data",e);
-            }
-
-            if(read > 0)
-            {
-                int limit = netData.limit();
-                netData.limit(netData.position());
-                netData.position(netData.position() - read);
-
-                ByteBuffer data = netData.slice();
-
-                netData.limit(limit);
-                netData.position(netData.position() + read);
-
-                delegate.send(data);
-            }
-
-            switch(status)
-            {
-                case CLOSED:
-                    throw new SenderException("SSLEngine is closed");
-
-                case BUFFER_OVERFLOW:
-                    netData.clear();
-                    continue;
-
-                case OK:
-                    break; // do nothing
-
-                default:
-                    throw new IllegalStateException("SSLReceiver: Invalid State " + status);
-            }
-
-            switch (handshakeStatus)
-            {
-                case NEED_WRAP:
-                    if (netData.hasRemaining())
-                    {
-                        continue;
-                    }
-
-                case NEED_TASK:
-                    doTasks();
-                    break;
-
-                case NEED_UNWRAP:
-                    flush();
-                    return;
-
-                case FINISHED:
-                    if (_hostname != null)
-                    {
-                        SSLUtil.verifyHostname(engine, _hostname);
-                    }
-
-                case NOT_HANDSHAKING:
-                    break; //do  nothing
-
-                default:
-                    throw new IllegalStateException("SSLSender: Invalid State " + status);
-            }
-
-        }
-    }
-
-    private void doTasks()
-    {
-        Runnable runnable;
-        while ((runnable = engine.getDelegatedTask()) != null) {
-            runnable.run();
-        }
-    }
-
-    public void setIdleTimeout(int i)
-    {
-        delegate.setIdleTimeout(i);
-    }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 7c61136..7d64012 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -264,8 +264,4 @@
         }
     }
 
-    public void setIdleTimeout(int i)
-    {
-        delegate.setIdleTimeout(i);
-    }
 }
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
index 3071594..865a360 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -22,6 +22,7 @@
 
 
 import java.nio.ByteBuffer;
+import java.util.Set;
 
 import javax.net.ssl.SSLContext;
 
@@ -150,7 +151,9 @@
         }
 
         public void accept(NetworkTransportConfiguration config,
-                           ProtocolEngineFactory factory, SSLContext sslContext)
+                           ProtocolEngineFactory factory,
+                           SSLContext sslContext,
+                           final Set<TransportEncryption> encryptionSet)
         {
             throw new UnsupportedOperationException();
         }
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
index 3025414..cdd5e13 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
@@ -39,7 +39,7 @@
     private Session session;
     private AMQQueue queue;
     private MessageConsumer consumer;
-    private int numMessages;
+    private int _numMessages;
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
 
@@ -86,7 +86,7 @@
     {
         super.setUp();
 
-        numMessages = isBrokerStorePersistent() ? 300 : 1000;
+        _numMessages = isBrokerStorePersistent() ? 300 : 1000;
 
         _logger.info("Create Connection");
         con = getConnection();
@@ -106,30 +106,33 @@
 
         // Setup initial messages
         _logger.info("Creating first producer thread");
-        producerThread = new ASyncProducer(queue, 0, numMessages / 2);
+        producerThread = new ASyncProducer(queue, 0, _numMessages / 2);
         producerThread.start();
         // Wait for them to be done
         producerThread.join();
 
         // Setup second set of messages to produce while we consume
         _logger.info("Creating second producer thread");
-        producerThread = new ASyncProducer(queue, numMessages / 2, numMessages);
+        producerThread = new ASyncProducer(queue, _numMessages / 2, _numMessages);
         producerThread.start();
 
         // Start consuming and checking they're in order
         _logger.info("Consuming messages");
-        for (int i = 0; i < numMessages; i++)
+        for (int i = 0; i < _numMessages; i++)
         {
             Message msg = consumer.receive(3000);
+
+            _logger.debug("KWDEBUG got " + msg);
+
             assertNotNull("Message " + i + " should not be null", msg);
             assertTrue("Message " + i + " should be a text message", msg instanceof TextMessage);
-            assertEquals("Message content " + i + "does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
+            assertEquals("Message content " + i + " does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
         }
     }
 
     protected void tearDown() throws Exception
     {
-        _logger.info("Interuptting producer thread");
+        _logger.info("Interrupting producer thread");
         producerThread.interrupt();
         _logger.info("Closing connection");
         con.close();
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 007772e..9a8d021 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -161,7 +161,7 @@
 
         when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
         MultiVersionProtocolEngineFactory factory =
-            new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
+            new MultiVersionProtocolEngineFactory(_broker, protocols, null, port,
                     org.apache.qpid.server.model.Transport.TCP);
 
         //create a dummy to retrieve the 'current' ID number
@@ -215,7 +215,7 @@
 
         try
         {
-            new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, Protocol.AMQP_0_9, null,
+            new MultiVersionProtocolEngineFactory(_broker, versions, Protocol.AMQP_0_9, null,
                     org.apache.qpid.server.model.Transport.TCP);
             fail("should not have been allowed to create the factory");
         }
@@ -236,10 +236,6 @@
         {
             _sender = new Sender<ByteBuffer>()
             {
-                public void setIdleTimeout(int i)
-                {
-                }
-
                 public void send(ByteBuffer msg)
                 {
                 }
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 3ffa73b..347bf93 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -114,11 +114,6 @@
             _sender = new Sender<ByteBuffer>()
             {
 
-                public void setIdleTimeout(int i)
-                {
-
-                }
-
                 public void send(ByteBuffer msg)
                 {
 
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
index f762038..5928a41 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -313,14 +313,10 @@
                                   _maxFrameSize,
                                   actualHeartbeatInterval);
 
-            int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
             conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
             conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
             conn.setMaxFrameSize(_maxFrameSize);
 
-
-            conn.setIdleTimeout(idleTimeout);
-
             int channelMax = tune.getChannelMax();
             conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
 
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index cafb1d5..b52987f 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -26,3 +26,4 @@
 
 //QPID-4153 Messages causing a runtime selector error should be dead-lettered (or something similar)
 org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+