PROTON-1458: add ability to set/get max-message-size details on links
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index 248c687..bdbf534 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Source;
@@ -298,4 +299,32 @@
* @return the desired capabilities array conveyed by the peer, or null if there was none.
*/
Symbol[] getRemoteDesiredCapabilities();
+
+ /**
+ * Sets the local link max message size, to be conveyed to the peer via the Attach frame
+ * when attaching the link to the session. Null or 0 means no limit.
+ *
+ * Must be called during link setup, i.e. before calling the {@link #open()} method.
+ *
+ * @param maxMessageSize
+ * the local max message size value, or null to clear. 0 also means no limit.
+ */
+ void setMaxMessageSize(UnsignedLong maxMessageSize);
+
+ /**
+ * Gets the local link max message size.
+ *
+ * @return the local max message size, or null if none was set. 0 also means no limit.
+ *
+ * @see #setMaxMessageSize(UnsignedLong)
+ */
+ UnsignedLong getMaxMessageSize();
+
+ /**
+ * Gets the remote link max message size, as conveyed from the peer via the Attach frame
+ * when attaching the link to the session.
+ *
+ * @return the remote max message size conveyed by the peer, or null if none was set. 0 also means no limit.
+ */
+ UnsignedLong getRemoteMaxMessageSize();
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 0d030cb..23a1f42 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Source;
@@ -49,6 +50,8 @@
private int _credit;
private int _unsettled;
private int _drained;
+ private UnsignedLong _maxMessageSize;
+ private UnsignedLong _remoteMaxMessageSize;
private SenderSettleMode _senderSettleMode;
private SenderSettleMode _remoteSenderSettleMode;
@@ -451,6 +454,29 @@
}
@Override
+ public UnsignedLong getMaxMessageSize()
+ {
+ return _maxMessageSize;
+ }
+
+ @Override
+ public void setMaxMessageSize(UnsignedLong maxMessageSize)
+ {
+ _maxMessageSize = maxMessageSize;
+ }
+
+ @Override
+ public UnsignedLong getRemoteMaxMessageSize()
+ {
+ return _remoteMaxMessageSize;
+ }
+
+ void setRemoteMaxMessageSize(UnsignedLong remoteMaxMessageSize)
+ {
+ _remoteMaxMessageSize = remoteMaxMessageSize;
+ }
+
+ @Override
public int drained()
{
int drained = 0;
@@ -511,5 +537,4 @@
{
return _detached;
}
-
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index e07b6f9..a7908f2 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -790,6 +790,11 @@
attach.setDesiredCapabilities(link.getDesiredCapabilities());
}
+ if(link.getMaxMessageSize() != null)
+ {
+ attach.setMaxMessageSize(link.getMaxMessageSize());
+ }
+
attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER);
if(link instanceof SenderImpl)
@@ -1213,6 +1218,8 @@
link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities());
link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities());
+ link.setRemoteMaxMessageSize(attach.getMaxMessageSize());
+
transportLink.setName(attach.getName());
transportLink.setRemoteHandle(handle);
transportSession.addLinkRemoteHandle(transportLink, handle);
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
index 518960c..7fcd084 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
@@ -36,14 +36,19 @@
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
@@ -57,6 +62,8 @@
private static final Integer RCV_PROP_VAL = 1234;
private static final Symbol SND_PROP = Symbol.valueOf("SenderPropName");
private static final Integer SND_PROP_VAL = 5678;
+ private static final UnsignedLong CLIENT_MAX_MSG_SIZE = UnsignedLong.valueOf(54321);
+ private static final UnsignedLong SERVER_MAX_MSG_SIZE = UnsignedLong.valueOf(12345);
private final String _sourceAddress = getServer().containerId + "-link1-source";
@@ -430,6 +437,103 @@
assertLinkCreditState(getClient().receiver, 2, 0, 2);
}
+ //TODO
+ @Test
+ public void testMaxMessageSizeValue() throws Exception
+ {
+ LOGGER.fine(bold("======== About to create transports"));
+
+ Transport clientTransport = Proton.transport();
+ getClient().setTransport(clientTransport);
+ ProtocolTracerEnabler.setProtocolTracer(clientTransport, TestLoggingHelper.CLIENT_PREFIX);
+
+ Transport serverTransport = Proton.transport();
+ getServer().setTransport(serverTransport);
+ ProtocolTracerEnabler.setProtocolTracer(serverTransport, " " + TestLoggingHelper.SERVER_PREFIX);
+
+ doOutputInputCycle();
+
+ Connection clientConnection = Proton.connection();
+ getClient().setConnection(clientConnection);
+ clientTransport.bind(clientConnection);
+
+ Connection serverConnection = Proton.connection();
+ getServer().setConnection(serverConnection);
+ serverTransport.bind(serverConnection);
+
+ LOGGER.fine(bold("======== About to open connections"));
+ clientConnection.open();
+ serverConnection.open();
+
+ doOutputInputCycle();
+
+ LOGGER.fine(bold("======== About to open sessions"));
+ Session clientSession = clientConnection.session();
+ getClient().setSession(clientSession);
+ clientSession.open();
+
+ pumpClientToServer();
+
+ Session serverSession = serverConnection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ getServer().setSession(serverSession);
+
+ serverSession.open();
+
+ pumpServerToClient();
+
+ LOGGER.fine(bold("======== About to create receiver"));
+
+ Source clientSource = new Source();
+ getClient().setSource(clientSource);
+ clientSource.setAddress(_sourceAddress);
+
+ Target clientTarget = new Target();
+ getClient().setTarget(clientTarget);
+ clientTarget.setAddress(null);
+
+ Receiver clientReceiver = clientSession.receiver("link1");
+ getClient().setReceiver(clientReceiver);
+ clientReceiver.setTarget(clientTarget);
+ clientReceiver.setSource(clientSource);
+
+ clientReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ clientReceiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+ // Set the local link max-message-size
+ assertNull("Expected no value to be set", clientReceiver.getMaxMessageSize());
+ clientReceiver.setMaxMessageSize(CLIENT_MAX_MSG_SIZE);
+ assertEquals("Expected value to be set", CLIENT_MAX_MSG_SIZE, clientReceiver.getMaxMessageSize());
+
+ clientReceiver.open();
+ pumpClientToServer();
+
+ LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+ Sender serverSender = (Sender) getServer().getConnection().linkHead(of(UNINITIALIZED), of(ACTIVE));
+ getServer().setSender(serverSender);
+
+ serverSender.setReceiverSettleMode(serverSender.getRemoteReceiverSettleMode());
+ serverSender.setSenderSettleMode(serverSender.getRemoteSenderSettleMode());
+
+ org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = serverSender.getRemoteSource();
+ serverSender.setSource(serverRemoteSource);
+
+ assertEquals("Expected value to be set", CLIENT_MAX_MSG_SIZE, serverSender.getRemoteMaxMessageSize());
+
+ // Set the local link max-message-size
+ assertNull("Expected no value to be set", serverSender.getMaxMessageSize());
+ serverSender.setMaxMessageSize(SERVER_MAX_MSG_SIZE);
+ assertEquals("Expected value to be set", SERVER_MAX_MSG_SIZE, serverSender.getMaxMessageSize());
+
+ serverSender.open();
+
+ assertNull("Expected no value to be present yet", clientReceiver.getRemoteMaxMessageSize());
+
+ pumpServerToClient();
+
+ assertEquals("Expected value to be set", SERVER_MAX_MSG_SIZE, clientReceiver.getRemoteMaxMessageSize());
+ }
+
void assertLinkCreditState(Link link, int credit, int queued, int remoteCredit)
{
assertEquals("Unexpected credit", credit, link.getCredit());
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index dd38dc9..8b8425d 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -728,17 +728,6 @@
assert self.snd.remote_rcv_settle_mode == Link.RCV_SECOND
assert self.rcv.remote_snd_settle_mode == Link.SND_UNSETTLED
- def test_max_message_size(self):
- if "java" in sys.platform:
- raise Skipped()
- assert self.snd.max_message_size == 0
- assert self.rcv.remote_max_message_size == 0
- self.snd.max_message_size = 13579
- self.snd.open()
- self.rcv.open()
- self.pump()
- assert self.rcv.remote_max_message_size == 13579
-
def test_cleanup(self):
snd, rcv = self.link("test-link")
snd.open()