QPID-8349: [Tests][AMQP 1.0] Simplify transaction tests by moving InteractionTransactionalState from tests into Interaction
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index ab2c959..727b501 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -112,6 +112,7 @@
private Map<Class, FrameBody> _latestResponses = new HashMap<>();
private AtomicLong _receivedDeliveryCount = new AtomicLong();
private AtomicLong _coordinatorCredits = new AtomicLong();
+ private InteractionTransactionalState _transactionalState;
Interaction(final FrameTransport frameTransport)
{
@@ -811,6 +812,11 @@
return transferState(transactionalState);
}
+ public Interaction transferTransactionalStateFromCurrentTransaction()
+ {
+ return transferTransactionalState(getCurrentTransactionId());
+ }
+
public Interaction transferResume(final Boolean resume)
{
_transfer.setResume(resume);
@@ -901,6 +907,11 @@
return dispositionState(state);
}
+ public Interaction dispositionTransactionalStateFromCurrentTransaction(final Outcome outcome)
+ {
+ return dispositionTransactionalState(getCurrentTransactionId(), outcome);
+ }
+
public Interaction dispositionRole(final Role role)
{
_disposition.setRole(role);
@@ -948,23 +959,40 @@
// transaction //
////////////////
- public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception
+
+ public UnsignedInteger getCoordinatorHandle()
{
- return txnAttachCoordinatorLink(transactionalState, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+ return _transactionalState == null ? null : _transactionalState.getHandle();
}
- public Interaction txnAttachCoordinatorLink(final InteractionTransactionalState transactionalState,
+ public Binary getCurrentTransactionId()
+ {
+ return _transactionalState == null ? null : _transactionalState.getCurrentTransactionId();
+ }
+
+ public DeliveryState getCoordinatorLatestDeliveryState()
+ {
+ return _transactionalState == null ? null : _transactionalState.getDeliveryState();
+ }
+
+ public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle) throws Exception
+ {
+ return txnAttachCoordinatorLink(handle, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+ }
+
+ public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle,
final Symbol... outcomes) throws Exception
{
Attach attach = new Attach();
- attach.setName("testTransactionCoordinator-" + transactionalState.getHandle());
- attach.setHandle(transactionalState.getHandle());
+ attach.setName("testTransactionCoordinator-" + handle);
+ attach.setHandle(handle);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
attach.setTarget(new Coordinator());
attach.setRole(Role.SENDER);
Source source = new Source();
attach.setSource(source);
source.setOutcomes(outcomes);
+ _transactionalState = new InteractionTransactionalState(handle);
sendPerformativeAndChainFuture(attach, _sessionChannel);
consumeResponse(Attach.class);
final Flow flow = consumeResponse(Flow.class).getLatestResponse(Flow.class);
@@ -972,32 +1000,42 @@
return this;
}
- public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
+ public Interaction txnDeclare() throws Exception
{
- sendPayloadToCoordinator(new Declare(), txnState.getHandle());
+ sendPayloadToCoordinator(new Declare(), _transactionalState.getHandle());
final DeliveryState state = handleCoordinatorResponse();
- txnState.setDeliveryState(state);
+ _transactionalState.setDeliveryState(state);
final Binary transactionId = ((Declared) state).getTxnId();
- txnState.setLastTransactionId(transactionId);
+ _transactionalState.setLastTransactionId(transactionId);
return this;
}
- public Interaction txnSendDischarge(final InteractionTransactionalState txnState, final boolean failed)
+ public Interaction txnSendDischarge(final boolean failed)
+ throws Exception
+ {
+ return txnSendDischarge(_transactionalState.getCurrentTransactionId(), failed);
+ }
+
+ public Interaction txnSendDischarge(Binary transactionId, final boolean failed)
throws Exception
{
final Discharge discharge = new Discharge();
- discharge.setTxnId(txnState.getCurrentTransactionId());
+ discharge.setTxnId(transactionId);
discharge.setFail(failed);
- sendPayloadToCoordinator(discharge, txnState.getHandle());
+ sendPayloadToCoordinator(discharge, _transactionalState.getHandle());
return this;
}
- public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+ public Interaction txnDischarge(boolean failed) throws Exception
{
- txnSendDischarge(txnState, failed);
+ return txnDischarge(_transactionalState.getCurrentTransactionId(), failed);
+ }
+
+ public Interaction txnDischarge(Binary transactionId, boolean failed) throws Exception
+ {
+ txnSendDischarge(transactionId, failed);
final DeliveryState state = handleCoordinatorResponse();
- txnState.setDeliveryState(state);
- txnState.setLastTransactionId(null);
+ _transactionalState.setDeliveryState(state);
return this;
}
@@ -1186,11 +1224,6 @@
return transfers;
}
- public InteractionTransactionalState createTransactionalState(final UnsignedInteger handle)
- {
- return new InteractionTransactionalState(handle);
- }
-
///////////
// Empty //
///////////
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index a7ef076..582e7de 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -61,7 +61,6 @@
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -252,12 +251,11 @@
{
final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachHandle(linkHandle)
@@ -268,13 +266,11 @@
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
.transferDeliveryTag(_deliveryTag)
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferSettled(Boolean.TRUE)
- .transfer()
+ .transfer().txnDischarge(false);
- .txnDischarge(txnState, false);
-
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
@@ -288,12 +284,11 @@
{
final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachHandle(linkHandle)
@@ -304,7 +299,7 @@
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME))
.transferDeliveryTag(_deliveryTag)
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferSettled(Boolean.FALSE)
.transfer();
@@ -318,9 +313,9 @@
final TransactionalState receivedTxnState = (TransactionalState) dispositionState;
assertThat(receivedTxnState.getOutcome(), is(instanceOf(Accepted.class)));
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
@@ -334,12 +329,11 @@
{
final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
@@ -351,7 +345,7 @@
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferSettled(Boolean.FALSE)
.transfer();
@@ -370,9 +364,9 @@
assertThat(rejectedError.getInfo(), is(notNullValue()));
assertThat(rejectedError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
}
@@ -383,12 +377,11 @@
{
final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport);
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
@@ -401,7 +394,7 @@
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryId(UnsignedInteger.valueOf(1))
.transferDeliveryTag(_deliveryTag)
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferSettled(Boolean.FALSE)
.transfer();
@@ -412,9 +405,9 @@
assertThat(senderLinkDetachError.getInfo(), is(notNullValue()));
assertThat(senderLinkDetachError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag)));
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- DeliveryState txnDischargeDeliveryState = txnState.getDeliveryState();
+ DeliveryState txnDischargeDeliveryState = interaction.getCoordinatorLatestDeliveryState();
assertThat(txnDischargeDeliveryState, is(instanceOf(Rejected.class)));
Rejected rejected = (Rejected) txnDischargeDeliveryState;
Error error = rejected.getError();
@@ -454,13 +447,12 @@
final Interaction interaction =
openInteractionWithAnonymousRelayCapability(transport);
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.begin()
.consumeResponse(Begin.class)
// attaching coordinator link with supported outcomes Accepted and Rejected
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachHandle(linkHandle)
@@ -472,13 +464,13 @@
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferSettled(Boolean.TRUE)
.transfer();
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- DeliveryState txDischargeDeliveryState = txnState.getDeliveryState();
+ DeliveryState txDischargeDeliveryState = interaction.getCoordinatorLatestDeliveryState();
assertThat(txDischargeDeliveryState, is(instanceOf(Rejected.class)));
Rejected rejected = (Rejected) txDischargeDeliveryState;
@@ -521,13 +513,11 @@
final Interaction interaction =
openInteractionWithAnonymousRelayCapability(transport);
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
-
interaction.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO, Accepted.ACCEPTED_SYMBOL)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachHandle(linkHandle)
@@ -541,10 +531,9 @@
.transferHandle(linkHandle)
.transferPayload(generateMessagePayloadToDestination("Unknown"))
.transferDeliveryTag(_deliveryTag)
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferSettled(Boolean.TRUE)
- .transfer()
- .txnSendDischarge(txnState, false);
+ .transfer().txnSendDischarge(false);
Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
index 39a8a9b..8cd435b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java
@@ -51,7 +51,6 @@
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -132,7 +131,6 @@
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Attach attach = interaction.negotiateProtocol()
.consumeResponse()
@@ -141,8 +139,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
@@ -153,7 +151,7 @@
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transfer()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
@@ -171,9 +169,9 @@
assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
- interaction.txnSendDischarge(txnState, false);
+ interaction.txnSendDischarge(false);
- assertTransactionRollbackOnly(interaction, txnState);
+ assertTransactionRollbackOnly(interaction);
}
}
@@ -187,7 +185,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Attach attach = interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -195,8 +192,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -221,7 +218,7 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.disposition();
interaction.flowIncomingWindow(UnsignedInteger.valueOf(2))
@@ -244,14 +241,13 @@
assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED));
assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle())));
- interaction.txnSendDischarge(txnState, false);
+ interaction.txnSendDischarge(false);
- assertTransactionRollbackOnly(interaction, txnState);
+ assertTransactionRollbackOnly(interaction);
}
}
- private void assertTransactionRollbackOnly(final Interaction interaction,
- final InteractionTransactionalState txnState) throws Exception
+ private void assertTransactionRollbackOnly(final Interaction interaction) throws Exception
{
Disposition declareTransactionDisposition = null;
Flow coordinatorFlow = null;
@@ -266,7 +262,7 @@
if (response.getBody() instanceof Flow)
{
final Flow flowResponse = (Flow) response.getBody();
- if (flowResponse.getHandle().equals(txnState.getHandle()))
+ if (flowResponse.getHandle().equals(interaction.getCoordinatorHandle()))
{
coordinatorFlow = flowResponse;
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
index cd4f896..85dff52 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
@@ -46,7 +46,6 @@
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -74,7 +73,6 @@
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Disposition responseDisposition = interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -82,8 +80,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
@@ -93,7 +91,7 @@
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transfer()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
@@ -116,7 +114,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -124,8 +121,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -150,7 +147,7 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.disposition()
.sync();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 42f5dfc..dab344e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -83,7 +83,6 @@
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
import org.apache.qpid.tests.protocol.v1_0.Utils;
@@ -1056,9 +1055,8 @@
Flow flow = interaction.getLatestResponse(Flow.class);
assumeThat("insufficient credit for the test", flow.getLinkCredit().intValue(), is(greaterThan(2)));
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
- interaction.txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState);
+ interaction.txnAttachCoordinatorLink(UnsignedInteger.ONE)
+ .txnDeclare();
interaction.transferDeliveryId()
.transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8)))
@@ -1070,11 +1068,11 @@
.transfer()
.transferDeliveryId()
.transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8)))
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferPayloadData(contents[2])
.transfer();
- interaction.txnSendDischarge(txnState, false);
+ interaction.txnSendDischarge(false);
assertDeliveries(interaction, Sets.newTreeSet(Arrays.asList(UnsignedInteger.ONE,
UnsignedInteger.valueOf(2),
@@ -1174,9 +1172,8 @@
deliveryIds.add(interaction.getLatestDeliveryId());
}
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE);
- interaction.txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState);
+ interaction.txnAttachCoordinatorLink(UnsignedInteger.ONE)
+ .txnDeclare();
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
@@ -1188,10 +1185,10 @@
.dispositionRole(Role.RECEIVER)
.dispositionFirst(deliveryIds.get(2))
.dispositionLast(deliveryIds.get(3))
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.disposition();
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
}
String messageText = getTestName() + "_" + 4;
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 85040c8..369bd9b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -55,7 +55,6 @@
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -81,24 +80,22 @@
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO);
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState, Rejected.REJECTED_SYMBOL)
- .txnDeclare(txnState);
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO, Rejected.REJECTED_SYMBOL)
+ .txnDeclare();
- assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class)));
- assertThat(txnState.getCurrentTransactionId(), is(notNullValue()));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Declared.class)));
+ assertThat(interaction.getCurrentTransactionId(), is(notNullValue()));
- txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(new Binary("nonExistingTransaction".getBytes(UTF_8)), false);
interaction.doCloseConnection();
- assertThat(txnState.getDeliveryState(), is(instanceOf(Rejected.class)));
- final Error error = ((Rejected) txnState.getDeliveryState()).getError();
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Rejected.class)));
+ final Error error = ((Rejected) interaction.getCoordinatorLatestDeliveryState()).getError();
assertThat(error, is(notNullValue()));
if (KIND_BROKER_J.equals(getBrokerAdmin().getKind()))
@@ -119,19 +116,17 @@
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO);
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL)
- .txnDeclare(txnState);
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO, Accepted.ACCEPTED_SYMBOL)
+ .txnDeclare();
- assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class)));
- assertThat(txnState.getCurrentTransactionId(), is(notNullValue()));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Declared.class)));
+ assertThat(interaction.getCurrentTransactionId(), is(notNullValue()));
- txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8)));
- interaction.txnSendDischarge(txnState, false);
+ interaction.txnSendDischarge(new Binary("nonExistingTransaction".getBytes(UTF_8)), false);
final Detach detachResponse = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
interaction.doCloseConnection();
@@ -157,13 +152,12 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -187,12 +181,11 @@
final UnsignedInteger deliveryId = transfers.get(0).getDeliveryId();
interaction.detach().consumeResponse(Detach.class)
.dispositionFirst(deliveryId)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.dispositionRole(Role.RECEIVER)
- .disposition()
- .txnDischarge(txnState, false);
+ .disposition().txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
interaction.doCloseConnection();
}
@@ -217,13 +210,12 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachHandle(UnsignedInteger.ONE)
@@ -232,16 +224,16 @@
.consumeResponse(Flow.class)
.transferDeliveryId()
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferPayloadData(getTestName())
.transferHandle(UnsignedInteger.ONE)
.transfer().consumeResponse(Disposition.class)
.detachHandle(UnsignedInteger.ONE)
.detach().consumeResponse(Detach.class);
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
@@ -259,13 +251,12 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachHandle(UnsignedInteger.ONE)
@@ -275,7 +266,7 @@
.consumeResponse(Flow.class)
.transferDeliveryId()
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transferPayloadData(getTestName())
.transferHandle(UnsignedInteger.ONE)
.transfer().consumeResponse(Disposition.class)
@@ -283,9 +274,9 @@
.detachHandle(UnsignedInteger.ONE)
.detachClose(true)
.detach().consumeResponse(Detach.class);
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index f44790f..02d28a7 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -58,7 +58,6 @@
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -86,7 +85,6 @@
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Disposition responseDisposition = interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -94,8 +92,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
@@ -106,7 +104,7 @@
.transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transfer()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
@@ -116,9 +114,9 @@
assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(getTestName())));
@@ -136,7 +134,6 @@
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Disposition responseDisposition = interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -144,8 +141,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
@@ -156,7 +153,7 @@
.transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transfer()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
@@ -166,9 +163,9 @@
assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
- interaction.txnDischarge(txnState, true);
+ interaction.txnDischarge(true);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
final String content = getTestName() + "_2";
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
@@ -188,7 +185,6 @@
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Disposition responseDisposition = interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -196,8 +192,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
@@ -210,7 +206,7 @@
.transferDeliveryId()
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
- .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transferTransactionalStateFromCurrentTransaction()
.transfer()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
@@ -222,12 +218,12 @@
interaction.dispositionRole(Role.SENDER)
.dispositionSettled(true)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.disposition();
- interaction.txnDischarge(txnState, false);
+ interaction.txnDischarge(false);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
}
@@ -244,13 +240,13 @@
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+
Response<?> response = interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.SENDER)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
@@ -280,7 +276,7 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+
interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -288,8 +284,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -314,12 +310,10 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
- .disposition()
- .txnDischarge(txnState, false);
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
+ .disposition().txnDischarge(false);
-
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
}
@@ -332,7 +326,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -340,8 +333,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -366,11 +359,10 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
- .disposition()
- .txnDischarge(txnState, true);
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
+ .disposition().txnDischarge(true);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
assertThat(receivedMessage, is(equalTo(getTestName())));
@@ -390,13 +382,12 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -446,7 +437,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -454,8 +444,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -480,8 +470,7 @@
Disposition settledDisposition = interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(),
- new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.disposition()
.consumeResponse(Disposition.class)
.getLatestResponse(Disposition.class);
@@ -490,9 +479,8 @@
assertThat(settledDisposition.getState(), is(instanceOf(TransactionalState.class)));
assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
- interaction.txnDischarge(txnState, false);
-
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ interaction.txnDischarge(false);
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
}
@@ -509,7 +497,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -517,8 +504,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -533,7 +520,8 @@
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
- .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId()))
+ .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"),
+ interaction.getCurrentTransactionId()))
.flow()
.receiveDelivery();
@@ -546,17 +534,15 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .dispositionTransactionalStateFromCurrentTransaction(new Accepted())
.dispositionFirstFromLatestDelivery()
- .disposition()
- .txnDischarge(txnState, false);
+ .disposition().txnDischarge(false);
-
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
Transfer transfer = transfers.get(0);
assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
- assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+ assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(interaction.getCurrentTransactionId())));
final String content = getTestName() + "_2";
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content);
@@ -577,7 +563,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -585,8 +570,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)
@@ -601,7 +586,8 @@
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
- .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId()))
+ .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"),
+ interaction.getCurrentTransactionId()))
.flow()
.receiveDelivery();
@@ -614,17 +600,16 @@
interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
- .disposition()
- .txnDischarge(txnState, true);
+ .dispositionTransactionalState(interaction.getCurrentTransactionId(), new Accepted())
+ .disposition().txnDischarge(true);
- assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class)));
+ assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
Transfer transfer = transfers.get(0);
assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
- assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+ assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(interaction.getCurrentTransactionId())));
}
}
@@ -643,7 +628,6 @@
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
Response<?> response = interaction.negotiateProtocol()
.consumeResponse()
.open()
@@ -651,8 +635,8 @@
.begin()
.consumeResponse(Begin.class)
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
+ .txnAttachCoordinatorLink(UnsignedInteger.ZERO)
+ .txnDeclare()
.attachRole(Role.RECEIVER)
.attachHandle(UnsignedInteger.ONE)