QPID-8240 : Detect idle connections
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index c590eca..7b5429e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -208,7 +208,7 @@
public final void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
doSend(consumer, entry, batch);
-
+ getSession().getAMQPConnection().updateLastMessageOutboundTime();
if (consumer.acquires())
{
entry.makeAcquisitionStealable();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 3bc6c84..f7b77f1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -3433,7 +3433,11 @@
{
if(allStats || statistics.contains(stat.getName()))
{
- map.put(stat.getName(), stat.getValue(this));
+ Object value = stat.getValue(this);
+ if(value != null)
+ {
+ map.put(stat.getName(), value);
+ }
}
}
return map;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 14ee17a..62cefc1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -120,7 +120,7 @@
description = "Total size of all messages received by this connection.")
long getBytesIn();
- // currently this reports outbound message content size without header.
+ // currently this reports outbound message content size without header.
// See also QPID-7689: https://issues.apache.org/jira/browse/QPID-7689?focusedCommentId=16022923#comment-16022923
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Outbound",
@@ -143,6 +143,24 @@
Date getLastIoTime();
@SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Inbound Message",
+ description = "Time of last message received by the broker on this connection. "
+ + "If no message has been received the connection creation time will be used.")
+ Date getLastInboundMessageTime();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Outbound Message",
+ description = "Time of last message sent by the broker on this connection. "
+ + "If no message has been snt the connection creation time will be used.")
+ Date getLastOutboundMessageTime();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.ABSOLUTE_TIME, label = "Last Message",
+ description = "Time of last message sent or received by the broker on this connection. "
+ + "If no message has been sent or received the connection creation time will be used.")
+ Date getLastMessageTime();
+
+ @SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Sessions",
description = "Current number of sessions belonging to this connection.")
int getSessionCount();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 3b609f9..e99c254 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -85,6 +85,10 @@
void unblock();
+ void updateLastMessageInboundTime();
+
+ void updateLastMessageOutboundTime();
+
void pushScheduler(NetworkConnectionScheduler networkConnectionScheduler);
NetworkConnectionScheduler popScheduler();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 5887a70..351d3b0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -48,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
@@ -71,6 +72,7 @@
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.network.NetworkConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
@@ -78,7 +80,9 @@
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.FixedKeyMapCreator;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
@@ -127,6 +131,11 @@
private volatile NamedAddressSpace _addressSpace;
private volatile long _lastReadTime;
private volatile long _lastWriteTime;
+ private volatile long _lastMessageInboundTime;
+ private volatile long _lastMessageOutboundTime;
+ private volatile boolean _messagesWritten;
+
+
private volatile AccessControlContext _accessControllerContext;
private volatile Thread _ioThread;
private volatile StatisticsGatherer _statisticsGatherer;
@@ -187,18 +196,13 @@
{
final AccessControlContext acc = AccessController.getContext();
return AccessController.doPrivileged(
- new PrivilegedAction<AccessControlContext>()
- {
- @Override
- public AccessControlContext run()
- {
- if (subject == null)
- return new AccessControlContext(acc, null);
- else
- return new AccessControlContext
- (acc,
- new SubjectDomainCombiner(subject));
- }
+ (PrivilegedAction<AccessControlContext>) () -> {
+ if (subject == null)
+ return new AccessControlContext(acc, null);
+ else
+ return new AccessControlContext
+ (acc,
+ new SubjectDomainCombiner(subject));
});
}
@@ -209,7 +213,7 @@
long maxAuthDelay = _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
_aggregateTicker.addTicker(slowConnectionOpenTicker);
- _lastReadTime = _lastWriteTime = getCreatedTime().getTime();
+ _lastReadTime = _lastWriteTime = _lastMessageInboundTime = _lastMessageOutboundTime = getCreatedTime().getTime();
_maxUncommittedInMemorySize = getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_transactionObserver = _maxUncommittedInMemorySize < 0 ? FlowToDiskTransactionObserver.NOOP_TRANSACTION_OBSERVER : new FlowToDiskTransactionObserver(_maxUncommittedInMemorySize, _logSubject, _eventLoggerProvider.getEventLogger());
logConnectionOpen();
@@ -268,7 +272,7 @@
return _lastReadTime;
}
- public final void updateLastReadTime()
+ private void updateLastReadTime()
{
_lastReadTime = System.currentTimeMillis();
}
@@ -281,7 +285,43 @@
public final void updateLastWriteTime()
{
- _lastWriteTime = System.currentTimeMillis();
+ final long currentTime = System.currentTimeMillis();
+ _lastWriteTime = currentTime;
+ if(_messagesWritten)
+ {
+ _messagesWritten = false;
+ _lastMessageOutboundTime = currentTime;
+ }
+ }
+
+ @Override
+ public void updateLastMessageInboundTime()
+ {
+ _lastMessageInboundTime = _lastReadTime;
+ }
+
+ @Override
+ public void updateLastMessageOutboundTime()
+ {
+ _messagesWritten = true;
+ }
+
+ @Override
+ public Date getLastInboundMessageTime()
+ {
+ return new Date(_lastMessageInboundTime);
+ }
+
+ @Override
+ public Date getLastOutboundMessageTime()
+ {
+ return new Date(_lastMessageOutboundTime);
+ }
+
+ @Override
+ public Date getLastMessageTime()
+ {
+ return new Date(Math.max(_lastMessageInboundTime, _lastMessageOutboundTime));
}
@Override
@@ -438,6 +478,7 @@
@Override
public void registerMessageReceived(long messageSize)
{
+ updateLastMessageInboundTime();
_messagesIn.incrementAndGet();
_bytesIn.addAndGet(messageSize);
_statisticsGatherer.registerMessageReceived(messageSize);
@@ -522,6 +563,33 @@
}
}
+ @Override
+ public final void received(final QpidByteBuffer buf)
+ {
+ AccessController.doPrivileged((PrivilegedAction<Object>) () ->
+ {
+ updateLastReadTime();
+ try
+ {
+ onReceive(buf);
+ }
+ catch (StoreException e)
+ {
+ if (getAddressSpace().isActive())
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ }
+ return null;
+ }, getAccessControllerContext());
+ }
+
+ protected abstract void onReceive(final QpidByteBuffer msg);
+
protected abstract void addAsyncTask(final Action<? super T> action);
protected abstract boolean isOpeningInProgress();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 055f935..ce7e7f0 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -135,33 +135,17 @@
}
@Override
- public void received(final QpidByteBuffer buf)
+ protected void onReceive(final QpidByteBuffer buf)
{
- AccessController.doPrivileged((PrivilegedAction<Object>) () ->
+ try
{
- updateLastReadTime();
- try
- {
- _inputHandler.received(buf);
- _connection.receivedComplete();
- }
- catch (IllegalArgumentException | IllegalStateException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- catch (StoreException e)
- {
- if (getAddressSpace().isActive())
- {
- throw new ServerScopedRuntimeException(e);
- }
- else
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- }
- return null;
- }, getAccessControllerContext());
+ _inputHandler.received(buf);
+ _connection.receivedComplete();
+ }
+ catch (IllegalArgumentException | IllegalStateException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
@Override
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index eca6d8a..e888d7f 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -243,40 +243,18 @@
}
@Override
- public void received(final QpidByteBuffer msg)
+ protected void onReceive(final QpidByteBuffer msg)
{
- AccessController.doPrivileged(new PrivilegedAction<Void>()
+ try
{
- @Override
- public Void run()
- {
- updateLastReadTime();
-
- try
- {
- _decoder.decodeBuffer(msg);
- receivedCompleteAllChannels();
- }
- catch (AMQFrameDecodingException | IOException e)
- {
- LOGGER.error("Unexpected exception", e);
- throw new ConnectionScopedRuntimeException(e);
- }
- catch (StoreException e)
- {
- if (getAddressSpace().isActive())
- {
- throw new ServerScopedRuntimeException(e);
- }
- else
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- }
- return null;
- }
- }, getAccessControllerContext());
-
+ _decoder.decodeBuffer(msg);
+ receivedCompleteAllChannels();
+ }
+ catch (AMQFrameDecodingException | IOException e)
+ {
+ LOGGER.error("Unexpected exception", e);
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
private void receivedCompleteAllChannels()
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 02465a2..bcb273d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -24,7 +24,6 @@
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
@@ -117,7 +116,6 @@
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
@@ -133,8 +131,7 @@
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
- implements FrameOutputHandler,
- DescribedTypeConstructorRegistry.Source,
+ implements DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
SASLEndpoint,
AMQPConnection_1_0<AMQPConnection_1_0Impl>
@@ -357,7 +354,7 @@
{
outcome.setAdditionalData(new Binary(challenge));
}
- send(new SASLFrame(outcome), null);
+ send(new SASLFrame(outcome));
_saslComplete = true;
_connectionState = ConnectionState.AWAIT_AMQP_HEADER;
disposeSaslNegotiator();
@@ -377,7 +374,7 @@
{
SaslChallenge challengeBody = new SaslChallenge();
challengeBody.setChallenge(new Binary(challenge));
- send(new SASLFrame(challengeBody), null);
+ send(new SASLFrame(challengeBody));
_connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
}
@@ -386,7 +383,7 @@
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.AUTH);
- send(new SASLFrame(outcome), null);
+ send(new SASLFrame(outcome));
_saslComplete = true;
closeSaslWithFailure();
}
@@ -1210,7 +1207,7 @@
ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
if (payload == null)
{
- send(AMQFrame.createAMQFrame(channel, body));
+ send(new TransportFrame(channel, body));
return 0;
}
else
@@ -1220,7 +1217,7 @@
long payloadLength = (long) payload.remaining();
if (payloadLength <= maxPayloadSize)
{
- send(AMQFrame.createAMQFrame(channel, body, payload));
+ send(new TransportFrame(channel, body, payload));
return (int)payloadLength;
}
else
@@ -1234,7 +1231,7 @@
try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize))
{
payload.position(payload.position() + maxPayloadSize);
- send(AMQFrame.createAMQFrame(channel, body, payloadDup));
+ send(new TransportFrame(channel, body, payloadDup));
}
return maxPayloadSize;
@@ -1261,7 +1258,7 @@
@Override
public void writerIdle()
{
- send(TransportFrame.createAMQFrame((short)0,null));
+ send(TransportFrame.HEARTBEAT);
}
@Override
@@ -1285,74 +1282,54 @@
return getNetwork().getRemoteAddress().toString();
}
-
-
@Override
- public void received(final QpidByteBuffer msg)
+ protected void onReceive(final QpidByteBuffer msg)
{
-
- AccessController.doPrivileged((PrivilegedAction<Object>) () ->
+ try
{
- updateLastReadTime();
+ int remaining;
+
try
{
- int remaining;
-
- try
+ do
{
- do
+ remaining = msg.remaining();
+
+ switch (_connectionState)
{
- remaining = msg.remaining();
-
- switch (_connectionState)
- {
- case AWAIT_AMQP_OR_SASL_HEADER:
- case AWAIT_AMQP_HEADER:
- if (remaining >= 8)
- {
- processProtocolHeader(msg);
- }
- break;
- case AWAIT_SASL_INIT:
- case AWAIT_SASL_RESPONSE:
- case AWAIT_OPEN:
- case OPENED:
- case CLOSE_SENT:
- _frameHandler.parse(msg);
- break;
- case CLOSE_RECEIVED:
- case CLOSED:
- // ignore;
- break;
- }
-
-
+ case AWAIT_AMQP_OR_SASL_HEADER:
+ case AWAIT_AMQP_HEADER:
+ if (remaining >= 8)
+ {
+ processProtocolHeader(msg);
+ }
+ break;
+ case AWAIT_SASL_INIT:
+ case AWAIT_SASL_RESPONSE:
+ case AWAIT_OPEN:
+ case OPENED:
+ case CLOSE_SENT:
+ _frameHandler.parse(msg);
+ break;
+ case CLOSE_RECEIVED:
+ case CLOSED:
+ // ignore;
+ break;
}
- while (msg.remaining() != remaining);
- }
- finally
- {
- receivedComplete();
- }
- }
- catch (IllegalArgumentException | IllegalStateException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- catch (StoreException e)
- {
- if (getAddressSpace().isActive())
- {
- throw new ServerScopedRuntimeException(e);
- }
- else
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- }
- return null;
- }, getAccessControllerContext());
+
+ }
+ while (msg.remaining() != remaining);
+ }
+ finally
+ {
+ receivedComplete();
+ }
+ }
+ catch (IllegalArgumentException | IllegalStateException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
@Override
@@ -1402,7 +1379,7 @@
mechanismsList.add(Symbol.valueOf(name));
}
mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
- send(new SASLFrame(mechanisms), null);
+ send(new SASLFrame(mechanisms));
_connectionState = ConnectionState.AWAIT_SASL_INIT;
_frameHandler = getFrameHandler(true);
@@ -1478,15 +1455,7 @@
}
}
- @Override
- public void send(final AMQFrame amqFrame)
- {
- send(amqFrame, null);
- }
-
-
- @Override
- public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ private void send(final AMQFrame amqFrame)
{
updateLastWriteTime();
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
@@ -1501,13 +1470,6 @@
}
}
- public void send(short channel, FrameBody body)
- {
- AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
- send(frame);
-
- }
-
private void addCloseTicker()
{
long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
deleted file mode 100644
index 878d195..0000000
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-
-public interface FrameOutputHandler<T>
-{
-
- void send(AMQFrame<T> frame);
- void send(AMQFrame<T> frame, ByteBuffer payload);
-
-}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
index ca42eee..d8e0481 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
@@ -45,16 +45,6 @@
return _payload;
}
- public static TransportFrame createAMQFrame(int channel, FrameBody frameBody)
- {
- return createAMQFrame(channel, frameBody, null);
- }
-
- public static TransportFrame createAMQFrame(int channel, FrameBody frameBody, QpidByteBuffer payload)
- {
- return new TransportFrame(channel, frameBody, payload);
- }
-
abstract public int getChannel();
abstract public byte getFrameType();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
index 44f72dd..3156cbd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
@@ -23,6 +23,7 @@
public final class TransportFrame extends AMQFrame<FrameBody>
{
+ public static final TransportFrame HEARTBEAT = new TransportFrame(0, null);
private final int _channel;
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index 262838a..5cd74f5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -60,8 +60,8 @@
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -199,7 +199,7 @@
Open open = new Open();
open.setContainerId("testContainerId");
- _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+ _frameWriter.send(new TransportFrame((int) (short) 0, open));
verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -221,7 +221,7 @@
Open open = new Open();
open.setContainerId("testContainerId");
- _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+ _frameWriter.send(new TransportFrame((int) (short) 0, open));
verify(_virtualHost, never()).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
verify(_networkConnection).close();
@@ -241,7 +241,7 @@
Open open = new Open();
open.setContainerId("testContainerId");
- _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+ _frameWriter.send(new TransportFrame((int) (short) 0, open));
verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();
@@ -274,7 +274,7 @@
Open open = new Open();
open.setContainerId("testContainerId");
- _frameWriter.send(AMQFrame.createAMQFrame((short)0,open));
+ _frameWriter.send(new TransportFrame((int) (short) 0, open));
verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class));
AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal();