QPID-8349: [Tests][AMQP 1.0] Introduce field to count incoming deliveries in order to set flow next-incoming-id based on received message count and begin next-outgoing-id
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index ac30068..1437e19 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -30,12 +30,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -113,6 +115,8 @@
private Object _decodedLatestDelivery;
private UnsignedInteger _latestDeliveryId;
private Map<String, Object> _latestDeliveryApplicationProperties;
+ private Map<Class, FrameBody> _latestResponses = new HashMap<>();
+ private AtomicLong _receivedDeliveryCount = new AtomicLong();
Interaction(final FrameTransport frameTransport)
{
@@ -644,6 +648,12 @@
return flowNextIncomingId(_latestDeliveryId.add(UnsignedInteger.ONE));
}
+ public Interaction flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
+ {
+ final Begin begin = (Begin) _latestResponses.get(Begin.class);
+ return flowNextIncomingId(begin.getNextOutgoingId().add(UnsignedInteger.valueOf(_receivedDeliveryCount.get())));
+ }
+
public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow)
{
_flow.setOutgoingWindow(outgoingWindow);
@@ -680,6 +690,12 @@
return this;
}
+ public Interaction flowDeliveryCount()
+ {
+ _flow.setDeliveryCount(UnsignedInteger.valueOf(_receivedDeliveryCount.get()));
+ return this;
+ }
+
public Interaction flowLinkCredit(final UnsignedInteger linkCredit)
{
_flow.setLinkCredit(linkCredit);
@@ -1071,6 +1087,7 @@
sync();
_latestDelivery = receiveAllTransfers(ignore);
_latestDeliveryId = _latestDelivery.size() > 0 ? _latestDelivery.get(0).getDeliveryId() : null;
+ _receivedDeliveryCount.incrementAndGet();
return this;
}
@@ -1163,4 +1180,16 @@
while (completed == null);
return completed;
}
+
+ @Override
+ protected Response<?> getNextResponse() throws Exception
+ {
+ Response<?> response = super.getNextResponse();
+ if (response != null && response.getBody() instanceof FrameBody)
+ {
+ _latestResponses.put(response.getBody().getClass(), (FrameBody)response.getBody());
+ }
+ return response;
+ }
+
}
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
index 2c977f3..f9640dd 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -84,12 +84,12 @@
return getInteraction();
}
- public Response<?> getLatestResponse() throws Exception
+ public Response<?> getLatestResponse()
{
return _latestResponse;
}
- public <T> T getLatestResponse(Class<T> type) throws Exception
+ public <T> T getLatestResponse(Class<T> type)
{
if (_latestResponse.getBody() == null)
{