QPID-7633: Extract interfaces AMQPConnection_0_10/AMQPConnection_1_0 (analogue of AMQPConnection_0_8)

* Mechanical refactoring - extract interfaces AMQPConnection_0_10/AMQPConnection_1_0
* Fix up unit tests
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 13a3077..5323a50 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -32,6 +32,8 @@
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Deletable;
 
@@ -45,6 +47,8 @@
 
     Subject getSubject();
 
+    int getMessageCompressionThreshold();
+
     Principal getAuthorizedPrincipal();
 
     String getRemoteAddressString();
@@ -104,4 +108,7 @@
     void notifyWork(AMQSessionModel<?,?> sessionModel);
 
     boolean isTransportBlockedForWriting();
+
+    long getMaxMessageSize();
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 983e1b4..ea43930 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -372,6 +372,7 @@
         return maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE;
     }
 
+    @Override
     public long getMaxMessageSize()
     {
         return _maxMessageSize.get();
@@ -818,6 +819,7 @@
         logConnectionOpen();
     }
 
+    @Override
     public int getMessageCompressionThreshold()
     {
         return _messageCompressionThreshold;
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
old mode 100755
new mode 100644
index 81dd598..5ec8c7b
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,345 +17,44 @@
  * under the License.
  *
  */
+
 package org.apache.qpid.server.protocol.v0_10;
 
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.security.AccessControlContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.ContextProvider;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.Constant;
 
-
-public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
+@ManagedObject(category = false, creatable = false, type="AMQP_0_10")
+public interface AMQPConnection_0_10<C extends AMQPConnection_0_10<C>> extends AMQPConnection<C>,
+                                                                             ProtocolEngine,
+                                                                             EventLoggerProvider
 {
-    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
-    private final ServerInputHandler _inputHandler;
+    // 0-10's current implementation (ServerConnection etc) means we have to break the encapsulation
 
-    private final ServerConnection _connection;
+    void initialiseHeartbeating(long writerIdle, long readerIdle);
 
-    private volatile boolean _transportBlockedForWriting;
+    void setClientId(String clientId);
 
-    private final AtomicBoolean _stateChanged = new AtomicBoolean();
-    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
-    private ServerDisassembler _disassembler;
+    void setClientProduct(String clientProduct);
 
-    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
+    void setClientVersion(String clientVersion);
 
+    void setRemoteProcessPid(String remoteProcessPid);
 
-    public AMQPConnection_0_10(final Broker<?> broker,
-                               ServerNetworkConnection network,
-                               final AmqpPort<?> port,
-                               final Transport transport,
-                               final long id,
-                               final AggregateTicker aggregateTicker)
-    {
-        super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
+    void setSubject(Subject authorizedSubject);
 
-        _connection = new ServerConnection(id, broker, port, transport, this);
+    void setAddressSpace(NamedAddressSpace addressSpace);
 
-        SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
-        ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
+    ContextProvider getContextProvider();
 
-        _connection.setConnectionDelegate(connDelegate);
-        _connection.setRemoteAddress(network.getRemoteAddress());
-        _connection.setLocalAddress(network.getLocalAddress());
+    AccessControlContext getAccessControllerContext();
 
-        _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
-        _connection.addFrameSizeObserver(_inputHandler);
-
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                _connection.setNetworkConnection(getNetwork());
-                _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
-                _connection.setSender(_disassembler);
-                _connection.addFrameSizeObserver(_disassembler);
-                return null;
-            }
-        }, getAccessControllerContext());
-    }
-
-    private ByteBufferSender wrapSender(final ByteBufferSender sender)
-    {
-        return new ByteBufferSender()
-        {
-            @Override
-            public boolean isDirectBufferPreferred()
-            {
-                return sender.isDirectBufferPreferred();
-            }
-
-            @Override
-            public void send(final QpidByteBuffer msg)
-            {
-                updateLastWriteTime();
-                sender.send(msg);
-            }
-
-            @Override
-            public void flush()
-            {
-                sender.flush();
-
-            }
-
-            @Override
-            public void close()
-            {
-                sender.close();
-            }
-        };
-    }
-
-    public void received(final QpidByteBuffer buf)
-    {
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                updateLastReadTime();
-                try
-                {
-                    _inputHandler.received(buf);
-                    _connection.receivedComplete();
-                }
-                catch (IllegalArgumentException | IllegalStateException e)
-                {
-                    throw new ConnectionScopedRuntimeException(e);
-                }
-                catch (StoreException e)
-                {
-                    if (getAddressSpace().isActive())
-                    {
-                        throw new ServerScopedRuntimeException(e);
-                    }
-                    else
-                    {
-                        throw new ConnectionScopedRuntimeException(e);
-                    }
-                }
-                return null;
-            }
-        }, getAccessControllerContext());
-    }
-
-    @Override
-    public void encryptedTransport()
-    {
-    }
-
-    public void writerIdle()
-    {
-        _connection.doHeartBeat();
-    }
-
-    public void readerIdle()
-    {
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
-                getNetwork().close();
-                return null;
-            }
-        }, getAccessControllerContext());
-
-    }
-
-    public String getAddress()
-    {
-        return getNetwork().getRemoteAddress().toString();
-    }
-
-    @Override
-    public void closed()
-    {
-        try
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Void>()
-            {
-                @Override
-                public Void run()
-                {
-                    _inputHandler.closed();
-                    if(_disassembler != null)
-                    {
-                        _disassembler.closed();
-                    }
-                    return null;
-                }
-            }, getAccessControllerContext());
-        }
-        finally
-        {
-            markTransportClosed();
-        }
-    }
-
-    @Override
-    public boolean isTransportBlockedForWriting()
-    {
-        return _transportBlockedForWriting;
-    }
-
-    @Override
-    public void setTransportBlockedForWriting(final boolean blocked)
-    {
-        if(_transportBlockedForWriting != blocked)
-        {
-            _transportBlockedForWriting = blocked;
-            _connection.transportStateChanged();
-        }
-    }
-
-    @Override
-    public Iterator<Runnable> processPendingIterator()
-    {
-        if (isIOThread())
-        {
-            return _connection.processPendingIterator(_sessionsWithWork);
-        }
-        else
-        {
-            return Collections.emptyIterator();
-        }
-    }
-
-    @Override
-    public boolean hasWork()
-    {
-        return _stateChanged.get();
-    }
-
-    @Override
-    public void notifyWork()
-    {
-        _stateChanged.set(true);
-
-        final Action<ProtocolEngine> listener = _workListener.get();
-        if(listener != null)
-        {
-            listener.performAction(this);
-        }
-    }
-
-    @Override
-    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
-    {
-        _sessionsWithWork.add(sessionModel);
-        notifyWork();
-    }
-
-    public void clearWork()
-    {
-        _stateChanged.set(false);
-    }
-
-    public void setWorkListener(final Action<ProtocolEngine> listener)
-    {
-        _workListener.set(listener);
-    }
-
-    public boolean hasSessionWithName(final byte[] name)
-    {
-        return _connection.hasSessionWithName(name);
-    }
-
-    @Override
-    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
-    {
-        stopConnection();
-        // Best mapping for all reasons is "forced"
-        _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);
-
-    }
-
-    @Override
-    public void closeSessionAsync(final AMQSessionModel<?,?> session,
-                                  final CloseReason reason, final String message)
-    {
-        ServerSession s = ((Session_0_10)session).getServerSession();
-        _connection.closeSessionAsync(s, reason, message);
-    }
-
-    @Override
-    protected void addAsyncTask(final Action<? super ServerConnection> action)
-    {
-        _connection.addAsyncTask(action);
-    }
-
-    public void block()
-    {
-        _connection.block();
-    }
-
-    public String getRemoteContainerName()
-    {
-        return _connection.getRemoteContainerName();
-    }
-
-    public Collection<? extends Session_0_10> getSessionModels()
-    {
-        final Collection<org.apache.qpid.server.model.Session> sessions =
-                getChildren(org.apache.qpid.server.model.Session.class);
-        final Collection<? extends Session_0_10> session_0_10s = new ArrayList<>((Collection)sessions);
-        return session_0_10s;
-    }
-
-    public void unblock()
-    {
-        _connection.unblock();
-    }
-
-    public long getSessionCountLimit()
-    {
-        return _connection.getSessionCountLimit();
-    }
-
-    @Override
-    protected boolean isOrderlyClose()
-    {
-        return !_connection.isConnectionLost();
-    }
-
-    @Override
-    public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
-    {
-        super.initialiseHeartbeating(writerDelay, readerDelay);
-    }
+    void performDeleteTasks();
 }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
new file mode 100755
index 0000000..581c50e
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -0,0 +1,364 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Constant;
+
+
+public class AMQPConnection_0_10Impl extends AbstractAMQPConnection<AMQPConnection_0_10Impl, ServerConnection>
+        implements
+        AMQPConnection_0_10<AMQPConnection_0_10Impl>
+{
+    private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10Impl.class);
+    private final ServerInputHandler _inputHandler;
+
+    private final ServerConnection _connection;
+
+    private volatile boolean _transportBlockedForWriting;
+
+    private final AtomicBoolean _stateChanged = new AtomicBoolean();
+    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+    private ServerDisassembler _disassembler;
+
+    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
+
+
+    public AMQPConnection_0_10Impl(final Broker<?> broker,
+                                   ServerNetworkConnection network,
+                                   final AmqpPort<?> port,
+                                   final Transport transport,
+                                   final long id,
+                                   final AggregateTicker aggregateTicker)
+    {
+        super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
+
+        _connection = new ServerConnection(id, broker, port, transport, this);
+
+        SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+        ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, subjectCreator);
+
+        _connection.setConnectionDelegate(connDelegate);
+        _connection.setRemoteAddress(network.getRemoteAddress());
+        _connection.setLocalAddress(network.getLocalAddress());
+
+        _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
+        _connection.addFrameSizeObserver(_inputHandler);
+
+        AccessController.doPrivileged(new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                _connection.setNetworkConnection(getNetwork());
+                _disassembler = new ServerDisassembler(wrapSender(getNetwork().getSender()), Constant.MIN_MAX_FRAME_SIZE);
+                _connection.setSender(_disassembler);
+                _connection.addFrameSizeObserver(_disassembler);
+                return null;
+            }
+        }, getAccessControllerContext());
+    }
+
+    private ByteBufferSender wrapSender(final ByteBufferSender sender)
+    {
+        return new ByteBufferSender()
+        {
+            @Override
+            public boolean isDirectBufferPreferred()
+            {
+                return sender.isDirectBufferPreferred();
+            }
+
+            @Override
+            public void send(final QpidByteBuffer msg)
+            {
+                updateLastWriteTime();
+                sender.send(msg);
+            }
+
+            @Override
+            public void flush()
+            {
+                sender.flush();
+
+            }
+
+            @Override
+            public void close()
+            {
+                sender.close();
+            }
+        };
+    }
+
+    public void received(final QpidByteBuffer buf)
+    {
+        AccessController.doPrivileged(new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                updateLastReadTime();
+                try
+                {
+                    _inputHandler.received(buf);
+                    _connection.receivedComplete();
+                }
+                catch (IllegalArgumentException | IllegalStateException e)
+                {
+                    throw new ConnectionScopedRuntimeException(e);
+                }
+                catch (StoreException e)
+                {
+                    if (getAddressSpace().isActive())
+                    {
+                        throw new ServerScopedRuntimeException(e);
+                    }
+                    else
+                    {
+                        throw new ConnectionScopedRuntimeException(e);
+                    }
+                }
+                return null;
+            }
+        }, getAccessControllerContext());
+    }
+
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
+    public void writerIdle()
+    {
+        _connection.doHeartBeat();
+    }
+
+    public void readerIdle()
+    {
+        AccessController.doPrivileged(new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection state: " + _connection.getConnectionDelegate().getState(), true));
+                getNetwork().close();
+                return null;
+            }
+        }, getAccessControllerContext());
+
+    }
+
+    public String getAddress()
+    {
+        return getNetwork().getRemoteAddress().toString();
+    }
+
+    @Override
+    public void closed()
+    {
+        try
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Void>()
+            {
+                @Override
+                public Void run()
+                {
+                    _inputHandler.closed();
+                    if(_disassembler != null)
+                    {
+                        _disassembler.closed();
+                    }
+                    return null;
+                }
+            }, getAccessControllerContext());
+        }
+        finally
+        {
+            markTransportClosed();
+        }
+    }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        if(_transportBlockedForWriting != blocked)
+        {
+            _transportBlockedForWriting = blocked;
+            _connection.transportStateChanged();
+        }
+    }
+
+    @Override
+    public Iterator<Runnable> processPendingIterator()
+    {
+        if (isIOThread())
+        {
+            return _connection.processPendingIterator(_sessionsWithWork);
+        }
+        else
+        {
+            return Collections.emptyIterator();
+        }
+    }
+
+    @Override
+    public boolean hasWork()
+    {
+        return _stateChanged.get();
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _stateChanged.set(true);
+
+        final Action<ProtocolEngine> listener = _workListener.get();
+        if(listener != null)
+        {
+            listener.performAction(this);
+        }
+    }
+
+    @Override
+    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
+    {
+        _sessionsWithWork.add(sessionModel);
+        notifyWork();
+    }
+
+    public void clearWork()
+    {
+        _stateChanged.set(false);
+    }
+
+    public void setWorkListener(final Action<ProtocolEngine> listener)
+    {
+        _workListener.set(listener);
+    }
+
+    public boolean hasSessionWithName(final byte[] name)
+    {
+        return _connection.hasSessionWithName(name);
+    }
+
+    @Override
+    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
+    {
+        stopConnection();
+        // Best mapping for all reasons is "forced"
+        _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);
+
+    }
+
+    @Override
+    public void closeSessionAsync(final AMQSessionModel<?,?> session,
+                                  final CloseReason reason, final String message)
+    {
+        ServerSession s = ((Session_0_10)session).getServerSession();
+        _connection.closeSessionAsync(s, reason, message);
+    }
+
+    @Override
+    protected void addAsyncTask(final Action<? super ServerConnection> action)
+    {
+        _connection.addAsyncTask(action);
+    }
+
+    public void block()
+    {
+        _connection.block();
+    }
+
+    public String getRemoteContainerName()
+    {
+        return _connection.getRemoteContainerName();
+    }
+
+    public Collection<? extends Session_0_10> getSessionModels()
+    {
+        final Collection<org.apache.qpid.server.model.Session> sessions =
+                getChildren(org.apache.qpid.server.model.Session.class);
+        final Collection<? extends Session_0_10> session_0_10s = new ArrayList<>((Collection)sessions);
+        return session_0_10s;
+    }
+
+    public void unblock()
+    {
+        _connection.unblock();
+    }
+
+    public long getSessionCountLimit()
+    {
+        return _connection.getSessionCountLimit();
+    }
+
+    @Override
+    protected boolean isOrderlyClose()
+    {
+        return !_connection.isConnectionLost();
+    }
+
+    @Override
+    public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
+    {
+        super.initialiseHeartbeating(writerDelay, readerDelay);
+    }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index a1cae9d..9818e48 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -68,8 +68,8 @@
                                             long id, final AggregateTicker aggregateTicker)
     {
 
-        final AMQPConnection_0_10 protocolEngine_0_10 =
-                new AMQPConnection_0_10(broker, network, port, transport, id, aggregateTicker);
+        final AMQPConnection_0_10Impl protocolEngine_0_10 =
+                new AMQPConnection_0_10Impl(broker, network, port, transport, id, aggregateTicker);
         protocolEngine_0_10.create();
         return protocolEngine_0_10;
     }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index a3bc5d9..0736826 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -45,6 +45,7 @@
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
+import org.apache.qpid.server.security.auth.sasl.SaslSettings;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -479,7 +480,8 @@
             return;
         }
 
-        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, serverConnection.getAmqpConnection());
+        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism,
+                                                               (SaslSettings) serverConnection.getAmqpConnection());
         if (_saslNegotiator == null)
         {
             serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 3717211..db70dd8 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -66,7 +66,6 @@
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 76d2fe1..5c68d58 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -27,18 +27,19 @@
 
 import javax.security.auth.Subject;
 
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.model.BrokerTestHelper;
-import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.ExecutionErrorCode;
@@ -50,12 +51,15 @@
 {
 
     private VirtualHost<?> _virtualHost;
+    private CurrentThreadTaskExecutor _taskExecutor;
 
     @Override
     public void setUp() throws Exception
     {
         super.setUp();
         BrokerTestHelper.setUp();
+        _taskExecutor = new CurrentThreadTaskExecutor();
+        _taskExecutor.start();
         _virtualHost = BrokerTestHelper.createVirtualHost(getName());
     }
 
@@ -71,30 +75,37 @@
         }
         finally
         {
-            BrokerTestHelper.tearDown();
-            super.tearDown();
+            try
+            {
+                if (_taskExecutor != null)
+                {
+                    _taskExecutor.stop();
+                }
+            }
+            finally
+            {
+                BrokerTestHelper.tearDown();
+                super.tearDown();
+            }
         }
     }
 
     public void testOverlargeMessageTest() throws Exception
     {
-        if (true) return;
-
-        TaskExecutor taskExecutor = mock(TaskExecutor.class);
-
         final Broker<?> broker = mock(Broker.class);
         when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
 
         AmqpPort port = createMockPort();
 
-        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class); // TODO needs to be an interface
+        final AMQPConnection_0_10 modelConnection = mock(AMQPConnection_0_10.class);
         when(modelConnection.getAddressSpace()).thenReturn(_virtualHost);
         when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
         when(modelConnection.getBroker()).thenReturn((Broker)broker);
         when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
         when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
-        when(modelConnection.getChildExecutor()).thenReturn(taskExecutor);
+        when(modelConnection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        when(modelConnection.getChildExecutor()).thenReturn(_taskExecutor);
         when(modelConnection.getModel()).thenReturn(BrokerModel.getInstance());
 
         Subject subject = new Subject();
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
index 54f7333..37f53ad 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
@@ -69,8 +69,6 @@
 
     int getBinaryDataLimit();
 
-    long getMaxMessageSize();
-
     boolean ignoreAllButCloseOk();
 
     boolean channelAwaitingClosure(int channelId);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index d109ce4..109886d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,1707 +17,43 @@
  * under the License.
  *
  */
+
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.Principal;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.AuthenticationProvider;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
-import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
-import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.util.Functions;
 
-public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0, ConnectionHandler>
-        implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
-                   ValueWriter.Registry.Source,
-                   SASLEndpoint,
-                   ConnectionHandler
+@ManagedObject(category = false, creatable = false, type="AMQP_1_0")
+public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQPConnection<C>,
+                                                                             ProtocolEngine, EventLoggerProvider
 {
+    Object getReference();
 
-    private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
-    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.protocol.frame");
+    String getRemoteContainerId();
 
-    private final AtomicBoolean _stateChanged = new AtomicBoolean();
-    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+    SectionDecoderRegistry getSectionDecoderRegistry();
 
+    AMQPDescribedTypeRegistry getDescribedTypeRegistry();
 
-    private static final byte[] SASL_HEADER = new byte[]
-            {
-                    (byte) 'A',
-                    (byte) 'M',
-                    (byte) 'Q',
-                    (byte) 'P',
-                    (byte) 3,
-                    (byte) 1,
-                    (byte) 0,
-                    (byte) 0
-            };
+    int sendFrame(short channel, FrameBody body, List<QpidByteBuffer> payload);
 
-    private static final byte[] AMQP_HEADER = new byte[]
-            {
-                    (byte) 'A',
-                    (byte) 'M',
-                    (byte) 'Q',
-                    (byte) 'P',
-                    (byte) 0,
-                    (byte) 1,
-                    (byte) 0,
-                    (byte) 0
-            };
+    void sendFrame(short channel, FrameBody body);
 
-    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
-    public static final Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
+    void sendEnd(short sendChannel, End end, boolean b);
 
-    private FrameWriter _frameWriter;
-    private ProtocolHandler _frameHandler;
-    private volatile boolean _transportBlockedForWriting;
-    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
-    private boolean _blocking;
-    private final Object _blockingLock = new Object();
-    private List<Symbol> _offeredCapabilities;
+    void sessionEnded(Session_1_0 session_1_0);
 
-    private enum FrameReceivingState
-    {
-        AMQP_OR_SASL_HEADER,
-        SASL_INIT_ONLY,
-        SASL_RESPONSE_ONLY,
-        AMQP_HEADER,
-        OPEN_ONLY,
-        ANY_FRAME,
-        CLOSED
-    }
+    boolean isClosed();
 
-    private volatile FrameReceivingState _frameReceivingState = FrameReceivingState.AMQP_OR_SASL_HEADER;
-
-    private static final short CONNECTION_CONTROL_CHANNEL = (short) 0;
-    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
-
-    private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
-
-    private AmqpPort<?> _port;
-    private SubjectCreator _subjectCreator;
-
-    private int _channelMax = DEFAULT_CHANNEL_MAX;
-    private int _maxFrameSize = 4096;
-    private String _remoteContainerId;
-
-    private SocketAddress _remoteAddress;
-
-    // positioned by the *outgoing* channel
-    private Session_1_0[] _sendingSessions;
-
-    // positioned by the *incoming* channel
-    private Session_1_0[] _receivingSessions;
-    private boolean _closedForInput;
-    private boolean _closedForOutput;
-
-    private long _idleTimeout;
-
-    private ConnectionState _connectionState = ConnectionState.UNOPENED;
-
-    private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
-                                                                                        .registerTransportLayer()
-                                                                                        .registerMessagingLayer()
-                                                                                        .registerTransactionLayer()
-                                                                                        .registerSecurityLayer();
-
-
-    private Map _properties;
-    private boolean _saslComplete;
-
-    private SaslNegotiator _saslNegotiator;
-    private String _localHostname;
-    private long _desiredIdleTimeout;
-
-    private Error _remoteError;
-
-    private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
-
-    private Map _remoteProperties;
-
-    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
-
-    private final Collection<Session_1_0>
-            _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
-
-    private final Object _reference = new Object();
-
-    private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
-            new ConcurrentLinkedQueue<>();
-
-    private boolean _closedOnOpen;
-
-    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
-
-    AMQPConnection_1_0(final Broker<?> broker,
-                       final ServerNetworkConnection network,
-                       AmqpPort<?> port,
-                       Transport transport,
-                       long id,
-                       final AggregateTicker aggregateTicker)
-    {
-        super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
-
-        _subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
-
-        _port = port;
-
-        Map<Symbol, Object> serverProperties = new LinkedHashMap<>();
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
-
-        setProperties(serverProperties);
-
-        List<Symbol> offeredCapabilities = new ArrayList<>();
-        offeredCapabilities.add(ANONYMOUS_RELAY);
-        offeredCapabilities.add(SHARED_SUBSCRIPTIONS);
-
-        setOfferedCapabilities(offeredCapabilities);
-
-        setRemoteAddress(network.getRemoteAddress());
-
-        setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
-
-        _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
-    }
-
-
-    private void setUserPrincipal(final Principal user)
-    {
-        setSubject(_subjectCreator.createSubjectWithGroups(user));
-    }
-
-    private long getDesiredIdleTimeout()
-    {
-        return _desiredIdleTimeout;
-    }
-
-    public void receiveAttach(final short channel, final Attach attach)
-    {
-        assertState(FrameReceivingState.ANY_FRAME);
-        final Session_1_0 session = getSession(channel);
-        if (session != null)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveAttach(attach);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-        else
-        {
-            closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
-        }
-    }
-
-    public void receive(final short channel, final Object frame)
-    {
-        FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
-        if (frame instanceof FrameBody)
-        {
-            ((FrameBody) frame).invoke(channel, this);
-        }
-        else if (frame instanceof SaslFrameBody)
-        {
-            ((SaslFrameBody) frame).invoke(channel, this);
-        }
-    }
-
-    private void closeSaslWithFailure()
-    {
-        _saslComplete = true;
-        disposeSaslNegotiator();
-        _frameReceivingState = FrameReceivingState.CLOSED;
-        setClosedForInput(true);
-        addCloseTicker();
-    }
-
-    private void disposeSaslNegotiator()
-    {
-        _saslNegotiator.dispose();
-        _saslNegotiator = null;
-    }
-
-    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
-    {
-        LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
-        closeSaslWithFailure();
-    }
-
-    @Override
-    public void receiveClose(final short channel, final Close close)
-    {
-        assertState(FrameReceivingState.ANY_FRAME);
-        _frameReceivingState = FrameReceivingState.CLOSED;
-        setClosedForInput(true);
-        closeReceived();
-        switch (_connectionState)
-        {
-            case UNOPENED:
-            case AWAITING_OPEN:
-                closeConnection(ConnectionError.CONNECTION_FORCED,
-                                "Connection close sent before connection was opened");
-                break;
-            case OPEN:
-                _connectionState = ConnectionState.CLOSE_RECEIVED;
-                if(close.getError() != null)
-                {
-                    final Error error = close.getError();
-                    ErrorCondition condition = error.getCondition();
-                    Symbol errorCondition = condition == null ? null : condition.getValue();
-                    LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
-                                errorCondition, close.getError().getDescription());
-                }
-                sendClose(new Close());
-                _connectionState = ConnectionState.CLOSED;
-                _orderlyClose.set(true);
-                addCloseTicker();
-                break;
-            case CLOSE_SENT:
-                _connectionState = ConnectionState.CLOSED;
-                _orderlyClose.set(true);
-
-            default:
-        }
-        _remoteError = close.getError();
-    }
-
-    private void closeReceived()
-    {
-        Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
-
-        for (final Session_1_0 session : sessions)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.remoteEnd(new End());
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-    }
-
-    private void setClosedForInput(final boolean closed)
-    {
-        _closedForInput = closed;
-    }
-
-    public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
-    {
-        LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
-        closeSaslWithFailure();
-    }
-
-    public void receiveSaslResponse(final SaslResponse saslResponse)
-    {
-        final Binary responseBinary = saslResponse.getResponse();
-        byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
-
-        assertState(FrameReceivingState.SASL_RESPONSE_ONLY);
-
-        processSaslResponse(response);
-    }
-
-    public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
-    {
-        return _describedTypeRegistry;
-    }
-
-    public SectionDecoderRegistry getSectionDecoderRegistry()
-    {
-        return _describedTypeRegistry.getSectionDecoderRegistry();
-    }
-
-
-
-
-    private boolean closedForOutput()
-    {
-        return _closedForOutput;
-    }
-
-    public boolean isClosed()
-    {
-        return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED;
-    }
-
-    public boolean closedForInput()
-    {
-        return _closedForInput;
-    }
-
-    void sessionEnded(final Session_1_0 session)
-    {
-        if (!_closedOnOpen)
-        {
-            _sessions.remove(session);
-        }
-    }
-
-    private void inputClosed()
-    {
-        if (!_closedForInput)
-        {
-            _closedForInput = true;
-            FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
-            switch (_connectionState)
-            {
-                case UNOPENED:
-                case AWAITING_OPEN:
-                case CLOSE_SENT:
-                    _connectionState = ConnectionState.CLOSED;
-                    closeSender();
-                    break;
-                case OPEN:
-                    _connectionState = ConnectionState.CLOSE_RECEIVED;
-                case CLOSED:
-                    // already sent our close - too late to do anything more
-                    break;
-                default:
-            }
-            closeReceived();
-        }
-    }
-
-    private void closeSender()
-    {
-        setClosedForOutput(true);
-    }
-
-    String getRemoteContainerId()
-    {
-        return _remoteContainerId;
-    }
-
-    private void setDesiredIdleTimeout(final long desiredIdleTimeout)
-    {
-        _desiredIdleTimeout = desiredIdleTimeout;
-    }
-
-    public boolean isOpen()
-    {
-        return _connectionState == ConnectionState.OPEN;
-    }
-
-    void sendEnd(final short channel, final End end, final boolean remove)
-    {
-        sendFrame(channel, end);
-        if (remove)
-        {
-            _sendingSessions[channel] = null;
-        }
-    }
-
-    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
-    {
-        LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
-        closeSaslWithFailure();
-    }
-
-    public void receiveEnd(final short channel, final End end)
-    {
-
-        assertState(FrameReceivingState.ANY_FRAME);
-        final Session_1_0 session = getSession(channel);
-        if (session != null)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    _receivingSessions[channel] = null;
-
-                    session.receiveEnd(end);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-        else
-        {
-            closeConnectionWithInvalidChannel(channel, end);
-        }
-    }
-
-    private void closeConnectionWithInvalidChannel(final short channel, final FrameBody frame)
-    {
-        closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
-    }
-
-    public void receiveDisposition(final short channel,
-                                   final Disposition disposition)
-    {
-        assertState(FrameReceivingState.ANY_FRAME);
-        final Session_1_0 session = getSession(channel);
-        if (session != null)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveDisposition(disposition);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-        else
-        {
-            closeConnectionWithInvalidChannel(channel, disposition);
-        }
-
-    }
-
-    public void receiveBegin(final short channel, final Begin begin)
-    {
-
-        assertState(FrameReceivingState.ANY_FRAME);
-        short myChannelId;
-        if (begin.getRemoteChannel() != null)
-        {
-            closeConnection(ConnectionError.FRAMING_ERROR,
-                            "BEGIN received on channel "
-                            + channel
-                            + " with given remote-channel "
-                            + begin.getRemoteChannel()
-                            + ". Since the broker does not spontaneously start channels, this must be an error.");
-
-        }
-        else // Peer requesting session creation
-        {
-
-            if (_receivingSessions[channel] == null)
-            {
-                myChannelId = getFirstFreeChannel();
-                if (myChannelId == -1)
-                {
-
-                    closeConnection(ConnectionError.FRAMING_ERROR,
-                                    "BEGIN received on channel "
-                                    + channel
-                                    + ". There are no free channels for the broker to respond on.");
-
-                }
-                Session_1_0 session = new Session_1_0(this, begin, myChannelId);
-                session.create();
-
-                _receivingSessions[channel] = session;
-                _sendingSessions[myChannelId] = session;
-
-                Begin beginToSend = new Begin();
-
-                session.setReceivingChannel(channel);
-                session.setSendingChannel(myChannelId);
-                beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
-                beginToSend.setNextOutgoingId(session.getNextOutgoingId());
-                beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
-                beginToSend.setIncomingWindow(session.getIncomingWindowSize());
-                sendFrame(myChannelId, beginToSend);
-
-                _sessions.add(session);
-            }
-            else
-            {
-                closeConnection(ConnectionError.FRAMING_ERROR,
-                                "BEGIN received on channel " + channel + " which is already in use.");
-            }
-
-        }
-
-    }
-
-    private short getFirstFreeChannel()
-    {
-        for (int i = 0; i <= _channelMax; i++)
-        {
-            if (_sendingSessions[i] == null)
-            {
-                return (short) i;
-            }
-        }
-        return -1;
-    }
-
-    public void handleError(final Error error)
-    {
-        if (!closedForOutput())
-        {
-            Close close = new Close();
-            close.setError(error);
-            sendFrame((short) 0, close);
-
-            setClosedForOutput(true);
-        }
-
-    }
-
-    public void receiveTransfer(final short channel, final Transfer transfer)
-    {
-        assertState(FrameReceivingState.ANY_FRAME);
-        final Session_1_0 session = getSession(channel);
-        if (session != null)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveTransfer(transfer);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-        else
-        {
-            closeConnectionWithInvalidChannel(channel, transfer);
-        }
-    }
-
-    public void receiveFlow(final short channel, final Flow flow)
-    {
-        assertState(FrameReceivingState.ANY_FRAME);
-        final Session_1_0 session = getSession(channel);
-        if (session != null)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveFlow(flow);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-        else
-        {
-            closeConnectionWithInvalidChannel(channel, flow);
-        }
-
-    }
-
-    public void receiveOpen(final short channel, final Open open)
-    {
-        assertState(FrameReceivingState.OPEN_ONLY);
-        _frameReceivingState = FrameReceivingState.ANY_FRAME;
-        _channelMax = open.getChannelMax() == null ? _channelMax
-                : open.getChannelMax().intValue() < _channelMax
-                        ? open.getChannelMax().intValue()
-                        : _channelMax;
-        if (_receivingSessions == null)
-        {
-            _receivingSessions = new Session_1_0[_channelMax + 1];
-            _sendingSessions = new Session_1_0[_channelMax + 1];
-        }
-        _maxFrameSize = open.getMaxFrameSize() == null
-                ? getBroker().getNetworkBufferSize()
-                : Math.min(open.getMaxFrameSize().intValue(), getBroker().getNetworkBufferSize());
-        _remoteContainerId = open.getContainerId();
-        _localHostname = open.getHostname();
-        if (open.getIdleTimeOut() != null)
-        {
-            _idleTimeout = open.getIdleTimeOut().longValue();
-        }
-        _remoteProperties = open.getProperties();
-        if (_remoteProperties != null)
-        {
-            if (_remoteProperties.containsKey(Symbol.valueOf("product")))
-            {
-                setClientProduct(_remoteProperties.get(Symbol.valueOf("product")).toString());
-            }
-            if (_remoteProperties.containsKey(Symbol.valueOf("version")))
-            {
-                setClientVersion(_remoteProperties.get(Symbol.valueOf("version")).toString());
-            }
-            setClientId(_remoteContainerId);
-        }
-        if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
-        {
-            closeConnection(ConnectionError.CONNECTION_FORCED,
-                            "Requested idle timeout of "
-                            + _idleTimeout
-                            + " is too low. The minimum supported timeout is"
-                            + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
-            _closedOnOpen = true;
-        }
-        else
-        {
-            long desiredIdleTimeout = getDesiredIdleTimeout();
-            initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout);
-            final NamedAddressSpace addressSpace = ((AmqpPort) _port).getAddressSpace(_localHostname);
-            if (addressSpace == null)
-            {
-                closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
-            }
-            else
-            {
-                if (!addressSpace.isActive())
-                {
-                    _closedOnOpen = true;
-
-                    final Error err = new Error();
-                    err.setCondition(AmqpError.NOT_FOUND);
-                    populateConnectionRedirect(addressSpace, err);
-
-                    closeConnection(err);
-
-                    _closedOnOpen = true;
-
-                }
-                else
-                {
-                    if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
-                    {
-                        closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
-                    }
-                    else
-                    {
-                        try
-                        {
-                            setAddressSpace(addressSpace);
-                        }
-                        catch (VirtualHostUnavailableException e)
-                        {
-                            closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
-                        }
-                    }
-                }
-            }
-        }
-        switch (_connectionState)
-        {
-            case UNOPENED:
-                sendOpen(_channelMax, _maxFrameSize);
-            case AWAITING_OPEN:
-                _connectionState = ConnectionState.OPEN;
-            default:
-                // TODO bad stuff (connection already open)
-
-        }
-
-    }
-
-    private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err)
-    {
-        final String redirectHost = addressSpace.getRedirectHost(((AmqpPort) _port));
-
-        if(redirectHost == null)
-        {
-            err.setDescription("Virtual host '" + _localHostname + "' is not active");
-        }
-        else
-        {
-            String networkHost;
-            int port;
-            if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
-            {
-                // IPv6 case
-                networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
-                if(redirectHost.contains("]:"))
-                {
-                    port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
-                }
-                else
-                {
-                    port = -1;
-                }
-            }
-            else
-            {
-                if(redirectHost.contains(":"))
-                {
-                    networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
-                    try
-                    {
-                        String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
-                        port = Integer.parseInt(portString);
-                    }
-                    catch (NumberFormatException e)
-                    {
-                        port = -1;
-                    }
-                }
-                else
-                {
-                    networkHost = redirectHost;
-                    port = -1;
-                }
-            }
-            final Map<Symbol, Object> infoMap = new HashMap<>();
-            infoMap.put(Symbol.valueOf("network-host"), networkHost);
-            if(port > 0)
-            {
-                infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
-            }
-            err.setInfo(infoMap);
-        }
-    }
-
-    public void receiveDetach(final short channel, final Detach detach)
-    {
-        assertState(FrameReceivingState.ANY_FRAME);
-        final Session_1_0 session = getSession(channel);
-        if (session != null)
-        {
-            AccessController.doPrivileged(new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    session.receiveDetach(detach);
-                    return null;
-                }
-            }, session.getAccessControllerContext());
-        }
-        else
-        {
-            closeConnectionWithInvalidChannel(channel, detach);
-        }
-    }
-
-    private void transportStateChanged()
-    {
-        for (Session_1_0 session : _sessions)
-        {
-            session.transportStateChanged();
-        }
-    }
-
-    public void close(final Error error)
-    {
-        closeConnection(error);
-    }
-
-    private void setRemoteAddress(final SocketAddress remoteAddress)
-    {
-        _remoteAddress = remoteAddress;
-    }
-
-    public void setProperties(final Map<Symbol, Object> properties)
-    {
-        _properties = properties;
-    }
-
-    public void setOfferedCapabilities(final List<Symbol> offeredCapabilities)
-    {
-        _offeredCapabilities = offeredCapabilities;
-    }
-
-
-    private void setClosedForOutput(final boolean closed)
-    {
-        _closedForOutput = closed;
-    }
-
-    public void receiveSaslInit(final SaslInit saslInit)
-    {
-        assertState(FrameReceivingState.SASL_INIT_ONLY);
-        String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
-        final Binary initialResponse = saslInit.getInitialResponse();
-        byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
-
-        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
-        processSaslResponse(response);
-    }
-
-    private void processSaslResponse(final byte[] response)
-    {
-        byte[] challenge = null;
-        SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
-        if (authenticationResult == null)
-        {
-            authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
-            challenge = authenticationResult.getChallenge();
-        }
-
-        if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
-        {
-            _successfulAuthenticationResult = authenticationResult;
-            if (challenge == null || challenge.length == 0)
-            {
-                setSubject(_successfulAuthenticationResult.getSubject());
-                SaslOutcome outcome = new SaslOutcome();
-                outcome.setCode(SaslCode.OK);
-                send(new SASLFrame(outcome), null);
-                _saslComplete = true;
-                _frameReceivingState = FrameReceivingState.AMQP_HEADER;
-                disposeSaslNegotiator();
-            }
-            else
-            {
-                continueSaslNegotiation(challenge);
-            }
-        }
-        else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
-        {
-            continueSaslNegotiation(challenge);
-        }
-        else
-        {
-            handleSaslError();
-        }
-    }
-
-    private void continueSaslNegotiation(final byte[] challenge)
-    {
-        SaslChallenge challengeBody = new SaslChallenge();
-        challengeBody.setChallenge(new Binary(challenge));
-        send(new SASLFrame(challengeBody), null);
-
-        _frameReceivingState = FrameReceivingState.SASL_RESPONSE_ONLY;
-    }
-
-    private void handleSaslError()
-    {
-        SaslOutcome outcome = new SaslOutcome();
-        outcome.setCode(SaslCode.AUTH);
-        send(new SASLFrame(outcome), null);
-        _saslComplete = true;
-        closeSaslWithFailure();
-    }
-
-    public int getMaxFrameSize()
-    {
-        return _maxFrameSize;
-    }
-
-    Object getReference()
-    {
-        return _reference;
-    }
-
-    private void endpointClosed()
-    {
-        try
-        {
-            performDeleteTasks();
-            closeReceived();
-        }
-        finally
-        {
-            NamedAddressSpace virtualHost = getAddressSpace();
-            if (virtualHost != null)
-            {
-                virtualHost.deregisterConnection(this);
-            }
-        }
-    }
-
-    private void closeConnection(ErrorCondition errorCondition, String description)
-    {
-        closeConnection(new Error(errorCondition, description));
-    }
-
-    private void closeConnection(final Error error)
-    {
-        Close close = new Close();
-        close.setError(error);
-        switch (_connectionState)
-        {
-            case UNOPENED:
-                sendOpen(0, 0);
-                sendClose(close);
-                _connectionState = ConnectionState.CLOSED;
-                break;
-            case AWAITING_OPEN:
-            case OPEN:
-                sendClose(close);
-                _connectionState = ConnectionState.CLOSE_SENT;
-                addCloseTicker();
-            case CLOSE_SENT:
-            case CLOSED:
-                // already sent our close - too late to do anything more
-                break;
-            default:
-                throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
-        }
-    }
-
-    int sendFrame(final short channel, final FrameBody body, final List<QpidByteBuffer> payload)
-    {
-        if (!_closedForOutput)
-        {
-            ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
-            if (payload == null)
-            {
-                send(AMQFrame.createAMQFrame(channel, body));
-                return 0;
-            }
-            else
-            {
-                int size = writer.getEncodedSize();
-                int maxPayloadSize = _maxFrameSize - (size + 9);
-                long payloadLength = QpidByteBufferUtils.remaining(payload);
-                if(payloadLength <= maxPayloadSize)
-                {
-                    send(AMQFrame.createAMQFrame(channel, body, payload));
-                    return (int)payloadLength;
-                }
-                else
-                {
-                    ((Transfer) body).setMore(Boolean.TRUE);
-
-                    writer = _describedTypeRegistry.getValueWriter(body);
-                    size = writer.getEncodedSize();
-                    maxPayloadSize = _maxFrameSize - (size + 9);
-
-                    List<QpidByteBuffer> payloadDup = new ArrayList<>(payload.size());
-                    int payloadSize = 0;
-                    for(QpidByteBuffer buf : payload)
-                    {
-                        if(payloadSize + buf.remaining() < maxPayloadSize)
-                        {
-                            payloadSize += buf.remaining();
-                            payloadDup.add(buf.duplicate());
-                        }
-                        else
-                        {
-                            QpidByteBuffer dup = buf.slice();
-                            dup.limit(maxPayloadSize-payloadSize);
-                            payloadDup.add(dup);
-                            break;
-                        }
-                    }
-
-                    QpidByteBufferUtils.skip(payload, maxPayloadSize);
-                    send(AMQFrame.createAMQFrame(channel, body, payloadDup));
-                    for(QpidByteBuffer buf : payloadDup)
-                    {
-                        buf.dispose();
-                    }
-
-                    return maxPayloadSize;
-                }
-            }
-        }
-        else
-        {
-            return -1;
-        }
-    }
-
-    void sendFrame(final short channel, final FrameBody body)
-    {
-        sendFrame(channel, body, null);
-    }
-
-    public ByteBufferSender getSender()
-    {
-        return getNetwork().getSender();
-    }
-
-    @Override
-    public void writerIdle()
-    {
-        send(TransportFrame.createAMQFrame((short)0,null));
-    }
-
-    @Override
-    public void readerIdle()
-    {
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                getEventLogger().message(ConnectionMessages.IDLE_CLOSE("", false));
-                getNetwork().close();
-                return null;
-            }
-        }, getAccessControllerContext());
-    }
-
-    @Override
-    public void encryptedTransport()
-    {
-    }
-
-    public String getAddress()
-    {
-        return getNetwork().getRemoteAddress().toString();
-    }
-
-
-
-    public void received(final QpidByteBuffer msg)
-    {
-
-        AccessController.doPrivileged(new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                updateLastReadTime();
-                try
-                {
-                    int remaining;
-
-                    do
-                    {
-                        remaining = msg.remaining();
-
-                        switch (_frameReceivingState)
-                        {
-                            case AMQP_OR_SASL_HEADER:
-                            case AMQP_HEADER:
-                                if (remaining >= 8)
-                                {
-                                    processProtocolHeader(msg);
-                                }
-                                break;
-                            case OPEN_ONLY:
-                            case ANY_FRAME:
-                            case SASL_INIT_ONLY:
-                            case SASL_RESPONSE_ONLY:
-                                _frameHandler.parse(msg);
-                                break;
-                            case CLOSED:
-                                // ignore;
-                                break;
-                        }
-
-
-                    }
-                    while (msg.remaining() != remaining);
-                }
-                catch (IllegalArgumentException | IllegalStateException e)
-                {
-                    throw new ConnectionScopedRuntimeException(e);
-                }
-                catch (StoreException e)
-                {
-                    if (getAddressSpace().isActive())
-                    {
-                        throw new ServerScopedRuntimeException(e);
-                    }
-                    else
-                    {
-                        throw new ConnectionScopedRuntimeException(e);
-                    }
-                }
-                return null;
-            }
-        }, getAccessControllerContext());
-
-    }
-
-    private void processProtocolHeader(final QpidByteBuffer msg)
-    {
-        if(msg.remaining() >= 8)
-        {
-            byte[] header = new byte[8];
-            msg.get(header);
-
-            final AuthenticationProvider authenticationProvider = getPort().getAuthenticationProvider();
-            final SubjectCreator subjectCreator = authenticationProvider.getSubjectCreator(getTransport().isSecure());
-
-            if(Arrays.equals(header, SASL_HEADER))
-            {
-                if(_saslComplete)
-                {
-                    throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
-                }
-
-                getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
-
-                SaslMechanisms mechanisms = new SaslMechanisms();
-                ArrayList<Symbol> mechanismsList = new ArrayList<>();
-                for (String name :  subjectCreator.getMechanisms())
-                {
-                    mechanismsList.add(Symbol.valueOf(name));
-                }
-                mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
-                send(new SASLFrame(mechanisms), null);
-
-                _frameReceivingState = FrameReceivingState.SASL_INIT_ONLY;
-                _frameHandler = new FrameHandler(this, true);
-            }
-            else if(Arrays.equals(header, AMQP_HEADER))
-            {
-                if(!_saslComplete)
-                {
-                    final List<String> mechanisms = subjectCreator.getMechanisms();
-
-                    if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
-                    {
-                        setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
-                    }
-                    else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
-                    {
-                        setUserPrincipal(new AuthenticatedPrincipal(((AnonymousAuthenticationManager) authenticationProvider).getAnonymousPrincipal()));
-                    }
-                    else
-                    {
-                        LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", getLogSubject());
-                        _connectionState = ConnectionState.CLOSED;
-                        getNetwork().close();
-                    }
-
-                }
-                getSender().send(QpidByteBuffer.wrap(AMQP_HEADER));
-                _frameReceivingState = FrameReceivingState.OPEN_ONLY;
-                _frameHandler = new FrameHandler(this, false);
-
-            }
-            else
-            {
-                LOGGER.warn("{} : unknown AMQP header {}", getLogSubject(), Functions.str(header));
-                _connectionState = ConnectionState.CLOSED;
-                getNetwork().close();
-            }
-
-        }
-
-    }
-
-
-    public void closed()
-    {
-        try
-        {
-            inputClosed();
-        }
-        catch(RuntimeException e)
-        {
-            LOGGER.error("Exception while closing", e);
-        }
-        finally
-        {
-            try
-            {
-                endpointClosed();
-            }
-            finally
-            {
-                markTransportClosed();
-            }
-        }
-    }
-
-    public void send(final AMQFrame amqFrame)
-    {
-        send(amqFrame, null);
-    }
-
-
-
-    public void send(final AMQFrame amqFrame, ByteBuffer buf)
-    {
-        updateLastWriteTime();
-        FRAME_LOGGER.debug("SEND[{}|{}] : {}",
-                           getNetwork().getRemoteAddress(),
-                           amqFrame.getChannel(),
-                           amqFrame.getFrameBody());
-
-        int size = _frameWriter.send(amqFrame);
-        if (size > getMaxFrameSize())
-        {
-            throw new OversizeFrameException(amqFrame, size);
-        }
-    }
-
-    public void send(short channel, FrameBody body)
-    {
-        AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
-        send(frame);
-
-    }
-
-    private void addCloseTicker()
-    {
-        long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
-
-        getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, getNetwork()));
-
-        // trigger a wakeup to ensure the ticker will be taken into account
-        notifyWork();
-    }
-
-    @Override
-    public boolean isTransportBlockedForWriting()
-    {
-        return _transportBlockedForWriting;
-    }
-    @Override
-    public void setTransportBlockedForWriting(final boolean blocked)
-    {
-        if(_transportBlockedForWriting != blocked)
-        {
-            _transportBlockedForWriting = blocked;
-            transportStateChanged();
-        }
-
-    }
-
-    @Override
-    public Iterator<Runnable> processPendingIterator()
-    {
-        if (isIOThread())
-        {
-            return new ProcessPendingIterator();
-        }
-        else
-        {
-            return Collections.emptyIterator();
-        }
-    }
-
-    @Override
-    public boolean hasWork()
-    {
-        return _stateChanged.get();
-    }
-
-    @Override
-    public void notifyWork()
-    {
-        _stateChanged.set(true);
-
-        final Action<ProtocolEngine> listener = _workListener.get();
-        if(listener != null)
-        {
-            listener.performAction(this);
-        }
-    }
-
-    @Override
-    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
-    {
-        _sessionsWithWork.add(sessionModel);
-        notifyWork();
-    }
-
-    @Override
-    public void clearWork()
-    {
-        _stateChanged.set(false);
-    }
-
-    @Override
-    public void setWorkListener(final Action<ProtocolEngine> listener)
-    {
-        _workListener.set(listener);
-    }
-
-    public boolean hasSessionWithName(final byte[] name)
-    {
-        return false;
-    }
-
-    @Override
-    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
-    {
-
-        stopConnection();
-        final ErrorCondition cause;
-        switch(reason)
-        {
-            case MANAGEMENT:
-                cause = ConnectionError.CONNECTION_FORCED;
-                break;
-            case TRANSACTION_TIMEOUT:
-                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
-                break;
-            default:
-                cause = AmqpError.INTERNAL_ERROR;
-        }
-        Action<ConnectionHandler> action = new Action<ConnectionHandler>()
-        {
-            @Override
-            public void performAction(final ConnectionHandler object)
-            {
-                closeConnection(cause, description);
-
-            }
-        };
-        addAsyncTask(action);
-    }
-
-    public void closeSessionAsync(final AMQSessionModel<?,?> session,
-                                  final CloseReason reason, final String message)
-    {
-        final ErrorCondition cause;
-        switch(reason)
-        {
-            case MANAGEMENT:
-                cause = ConnectionError.CONNECTION_FORCED;
-                break;
-            case TRANSACTION_TIMEOUT:
-                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
-                break;
-            default:
-                cause = AmqpError.INTERNAL_ERROR;
-        }
-        addAsyncTask(new Action<ConnectionHandler>()
-        {
-            @Override
-            public void performAction(final ConnectionHandler object)
-            {
-                AccessController.doPrivileged(new PrivilegedAction<Void>() {
-                    @Override
-                    public Void run()
-                    {
-                        ((Session_1_0)session).close(cause, message);
-                        return null;
-                    }
-                }, ((Session_1_0)session).getAccessControllerContext());
-            }
-        });
-
-    }
-
-    public void block()
-    {
-        synchronized (_blockingLock)
-        {
-            if (!_blocking)
-            {
-                _blocking = true;
-                doOnIOThreadAsync(
-                        new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-                                doBlock();
-                            }
-                        });
-            }
-        }
-    }
-
-    private void doBlock()
-    {
-        for(Session_1_0 session : _sessions)
-        {
-            session.block();
-        }
-    }
-
-    public String getRemoteContainerName()
-    {
-        return _remoteContainerId;
-    }
-
-    public Collection<? extends Session_1_0> getSessionModels()
-    {
-        return Collections.unmodifiableCollection(_sessions);
-    }
-
-    public void unblock()
-    {
-        synchronized (_blockingLock)
-        {
-            if(_blocking)
-            {
-                _blocking = false;
-                doOnIOThreadAsync(
-                        new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-                                doUnblock();
-                            }
-                        });
-            }
-        }
-    }
-
-    private void doUnblock()
-    {
-        for(Session_1_0 session : _sessions)
-        {
-            session.unblock();
-        }
-    }
-
-    @Override
-    public long getSessionCountLimit()
-    {
-        return _channelMax+1;
-    }
-
-    @Override
-    public boolean isOrderlyClose()
-    {
-        return _orderlyClose.get();
-    }
-
-    @Override
-    protected void addAsyncTask(final Action<? super ConnectionHandler> action)
-    {
-        _asyncTaskList.add(action);
-        notifyWork();
-    }
-
-
-    private void sendOpen(final int channelMax, final int maxFrameSize)
-    {
-        Open open = new Open();
-
-        if (_receivingSessions == null)
-        {
-            _receivingSessions = new Session_1_0[channelMax + 1];
-            _sendingSessions = new Session_1_0[channelMax + 1];
-        }
-        if (channelMax < _channelMax)
-        {
-            _channelMax = channelMax;
-        }
-        open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
-        open.setContainerId(getAddressSpace() == null ? UUID.randomUUID().toString() : getAddressSpace().getId().toString());
-        open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
-        // TODO - should we try to set the hostname based on the connection information?
-        // open.setHostname();
-        open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
-
-        // set the offered capabilities
-        if(_offeredCapabilities != null && !_offeredCapabilities.isEmpty())
-        {
-            open.setOfferedCapabilities(_offeredCapabilities.toArray(new Symbol[_offeredCapabilities.size()]));
-        }
-
-        if (_properties != null)
-        {
-            open.setProperties(_properties);
-        }
-
-        sendFrame(CONNECTION_CONTROL_CHANNEL, open);
-    }
-
-    private void closeWithError(final AmqpError amqpError, final String errorDescription)
-    {
-        final Error err = new Error();
-        err.setCondition(amqpError);
-        err.setDescription(errorDescription);
-        closeConnection(err);
-        _closedOnOpen = true;
-    }
-
-    private Session_1_0 getSession(final short channel)
-    {
-        Session_1_0 session = _receivingSessions[channel];
-        if (session == null)
-        {
-            Error error = new Error();
-            error.setCondition(ConnectionError.FRAMING_ERROR);
-            error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
-            handleError(error);
-        }
-
-        return session;
-    }
-
-    private void sendClose(Close closeToSend)
-    {
-        sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
-        closeSender();
-    }
-
-
-    private void assertState(final FrameReceivingState state)
-    {
-        if(_frameReceivingState != state)
-        {
-            throw new ConnectionScopedRuntimeException("Unexpected state, client has sent frame in an illegal order.  Required state: " + state + ", actual state: " + _frameReceivingState);
-        }
-    }
-
-
-    private class ProcessPendingIterator implements Iterator<Runnable>
-    {
-        private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
-        private ProcessPendingIterator()
-        {
-            _sessionIterator = _sessionsWithWork.iterator();
-        }
-
-        @Override
-        public boolean hasNext()
-        {
-            return (!_sessionsWithWork.isEmpty() && !isClosed() && !isConnectionStopped()) || !_asyncTaskList.isEmpty();
-        }
-
-        @Override
-        public Runnable next()
-        {
-            if(!_sessionsWithWork.isEmpty())
-            {
-                if(isClosed() || isConnectionStopped())
-                {
-
-                    final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
-                    if(asyncAction != null)
-                    {
-                        return new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-                                asyncAction.performAction(AMQPConnection_1_0.this);
-                            }
-                        };
-                    }
-                    else
-                    {
-                        return new Runnable()
-                        {
-                            @Override
-                            public void run()
-                            {
-
-                            }
-                        };
-                    }
-                }
-                else
-                {
-                    if (!_sessionIterator.hasNext())
-                    {
-                        _sessionIterator = _sessionsWithWork.iterator();
-                    }
-                    final AMQSessionModel<?,?> session = _sessionIterator.next();
-                    return new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            _sessionIterator.remove();
-                            if (session.processPending())
-                            {
-                                _sessionsWithWork.add(session);
-                            }
-                        }
-                    };
-                }
-            }
-            else if(!_asyncTaskList.isEmpty())
-            {
-                final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
-                return new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        asyncAction.performAction(AMQPConnection_1_0.this);
-                    }
-                };
-            }
-            else
-            {
-                throw new NoSuchElementException();
-            }
-        }
-
-        @Override
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    @Override
-    public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
-    {
-        super.initialiseHeartbeating(writerDelay, readerDelay);
-    }
+    void close(Error error);
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
new file mode 100644
index 0000000..0b8572c
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -0,0 +1,1731 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
+import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.util.Functions;
+
+public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
+        implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
+                   ValueWriter.Registry.Source,
+                   SASLEndpoint,
+                   ConnectionHandler,
+                   AMQPConnection_1_0<AMQPConnection_1_0Impl>
+{
+
+    private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
+    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.protocol.frame");
+
+    private final AtomicBoolean _stateChanged = new AtomicBoolean();
+    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+
+
+    private static final byte[] SASL_HEADER = new byte[]
+            {
+                    (byte) 'A',
+                    (byte) 'M',
+                    (byte) 'Q',
+                    (byte) 'P',
+                    (byte) 3,
+                    (byte) 1,
+                    (byte) 0,
+                    (byte) 0
+            };
+
+    private static final byte[] AMQP_HEADER = new byte[]
+            {
+                    (byte) 'A',
+                    (byte) 'M',
+                    (byte) 'Q',
+                    (byte) 'P',
+                    (byte) 0,
+                    (byte) 1,
+                    (byte) 0,
+                    (byte) 0
+            };
+
+    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    public static final Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
+
+    private FrameWriter _frameWriter;
+    private ProtocolHandler _frameHandler;
+    private volatile boolean _transportBlockedForWriting;
+    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
+    private boolean _blocking;
+    private final Object _blockingLock = new Object();
+    private List<Symbol> _offeredCapabilities;
+
+    private enum FrameReceivingState
+    {
+        AMQP_OR_SASL_HEADER,
+        SASL_INIT_ONLY,
+        SASL_RESPONSE_ONLY,
+        AMQP_HEADER,
+        OPEN_ONLY,
+        ANY_FRAME,
+        CLOSED
+    }
+
+    private volatile FrameReceivingState _frameReceivingState = FrameReceivingState.AMQP_OR_SASL_HEADER;
+
+    private static final short CONNECTION_CONTROL_CHANNEL = (short) 0;
+    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
+
+    private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
+
+    private AmqpPort<?> _port;
+    private SubjectCreator _subjectCreator;
+
+    private int _channelMax = DEFAULT_CHANNEL_MAX;
+    private int _maxFrameSize = 4096;
+    private String _remoteContainerId;
+
+    private SocketAddress _remoteAddress;
+
+    // positioned by the *outgoing* channel
+    private Session_1_0[] _sendingSessions;
+
+    // positioned by the *incoming* channel
+    private Session_1_0[] _receivingSessions;
+    private boolean _closedForInput;
+    private boolean _closedForOutput;
+
+    private long _idleTimeout;
+
+    private ConnectionState _connectionState = ConnectionState.UNOPENED;
+
+    private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
+                                                                                        .registerTransportLayer()
+                                                                                        .registerMessagingLayer()
+                                                                                        .registerTransactionLayer()
+                                                                                        .registerSecurityLayer();
+
+
+    private Map _properties;
+    private boolean _saslComplete;
+
+    private SaslNegotiator _saslNegotiator;
+    private String _localHostname;
+    private long _desiredIdleTimeout;
+
+    private Error _remoteError;
+
+    private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
+
+    private Map _remoteProperties;
+
+    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
+
+    private final Collection<Session_1_0>
+            _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+
+    private final Object _reference = new Object();
+
+    private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
+            new ConcurrentLinkedQueue<>();
+
+    private boolean _closedOnOpen;
+
+    private final Set<AMQSessionModel<?,?>> _sessionsWithWork =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQSessionModel<?,?>, Boolean>());
+
+    AMQPConnection_1_0Impl(final Broker<?> broker,
+                           final ServerNetworkConnection network,
+                           AmqpPort<?> port,
+                           Transport transport,
+                           long id,
+                           final AggregateTicker aggregateTicker)
+    {
+        super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
+
+        _subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+
+        _port = port;
+
+        Map<Symbol, Object> serverProperties = new LinkedHashMap<>();
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
+
+        setProperties(serverProperties);
+
+        List<Symbol> offeredCapabilities = new ArrayList<>();
+        offeredCapabilities.add(ANONYMOUS_RELAY);
+        offeredCapabilities.add(SHARED_SUBSCRIPTIONS);
+
+        setOfferedCapabilities(offeredCapabilities);
+
+        setRemoteAddress(network.getRemoteAddress());
+
+        setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
+
+        _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
+    }
+
+
+    private void setUserPrincipal(final Principal user)
+    {
+        setSubject(_subjectCreator.createSubjectWithGroups(user));
+    }
+
+    private long getDesiredIdleTimeout()
+    {
+        return _desiredIdleTimeout;
+    }
+
+    public void receiveAttach(final short channel, final Attach attach)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveAttach(attach);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
+        }
+    }
+
+    public void receive(final short channel, final Object frame)
+    {
+        FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
+        if (frame instanceof FrameBody)
+        {
+            ((FrameBody) frame).invoke(channel, this);
+        }
+        else if (frame instanceof SaslFrameBody)
+        {
+            ((SaslFrameBody) frame).invoke(channel, this);
+        }
+    }
+
+    private void closeSaslWithFailure()
+    {
+        _saslComplete = true;
+        disposeSaslNegotiator();
+        _frameReceivingState = FrameReceivingState.CLOSED;
+        setClosedForInput(true);
+        addCloseTicker();
+    }
+
+    private void disposeSaslNegotiator()
+    {
+        _saslNegotiator.dispose();
+        _saslNegotiator = null;
+    }
+
+    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+    {
+        LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
+        closeSaslWithFailure();
+    }
+
+    @Override
+    public void receiveClose(final short channel, final Close close)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        _frameReceivingState = FrameReceivingState.CLOSED;
+        setClosedForInput(true);
+        closeReceived();
+        switch (_connectionState)
+        {
+            case UNOPENED:
+            case AWAITING_OPEN:
+                closeConnection(ConnectionError.CONNECTION_FORCED,
+                                "Connection close sent before connection was opened");
+                break;
+            case OPEN:
+                _connectionState = ConnectionState.CLOSE_RECEIVED;
+                if(close.getError() != null)
+                {
+                    final Error error = close.getError();
+                    ErrorCondition condition = error.getCondition();
+                    Symbol errorCondition = condition == null ? null : condition.getValue();
+                    LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
+                                errorCondition, close.getError().getDescription());
+                }
+                sendClose(new Close());
+                _connectionState = ConnectionState.CLOSED;
+                _orderlyClose.set(true);
+                addCloseTicker();
+                break;
+            case CLOSE_SENT:
+                _connectionState = ConnectionState.CLOSED;
+                _orderlyClose.set(true);
+
+            default:
+        }
+        _remoteError = close.getError();
+    }
+
+    private void closeReceived()
+    {
+        Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
+
+        for (final Session_1_0 session : sessions)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.remoteEnd(new End());
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+    }
+
+    private void setClosedForInput(final boolean closed)
+    {
+        _closedForInput = closed;
+    }
+
+    public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+    {
+        LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
+        closeSaslWithFailure();
+    }
+
+    public void receiveSaslResponse(final SaslResponse saslResponse)
+    {
+        final Binary responseBinary = saslResponse.getResponse();
+        byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
+
+        assertState(FrameReceivingState.SASL_RESPONSE_ONLY);
+
+        processSaslResponse(response);
+    }
+
+    @Override
+    public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
+    {
+        return _describedTypeRegistry;
+    }
+
+    @Override
+    public SectionDecoderRegistry getSectionDecoderRegistry()
+    {
+        return _describedTypeRegistry.getSectionDecoderRegistry();
+    }
+
+    private boolean closedForOutput()
+    {
+        return _closedForOutput;
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+        return _connectionState == ConnectionState.CLOSED
+               || _connectionState == ConnectionState.CLOSE_RECEIVED;
+    }
+
+    public boolean closedForInput()
+    {
+        return _closedForInput;
+    }
+
+    @Override
+    public void sessionEnded(final Session_1_0 session)
+    {
+        if (!_closedOnOpen)
+        {
+            _sessions.remove(session);
+        }
+    }
+
+    private void inputClosed()
+    {
+        if (!_closedForInput)
+        {
+            _closedForInput = true;
+            FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
+            switch (_connectionState)
+            {
+                case UNOPENED:
+                case AWAITING_OPEN:
+                case CLOSE_SENT:
+                    _connectionState = ConnectionState.CLOSED;
+                    closeSender();
+                    break;
+                case OPEN:
+                    _connectionState = ConnectionState.CLOSE_RECEIVED;
+                case CLOSED:
+                    // already sent our close - too late to do anything more
+                    break;
+                default:
+            }
+            closeReceived();
+        }
+    }
+
+    private void closeSender()
+    {
+        setClosedForOutput(true);
+    }
+
+    public String getRemoteContainerId()
+    {
+        return _remoteContainerId;
+    }
+
+    private void setDesiredIdleTimeout(final long desiredIdleTimeout)
+    {
+        _desiredIdleTimeout = desiredIdleTimeout;
+    }
+
+    public boolean isOpen()
+    {
+        return _connectionState == ConnectionState.OPEN;
+    }
+
+    @Override
+    public void sendEnd(final short channel, final End end, final boolean remove)
+    {
+        sendFrame(channel, end);
+        if (remove)
+        {
+            _sendingSessions[channel] = null;
+        }
+    }
+
+    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+    {
+        LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
+        closeSaslWithFailure();
+    }
+
+    public void receiveEnd(final short channel, final End end)
+    {
+
+        assertState(FrameReceivingState.ANY_FRAME);
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    _receivingSessions[channel] = null;
+
+                    session.receiveEnd(end);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            closeConnectionWithInvalidChannel(channel, end);
+        }
+    }
+
+    private void closeConnectionWithInvalidChannel(final short channel, final FrameBody frame)
+    {
+        closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
+    }
+
+    public void receiveDisposition(final short channel,
+                                   final Disposition disposition)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveDisposition(disposition);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            closeConnectionWithInvalidChannel(channel, disposition);
+        }
+
+    }
+
+    public void receiveBegin(final short channel, final Begin begin)
+    {
+
+        assertState(FrameReceivingState.ANY_FRAME);
+        short myChannelId;
+        if (begin.getRemoteChannel() != null)
+        {
+            closeConnection(ConnectionError.FRAMING_ERROR,
+                            "BEGIN received on channel "
+                            + channel
+                            + " with given remote-channel "
+                            + begin.getRemoteChannel()
+                            + ". Since the broker does not spontaneously start channels, this must be an error.");
+
+        }
+        else // Peer requesting session creation
+        {
+
+            if (_receivingSessions[channel] == null)
+            {
+                myChannelId = getFirstFreeChannel();
+                if (myChannelId == -1)
+                {
+
+                    closeConnection(ConnectionError.FRAMING_ERROR,
+                                    "BEGIN received on channel "
+                                    + channel
+                                    + ". There are no free channels for the broker to respond on.");
+
+                }
+                Session_1_0 session = new Session_1_0(this, begin, myChannelId);
+                session.create();
+
+                _receivingSessions[channel] = session;
+                _sendingSessions[myChannelId] = session;
+
+                Begin beginToSend = new Begin();
+
+                session.setReceivingChannel(channel);
+                session.setSendingChannel(myChannelId);
+                beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
+                beginToSend.setNextOutgoingId(session.getNextOutgoingId());
+                beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
+                beginToSend.setIncomingWindow(session.getIncomingWindowSize());
+                sendFrame(myChannelId, beginToSend);
+
+                _sessions.add(session);
+            }
+            else
+            {
+                closeConnection(ConnectionError.FRAMING_ERROR,
+                                "BEGIN received on channel " + channel + " which is already in use.");
+            }
+
+        }
+
+    }
+
+    private short getFirstFreeChannel()
+    {
+        for (int i = 0; i <= _channelMax; i++)
+        {
+            if (_sendingSessions[i] == null)
+            {
+                return (short) i;
+            }
+        }
+        return -1;
+    }
+
+    public void handleError(final Error error)
+    {
+        if (!closedForOutput())
+        {
+            Close close = new Close();
+            close.setError(error);
+            sendFrame((short) 0, close);
+
+            setClosedForOutput(true);
+        }
+
+    }
+
+    public void receiveTransfer(final short channel, final Transfer transfer)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveTransfer(transfer);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            closeConnectionWithInvalidChannel(channel, transfer);
+        }
+    }
+
+    public void receiveFlow(final short channel, final Flow flow)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveFlow(flow);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            closeConnectionWithInvalidChannel(channel, flow);
+        }
+
+    }
+
+    public void receiveOpen(final short channel, final Open open)
+    {
+        assertState(FrameReceivingState.OPEN_ONLY);
+        _frameReceivingState = FrameReceivingState.ANY_FRAME;
+        _channelMax = open.getChannelMax() == null ? _channelMax
+                : open.getChannelMax().intValue() < _channelMax
+                        ? open.getChannelMax().intValue()
+                        : _channelMax;
+        if (_receivingSessions == null)
+        {
+            _receivingSessions = new Session_1_0[_channelMax + 1];
+            _sendingSessions = new Session_1_0[_channelMax + 1];
+        }
+        _maxFrameSize = open.getMaxFrameSize() == null
+                ? getBroker().getNetworkBufferSize()
+                : Math.min(open.getMaxFrameSize().intValue(), getBroker().getNetworkBufferSize());
+        _remoteContainerId = open.getContainerId();
+        _localHostname = open.getHostname();
+        if (open.getIdleTimeOut() != null)
+        {
+            _idleTimeout = open.getIdleTimeOut().longValue();
+        }
+        _remoteProperties = open.getProperties();
+        if (_remoteProperties != null)
+        {
+            if (_remoteProperties.containsKey(Symbol.valueOf("product")))
+            {
+                setClientProduct(_remoteProperties.get(Symbol.valueOf("product")).toString());
+            }
+            if (_remoteProperties.containsKey(Symbol.valueOf("version")))
+            {
+                setClientVersion(_remoteProperties.get(Symbol.valueOf("version")).toString());
+            }
+            setClientId(_remoteContainerId);
+        }
+        if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
+        {
+            closeConnection(ConnectionError.CONNECTION_FORCED,
+                            "Requested idle timeout of "
+                            + _idleTimeout
+                            + " is too low. The minimum supported timeout is"
+                            + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
+            _closedOnOpen = true;
+        }
+        else
+        {
+            long desiredIdleTimeout = getDesiredIdleTimeout();
+            initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout);
+            final NamedAddressSpace addressSpace = ((AmqpPort) _port).getAddressSpace(_localHostname);
+            if (addressSpace == null)
+            {
+                closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
+            }
+            else
+            {
+                if (!addressSpace.isActive())
+                {
+                    _closedOnOpen = true;
+
+                    final Error err = new Error();
+                    err.setCondition(AmqpError.NOT_FOUND);
+                    populateConnectionRedirect(addressSpace, err);
+
+                    closeConnection(err);
+
+                    _closedOnOpen = true;
+
+                }
+                else
+                {
+                    if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
+                    {
+                        closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
+                    }
+                    else
+                    {
+                        try
+                        {
+                            setAddressSpace(addressSpace);
+                        }
+                        catch (VirtualHostUnavailableException e)
+                        {
+                            closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
+                        }
+                    }
+                }
+            }
+        }
+        switch (_connectionState)
+        {
+            case UNOPENED:
+                sendOpen(_channelMax, _maxFrameSize);
+            case AWAITING_OPEN:
+                _connectionState = ConnectionState.OPEN;
+            default:
+                // TODO bad stuff (connection already open)
+
+        }
+
+    }
+
+    private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err)
+    {
+        final String redirectHost = addressSpace.getRedirectHost(((AmqpPort) _port));
+
+        if(redirectHost == null)
+        {
+            err.setDescription("Virtual host '" + _localHostname + "' is not active");
+        }
+        else
+        {
+            String networkHost;
+            int port;
+            if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
+            {
+                // IPv6 case
+                networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
+                if(redirectHost.contains("]:"))
+                {
+                    port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
+                }
+                else
+                {
+                    port = -1;
+                }
+            }
+            else
+            {
+                if(redirectHost.contains(":"))
+                {
+                    networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
+                    try
+                    {
+                        String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
+                        port = Integer.parseInt(portString);
+                    }
+                    catch (NumberFormatException e)
+                    {
+                        port = -1;
+                    }
+                }
+                else
+                {
+                    networkHost = redirectHost;
+                    port = -1;
+                }
+            }
+            final Map<Symbol, Object> infoMap = new HashMap<>();
+            infoMap.put(Symbol.valueOf("network-host"), networkHost);
+            if(port > 0)
+            {
+                infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
+            }
+            err.setInfo(infoMap);
+        }
+    }
+
+    public void receiveDetach(final short channel, final Detach detach)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        final Session_1_0 session = getSession(channel);
+        if (session != null)
+        {
+            AccessController.doPrivileged(new PrivilegedAction<Object>()
+            {
+                @Override
+                public Object run()
+                {
+                    session.receiveDetach(detach);
+                    return null;
+                }
+            }, session.getAccessControllerContext());
+        }
+        else
+        {
+            closeConnectionWithInvalidChannel(channel, detach);
+        }
+    }
+
+    private void transportStateChanged()
+    {
+        for (Session_1_0 session : _sessions)
+        {
+            session.transportStateChanged();
+        }
+    }
+
+    @Override
+    public void close(final Error error)
+    {
+        closeConnection(error);
+    }
+
+    private void setRemoteAddress(final SocketAddress remoteAddress)
+    {
+        _remoteAddress = remoteAddress;
+    }
+
+    public void setProperties(final Map<Symbol, Object> properties)
+    {
+        _properties = properties;
+    }
+
+    public void setOfferedCapabilities(final List<Symbol> offeredCapabilities)
+    {
+        _offeredCapabilities = offeredCapabilities;
+    }
+
+
+    private void setClosedForOutput(final boolean closed)
+    {
+        _closedForOutput = closed;
+    }
+
+    public void receiveSaslInit(final SaslInit saslInit)
+    {
+        assertState(FrameReceivingState.SASL_INIT_ONLY);
+        String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
+        final Binary initialResponse = saslInit.getInitialResponse();
+        byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
+
+        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
+        processSaslResponse(response);
+    }
+
+    private void processSaslResponse(final byte[] response)
+    {
+        byte[] challenge = null;
+        SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
+        if (authenticationResult == null)
+        {
+            authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
+            challenge = authenticationResult.getChallenge();
+        }
+
+        if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
+        {
+            _successfulAuthenticationResult = authenticationResult;
+            if (challenge == null || challenge.length == 0)
+            {
+                setSubject(_successfulAuthenticationResult.getSubject());
+                SaslOutcome outcome = new SaslOutcome();
+                outcome.setCode(SaslCode.OK);
+                send(new SASLFrame(outcome), null);
+                _saslComplete = true;
+                _frameReceivingState = FrameReceivingState.AMQP_HEADER;
+                disposeSaslNegotiator();
+            }
+            else
+            {
+                continueSaslNegotiation(challenge);
+            }
+        }
+        else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
+        {
+            continueSaslNegotiation(challenge);
+        }
+        else
+        {
+            handleSaslError();
+        }
+    }
+
+    private void continueSaslNegotiation(final byte[] challenge)
+    {
+        SaslChallenge challengeBody = new SaslChallenge();
+        challengeBody.setChallenge(new Binary(challenge));
+        send(new SASLFrame(challengeBody), null);
+
+        _frameReceivingState = FrameReceivingState.SASL_RESPONSE_ONLY;
+    }
+
+    private void handleSaslError()
+    {
+        SaslOutcome outcome = new SaslOutcome();
+        outcome.setCode(SaslCode.AUTH);
+        send(new SASLFrame(outcome), null);
+        _saslComplete = true;
+        closeSaslWithFailure();
+    }
+
+    public int getMaxFrameSize()
+    {
+        return _maxFrameSize;
+    }
+
+    @Override
+    public Object getReference()
+    {
+        return _reference;
+    }
+
+    private void endpointClosed()
+    {
+        try
+        {
+            performDeleteTasks();
+            closeReceived();
+        }
+        finally
+        {
+            NamedAddressSpace virtualHost = getAddressSpace();
+            if (virtualHost != null)
+            {
+                virtualHost.deregisterConnection(this);
+            }
+        }
+    }
+
+    private void closeConnection(ErrorCondition errorCondition, String description)
+    {
+        closeConnection(new Error(errorCondition, description));
+    }
+
+    private void closeConnection(final Error error)
+    {
+        Close close = new Close();
+        close.setError(error);
+        switch (_connectionState)
+        {
+            case UNOPENED:
+                sendOpen(0, 0);
+                sendClose(close);
+                _connectionState = ConnectionState.CLOSED;
+                break;
+            case AWAITING_OPEN:
+            case OPEN:
+                sendClose(close);
+                _connectionState = ConnectionState.CLOSE_SENT;
+                addCloseTicker();
+            case CLOSE_SENT:
+            case CLOSED:
+                // already sent our close - too late to do anything more
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
+        }
+    }
+
+    @Override
+    public int sendFrame(final short channel, final FrameBody body, final List<QpidByteBuffer> payload)
+    {
+        if (!_closedForOutput)
+        {
+            ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
+            if (payload == null)
+            {
+                send(AMQFrame.createAMQFrame(channel, body));
+                return 0;
+            }
+            else
+            {
+                int size = writer.getEncodedSize();
+                int maxPayloadSize = _maxFrameSize - (size + 9);
+                long payloadLength = QpidByteBufferUtils.remaining(payload);
+                if(payloadLength <= maxPayloadSize)
+                {
+                    send(AMQFrame.createAMQFrame(channel, body, payload));
+                    return (int)payloadLength;
+                }
+                else
+                {
+                    ((Transfer) body).setMore(Boolean.TRUE);
+
+                    writer = _describedTypeRegistry.getValueWriter(body);
+                    size = writer.getEncodedSize();
+                    maxPayloadSize = _maxFrameSize - (size + 9);
+
+                    List<QpidByteBuffer> payloadDup = new ArrayList<>(payload.size());
+                    int payloadSize = 0;
+                    for(QpidByteBuffer buf : payload)
+                    {
+                        if(payloadSize + buf.remaining() < maxPayloadSize)
+                        {
+                            payloadSize += buf.remaining();
+                            payloadDup.add(buf.duplicate());
+                        }
+                        else
+                        {
+                            QpidByteBuffer dup = buf.slice();
+                            dup.limit(maxPayloadSize-payloadSize);
+                            payloadDup.add(dup);
+                            break;
+                        }
+                    }
+
+                    QpidByteBufferUtils.skip(payload, maxPayloadSize);
+                    send(AMQFrame.createAMQFrame(channel, body, payloadDup));
+                    for(QpidByteBuffer buf : payloadDup)
+                    {
+                        buf.dispose();
+                    }
+
+                    return maxPayloadSize;
+                }
+            }
+        }
+        else
+        {
+            return -1;
+        }
+    }
+
+    @Override
+    public void sendFrame(final short channel, final FrameBody body)
+    {
+        sendFrame(channel, body, null);
+    }
+
+    public ByteBufferSender getSender()
+    {
+        return getNetwork().getSender();
+    }
+
+    @Override
+    public void writerIdle()
+    {
+        send(TransportFrame.createAMQFrame((short)0,null));
+    }
+
+    @Override
+    public void readerIdle()
+    {
+        AccessController.doPrivileged(new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                getEventLogger().message(ConnectionMessages.IDLE_CLOSE("", false));
+                getNetwork().close();
+                return null;
+            }
+        }, getAccessControllerContext());
+    }
+
+    @Override
+    public void encryptedTransport()
+    {
+    }
+
+    public String getAddress()
+    {
+        return getNetwork().getRemoteAddress().toString();
+    }
+
+
+
+    public void received(final QpidByteBuffer msg)
+    {
+
+        AccessController.doPrivileged(new PrivilegedAction<Object>()
+        {
+            @Override
+            public Object run()
+            {
+                updateLastReadTime();
+                try
+                {
+                    int remaining;
+
+                    do
+                    {
+                        remaining = msg.remaining();
+
+                        switch (_frameReceivingState)
+                        {
+                            case AMQP_OR_SASL_HEADER:
+                            case AMQP_HEADER:
+                                if (remaining >= 8)
+                                {
+                                    processProtocolHeader(msg);
+                                }
+                                break;
+                            case OPEN_ONLY:
+                            case ANY_FRAME:
+                            case SASL_INIT_ONLY:
+                            case SASL_RESPONSE_ONLY:
+                                _frameHandler.parse(msg);
+                                break;
+                            case CLOSED:
+                                // ignore;
+                                break;
+                        }
+
+
+                    }
+                    while (msg.remaining() != remaining);
+                }
+                catch (IllegalArgumentException | IllegalStateException e)
+                {
+                    throw new ConnectionScopedRuntimeException(e);
+                }
+                catch (StoreException e)
+                {
+                    if (getAddressSpace().isActive())
+                    {
+                        throw new ServerScopedRuntimeException(e);
+                    }
+                    else
+                    {
+                        throw new ConnectionScopedRuntimeException(e);
+                    }
+                }
+                return null;
+            }
+        }, getAccessControllerContext());
+
+    }
+
+    private void processProtocolHeader(final QpidByteBuffer msg)
+    {
+        if(msg.remaining() >= 8)
+        {
+            byte[] header = new byte[8];
+            msg.get(header);
+
+            final AuthenticationProvider authenticationProvider = getPort().getAuthenticationProvider();
+            final SubjectCreator subjectCreator = authenticationProvider.getSubjectCreator(getTransport().isSecure());
+
+            if(Arrays.equals(header, SASL_HEADER))
+            {
+                if(_saslComplete)
+                {
+                    throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
+                }
+
+                getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
+
+                SaslMechanisms mechanisms = new SaslMechanisms();
+                ArrayList<Symbol> mechanismsList = new ArrayList<>();
+                for (String name :  subjectCreator.getMechanisms())
+                {
+                    mechanismsList.add(Symbol.valueOf(name));
+                }
+                mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
+                send(new SASLFrame(mechanisms), null);
+
+                _frameReceivingState = FrameReceivingState.SASL_INIT_ONLY;
+                _frameHandler = new FrameHandler(this, true);
+            }
+            else if(Arrays.equals(header, AMQP_HEADER))
+            {
+                if(!_saslComplete)
+                {
+                    final List<String> mechanisms = subjectCreator.getMechanisms();
+
+                    if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
+                    {
+                        setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+                    }
+                    else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+                    {
+                        setUserPrincipal(new AuthenticatedPrincipal(((AnonymousAuthenticationManager) authenticationProvider).getAnonymousPrincipal()));
+                    }
+                    else
+                    {
+                        LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", getLogSubject());
+                        _connectionState = ConnectionState.CLOSED;
+                        getNetwork().close();
+                    }
+
+                }
+                getSender().send(QpidByteBuffer.wrap(AMQP_HEADER));
+                _frameReceivingState = FrameReceivingState.OPEN_ONLY;
+                _frameHandler = new FrameHandler(this, false);
+
+            }
+            else
+            {
+                LOGGER.warn("{} : unknown AMQP header {}", getLogSubject(), Functions.str(header));
+                _connectionState = ConnectionState.CLOSED;
+                getNetwork().close();
+            }
+
+        }
+
+    }
+
+
+    public void closed()
+    {
+        try
+        {
+            inputClosed();
+        }
+        catch(RuntimeException e)
+        {
+            LOGGER.error("Exception while closing", e);
+        }
+        finally
+        {
+            try
+            {
+                endpointClosed();
+            }
+            finally
+            {
+                markTransportClosed();
+            }
+        }
+    }
+
+    public void send(final AMQFrame amqFrame)
+    {
+        send(amqFrame, null);
+    }
+
+
+
+    public void send(final AMQFrame amqFrame, ByteBuffer buf)
+    {
+        updateLastWriteTime();
+        FRAME_LOGGER.debug("SEND[{}|{}] : {}",
+                           getNetwork().getRemoteAddress(),
+                           amqFrame.getChannel(),
+                           amqFrame.getFrameBody());
+
+        int size = _frameWriter.send(amqFrame);
+        if (size > getMaxFrameSize())
+        {
+            throw new OversizeFrameException(amqFrame, size);
+        }
+    }
+
+    public void send(short channel, FrameBody body)
+    {
+        AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+        send(frame);
+
+    }
+
+    private void addCloseTicker()
+    {
+        long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
+
+        getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, getNetwork()));
+
+        // trigger a wakeup to ensure the ticker will be taken into account
+        notifyWork();
+    }
+
+    @Override
+    public boolean isTransportBlockedForWriting()
+    {
+        return _transportBlockedForWriting;
+    }
+    @Override
+    public void setTransportBlockedForWriting(final boolean blocked)
+    {
+        if(_transportBlockedForWriting != blocked)
+        {
+            _transportBlockedForWriting = blocked;
+            transportStateChanged();
+        }
+
+    }
+
+    @Override
+    public Iterator<Runnable> processPendingIterator()
+    {
+        if (isIOThread())
+        {
+            return new ProcessPendingIterator();
+        }
+        else
+        {
+            return Collections.emptyIterator();
+        }
+    }
+
+    @Override
+    public boolean hasWork()
+    {
+        return _stateChanged.get();
+    }
+
+    @Override
+    public void notifyWork()
+    {
+        _stateChanged.set(true);
+
+        final Action<ProtocolEngine> listener = _workListener.get();
+        if(listener != null)
+        {
+            listener.performAction(this);
+        }
+    }
+
+    @Override
+    public void notifyWork(final AMQSessionModel<?,?> sessionModel)
+    {
+        _sessionsWithWork.add(sessionModel);
+        notifyWork();
+    }
+
+    @Override
+    public void clearWork()
+    {
+        _stateChanged.set(false);
+    }
+
+    @Override
+    public void setWorkListener(final Action<ProtocolEngine> listener)
+    {
+        _workListener.set(listener);
+    }
+
+    public boolean hasSessionWithName(final byte[] name)
+    {
+        return false;
+    }
+
+    @Override
+    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
+    {
+
+        stopConnection();
+        final ErrorCondition cause;
+        switch(reason)
+        {
+            case MANAGEMENT:
+                cause = ConnectionError.CONNECTION_FORCED;
+                break;
+            case TRANSACTION_TIMEOUT:
+                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+                break;
+            default:
+                cause = AmqpError.INTERNAL_ERROR;
+        }
+        Action<ConnectionHandler> action = new Action<ConnectionHandler>()
+        {
+            @Override
+            public void performAction(final ConnectionHandler object)
+            {
+                closeConnection(cause, description);
+
+            }
+        };
+        addAsyncTask(action);
+    }
+
+    public void closeSessionAsync(final AMQSessionModel<?,?> session,
+                                  final CloseReason reason, final String message)
+    {
+        final ErrorCondition cause;
+        switch(reason)
+        {
+            case MANAGEMENT:
+                cause = ConnectionError.CONNECTION_FORCED;
+                break;
+            case TRANSACTION_TIMEOUT:
+                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+                break;
+            default:
+                cause = AmqpError.INTERNAL_ERROR;
+        }
+        addAsyncTask(new Action<ConnectionHandler>()
+        {
+            @Override
+            public void performAction(final ConnectionHandler object)
+            {
+                AccessController.doPrivileged(new PrivilegedAction<Void>() {
+                    @Override
+                    public Void run()
+                    {
+                        ((Session_1_0)session).close(cause, message);
+                        return null;
+                    }
+                }, ((Session_1_0)session).getAccessControllerContext());
+            }
+        });
+
+    }
+
+    public void block()
+    {
+        synchronized (_blockingLock)
+        {
+            if (!_blocking)
+            {
+                _blocking = true;
+                doOnIOThreadAsync(
+                        new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                doBlock();
+                            }
+                        });
+            }
+        }
+    }
+
+    private void doBlock()
+    {
+        for(Session_1_0 session : _sessions)
+        {
+            session.block();
+        }
+    }
+
+    public String getRemoteContainerName()
+    {
+        return _remoteContainerId;
+    }
+
+    public Collection<? extends Session_1_0> getSessionModels()
+    {
+        return Collections.unmodifiableCollection(_sessions);
+    }
+
+    public void unblock()
+    {
+        synchronized (_blockingLock)
+        {
+            if(_blocking)
+            {
+                _blocking = false;
+                doOnIOThreadAsync(
+                        new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                doUnblock();
+                            }
+                        });
+            }
+        }
+    }
+
+    private void doUnblock()
+    {
+        for(Session_1_0 session : _sessions)
+        {
+            session.unblock();
+        }
+    }
+
+    @Override
+    public long getSessionCountLimit()
+    {
+        return _channelMax+1;
+    }
+
+    @Override
+    public boolean isOrderlyClose()
+    {
+        return _orderlyClose.get();
+    }
+
+    @Override
+    protected void addAsyncTask(final Action<? super ConnectionHandler> action)
+    {
+        _asyncTaskList.add(action);
+        notifyWork();
+    }
+
+
+    private void sendOpen(final int channelMax, final int maxFrameSize)
+    {
+        Open open = new Open();
+
+        if (_receivingSessions == null)
+        {
+            _receivingSessions = new Session_1_0[channelMax + 1];
+            _sendingSessions = new Session_1_0[channelMax + 1];
+        }
+        if (channelMax < _channelMax)
+        {
+            _channelMax = channelMax;
+        }
+        open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
+        open.setContainerId(getAddressSpace() == null ? UUID.randomUUID().toString() : getAddressSpace().getId().toString());
+        open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+        // TODO - should we try to set the hostname based on the connection information?
+        // open.setHostname();
+        open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
+
+        // set the offered capabilities
+        if(_offeredCapabilities != null && !_offeredCapabilities.isEmpty())
+        {
+            open.setOfferedCapabilities(_offeredCapabilities.toArray(new Symbol[_offeredCapabilities.size()]));
+        }
+
+        if (_properties != null)
+        {
+            open.setProperties(_properties);
+        }
+
+        sendFrame(CONNECTION_CONTROL_CHANNEL, open);
+    }
+
+    private void closeWithError(final AmqpError amqpError, final String errorDescription)
+    {
+        final Error err = new Error();
+        err.setCondition(amqpError);
+        err.setDescription(errorDescription);
+        closeConnection(err);
+        _closedOnOpen = true;
+    }
+
+    private Session_1_0 getSession(final short channel)
+    {
+        Session_1_0 session = _receivingSessions[channel];
+        if (session == null)
+        {
+            Error error = new Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
+            handleError(error);
+        }
+
+        return session;
+    }
+
+    private void sendClose(Close closeToSend)
+    {
+        sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
+        closeSender();
+    }
+
+
+    private void assertState(final FrameReceivingState state)
+    {
+        if(_frameReceivingState != state)
+        {
+            throw new ConnectionScopedRuntimeException("Unexpected state, client has sent frame in an illegal order.  Required state: " + state + ", actual state: " + _frameReceivingState);
+        }
+    }
+
+
+    private class ProcessPendingIterator implements Iterator<Runnable>
+    {
+        private Iterator<? extends AMQSessionModel<?,?>> _sessionIterator;
+        private ProcessPendingIterator()
+        {
+            _sessionIterator = _sessionsWithWork.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return (!_sessionsWithWork.isEmpty() && !isClosed() && !isConnectionStopped()) || !_asyncTaskList.isEmpty();
+        }
+
+        @Override
+        public Runnable next()
+        {
+            if(!_sessionsWithWork.isEmpty())
+            {
+                if(isClosed() || isConnectionStopped())
+                {
+
+                    final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
+                    if(asyncAction != null)
+                    {
+                        return new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                asyncAction.performAction(AMQPConnection_1_0Impl.this);
+                            }
+                        };
+                    }
+                    else
+                    {
+                        return new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+
+                            }
+                        };
+                    }
+                }
+                else
+                {
+                    if (!_sessionIterator.hasNext())
+                    {
+                        _sessionIterator = _sessionsWithWork.iterator();
+                    }
+                    final AMQSessionModel<?,?> session = _sessionIterator.next();
+                    return new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            _sessionIterator.remove();
+                            if (session.processPending())
+                            {
+                                _sessionsWithWork.add(session);
+                            }
+                        }
+                    };
+                }
+            }
+            else if(!_asyncTaskList.isEmpty())
+            {
+                final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        asyncAction.performAction(AMQPConnection_1_0Impl.this);
+                    }
+                };
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
+    public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
+    {
+        super.initialiseHeartbeating(writerDelay, readerDelay);
+    }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
index ce791e5..24ecac5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
@@ -89,8 +89,8 @@
         if(supportedMechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME)
                 || (supportedMechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && network.getPeerPrincipal() != null))
         {
-            final AMQPConnection_1_0 amqpConnection_1_0 =
-                    new AMQPConnection_1_0(broker, network, port, transport, id, aggregateTicker);
+            final AMQPConnection_1_0Impl amqpConnection_1_0 =
+                    new AMQPConnection_1_0Impl(broker, network, port, transport, id, aggregateTicker);
             amqpConnection_1_0.create();
             return amqpConnection_1_0;
         }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index 297a54e..221f073 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -65,8 +65,8 @@
                                             Transport transport,
                                             long id, final AggregateTicker aggregateTicker)
     {
-        final AMQPConnection_1_0 amqpConnection_1_0 =
-                new AMQPConnection_1_0(broker, network, port, transport, id, aggregateTicker);
+        final AMQPConnection_1_0Impl amqpConnection_1_0 =
+                new AMQPConnection_1_0Impl(broker, network, port, transport, id, aggregateTicker);
         amqpConnection_1_0.create();
         return amqpConnection_1_0;
     }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index f9f5616..3af28fc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -186,10 +186,10 @@
     private volatile long _rolledBackTransactions;
     private volatile int _unacknowledgedMessages;
 
-    public Session_1_0(final AMQPConnection_1_0 connection, Begin begin, short channelId)
+    public Session_1_0(final AMQPConnection_1_0 connection, Begin begin, short sendingChannelId)
     {
-        super(connection, channelId);
-        _sendingChannel = channelId;
+        super(connection, sendingChannelId);
+        _sendingChannel = sendingChannelId;
         _sessionState = SessionState.BEGIN_RECVD;
         _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
@@ -758,7 +758,7 @@
                 link = createSendingLink(endpoint, attach);
                 if (link != null)
                 {
-                    capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS);
+                    capabilities.add(AMQPConnection_1_0Impl.SHARED_SUBSCRIPTIONS);
                 }
             }
             else if (endpoint.getTarget() instanceof Coordinator)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index 7406dcd..5217c24 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -26,7 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0Impl;
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
@@ -53,7 +53,7 @@
         _isSasl = isSasl;
     }
 
-    public FrameHandler(final AMQPConnection_1_0 connection, final boolean sasl)
+    public FrameHandler(final AMQPConnection_1_0Impl connection, final boolean sasl)
     {
         this(new ValueHandler(connection.getDescribedTypeRegistry()), connection, sasl);
 
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index f810687..e111cdd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -79,7 +79,7 @@
 
 public class ProtocolEngine_1_0_0Test extends QpidTestCase
 {
-    private AMQPConnection_1_0 _protocolEngine_1_0_0;
+    private AMQPConnection_1_0Impl _protocolEngine_1_0_0;
     private ServerNetworkConnection _networkConnection;
     private Broker<?> _broker;
     private AmqpPort _port;
@@ -293,7 +293,7 @@
     private void createEngine(Transport transport)
     {
         _protocolEngine_1_0_0 =
-                new AMQPConnection_1_0(_broker, _networkConnection, _port, transport, 1, new AggregateTicker());
+                new AMQPConnection_1_0Impl(_broker, _networkConnection, _port, transport, 1, new AggregateTicker());
     }
 
     private void allowMechanisms(String... mechanisms)
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 89ca00f..23905c8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -41,13 +41,17 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -559,6 +563,11 @@
         when(connection.getAddressSpace()).thenReturn(_virtualHost);
         when(connection.getEventLogger()).thenReturn(mock(EventLogger.class));
         when(connection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD)).thenReturn(1L);
+        when(connection.getChildExecutor()).thenReturn(mock(TaskExecutor.class));
+        when(connection.getModel()).thenReturn(BrokerModel.getInstance());
+        when(connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
+        when(connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+        when(connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
         final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
         when(connection.doOnIOThreadAsync(runnableCaptor.capture())).thenAnswer(new Answer<ListenableFuture<Void>>()
         {
@@ -587,10 +596,10 @@
     {
         Begin begin = mock(Begin.class);
         when(begin.getNextOutgoingId()).thenReturn(new UnsignedInteger(channelId));
-        Session_1_0 _session = new Session_1_0(connection, begin);
-        _session.setReceivingChannel((short)channelId);
-        _session.setSendingChannel((short)channelId);
-        return _session;
+        Session_1_0 session = new Session_1_0(connection, begin, (short) channelId);
+        session.setReceivingChannel((short)channelId);
+        session.setSendingChannel((short)channelId);
+        return session;
     }
 
     private void sendDetach(final Session_1_0 session,