QPID-8240 : Detect idle connections
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index c590eca..7b5429e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -208,7 +208,7 @@
     public final void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
     {
         doSend(consumer, entry, batch);
-
+        getSession().getAMQPConnection().updateLastMessageOutboundTime();
         if (consumer.acquires())
         {
             entry.makeAcquisitionStealable();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 3bc6c84..f7b77f1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -3433,7 +3433,11 @@
         {
             if(allStats || statistics.contains(stat.getName()))
             {
-                map.put(stat.getName(), stat.getValue(this));
+                Object value = stat.getValue(this);
+                if(value != null)
+                {
+                    map.put(stat.getName(), value);
+                }
             }
         }
         return map;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 14ee17a..62cefc1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -120,7 +120,7 @@
                       description = "Total size of all messages received by this connection.")
     long getBytesIn();
 
-    // currently this reports outbound  message content size without header.
+    // currently this reports outbound message content size without header.
     // See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
     @SuppressWarnings("unused")
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Outbound",
@@ -143,6 +143,24 @@
     Date getLastIoTime();
 
     @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Inbound Message",
+            description = "Time of last message received by the broker on this connection. "
+                          + "If no message has been received the connection creation time will be used.")
+    Date getLastInboundMessageTime();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Outbound Message",
+            description = "Time of last message sent by the broker on this connection. "
+                          + "If no message has been snt the connection creation time will be used.")
+    Date getLastOutboundMessageTime();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Message",
+            description = "Time of last message sent or received by the broker on this connection. "
+                          + "If no message has been sent or received the connection creation time will be used.")
+    Date getLastMessageTime();
+
+    @SuppressWarnings("unused")
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Sessions",
                       description = "Current number of sessions belonging to this connection.")
     int getSessionCount();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 3b609f9..e99c254 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -85,6 +85,10 @@
 
     void unblock();
 
+    void updateLastMessageInboundTime();
+
+    void updateLastMessageOutboundTime();
+
     void pushScheduler(NetworkConnectionScheduler networkConnectionScheduler);
 
     NetworkConnectionScheduler popScheduler();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 5887a70..351d3b0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -48,6 +48,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
@@ -71,6 +72,7 @@
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.sasl.SaslSettings;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.network.NetworkConnection;
 import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
@@ -78,7 +80,9 @@
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.TransactionObserver;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.FixedKeyMapCreator;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
@@ -127,6 +131,11 @@
     private volatile NamedAddressSpace _addressSpace;
     private volatile long _lastReadTime;
     private volatile long _lastWriteTime;
+    private volatile long _lastMessageInboundTime;
+    private volatile long _lastMessageOutboundTime;
+    private volatile boolean _messagesWritten;
+
+
     private volatile AccessControlContext _accessControllerContext;
     private volatile Thread _ioThread;
     private volatile StatisticsGatherer _statisticsGatherer;
@@ -187,18 +196,13 @@
     {
         final AccessControlContext acc = AccessController.getContext();
         return AccessController.doPrivileged(
-                new PrivilegedAction<AccessControlContext>()
-                {
-                    @Override
-                    public AccessControlContext run()
-                    {
-                        if (subject == null)
-                            return new AccessControlContext(acc, null);
-                        else
-                            return new AccessControlContext
-                                    (acc,
-                                     new SubjectDomainCombiner(subject));
-                    }
+                (PrivilegedAction<AccessControlContext>) () -> {
+                    if (subject == null)
+                        return new AccessControlContext(acc, null);
+                    else
+                        return new AccessControlContext
+                                (acc,
+                                 new SubjectDomainCombiner(subject));
                 });
     }
 
@@ -209,7 +213,7 @@
         long maxAuthDelay = _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
         SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
         _aggregateTicker.addTicker(slowConnectionOpenTicker);
-        _lastReadTime = _lastWriteTime = getCreatedTime().getTime();
+        _lastReadTime = _lastWriteTime = _lastMessageInboundTime = _lastMessageOutboundTime = getCreatedTime().getTime();
         _maxUncommittedInMemorySize = getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
         _transactionObserver = _maxUncommittedInMemorySize < 0 ? FlowToDiskTransactionObserver.NOOP_TRANSACTION_OBSERVER : new FlowToDiskTransactionObserver(_maxUncommittedInMemorySize, _logSubject, _eventLoggerProvider.getEventLogger());
         logConnectionOpen();
@@ -268,7 +272,7 @@
         return _lastReadTime;
     }
 
-    public final void updateLastReadTime()
+    private void updateLastReadTime()
     {
         _lastReadTime = System.currentTimeMillis();
     }
@@ -281,7 +285,43 @@
 
     public final void updateLastWriteTime()
     {
-        _lastWriteTime = System.currentTimeMillis();
+        final long currentTime = System.currentTimeMillis();
+        _lastWriteTime = currentTime;
+        if(_messagesWritten)
+        {
+            _messagesWritten = false;
+            _lastMessageOutboundTime = currentTime;
+        }
+    }
+
+    @Override
+    public void updateLastMessageInboundTime()
+    {
+        _lastMessageInboundTime = _lastReadTime;
+    }
+
+    @Override
+    public void updateLastMessageOutboundTime()
+    {
+        _messagesWritten = true;
+    }
+
+    @Override
+    public Date getLastInboundMessageTime()
+    {
+        return new Date(_lastMessageInboundTime);
+    }
+
+    @Override
+    public Date getLastOutboundMessageTime()
+    {
+        return new Date(_lastMessageOutboundTime);
+    }
+
+    @Override
+    public Date getLastMessageTime()
+    {
+        return new Date(Math.max(_lastMessageInboundTime, _lastMessageOutboundTime));
     }
 
     @Override
@@ -438,6 +478,7 @@
     @Override
     public void registerMessageReceived(long messageSize)
     {
+        updateLastMessageInboundTime();
         _messagesIn.incrementAndGet();
         _bytesIn.addAndGet(messageSize);
         _statisticsGatherer.registerMessageReceived(messageSize);
@@ -522,6 +563,33 @@
         }
     }
 
+    @Override
+    public final void received(final QpidByteBuffer buf)
+    {
+        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
+        {
+            updateLastReadTime();
+            try
+            {
+                onReceive(buf);
+            }
+            catch (StoreException e)
+            {
+                if (getAddressSpace().isActive())
+                {
+                    throw new ServerScopedRuntimeException(e);
+                }
+                else
+                {
+                    throw new ConnectionScopedRuntimeException(e);
+                }
+            }
+            return null;
+        }, getAccessControllerContext());
+    }
+
+    protected abstract void onReceive(final QpidByteBuffer msg);
+
     protected abstract void addAsyncTask(final Action<? super T> action);
 
     protected abstract boolean isOpeningInProgress();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 055f935..ce7e7f0 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -135,33 +135,17 @@
     }
 
     @Override
-    public void received(final QpidByteBuffer buf)
+    protected void onReceive(final QpidByteBuffer buf)
     {
-        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
+        try
         {
-            updateLastReadTime();
-            try
-            {
-                _inputHandler.received(buf);
-                _connection.receivedComplete();
-            }
-            catch (IllegalArgumentException | IllegalStateException e)
-            {
-                throw new ConnectionScopedRuntimeException(e);
-            }
-            catch (StoreException e)
-            {
-                if (getAddressSpace().isActive())
-                {
-                    throw new ServerScopedRuntimeException(e);
-                }
-                else
-                {
-                    throw new ConnectionScopedRuntimeException(e);
-                }
-            }
-            return null;
-        }, getAccessControllerContext());
+            _inputHandler.received(buf);
+            _connection.receivedComplete();
+        }
+        catch (IllegalArgumentException | IllegalStateException e)
+        {
+            throw new ConnectionScopedRuntimeException(e);
+        }
     }
 
     @Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index eca6d8a..e888d7f 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -243,40 +243,18 @@
     }
 
     @Override
-    public void received(final QpidByteBuffer msg)
+    protected void onReceive(final QpidByteBuffer msg)
     {
-        AccessController.doPrivileged(new PrivilegedAction<Void>()
+        try
         {
-            @Override
-            public Void run()
-            {
-                updateLastReadTime();
-
-                try
-                {
-                    _decoder.decodeBuffer(msg);
-                    receivedCompleteAllChannels();
-                }
-                catch (AMQFrameDecodingException | IOException e)
-                {
-                    LOGGER.error("Unexpected exception", e);
-                    throw new ConnectionScopedRuntimeException(e);
-                }
-                catch (StoreException e)
-                {
-                    if (getAddressSpace().isActive())
-                    {
-                        throw new ServerScopedRuntimeException(e);
-                    }
-                    else
-                    {
-                        throw new ConnectionScopedRuntimeException(e);
-                    }
-                }
-                return null;
-            }
-        }, getAccessControllerContext());
-
+            _decoder.decodeBuffer(msg);
+            receivedCompleteAllChannels();
+        }
+        catch (AMQFrameDecodingException | IOException e)
+        {
+            LOGGER.error("Unexpected exception", e);
+            throw new ConnectionScopedRuntimeException(e);
+        }
     }
 
     private void receivedCompleteAllChannels()
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 02465a2..bcb273d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -24,7 +24,6 @@
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
 
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.AccessControlContext;
 import java.security.AccessControlException;
 import java.security.AccessController;
@@ -117,7 +116,6 @@
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
 import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.AggregateTicker;
@@ -133,8 +131,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
-        implements FrameOutputHandler,
-                   DescribedTypeConstructorRegistry.Source,
+        implements DescribedTypeConstructorRegistry.Source,
                    ValueWriter.Registry.Source,
                    SASLEndpoint,
                    AMQPConnection_1_0<AMQPConnection_1_0Impl>
@@ -357,7 +354,7 @@
                 {
                     outcome.setAdditionalData(new Binary(challenge));
                 }
-                send(new SASLFrame(outcome), null);
+                send(new SASLFrame(outcome));
                 _saslComplete = true;
                 _connectionState = ConnectionState.AWAIT_AMQP_HEADER;
                 disposeSaslNegotiator();
@@ -377,7 +374,7 @@
     {
         SaslChallenge challengeBody = new SaslChallenge();
         challengeBody.setChallenge(new Binary(challenge));
-        send(new SASLFrame(challengeBody), null);
+        send(new SASLFrame(challengeBody));
 
         _connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
     }
@@ -386,7 +383,7 @@
     {
         SaslOutcome outcome = new SaslOutcome();
         outcome.setCode(SaslCode.AUTH);
-        send(new SASLFrame(outcome), null);
+        send(new SASLFrame(outcome));
         _saslComplete = true;
         closeSaslWithFailure();
     }
@@ -1210,7 +1207,7 @@
             ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
             if (payload == null)
             {
-                send(AMQFrame.createAMQFrame(channel, body));
+                send(new TransportFrame(channel, body));
                 return 0;
             }
             else
@@ -1220,7 +1217,7 @@
                 long payloadLength = (long) payload.remaining();
                 if (payloadLength <= maxPayloadSize)
                 {
-                    send(AMQFrame.createAMQFrame(channel, body, payload));
+                    send(new TransportFrame(channel, body, payload));
                     return (int)payloadLength;
                 }
                 else
@@ -1234,7 +1231,7 @@
                     try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize))
                     {
                         payload.position(payload.position() + maxPayloadSize);
-                        send(AMQFrame.createAMQFrame(channel, body, payloadDup));
+                        send(new TransportFrame(channel, body, payloadDup));
                     }
 
                     return maxPayloadSize;
@@ -1261,7 +1258,7 @@
     @Override
     public void writerIdle()
     {
-        send(TransportFrame.createAMQFrame((short)0,null));
+        send(TransportFrame.HEARTBEAT);
     }
 
     @Override
@@ -1285,74 +1282,54 @@
         return getNetwork().getRemoteAddress().toString();
     }
 
-
-
     @Override
-    public void received(final QpidByteBuffer msg)
+    protected void onReceive(final QpidByteBuffer msg)
     {
-
-        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
+        try
         {
-            updateLastReadTime();
+            int remaining;
+
             try
             {
-                int remaining;
-
-                try
+                do
                 {
-                    do
+                    remaining = msg.remaining();
+
+                    switch (_connectionState)
                     {
-                        remaining = msg.remaining();
-
-                        switch (_connectionState)
-                        {
-                            case AWAIT_AMQP_OR_SASL_HEADER:
-                            case AWAIT_AMQP_HEADER:
-                                if (remaining >= 8)
-                                {
-                                    processProtocolHeader(msg);
-                                }
-                                break;
-                            case AWAIT_SASL_INIT:
-                            case AWAIT_SASL_RESPONSE:
-                            case AWAIT_OPEN:
-                            case OPENED:
-                            case CLOSE_SENT:
-                                _frameHandler.parse(msg);
-                                break;
-                            case CLOSE_RECEIVED:
-                            case CLOSED:
-                                // ignore;
-                                break;
-                        }
-
-
+                        case AWAIT_AMQP_OR_SASL_HEADER:
+                        case AWAIT_AMQP_HEADER:
+                            if (remaining >= 8)
+                            {
+                                processProtocolHeader(msg);
+                            }
+                            break;
+                        case AWAIT_SASL_INIT:
+                        case AWAIT_SASL_RESPONSE:
+                        case AWAIT_OPEN:
+                        case OPENED:
+                        case CLOSE_SENT:
+                            _frameHandler.parse(msg);
+                            break;
+                        case CLOSE_RECEIVED:
+                        case CLOSED:
+                            // ignore;
+                            break;
                     }
-                    while (msg.remaining() != remaining);
-                }
-                finally
-                {
-                    receivedComplete();
-                }
-            }
-            catch (IllegalArgumentException | IllegalStateException e)
-            {
-                throw new ConnectionScopedRuntimeException(e);
-            }
-            catch (StoreException e)
-            {
-                if (getAddressSpace().isActive())
-                {
-                    throw new ServerScopedRuntimeException(e);
-                }
-                else
-                {
-                    throw new ConnectionScopedRuntimeException(e);
-                }
-            }
-            return null;
-        }, getAccessControllerContext());
 
+
+                }
+                while (msg.remaining() != remaining);
+            }
+            finally
+            {
+                receivedComplete();
+            }
+        }
+        catch (IllegalArgumentException | IllegalStateException e)
+        {
+            throw new ConnectionScopedRuntimeException(e);
+        }
     }
 
     @Override
@@ -1402,7 +1379,7 @@
                     mechanismsList.add(Symbol.valueOf(name));
                 }
                 mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
-                send(new SASLFrame(mechanisms), null);
+                send(new SASLFrame(mechanisms));
 
                 _connectionState = ConnectionState.AWAIT_SASL_INIT;
                 _frameHandler = getFrameHandler(true);
@@ -1478,15 +1455,7 @@
         }
     }
 
-    @Override
-    public void send(final AMQFrame amqFrame)
-    {
-        send(amqFrame, null);
-    }
-
-
-    @Override
-    public void send(final AMQFrame amqFrame, ByteBuffer buf)
+    private void send(final AMQFrame amqFrame)
     {
         updateLastWriteTime();
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
@@ -1501,13 +1470,6 @@
         }
     }
 
-    public void send(short channel, FrameBody body)
-    {
-        AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
-        send(frame);
-
-    }
-
     private void addCloseTicker()
     {
         long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
deleted file mode 100644
index 878d195..0000000
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-
-public interface FrameOutputHandler<T>
-{
-
-    void send(AMQFrame<T> frame);
-    void send(AMQFrame<T> frame, ByteBuffer payload);
-
-}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
index ca42eee..d8e0481 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
@@ -45,16 +45,6 @@
         return _payload;
     }
 
-    public static TransportFrame createAMQFrame(int channel, FrameBody frameBody)
-    {
-        return createAMQFrame(channel, frameBody, null);
-    }
-
-    public static TransportFrame createAMQFrame(int channel, FrameBody frameBody, QpidByteBuffer payload)
-    {
-        return new TransportFrame(channel, frameBody, payload);
-    }
-
     abstract public int getChannel();
 
     abstract public byte getFrameType();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
index 44f72dd..3156cbd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
@@ -23,6 +23,7 @@
 
 public final class TransportFrame extends AMQFrame<FrameBody>
 {
+    public static final TransportFrame HEARTBEAT = new TransportFrame(0, null);
 
     private final int _channel;
 
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index 262838a..5cd74f5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -60,8 +60,8 @@
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -199,7 +199,7 @@
 
         Open open = new Open();
         open.setContainerId("testContainerId");
-        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+        _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
         verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -221,7 +221,7 @@
 
         Open open = new Open();
         open.setContainerId("testContainerId");
-        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+        _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
         verify(_virtualHost, never()).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
         verify(_networkConnection).close();
@@ -241,7 +241,7 @@
 
         Open open = new Open();
         open.setContainerId("testContainerId");
-        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+        _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
         verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
         AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -274,7 +274,7 @@
 
         Open open = new Open();
         open.setContainerId("testContainerId");
-        _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+        _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
         verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();