| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.tests.protocol.v1_0.extensions.anonymousterminus; |
| |
| |
| import static org.hamcrest.CoreMatchers.hasItem; |
| import static org.hamcrest.CoreMatchers.instanceOf; |
| import static org.hamcrest.CoreMatchers.notNullValue; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.is; |
| import static org.junit.Assume.assumeThat; |
| |
| import java.net.InetSocketAddress; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.apache.qpid.server.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.protocol.v1_0.SequenceNumber; |
| import org.apache.qpid.server.protocol.v1_0.type.Binary; |
| import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; |
| import org.apache.qpid.server.protocol.v1_0.type.Symbol; |
| import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Error; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Open; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Role; |
| import org.apache.qpid.tests.protocol.Response; |
| import org.apache.qpid.tests.protocol.SpecificationTest; |
| import org.apache.qpid.tests.protocol.v1_0.FrameTransport; |
| import org.apache.qpid.tests.protocol.v1_0.Interaction; |
| import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; |
| import org.apache.qpid.tests.protocol.v1_0.MessageEncoder; |
| import org.apache.qpid.tests.protocol.v1_0.Utils; |
| import org.apache.qpid.tests.utils.BrokerAdmin; |
| import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; |
| |
| public class AnonymousTerminusTest extends BrokerAdminUsingTestBase |
| { |
| private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); |
| private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag"); |
| private static final String TEST_MESSAGE_CONTENT = "test"; |
| private InetSocketAddress _brokerAddress; |
| private Binary _deliveryTag; |
| |
| @Before |
| public void setUp() |
| { |
| final BrokerAdmin brokerAdmin = getBrokerAdmin(); |
| brokerAdmin.createQueue(BrokerAdmin.TEST_QUEUE_NAME); |
| _brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); |
| _deliveryTag = new Binary("testTag".getBytes(StandardCharsets.UTF_8)); |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2. Sending A Message", |
| description = "Messages sent over links into a routing node will be" |
| + " forwarded to the node referenced in the to field of properties of the message" |
| + " just as if a direct link has been established to that node.") |
| @Test |
| public void transferPreSettledToKnownDestination() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .attachRole(Role.SENDER) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) |
| .transferSettled(Boolean.TRUE) |
| .transferDeliveryTag(_deliveryTag) |
| .transfer() |
| .sync(); |
| |
| Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); |
| assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); |
| } |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors", |
| description = "It is possible that a message sent to a routing node has an address in the to field" |
| + " of properties which, if used in the address field of target of an attach," |
| + " would result in an unsuccessful link establishment (for example," |
| + " if the address cannot be resolved to a node). In this case the routing node" |
| + " MUST communicate the error back to the sender of the message." |
| + " [...] the message has already been settled by the sender," |
| + " then the routing node MUST detach the link with an error." |
| + " [...] the info field of error MUST contain an entry with symbolic key delivery-tag" |
| + " and binary value of the delivery-tag of the message which caused the failure.") |
| @Test |
| public void transferPreSettledToUnknownDestination() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .attachRole(Role.SENDER) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferSettled(Boolean.TRUE) |
| .transferDeliveryTag(_deliveryTag) |
| .transfer(); |
| |
| Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class); |
| Error error = detach.getError(); |
| assertThat(error, is(notNullValue())); |
| assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); |
| assertThat(error.getInfo(), is(notNullValue())); |
| assertThat(error.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); |
| } |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors", |
| description = "It is possible that a message sent to a routing node has an address in the to field" |
| + " of properties which, if used in the address field of target of an attach," |
| + " would result in an unsuccessful link establishment (for example," |
| + " if the address cannot be resolved to a node). In this case the routing node" |
| + " MUST communicate the error back to the sender of the message." |
| + " If the source of the link supports the rejected outcome," |
| + " and the message has not already been settled by the sender, then the routing node" |
| + " MUST reject the message." |
| + " [...] the info field of error MUST contain an entry with symbolic key delivery-tag" |
| + " and binary value of the delivery-tag of the message which caused the failure.") |
| @Test |
| public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeSupportedBySource() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .attachRole(Role.SENDER) |
| .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferDeliveryTag(_deliveryTag) |
| .transfer() |
| .consumeResponse(); |
| |
| Disposition disposition = interaction.getLatestResponse(Disposition.class); |
| |
| assertThat(disposition.getSettled(), is(true)); |
| |
| DeliveryState dispositionState = disposition.getState(); |
| assertThat(dispositionState, is(instanceOf(Rejected.class))); |
| |
| Rejected rejected = (Rejected)dispositionState; |
| Error error = rejected.getError(); |
| assertThat(error, is(notNullValue())); |
| assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); |
| assertThat(error.getInfo(), is(notNullValue())); |
| assertThat(error.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); |
| } |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors", |
| description = "It is possible that a message sent to a routing node has an address in the to field" |
| + " of properties which, if used in the address field of target of an attach," |
| + " would result in an unsuccessful link establishment (for example," |
| + " if the address cannot be resolved to a node). In this case the routing node" |
| + " MUST communicate the error back to the sender of the message." |
| + " [...]" |
| + " If the source of the link does not support the rejected outcome," |
| + " [...] then the routing node MUST detach the link with an error." |
| + " [...] the info field of error MUST contain an entry with symbolic key delivery-tag" |
| + " and binary value of the delivery-tag of the message which caused the failure.") |
| @Test |
| public void transferUnsettledToUnknownDestinationWhenRejectedOutcomeNotSupportedBySource() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .attachRole(Role.SENDER) |
| .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferDeliveryTag(_deliveryTag) |
| .transfer(); |
| |
| Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class); |
| Error error = detach.getError(); |
| assertThat(error, is(notNullValue())); |
| assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); |
| assertThat(error.getInfo(), is(notNullValue())); |
| assertThat(error.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); |
| } |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2. Sending A Message", |
| description = "Messages sent over links into a routing node will be" |
| + " forwarded to the node referenced in the to field of properties of the message" |
| + " just as if a direct link has been established to that node.") |
| @Test |
| public void transferPreSettledInTransactionToKnownDestination() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| final UnsignedInteger linkHandle = UnsignedInteger.ONE; |
| final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .txnAttachCoordinatorLink(txnState) |
| .txnDeclare(txnState) |
| |
| .attachRole(Role.SENDER) |
| .attachHandle(linkHandle) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferHandle(linkHandle) |
| .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) |
| .transferDeliveryTag(_deliveryTag) |
| .transferTransactionalState(txnState.getCurrentTransactionId()) |
| .transferSettled(Boolean.TRUE) |
| .transfer() |
| |
| .txnDischarge(txnState, false); |
| |
| assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); |
| |
| Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); |
| assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); |
| } |
| } |
| |
| @Test |
| public void transferUnsettledInTransactionToKnownDestination() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| final UnsignedInteger linkHandle = UnsignedInteger.ONE; |
| final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .txnAttachCoordinatorLink(txnState) |
| .txnDeclare(txnState) |
| |
| .attachRole(Role.SENDER) |
| .attachHandle(linkHandle) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferHandle(linkHandle) |
| .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) |
| .transferDeliveryTag(_deliveryTag) |
| .transferTransactionalState(txnState.getCurrentTransactionId()) |
| .transferSettled(Boolean.FALSE) |
| .transfer(); |
| |
| Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class); |
| |
| assertThat(disposition.getSettled(), is(true)); |
| |
| DeliveryState dispositionState = disposition.getState(); |
| assertThat(dispositionState, is(instanceOf(TransactionalState.class))); |
| |
| final TransactionalState receivedTxnState = (TransactionalState) dispositionState; |
| assertThat(receivedTxnState.getOutcome(), is(instanceOf(Accepted.class))); |
| |
| interaction.txnDischarge(txnState, false); |
| |
| assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); |
| |
| Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); |
| assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); |
| } |
| } |
| |
| @Test |
| public void transferUnsettledInTransactionToUnknownDestinationWhenRejectedOutcomeSupportedBySource() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| final UnsignedInteger linkHandle = UnsignedInteger.ONE; |
| final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .txnAttachCoordinatorLink(txnState) |
| .txnDeclare(txnState) |
| |
| .attachRole(Role.SENDER) |
| .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL) |
| .attachHandle(linkHandle) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferHandle(linkHandle) |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferDeliveryTag(_deliveryTag) |
| .transferTransactionalState(txnState.getCurrentTransactionId()) |
| .transferSettled(Boolean.FALSE) |
| .transfer(); |
| |
| Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class); |
| |
| assertThat(disposition.getSettled(), is(true)); |
| |
| DeliveryState dispositionState = disposition.getState(); |
| assertThat(dispositionState, is(instanceOf(TransactionalState.class))); |
| |
| final TransactionalState receivedTxnState = (TransactionalState) dispositionState; |
| assertThat(receivedTxnState.getOutcome(), is(instanceOf(Rejected.class))); |
| |
| final Error rejectedError = ((Rejected) receivedTxnState.getOutcome()).getError(); |
| assertThat(rejectedError.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); |
| assertThat(rejectedError.getInfo(), is(notNullValue())); |
| assertThat(rejectedError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); |
| |
| interaction.txnDischarge(txnState, false); |
| |
| assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); |
| } |
| } |
| |
| @Test |
| public void transferUnsettledInTransactionToUnknownDestinationWhenRejectedOutcomeNotSupportedBySource() throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); |
| final UnsignedInteger linkHandle = UnsignedInteger.ONE; |
| final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .txnAttachCoordinatorLink(txnState) |
| .txnDeclare(txnState) |
| |
| .attachRole(Role.SENDER) |
| .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) |
| .attachHandle(linkHandle) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferHandle(linkHandle) |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferDeliveryId(UnsignedInteger.valueOf(1)) |
| .transferDeliveryTag(_deliveryTag) |
| .transferTransactionalState(txnState.getCurrentTransactionId()) |
| .transferSettled(Boolean.FALSE) |
| .transfer(); |
| |
| Detach senderLinkDetach = interaction.consumeResponse().getLatestResponse(Detach.class); |
| Error senderLinkDetachError = senderLinkDetach.getError(); |
| assertThat(senderLinkDetachError, is(notNullValue())); |
| assertThat(senderLinkDetachError.getCondition(), is(equalTo(AmqpError.NOT_FOUND))); |
| assertThat(senderLinkDetachError.getInfo(), is(notNullValue())); |
| assertThat(senderLinkDetachError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); |
| |
| interaction.txnDischarge(txnState, false); |
| |
| DeliveryState txnDischargeDeliveryState = txnState.getDeliveryState(); |
| assertThat(txnDischargeDeliveryState, is(instanceOf(Rejected.class))); |
| Rejected rejected = (Rejected) txnDischargeDeliveryState; |
| Error error = rejected.getError(); |
| |
| assertThat(error, is(notNullValue())); |
| assertThat(error.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK))); |
| } |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors", |
| description = "It is possible that a message sent to a routing node has an address in the to field" |
| + " of properties which, if used in the address field of target of an attach," |
| + " would result in an unsuccessful link establishment (for example," |
| + " if the address cannot be resolved to a node). In this case the routing node" |
| + " MUST communicate the error back to the sender of the message." |
| + " [...]" |
| + " <Not in spec yet>" |
| + " AMQP-140" |
| + " If a message cannot be routed to the destination implied in the \"to:\" field," |
| + " and the source does not allow for the rejected outcome" |
| + " [...] when messages are being sent within a transaction and have been sent pre-settled." |
| + " In this case the behaviour defined for transactions (of essentially marking" |
| + " the transaction as rollback only) should take precedence. " |
| + "" |
| + " AMQP spec 4.3 Discharging a Transaction" |
| + " If the coordinator is unable to complete the discharge, the coordinator MUST convey" |
| + " the error to the controller as a transaction-error. If the source for the link to" |
| + " the coordinator supports the rejected outcome, then the message MUST be rejected" |
| + " with this outcome carrying the transaction-error.") |
| @Test |
| public void transferPreSettledInTransactionToUnknownDestinationWhenRejectOutcomeSupportedByTxController() |
| throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final UnsignedInteger linkHandle = UnsignedInteger.ONE; |
| final Interaction interaction = |
| openInteractionWithAnonymousRelayCapability(transport); |
| |
| final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| // attaching coordinator link with supported outcomes Accepted and Rejected |
| .txnAttachCoordinatorLink(txnState) |
| .txnDeclare(txnState) |
| |
| .attachRole(Role.SENDER) |
| .attachHandle(linkHandle) |
| .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferHandle(linkHandle) |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferDeliveryTag(_deliveryTag) |
| .transferTransactionalState(txnState.getCurrentTransactionId()) |
| .transferSettled(Boolean.TRUE) |
| .transfer(); |
| |
| interaction.txnDischarge(txnState, false); |
| |
| DeliveryState txDischargeDeliveryState = txnState.getDeliveryState(); |
| assertThat(txDischargeDeliveryState, is(instanceOf(Rejected.class))); |
| |
| Rejected rejected = (Rejected) txDischargeDeliveryState; |
| Error error = rejected.getError(); |
| |
| assertThat(error, is(notNullValue())); |
| assertThat(error.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK))); |
| } |
| } |
| |
| @SpecificationTest(section = "Using the Anonymous Terminus for Message Routing. 2.2.2 Routing Errors", |
| description = "It is possible that a message sent to a routing node has an address in the to field" |
| + " of properties which, if used in the address field of target of an attach," |
| + " would result in an unsuccessful link establishment (for example," |
| + " if the address cannot be resolved to a node). In this case the routing node" |
| + " MUST communicate the error back to the sender of the message." |
| + " [...]" |
| + " <Not in spec yet>" |
| + " AMQP-140" |
| + " If a message cannot be routed to the destination implied in the \"to:\" field," |
| + " and the source does not allow for the rejected outcome" |
| + " [...] when messages are being sent within a transaction and have been sent pre-settled." |
| + " In this case the behaviour defined for transactions (of essentially marking" |
| + " the transaction as rollback only) should take precedence. " |
| + "" |
| + " AMQP spec 4.3 Discharging a Transaction" |
| + " If the coordinator is unable to complete the discharge, the coordinator MUST convey" |
| + " the error to the controller as a transaction-error." |
| + " [...]" |
| + " If the source does not support the rejected outcome, the transactional resource MUST" |
| + " detach the link to the coordinator, with the detach performative carrying" |
| + " the transaction-error") |
| @Test |
| public void transferPreSettledInTransactionToUnknownDestinationWhenRejectOutcomeNotSupportedByTxController() |
| throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final UnsignedInteger linkHandle = UnsignedInteger.ONE; |
| final Interaction interaction = |
| openInteractionWithAnonymousRelayCapability(transport); |
| |
| final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); |
| |
| interaction.begin() |
| .consumeResponse(Begin.class) |
| |
| .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL) |
| .txnDeclare(txnState) |
| |
| .attachRole(Role.SENDER) |
| .attachHandle(linkHandle) |
| .attachTarget(new Target()) |
| .attachName("link-" + linkHandle) |
| .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL) |
| .attach().consumeResponse(Attach.class) |
| .consumeResponse(Flow.class) |
| |
| .transferDeliveryId() |
| .transferHandle(linkHandle) |
| .transferPayload(generateMessagePayloadToDestination("Unknown")) |
| .transferDeliveryTag(_deliveryTag) |
| .transferTransactionalState(txnState.getCurrentTransactionId()) |
| .transferSettled(Boolean.TRUE) |
| .transfer() |
| .txnSendDischarge(txnState, false); |
| |
| Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class); |
| Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError(); |
| assertThat(transactionCoordinatorDetachError, is(notNullValue())); |
| assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK))); |
| } |
| } |
| |
| private Disposition getDispositionForDeliveryId(final Interaction interaction, |
| final UnsignedInteger deliveryId) throws Exception |
| { |
| Disposition dischargeTransactionDisposition = null; |
| |
| SequenceNumber id = new SequenceNumber(deliveryId.intValue()); |
| do |
| { |
| Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse(); |
| if (response.getBody() instanceof Disposition) |
| { |
| Disposition disposition = (Disposition) response.getBody(); |
| UnsignedInteger first = disposition.getFirst(); |
| UnsignedInteger last = disposition.getLast() == null ? disposition.getFirst() : disposition.getLast(); |
| if (new SequenceNumber(first.intValue()).compareTo(id) >= 0 && new SequenceNumber(last.intValue()).compareTo(id) <=0) |
| { |
| dischargeTransactionDisposition = disposition; |
| } |
| } |
| } while (dischargeTransactionDisposition == null); |
| return dischargeTransactionDisposition; |
| } |
| |
| private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception |
| { |
| final Interaction interaction = transport.newInteraction(); |
| interaction.negotiateProtocol().consumeResponse() |
| .openDesiredCapabilities(ANONYMOUS_RELAY) |
| .open().consumeResponse(Open.class); |
| |
| Open open = interaction.getLatestResponse(Open.class); |
| assumeThat(Arrays.asList(open.getOfferedCapabilities()), hasItem(ANONYMOUS_RELAY)); |
| return interaction; |
| } |
| |
| private QpidByteBuffer generateMessagePayloadToDestination(final String destinationName) |
| { |
| MessageEncoder messageEncoder = new MessageEncoder(); |
| final Properties properties = new Properties(); |
| properties.setTo(destinationName); |
| messageEncoder.setProperties(properties); |
| messageEncoder.addData(TEST_MESSAGE_CONTENT); |
| return messageEncoder.getPayload(); |
| } |
| } |