QPID-7633: [Java Broker] Remove SessionAdapter
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 1c2fba8..dbc87be 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -56,7 +56,7 @@
     boolean isProducerFlowBlocked();
 
 
-    Collection<Consumer> getConsumers();
+    Collection<? extends Consumer> getConsumers();
 
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Consumers")
     long getConsumerCount();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index 298eece..a9a0bbd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -70,7 +70,7 @@
 
     int getChannelId();
 
-    int getConsumerCount();
+    long getConsumerCount();
 
     Collection<Consumer<?,X>> getConsumers();
 
@@ -87,14 +87,14 @@
      *
      * @return the time this transaction started or 0 if not in a transaction
      */
-    long getTransactionStartTime();
+    long getTransactionStartTimeLong();
 
     /**
      * Return the time of the last activity on the current transaction.
      *
      * @return the time of the last activity or 0 if not in a transaction
      */
-    long getTransactionUpdateTime();
+    long getTransactionUpdateTimeLong();
 
     void transportStateChanged();
 
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
new file mode 100644
index 0000000..e0f3a52
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.session;
+
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Deletable;
+
+public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSession<S, X>,
+                             X extends ConsumerTarget<X>> extends Session<S>,
+                                             Deletable<S>,
+                                             EventLoggerProvider,
+                                             AMQSessionModel<S, X>
+{
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
similarity index 66%
rename from broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
rename to broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 94acef6..5f74109 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.model.adapter;
+package org.apache.qpid.server.session;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,80 +26,68 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.EventLoggerProvider;
+import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.TransactionTimeoutTicker;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.transport.network.Ticker;
 
-public final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
+public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, X extends ConsumerTarget<X>>
+        extends AbstractConfiguredObject<S>
+        implements AMQPSession<S, X>, EventLoggerProvider
 {
     private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
     private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
 
-    // Attributes
-    private final AMQSessionModel _session;
     private final Action _deleteModelTask;
-    private final AbstractAMQPConnection<?,?> _amqpConnection;
+    private final Connection<?> _amqpConnection;
 
-    public SessionAdapter(final AbstractAMQPConnection<?,?> amqpConnection,
-                          final AMQSessionModel<?,?> session)
+    protected AbstractAMQPSession(final Connection<?> parent,
+                                  final int sessionId)
     {
-        super(amqpConnection, createAttributes(session));
-        _amqpConnection = amqpConnection;
-        _session = session;
-        _session.addConsumerListener(new ConsumerListener()
-        {
-            @Override
-            public void consumerAdded(final Consumer<?,?> consumer)
-            {
-                childAdded(consumer);
-            }
-
-            @Override
-            public void consumerRemoved(final Consumer<?,?> consumer)
-            {
-                childRemoved(consumer);
-
-            }
-        });
-        session.setModelObject(this);
+        super(parent, createAttributes(sessionId));
+        _amqpConnection = parent;
 
         _deleteModelTask = new Action()
         {
             @Override
             public void performAction(final Object object)
             {
-                session.removeDeleteTask(this);
+                removeDeleteTask(this);
                 deleteAsync();
             }
         };
-        session.addDeleteTask(_deleteModelTask);
         setState(State.ACTIVE);
     }
 
-    private static Map<String, Object> createAttributes(final AMQSessionModel session)
+    @Override
+    protected void onCreate()
+    {
+        super.onCreate();
+        addDeleteTask(_deleteModelTask);
+    }
+
+    private static Map<String, Object> createAttributes(final long sessionId)
     {
         Map<String, Object> attributes = new HashMap<>();
-        attributes.put(ID, UUID.randomUUID());
-        attributes.put(NAME, String.valueOf(session.getChannelId()));
+        attributes.put(NAME, sessionId);
         attributes.put(DURABLE, false);
         attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
         return attributes;
@@ -109,67 +97,67 @@
     protected void postResolveChildren()
     {
         super.postResolveChildren();
-        registerTransactionTimeoutTickers(_amqpConnection, _session);
+        registerTransactionTimeoutTickers(_amqpConnection);
     }
 
     @Override
-    public int getChannelId()
-    {
-        return _session.getChannelId();
-    }
+    public abstract int getChannelId();
 
     @Override
     public boolean isProducerFlowBlocked()
     {
-        return _session.getBlocking();
+        return getBlocking();
     }
 
-    public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
-    {
-        return (Collection<Consumer>) _session.getConsumers();
-    }
+    public abstract boolean getBlocking();
+
+    public abstract Collection<Consumer<?,X>> getConsumers();
 
     @Override
-    public long getConsumerCount()
+    public abstract long getConsumerCount();
+
+    @Override
+    public int getLocalTransactionOpen()
     {
-        return _session.getConsumerCount();
+        long open = getTxnStart() - (getTxnCommits() + getTxnRejects());
+        return (open > 0L) ? 1 : 0;
     }
 
     @Override
     public long getLocalTransactionBegins()
     {
-        return _session.getTxnStart();
+        return getTxnStart();
     }
 
-    @Override
-    public int getLocalTransactionOpen()
-    {
-        long open = _session.getTxnStart() - (_session.getTxnCommits() + _session.getTxnRejects());
-        return (open > 0l) ? 1 : 0;
-    }
+    public abstract long getTxnRejects();
 
-    @Override
+    public abstract long getTxnCommits();
+
+    public abstract long getTxnStart();
+
     public long getLocalTransactionRollbacks()
     {
-        return _session.getTxnRejects();
+        return getTxnRejects();
     }
 
     @Override
     public long getUnacknowledgedMessages()
     {
-        return _session.getUnacknowledgedMessageCount();
+        return getUnacknowledgedMessageCount();
     }
 
+    public abstract int getUnacknowledgedMessageCount();
+
     @Override
     public Date getTransactionStartTime()
     {
-        return new Date(_session.getTransactionStartTime());
+        return new Date(getTransactionStartTimeLong());
     }
 
     @Override
     public Date getTransactionUpdateTime()
     {
-        return new Date(_session.getTransactionUpdateTime());
+        return new Date(getTransactionUpdateTimeLong());
     }
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
@@ -177,17 +165,19 @@
     {
         deleted();
         setState(State.DELETED);
-        _session.removeDeleteTask(_deleteModelTask);
+        removeDeleteTask(_deleteModelTask);
         return Futures.immediateFuture(null);
     }
 
-    private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?,?> amqpConnection,
-                                                   final AMQSessionModel session)
+    @Override
+    public abstract EventLogger getEventLogger();
+
+    private void registerTransactionTimeoutTickers(Connection<?> amqpConnection)
     {
         NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
         if (addressSpace instanceof QueueManagingVirtualHost)
         {
-            final EventLogger eventLogger = amqpConnection.getEventLogger();
+            final EventLogger eventLogger = getEventLogger();
             final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>) addressSpace;
             final List<Ticker> tickers = new ArrayList<>(4);
 
@@ -196,7 +186,7 @@
                 @Override
                 public Long get()
                 {
-                    return SessionAdapter.this._session.getTransactionStartTime();
+                    return getTransactionStartTimeLong();
                 }
             };
             final Supplier<Long> transactionUpdateTimeSupplier = new Supplier<Long>()
@@ -204,7 +194,7 @@
                 @Override
                 public Long get()
                 {
-                    return SessionAdapter.this._session.getTransactionUpdateTime();
+                    return getTransactionUpdateTimeLong();
                 }
             };
 
@@ -221,7 +211,7 @@
                             @Override
                             public void performAction(Long age)
                             {
-                                eventLogger.message(_session.getLogSubject(), ChannelMessages.OPEN_TXN(age));
+                                eventLogger.message(getLogSubject(), ChannelMessages.OPEN_TXN(age));
                             }
                         }
                 ));
@@ -236,7 +226,7 @@
                             @Override
                             public void performAction(Long age)
                             {
-                                _session.doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
+                                doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
                             }
                         }
                 ));
@@ -251,7 +241,7 @@
                             @Override
                             public void performAction(Long age)
                             {
-                                eventLogger.message(_session.getLogSubject(), ChannelMessages.IDLE_TXN(age));
+                                eventLogger.message(getLogSubject(), ChannelMessages.IDLE_TXN(age));
                             }
                         }
                 ));
@@ -266,7 +256,7 @@
                             @Override
                             public void performAction(Long age)
                             {
-                                _session.doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
+                                doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
                             }
                         }
                 ));
@@ -274,7 +264,7 @@
 
             for (Ticker ticker : tickers)
             {
-                session.addTicker(ticker);
+                addTicker(ticker);
             }
 
             Action deleteTickerTask = new Action()
@@ -282,21 +272,34 @@
                 @Override
                 public void performAction(Object o)
                 {
-                    session.removeDeleteTask(this);
+                    removeDeleteTask(this);
                     for (Ticker ticker : tickers)
                     {
-                        session.removeTicker(ticker);
+                        removeTicker(ticker);
                     }
                 }
             };
-            session.addDeleteTask(deleteTickerTask);
+            addDeleteTask(deleteTickerTask);
         }
     }
 
+    public abstract void addTicker(final Ticker ticker);
+
+    public abstract void removeTicker(final Ticker ticker);
+
+    public abstract void doTimeoutAction(final String idleTransactionTimeoutError);
+
+    public abstract LogSubject getLogSubject();
+
+    public abstract long getTransactionUpdateTimeLong();
+
+    public abstract long getTransactionStartTimeLong();
+
 
     @Override
     protected void logOperation(final String operation)
     {
-        _amqpConnection.getEventLogger().message(ChannelMessages.OPERATION(operation));
+        getEventLogger().message(ChannelMessages.OPERATION(operation));
     }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 4d7a0c6..fd16010 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -64,9 +64,7 @@
 import org.apache.qpid.server.model.StateTransition;
 import org.apache.qpid.server.model.TaskExecutorProvider;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.adapter.SessionAdapter;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.sasl.SaslSettings;
 import org.apache.qpid.server.stats.StatisticsCounter;
@@ -550,18 +548,6 @@
         return _subject;
     }
 
-    public void sessionAdded(final AMQSessionModel<?,?> session)
-    {
-        SessionAdapter adapter = new SessionAdapter(this, session);
-        adapter.create();
-        childAdded(adapter);
-
-    }
-
-    public void sessionRemoved(final AMQSessionModel<?,?> session)
-    {
-    }
-
     @Override
     public TaskExecutor getChildExecutor()
     {
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 93b6c87..c64cd20 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -347,7 +347,7 @@
         }
 
         @Override
-        public int getConsumerCount()
+        public long getConsumerCount()
         {
             return 0;
         }
@@ -377,13 +377,13 @@
         }
 
         @Override
-        public long getTransactionStartTime()
+        public long getTransactionStartTimeLong()
         {
             return 0;
         }
 
         @Override
-        public long getTransactionUpdateTime()
+        public long getTransactionUpdateTimeLong()
         {
             return 0;
         }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index a5d82d4..88d6814 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -39,7 +39,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.protocol.ErrorCodes;
 import org.apache.qpid.server.logging.EventLogger;
@@ -326,20 +325,12 @@
     public synchronized void registerSession(final Session ssn)
     {
         super.registerSession(ssn);
-        _amqpConnection.sessionAdded((ServerSession) ssn);
         if(_blocking)
         {
             ((ServerSession)ssn).block();
         }
     }
 
-    @Override
-    public synchronized void removeSession(final Session ssn)
-    {
-        _amqpConnection.sessionRemoved((ServerSession) ssn);
-        super.removeSession(ssn);
-    }
-
     public Collection<? extends ServerSession> getSessionModels()
     {
         return Collections.unmodifiableCollection(getChannels());
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 497a8ee..f2b5e30 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -184,9 +184,10 @@
     {
         SessionDelegate serverSessionDelegate = new ServerSessionDelegate();
 
-        ServerSession ssn = new ServerSession(conn, serverSessionDelegate,  new Binary(atc.getName()), 0);
+        final ServerSession serverSession =
+                new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
 
-        return ssn;
+        return serverSession;
     }
 
     @Override
@@ -413,9 +414,10 @@
 
         if(isSessionNameUnique(atc.getName(), conn))
         {
-
             serverConnection.map(ssn, atc.getChannel());
             serverConnection.registerSession(ssn);
+            final Session_0_10 session = new Session_0_10(((ServerConnection) conn).getAmqpConnection(), ssn.getChannelId(), ssn);
+            session.create();
             ssn.sendSessionAttached(atc.getName());
             ssn.setState(Session.State.OPEN);
         }
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index b7e017a..c02a461 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1076,7 +1076,7 @@
     }
 
     @Override
-    public int getConsumerCount()
+    public long getConsumerCount()
     {
         return _subscriptions.values().size();
     }
@@ -1113,7 +1113,7 @@
     }
 
     @Override
-    public long getTransactionStartTime()
+    public long getTransactionStartTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;
         if (serverTransaction.isTransactional())
@@ -1127,7 +1127,7 @@
     }
 
     @Override
-    public long getTransactionUpdateTime()
+    public long getTransactionUpdateTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;
         if (serverTransaction.isTransactional())
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
new file mode 100644
index 0000000..791805e
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.util.Collection;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.session.AbstractAMQPSession;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.network.Ticker;
+
+public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
+        implements AMQSessionModel<Session_0_10, ConsumerTarget_0_10>, LogSubject
+{
+    private final AMQPConnection_0_10 _connection;
+    private final ServerSession _serverSession;
+
+    protected Session_0_10(final Connection<?> parent, final int sessionId, final ServerSession serverSession)
+    {
+        super(parent, sessionId);
+        _connection = (AMQPConnection_0_10) parent;
+        _serverSession = serverSession;
+    }
+
+    @Override
+    public EventLogger getEventLogger()
+    {
+        return getConnection().getEventLogger();
+    }
+
+    @Override
+    public String toLogString()
+    {
+        return _serverSession.toLogString();
+    }
+
+    @Override
+    public AMQPConnection<?> getAMQPConnection()
+    {
+        return _connection;
+    }
+
+    @Override
+    public void block(final Queue<?> queue)
+    {
+        _serverSession.block(queue);
+    }
+
+    @Override
+    public void unblock(final Queue<?> queue)
+    {
+        _serverSession.unblock();
+    }
+
+    @Override
+    public void block()
+    {
+        _serverSession.block();
+    }
+
+    @Override
+    public void unblock()
+    {
+        _serverSession.unblock();
+    }
+
+    @Override
+    public Object getConnectionReference()
+    {
+        return _serverSession.getConnectionReference();
+    }
+
+    @Override
+    public void addConsumerListener(final ConsumerListener listener)
+    {
+        _serverSession.addConsumerListener(listener);
+    }
+
+    @Override
+    public void removeConsumerListener(final ConsumerListener listener)
+    {
+        _serverSession.removeConsumerListener(listener);
+    }
+
+    @Override
+    public void setModelObject(final Session<?> session)
+    {
+        _serverSession.setModelObject(this);
+    }
+
+    @Override
+    public Session<?> getModelObject()
+    {
+        return this;
+    }
+
+    @Override
+    public void transportStateChanged()
+    {
+        _serverSession.transportStateChanged();
+    }
+
+    @Override
+    public boolean processPending()
+    {
+        return _serverSession.processPending();
+    }
+
+    @Override
+    public void notifyWork(final ConsumerTarget_0_10 target)
+    {
+        _serverSession.notifyWork(target);
+    }
+
+    @Override
+    public int compareTo(final AMQSessionModel o)
+    {
+        return getId().compareTo(o.getId());
+    }
+
+    @Override
+    public void addDeleteTask(final Action<? super Session_0_10> task)
+    {
+        _serverSession.addDeleteTask((Action<? super ServerSession>) task);
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super Session_0_10> task)
+    {
+        _serverSession.removeDeleteTask((Action<? super ServerSession>) task);
+
+    }
+
+    @Override
+    public int getChannelId()
+    {
+        return _serverSession.getChannelId();
+    }
+
+    @Override
+    public boolean getBlocking()
+    {
+        return _serverSession.getBlocking();
+    }
+
+    @Override
+    public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
+    {
+        return _serverSession.getConsumers();
+    }
+
+    @Override
+    public long getConsumerCount()
+    {
+        return _serverSession.getConsumerCount();
+    }
+
+    @Override
+    public long getTxnRejects()
+    {
+        return _serverSession.getTxnRejects();
+    }
+
+    @Override
+    public long getTxnCommits()
+    {
+        return _serverSession.getTxnCommits();
+    }
+
+    @Override
+    public long getTxnStart()
+    {
+        return _serverSession.getTxnStart();
+    }
+
+    @Override
+    public int getUnacknowledgedMessageCount()
+    {
+        return _serverSession.getUnacknowledgedMessageCount();
+    }
+
+    @Override
+    public void addTicker(final Ticker ticker)
+    {
+        _serverSession.addTicker(ticker);
+    }
+
+    @Override
+    public void removeTicker(final Ticker ticker)
+    {
+        _serverSession.removeTicker(ticker);
+    }
+
+    @Override
+    public void doTimeoutAction(final String idleTransactionTimeoutError)
+    {
+        _serverSession.doTimeoutAction(idleTransactionTimeoutError);
+    }
+
+    @Override
+    public LogSubject getLogSubject()
+    {
+        return _serverSession.getLogSubject();
+    }
+
+    @Override
+    public long getTransactionUpdateTimeLong()
+    {
+        return _serverSession.getTransactionUpdateTimeLong();
+    }
+
+    @Override
+    public long getTransactionStartTimeLong()
+    {
+        return _serverSession.getTransactionStartTimeLong();
+    }
+
+    public AMQPConnection_0_10 getConnection()
+    {
+        return _connection;
+    }
+}
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e23a463..44f3014 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -91,6 +91,7 @@
 import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.session.AbstractAMQPSession;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
@@ -107,7 +108,7 @@
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.transport.network.Ticker;
 
-public class AMQChannel
+public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0_8>
         implements AMQSessionModel<AMQChannel, ConsumerTarget_0_8>,
                    AsyncAutoCommitTransaction.FutureRecorder,
                    ServerChannelMethodProcessor,
@@ -235,6 +236,7 @@
 
     public AMQChannel(AMQPConnection_0_8 connection, int channelId, final MessageStore messageStore)
     {
+        super(connection, channelId);
         _creditManager = new Pre0_10CreditManager(0L, 0L,
                                                   connection.getContextValue(Long.class, AMQPConnection_0_8.HIGH_PREFETCH_LIMIT),
                                                   connection.getContextValue(Long.class, AMQPConnection_0_8.BATCH_LIMIT));
@@ -1369,12 +1371,6 @@
     }
 
     @Override
-    public UUID getId()
-    {
-        return _id;
-    }
-
-    @Override
     public AMQPConnection_0_8<?> getAMQPConnection()
     {
         return _connection;
@@ -1915,7 +1911,7 @@
     }
 
     @Override
-    public int getConsumerCount()
+    public long getConsumerCount()
     {
         return _tag2SubscriptionTargetMap.size();
     }
@@ -1979,7 +1975,7 @@
     }
 
     @Override
-    public long getTransactionStartTime()
+    public long getTransactionStartTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;
         if (serverTransaction.isTransactional())
@@ -1993,7 +1989,7 @@
     }
 
     @Override
-    public long getTransactionUpdateTime()
+    public long getTransactionUpdateTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;
         if (serverTransaction.isTransactional())
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 3e19ade..61401e8 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -409,7 +409,6 @@
         synchronized (_channelAddRemoveLock)
         {
             _channelMap.put(channel.getChannelId(), channel);
-            sessionAdded(channel);
             if(_blocking)
             {
                 channel.block();
@@ -424,7 +423,6 @@
         {
             session = _channelMap.remove(channelId);
         }
-        sessionRemoved(session);
         session.dispose();
     }
 
@@ -901,6 +899,7 @@
             _logger.debug("Connecting to: {}", virtualHost.getName());
 
             final AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
+            channel.create();
 
             addChannel(channel);
 
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index dcb58ec..2c7267b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -83,7 +83,7 @@
     private MessageDestination _messageDestination;
 
     @Override
-    protected void setUp() throws Exception
+    public void setUp() throws Exception
     {
         super.setUp();
 
@@ -123,6 +123,9 @@
         when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
         when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
+        when(_amqConnection.getTaskExecutor()).thenReturn(taskExecutor);
+        when(_amqConnection.getChildExecutor()).thenReturn(taskExecutor);
+        when(_amqConnection.getModel()).thenReturn(BrokerModel.getInstance());
 
         when(_amqConnection.getContextValue(Long.class, AMQPConnection_0_8.BATCH_LIMIT)).thenReturn(AMQPConnection_0_8.DEFAULT_BATCH_LIMIT);
         when(_amqConnection.getContextValue(Long.class, AMQPConnection_0_8.HIGH_PREFETCH_LIMIT)).thenReturn(AMQPConnection_0_8.DEFAULT_BATCH_LIMIT);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index fbe16ee..d109ce4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -450,7 +450,6 @@
         if (!_closedOnOpen)
         {
             _sessions.remove(session);
-            sessionRemoved(session);
         }
     }
 
@@ -595,10 +594,11 @@
                     closeConnection(ConnectionError.FRAMING_ERROR,
                                     "BEGIN received on channel "
                                     + channel
-                                    + ". There are no free channels for the broker to responsd on.");
+                                    + ". There are no free channels for the broker to respond on.");
 
                 }
-                Session_1_0 session = new Session_1_0(this, begin);
+                Session_1_0 session = new Session_1_0(this, begin, myChannelId);
+                session.create();
 
                 _receivingSessions[channel] = session;
                 _sendingSessions[myChannelId] = session;
@@ -614,8 +614,6 @@
                 sendFrame(myChannelId, beginToSend);
 
                 _sessions.add(session);
-                sessionAdded(session);
-
             }
             else
             {
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index b0b20e9..b856bce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -366,7 +366,7 @@
                 return;
         }
 
-        if (!(getSession().getState() == SessionState.END_RECVD || getSession().isEnded()))
+        if (!(getSession().getSessionState() == SessionState.END_RECVD || getSession().isEnded()))
         {
             Detach detach = new Detach();
             detach.setHandle(getLocalHandle());
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c4f51b6..cf52a69 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -120,6 +120,7 @@
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.session.AbstractAMQPSession;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -129,7 +130,8 @@
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.transport.network.Ticker;
 
-public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
+public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget_1_0>
+        implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
 {
     public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
     private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
@@ -148,7 +150,6 @@
             new CopyOnWriteArrayList<Action<? super Session_1_0>>();
 
     private final AMQPConnection_1_0 _connection;
-    private UUID _id = UUID.randomUUID();
     private AtomicBoolean _closed = new AtomicBoolean();
     private final Subject _subject = new Subject();
 
@@ -156,11 +157,11 @@
 
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
-    private Session<?> _modelObject;
+    private Session<?> _modelObject = this;
     private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
     private Iterator<ConsumerTarget_1_0> _processPendingIterator;
 
-    private SessionState _state;
+    private SessionState _sessionState;
 
     private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>();
     private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
@@ -169,7 +170,7 @@
     private long _lastAttachedTime;
 
     private short _receivingChannel;
-    private short _sendingChannel = -1;
+    private final short _sendingChannel;
 
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
 
@@ -205,23 +206,12 @@
     private volatile long _rolledBackTransactions;
     private volatile int _unacknowledgedMessages;
 
-
-    public Session_1_0(final AMQPConnection_1_0 connection)
+    public Session_1_0(final AMQPConnection_1_0 connection, Begin begin, short channelId)
     {
-        this(connection, SessionState.INACTIVE, null);
-    }
-
-    public Session_1_0(final AMQPConnection_1_0 connection, Begin begin)
-    {
-        this(connection, SessionState.BEGIN_RECVD, new SequenceNumber(begin.getNextOutgoingId().intValue()));
-    }
-
-
-    private Session_1_0(final AMQPConnection_1_0 connection, SessionState state, SequenceNumber nextIncomingId)
-    {
-
-        _state = state;
-        _nextIncomingTransferId = nextIncomingId;
+        super(connection, channelId);
+        _sendingChannel = channelId;
+        _sessionState = SessionState.BEGIN_RECVD;
+        _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
         _subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
         _subject.getPrincipals().add(new SessionPrincipal(this));
@@ -237,16 +227,16 @@
     {
         _receivingChannel = receivingChannel;
         _logSubject.updateSessionDetails();
-        switch(_state)
+        switch(_sessionState)
         {
             case INACTIVE:
-                _state = SessionState.BEGIN_RECVD;
+                _sessionState = SessionState.BEGIN_RECVD;
                 break;
             case BEGIN_SENT:
-                _state = SessionState.ACTIVE;
+                _sessionState = SessionState.ACTIVE;
                 break;
             case END_PIPE:
-                _state = SessionState.END_SENT;
+                _sessionState = SessionState.END_SENT;
                 break;
             default:
                 // TODO error
@@ -261,7 +251,7 @@
 
     public void receiveAttach(final Attach attach)
     {
-        if(_state == SessionState.ACTIVE)
+        if(_sessionState == SessionState.ACTIVE)
         {
             UnsignedInteger handle = attach.getHandle();
             if(_remoteLinkEndpoints.containsKey(handle))
@@ -439,22 +429,22 @@
 
     public boolean isActive()
     {
-        return _state == SessionState.ACTIVE;
+        return _sessionState == SessionState.ACTIVE;
     }
 
     public void receiveEnd(final End end)
     {
-        switch (_state)
+        switch (_sessionState)
         {
             case END_SENT:
-                _state = SessionState.ENDED;
+                _sessionState = SessionState.ENDED;
                 break;
             case ACTIVE:
                 detachLinks();
                 remoteEnd(end);
                 short sendChannel = _sendingChannel;
                 _connection.sendEnd(sendChannel, new End(), true);
-                _state = SessionState.ENDED;
+                _sessionState = SessionState.ENDED;
                 break;
             default:
                 sendChannel = _sendingChannel;
@@ -570,9 +560,9 @@
 
     }
 
-    public SessionState getState()
+    public SessionState getSessionState()
     {
-        return _state;
+        return _sessionState;
     }
 
     public void sendFlow()
@@ -582,15 +572,14 @@
 
     public void setSendingChannel(final short sendingChannel)
     {
-        _sendingChannel = sendingChannel;
         _logSubject.updateSessionDetails();
-        switch(_state)
+        switch(_sessionState)
         {
             case INACTIVE:
-                _state = SessionState.BEGIN_SENT;
+                _sessionState = SessionState.BEGIN_SENT;
                 break;
             case BEGIN_RECVD:
-                _state = SessionState.ACTIVE;
+                _sessionState = SessionState.ACTIVE;
                 break;
             default:
                 // TODO error
@@ -652,17 +641,17 @@
 
     public void end(final End end)
     {
-        switch (_state)
+        switch (_sessionState)
         {
             case BEGIN_SENT:
                 _connection.sendEnd(_sendingChannel, end, false);
-                _state = SessionState.END_PIPE;
+                _sessionState = SessionState.END_PIPE;
                 break;
             case ACTIVE:
                 detachLinks();
                 short sendChannel = _sendingChannel;
                 _connection.sendEnd(sendChannel, end, true);
-                _state = SessionState.END_SENT;
+                _sessionState = SessionState.END_SENT;
                 break;
             default:
                 sendChannel = _sendingChannel;
@@ -773,7 +762,7 @@
 
     boolean isEnded()
     {
-        return _state == SessionState.ENDED || _connection.isClosed();
+        return _sessionState == SessionState.ENDED || _connection.isClosed();
     }
 
     UnsignedInteger getIncomingWindowSize()
@@ -1617,12 +1606,6 @@
     }
 
     @Override
-    public UUID getId()
-    {
-        return _id;
-    }
-
-    @Override
     public AMQPConnection<?> getAMQPConnection()
     {
         return _connection;
@@ -1877,7 +1860,7 @@
     }
 
     @Override
-    public int getConsumerCount()
+    public long getConsumerCount()
     {
         return getConsumers().size();
     }
@@ -1971,13 +1954,13 @@
     }
 
     @Override
-    public long getTransactionStartTime()
+    public long getTransactionStartTimeLong()
     {
         return 0L;
     }
 
     @Override
-    public long getTransactionUpdateTime()
+    public long getTransactionUpdateTimeLong()
     {
         return 0L;
     }
@@ -1985,7 +1968,7 @@
     @Override
     public boolean processPending()
     {
-        if (!getAMQPConnection().isIOThread() || END_STATES.contains(getState()))
+        if (!getAMQPConnection().isIOThread() || END_STATES.contains(getSessionState()))
         {
             return false;
         }