QPID-7633: [Java Broker] address review comments

* remove redundant declaration of methods from AbstractAMQPSession
* pass the receiving channels to the Session_1_0 ctor and remove Session_1_0#setReceivingChannel and Session_1_0#setSendingChannel
* get rid of uses of AMQPConnection_1_0Impl in favour of AMQPConnection_1_0
* rename sessionModel to session in various places
* rename channel to session in AbstractQueue
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index b913508..915e1d9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -37,7 +37,6 @@
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
-import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.transport.AMQPConnection;
 
 public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T>
@@ -97,7 +96,7 @@
     {
         @SuppressWarnings("unchecked")
         final T target = (T) this;
-        getSessionModel().notifyWork(target);
+        getSession().notifyWork(target);
     }
 
     protected final void setNotifyWorkDesired(final boolean desired)
@@ -108,12 +107,12 @@
             {
                 if (desired)
                 {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                    getSession().removeTicker(_suspendedConsumerLoggingTicker);
                 }
                 else
                 {
                     _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+                    getSession().addTicker(_suspendedConsumerLoggingTicker);
                 }
             }
 
@@ -134,7 +133,7 @@
     @Override
     public boolean processPending()
     {
-        if (!getSessionModel().getAMQPConnection().isIOThread())
+        if (!getSession().getAMQPConnection().isIOThread())
         {
             return false;
         }
@@ -172,7 +171,7 @@
 
     private ListenableFuture<Void> doOnIoThreadAsync(final Runnable task)
     {
-        return getSessionModel().getAMQPConnection().doOnIOThreadAsync(task);
+        return getSession().getAMQPConnection().doOnIOThreadAsync(task);
     }
 
     private void consumerRemovedInternal(final MessageInstanceConsumer sub)
@@ -279,7 +278,7 @@
             }
             if (_suspendedConsumerLoggingTicker != null)
             {
-                getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                getSession().removeTicker(_suspendedConsumerLoggingTicker);
             }
 
             return true;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index b2b6320..1fa7fb5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -58,7 +58,7 @@
 
     long getUnacknowledgedMessages();
 
-    AMQPSession<?,T> getSessionModel();
+    AMQPSession<?,T> getSession();
 
     void send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index 45fb744..5ef7c37 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -138,7 +138,7 @@
             final Collection<QueueConsumer<?,?>> consumers = _queue.getConsumers();
             for(QueueConsumer<?,?> c : consumers)
             {
-                if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
+                if(c.getSession().getConnectionReference() == message.getConnectionReference())
                 {
                     return false;
                 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
index 3f43f01..2302665 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -80,7 +80,7 @@
     long getUnacknowledgedMessages();
 
 
-    AMQPSession<?,?> getSessionModel();
+    AMQPSession<?,?> getSession();
 
     long getConsumerNumber();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6b04aac..39e309e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -208,7 +208,8 @@
 
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final Set<AMQPSession<?,?>> _blockedChannels = Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());
+    private final Set<AMQPSession<?, ?>> _blockedSessions =
+            Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final SettableFuture<Integer> _deleteFuture = SettableFuture.create();
@@ -357,36 +358,36 @@
         _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
         Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
-        AMQPSession<?, ?> sessionModel;
+        AMQPSession<?, ?> session;
         if(sessionPrincipals.isEmpty())
         {
-            sessionModel = null;
+            session = null;
         }
         else
         {
             final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
-            sessionModel = sessionPrincipal.getSession();
+            session = sessionPrincipal.getSession();
         }
 
-        if(sessionModel != null)
+        if(session != null)
         {
 
             switch(_exclusive)
             {
 
                 case PRINCIPAL:
-                    _exclusiveOwner = sessionModel.getAMQPConnection().getAuthorizedPrincipal();
+                    _exclusiveOwner = session.getAMQPConnection().getAuthorizedPrincipal();
                     break;
                 case CONTAINER:
-                    _exclusiveOwner = sessionModel.getAMQPConnection().getRemoteContainerName();
+                    _exclusiveOwner = session.getAMQPConnection().getRemoteContainerName();
                     break;
                 case CONNECTION:
-                    _exclusiveOwner = sessionModel.getAMQPConnection();
-                    addExclusivityConstraint(sessionModel.getAMQPConnection());
+                    _exclusiveOwner = session.getAMQPConnection();
+                    addExclusivityConstraint(session.getAMQPConnection());
                     break;
                 case SESSION:
-                    _exclusiveOwner = sessionModel;
-                    addExclusivityConstraint(sessionModel);
+                    _exclusiveOwner = session;
+                    addExclusivityConstraint(session);
                     break;
                 case NONE:
                 case LINK:
@@ -427,9 +428,9 @@
 
         if(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
         {
-            if(sessionModel != null)
+            if(session != null)
             {
-                addLifetimeConstraint(sessionModel.getAMQPConnection());
+                addLifetimeConstraint(session.getAMQPConnection());
             }
             else
             {
@@ -440,9 +441,9 @@
         }
         else if(getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)
         {
-            if(sessionModel != null)
+            if(session != null)
             {
-                addLifetimeConstraint(sessionModel);
+                addLifetimeConstraint(session);
             }
             else
             {
@@ -790,12 +791,12 @@
             case CONNECTION:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSessionModel().getAMQPConnection();
-                    addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
+                    exclusiveOwner = target.getSession().getAMQPConnection();
+                    addExclusivityConstraint(target.getSession().getAMQPConnection());
                 }
                 else
                 {
-                    if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
+                    if(exclusiveOwner != target.getSession().getAMQPConnection())
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -804,12 +805,12 @@
             case SESSION:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSessionModel();
-                    addExclusivityConstraint(target.getSessionModel());
+                    exclusiveOwner = target.getSession();
+                    addExclusivityConstraint(target.getSession());
                 }
                 else
                 {
-                    if(exclusiveOwner != target.getSessionModel())
+                    if(exclusiveOwner != target.getSession())
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -822,7 +823,7 @@
                 }
                 break;
             case PRINCIPAL:
-                Principal currentAuthorizedPrincipal = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+                Principal currentAuthorizedPrincipal = target.getSession().getAMQPConnection().getAuthorizedPrincipal();
                 if(exclusiveOwner == null)
                 {
                     exclusiveOwner = currentAuthorizedPrincipal;
@@ -838,11 +839,11 @@
             case CONTAINER:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
+                    exclusiveOwner = target.getSession().getAMQPConnection().getRemoteContainerName();
                 }
                 else
                 {
-                    if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
+                    if(!exclusiveOwner.equals(target.getSession().getAMQPConnection().getRemoteContainerName()))
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -1734,7 +1735,7 @@
     }
 
     @Override
-    public void checkCapacity(AMQPSession<?,?> channel)
+    public void checkCapacity(AMQPSession<?,?> session)
     {
         if(_queueFlowControlSizeBytes != 0L)
         {
@@ -1745,9 +1746,9 @@
                 getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
                                                                              _queueFlowControlSizeBytes));
 
-                _blockedChannels.add(channel);
+                _blockedSessions.add(session);
 
-                channel.block(this);
+                session.block(this);
 
                 if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
                 {
@@ -1756,8 +1757,8 @@
                     getEventLogger().message(_logSubject,
                                              QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
 
-                   channel.unblock(this);
-                   _blockedChannels.remove(channel);
+                   session.unblock(this);
+                   _blockedSessions.remove(session);
 
                 }
             }
@@ -1784,10 +1785,10 @@
                                                                          _queueFlowResumeSizeBytes));
                     }
 
-                    for (final AMQPSession<?,?> blockedChannel : _blockedChannels)
+                    for (final AMQPSession<?,?> blockedSession : _blockedSessions)
                     {
-                        blockedChannel.unblock(this);
-                        _blockedChannels.remove(blockedChannel);
+                        blockedSession.unblock(this);
+                        _blockedSessions.remove(blockedSession);
                     }
                 }
             }
@@ -2610,7 +2611,7 @@
                 allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getAMQPConnection().getRemoteContainerName());
                 break;
             case LINK:
-                allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
+                allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSession() == session;
                 break;
             default:
                 throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
@@ -2691,9 +2692,9 @@
 
                     if(session == null)
                     {
-                        session = c.getSessionModel();
+                        session = c.getSession();
                     }
-                    else if(!session.equals(c.getSessionModel()))
+                    else if(!session.equals(c.getSession()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2701,7 +2702,7 @@
                 _exclusiveOwner = session;
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection();
         }
     }
 
@@ -2719,9 +2720,9 @@
                     QueueConsumer<?,?> c = queueConsumerIterator.next();
                     if(con == null)
                     {
-                        con = c.getSessionModel().getAMQPConnection();
+                        con = c.getSession().getAMQPConnection();
                     }
-                    else if(!con.equals(c.getSessionModel().getAMQPConnection()))
+                    else if(!con.equals(c.getSession().getAMQPConnection()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2732,7 +2733,7 @@
                 _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPSession<?,?>)_exclusiveOwner).getAMQPConnection();
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection();
         }
     }
 
@@ -2749,9 +2750,9 @@
                     QueueConsumer<?,?> c = queueConsumerIterator.next();
                     if(containerID == null)
                     {
-                        containerID = c.getSessionModel().getAMQPConnection().getRemoteContainerName();
+                        containerID = c.getSession().getAMQPConnection().getRemoteContainerName();
                     }
-                    else if(!containerID.equals(c.getSessionModel().getAMQPConnection().getRemoteContainerName()))
+                    else if(!containerID.equals(c.getSession().getAMQPConnection().getRemoteContainerName()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2765,7 +2766,7 @@
                 _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPSession<?,?>)_exclusiveOwner).getAMQPConnection().getRemoteContainerName();
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getRemoteContainerName();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection().getRemoteContainerName();
         }
     }
 
@@ -2782,10 +2783,10 @@
                     QueueConsumer<?,?> c = queueConsumerIterator.next();
                     if(principal == null)
                     {
-                        principal = c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+                        principal = c.getSession().getAMQPConnection().getAuthorizedPrincipal();
                     }
                     else if(!Objects.equals(principal.getName(),
-                                            c.getSessionModel().getAMQPConnection().getAuthorizedPrincipal().getName()))
+                                            c.getSession().getAMQPConnection().getAuthorizedPrincipal().getName()))
                     {
                         throw new ExistingConsumerPreventsExclusive();
                     }
@@ -2799,7 +2800,7 @@
                 _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQPSession<?,?>)_exclusiveOwner).getAMQPConnection().getAuthorizedPrincipal();
                 break;
             case LINK:
-                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSession().getAMQPConnection().getAuthorizedPrincipal();
         }
     }
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 316f661..a5ed736 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -112,9 +112,9 @@
                       final Integer priority)
     {
         super(queue,
-              createAttributeMap(target.getSessionModel(), consumerName, filters, optionSet, priority));
+              createAttributeMap(target.getSession(), consumerName, filters, optionSet, priority));
         _messageClass = messageClass;
-        _sessionReference = target.getSessionModel().getConnectionReference();
+        _sessionReference = target.getSession().getConnectionReference();
         _consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
         _filters = filters;
         _acquires = optionSet.contains(ConsumerOption.ACQUIRES);
@@ -132,7 +132,7 @@
         setupLogging();
     }
 
-    private static Map<String, Object> createAttributeMap(final AMQPSession<?,?> sessionModel,
+    private static Map<String, Object> createAttributeMap(final AMQPSession<?,?> session,
                                                           String linkName,
                                                           FilterManager filters,
                                                           EnumSet<ConsumerOption> optionSet,
@@ -140,9 +140,9 @@
     {
         Map<String,Object> attributes = new HashMap<String, Object>();
         attributes.put(ID, UUID.randomUUID());
-        String name = sessionModel.getAMQPConnection().getConnectionId()
+        String name = session.getAMQPConnection().getConnectionId()
                       + "|"
-                      + sessionModel.getChannelId()
+                      + session.getChannelId()
                       + "|"
                       + linkName;
         attributes.put(NAME, name);
@@ -215,9 +215,9 @@
     }
 
     @Override
-    public AMQPSession<?,?> getSessionModel()
+    public AMQPSession<?,?> getSession()
     {
-        return _target.getSessionModel();
+        return _target.getSession();
     }
 
     @Override
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 39e497e..3b3d76c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -22,7 +22,6 @@
 
 import java.security.AccessControlContext;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -50,7 +49,6 @@
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Session;
@@ -172,13 +170,6 @@
         return getBlocking();
     }
 
-    public abstract boolean getBlocking();
-
-    public abstract Collection<Consumer<?,X>> getConsumers();
-
-    @Override
-    public abstract long getConsumerCount();
-
     @Override
     public int getLocalTransactionOpen()
     {
@@ -192,12 +183,6 @@
         return getTxnStart();
     }
 
-    public abstract long getTxnRejects();
-
-    public abstract long getTxnCommits();
-
-    public abstract long getTxnStart();
-
     public long getLocalTransactionRollbacks()
     {
         return getTxnRejects();
@@ -221,8 +206,6 @@
         _taskList.remove(task);
     }
 
-    public abstract int getUnacknowledgedMessageCount();
-
     @Override
     public Date getTransactionStartTime()
     {
@@ -375,17 +358,11 @@
         _connection.getAggregateTicker().removeTicker(ticker);
     }
 
-    public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
-
     public LogSubject getLogSubject()
     {
         return _logSubject;
     }
 
-    public abstract long getTransactionUpdateTimeLong();
-
-    public abstract long getTransactionStartTimeLong();
-
     @Override
     protected void logOperation(final String operation)
     {
@@ -431,8 +408,6 @@
         }
     }
 
-    public abstract void transportStateChanged();
-
     protected abstract void updateBlockedStateIfNecessary();
 
     public abstract boolean isClosing();
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 6319a99..bf86051 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -85,7 +85,7 @@
         return queue;
     }
 
-    public AMQPSession getSessionModel()
+    public AMQPSession getSession()
     {
         return _sessionModel;
     }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 57c2a7d..47feaf5 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -411,6 +411,11 @@
                            });
    }
 
+    void acknowledge(final MessageInstanceConsumer consumer, final MessageInstance entry)
+    {
+        _session.acknowledge(consumer, this, entry);
+    }
+
     void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
         entry.setRedelivered();
@@ -429,7 +434,7 @@
             entry.setRedelivered();
         }
 
-        if (getSessionModel().isClosing() || !setRedelivered)
+        if (getSession().isClosing() || !setRedelivered)
         {
             entry.decrementDeliveryCount();
         }
@@ -487,7 +492,7 @@
 
     protected EventLogger getEventLogger()
     {
-        return getSessionModel().getAMQPConnection().getEventLogger();
+        return getSession().getAMQPConnection().getEventLogger();
     }
 
     private boolean isMaxDeliveryLimitReached(MessageInstance entry)
@@ -573,16 +578,11 @@
         stop();
     }
 
-    public Session_0_10 getSessionModel()
+    public Session_0_10 getSession()
     {
         return _session.getModelObject();
     }
 
-    public ServerSession getSession()
-    {
-        return _session;
-    }
-
     public boolean isDurable()
     {
         return false;
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 17a0b6b..5617833 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -47,7 +47,7 @@
 
     public void onAccept()
     {
-        _target.getSession().acknowledge(_consumer, _target, _entry);
+        _target.acknowledge(_consumer, _entry);
     }
 
     public void onRelease(boolean setRedelivered)
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index d0941bf..8320bd7 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -328,7 +328,7 @@
         return _targetAddress;
     }
 
-    public AMQChannel getSessionModel()
+    public AMQChannel getSession()
     {
         return _channel;
     }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 109886d..d149783 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.model.ManagedObject;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -35,8 +36,13 @@
 
 @ManagedObject(category = false, creatable = false, type="AMQP_1_0")
 public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQPConnection<C>,
-                                                                             ProtocolEngine, EventLoggerProvider
+                                                                             ProtocolEngine,
+                                                                             ConnectionHandler,
+                                                                             EventLoggerProvider
 {
+    Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
+
     Object getReference();
 
     String getRemoteContainerId();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index f9b85e1..d64ac26 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -116,10 +116,10 @@
 import org.apache.qpid.transport.util.Functions;
 
 public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
-        implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
+        implements FrameOutputHandler,
+                   DescribedTypeConstructorRegistry.Source,
                    ValueWriter.Registry.Source,
                    SASLEndpoint,
-                   ConnectionHandler,
                    AMQPConnection_1_0<AMQPConnection_1_0Impl>
 {
 
@@ -154,9 +154,6 @@
                     (byte) 0
             };
 
-    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
-    public static final Symbol SHARED_SUBSCRIPTIONS = Symbol.valueOf("SHARED-SUBS");
-
     private FrameWriter _frameWriter;
     private ProtocolHandler _frameHandler;
     private volatile boolean _transportBlockedForWriting;
@@ -570,16 +567,16 @@
 
     }
 
-    public void receiveBegin(final short channel, final Begin begin)
+    public void receiveBegin(final short receivingChannelId, final Begin begin)
     {
 
         assertState(FrameReceivingState.ANY_FRAME);
-        short myChannelId;
+        short sendingChannelId;
         if (begin.getRemoteChannel() != null)
         {
             closeConnection(ConnectionError.FRAMING_ERROR,
                             "BEGIN received on channel "
-                            + channel
+                            + receivingChannelId
                             + " with given remote-channel "
                             + begin.getRemoteChannel()
                             + ". Since the broker does not spontaneously start channels, this must be an error.");
@@ -588,40 +585,37 @@
         else // Peer requesting session creation
         {
 
-            if (_receivingSessions[channel] == null)
+            if (_receivingSessions[receivingChannelId] == null)
             {
-                myChannelId = getFirstFreeChannel();
-                if (myChannelId == -1)
+                sendingChannelId = getFirstFreeChannel();
+                if (sendingChannelId == -1)
                 {
 
                     closeConnection(ConnectionError.FRAMING_ERROR,
                                     "BEGIN received on channel "
-                                    + channel
+                                    + receivingChannelId
                                     + ". There are no free channels for the broker to respond on.");
 
                 }
-                Session_1_0 session = new Session_1_0(this, begin, myChannelId);
+                Session_1_0 session = new Session_1_0(this, begin, sendingChannelId, receivingChannelId);
                 session.create();
 
-                _receivingSessions[channel] = session;
-                _sendingSessions[myChannelId] = session;
+                _receivingSessions[receivingChannelId] = session;
+                _sendingSessions[sendingChannelId] = session;
 
                 Begin beginToSend = new Begin();
-
-                session.setReceivingChannel(channel);
-                session.setSendingChannel(myChannelId);
-                beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
+                beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
                 beginToSend.setNextOutgoingId(session.getNextOutgoingId());
                 beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
                 beginToSend.setIncomingWindow(session.getIncomingWindowSize());
-                sendFrame(myChannelId, beginToSend);
+                sendFrame(sendingChannelId, beginToSend);
 
                 _sessions.add(session);
             }
             else
             {
                 closeConnection(ConnectionError.FRAMING_ERROR,
-                                "BEGIN received on channel " + channel + " which is already in use.");
+                                "BEGIN received on channel " + receivingChannelId + " which is already in use.");
             }
 
         }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index abd55bb..4da1af4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -315,6 +315,7 @@
         }
     }
 
+    @Override
     public Session_1_0 getSession()
     {
         return _link.getSession();
@@ -494,12 +495,6 @@
     }
 
     @Override
-    public Session_1_0 getSessionModel()
-    {
-        return getSession();
-    }
-
-    @Override
     public void acquisitionRemoved(final MessageInstance node)
     {
     }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c85c582..f7f8ab6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -122,7 +122,6 @@
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-import org.apache.qpid.transport.network.Ticker;
 
 public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
         implements LogSubject, org.apache.qpid.server.util.Deletable<Session_1_0>
@@ -150,7 +149,6 @@
     private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
     private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
     private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
-    private long _lastAttachedTime;
 
     private short _receivingChannel;
     private final short _sendingChannel;
@@ -189,34 +187,28 @@
     private volatile long _rolledBackTransactions;
     private volatile int _unacknowledgedMessages;
 
-    public Session_1_0(final AMQPConnection_1_0 connection, Begin begin, short sendingChannelId)
+    public Session_1_0(final AMQPConnection_1_0 connection,
+                       Begin begin,
+                       short sendingChannelId,
+                       short receivingChannelId)
     {
         super(connection, sendingChannelId);
         _sendingChannel = sendingChannelId;
-        _sessionState = SessionState.BEGIN_RECVD;
+        _receivingChannel = receivingChannelId;
+        _sessionState = SessionState.ACTIVE;
         _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
         _primaryDomain = getPrimaryDomain();
-    }
 
-    public void setReceivingChannel(final short receivingChannel)
-    {
-        _receivingChannel = receivingChannel;
-        switch(_sessionState)
+        AccessController.doPrivileged((new PrivilegedAction<Object>()
         {
-            case INACTIVE:
-                _sessionState = SessionState.BEGIN_RECVD;
-                break;
-            case BEGIN_SENT:
-                _sessionState = SessionState.ACTIVE;
-                break;
-            case END_PIPE:
-                _sessionState = SessionState.END_SENT;
-                break;
-            default:
-                // TODO error
-
-        }
+            @Override
+            public Object run()
+            {
+                _connection.getEventLogger().message(ChannelMessages.CREATE());
+                return null;
+            }
+        }), _accessControllerContext);
     }
 
     public void sendDetach(final Detach detach)
@@ -545,33 +537,6 @@
         sendFlow(new Flow());
     }
 
-    public void setSendingChannel(final short sendingChannel)
-    {
-        switch(_sessionState)
-        {
-            case INACTIVE:
-                _sessionState = SessionState.BEGIN_SENT;
-                break;
-            case BEGIN_RECVD:
-                _sessionState = SessionState.ACTIVE;
-                break;
-            default:
-                // TODO error
-
-        }
-
-        AccessController.doPrivileged((new PrivilegedAction<Object>()
-        {
-            @Override
-            public Object run()
-            {
-                _connection.getEventLogger().message(ChannelMessages.CREATE());
-
-                return null;
-            }
-        }), _accessControllerContext);
-    }
-
     public void sendFlow(final Flow flow)
     {
         if(_nextIncomingTransferId != null)
@@ -761,7 +726,7 @@
                 link = createSendingLink(endpoint, attach);
                 if (link != null)
                 {
-                    capabilities.add(AMQPConnection_1_0Impl.SHARED_SUBSCRIPTIONS);
+                    capabilities.add(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS);
                 }
             }
             else if (endpoint.getTarget() instanceof Coordinator)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index 5217c24..7406dcd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -26,7 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0Impl;
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
@@ -53,7 +53,7 @@
         _isSasl = isSasl;
     }
 
-    public FrameHandler(final AMQPConnection_1_0Impl connection, final boolean sasl)
+    public FrameHandler(final AMQPConnection_1_0 connection, final boolean sasl)
     {
         this(new ValueHandler(connection.getDescribedTypeRegistry()), connection, sasl);
 
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 81c0cd8..a1fa32b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -684,9 +684,7 @@
     {
         Begin begin = mock(Begin.class);
         when(begin.getNextOutgoingId()).thenReturn(new UnsignedInteger(channelId));
-        Session_1_0 session = new Session_1_0(connection, begin, (short) channelId);
-        session.setReceivingChannel((short)channelId);
-        session.setSendingChannel((short)channelId);
+        Session_1_0 session = new Session_1_0(connection, begin, (short) channelId, (short) channelId);
         return session;
     }
 
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 38f01d7..c4f5515 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1007,7 +1007,7 @@
             AMQPSession<?,?> publishingSession = sessionPrincipals.iterator().next().getSession();
             for (ManagementNodeConsumer candidate : _consumers)
             {
-                if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSessionModel().getConnectionReference() == publishingSession.getConnectionReference())
+                if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSession().getConnectionReference() == publishingSession.getConnectionReference())
                 {
                     consumer = candidate;
                     break;
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 1119dbc..b6bcb64 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -106,9 +106,9 @@
         }
     }
 
-    AMQPSession<?,?> getSessionModel()
+    AMQPSession<?,?> getSession()
     {
-        return _target.getSessionModel();
+        return _target.getSession();
     }
 
     @Override
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index a0151d2..7df8887 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -273,9 +273,9 @@
         }
 
         @Override
-        public AMQPSession getSessionModel()
+        public AMQPSession getSession()
         {
-            return _underlying.getSessionModel();
+            return _underlying.getSession();
         }
 
         @Override