| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms; |
| |
| import static org.hamcrest.CoreMatchers.notNullValue; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.is; |
| |
| import java.net.InetSocketAddress; |
| import java.util.Collections; |
| |
| import org.hamcrest.Matchers; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import org.apache.qpid.server.protocol.v1_0.Session_1_0; |
| import org.apache.qpid.server.protocol.v1_0.type.Symbol; |
| import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Open; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Role; |
| import org.apache.qpid.tests.protocol.SpecificationTest; |
| import org.apache.qpid.tests.protocol.v1_0.FrameTransport; |
| import org.apache.qpid.tests.protocol.v1_0.Interaction; |
| import org.apache.qpid.tests.protocol.v1_0.Utils; |
| import org.apache.qpid.tests.utils.BrokerAdmin; |
| import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; |
| |
| public class TemporaryDestinationTest extends BrokerAdminUsingTestBase |
| { |
| private static final Symbol TEMPORARY_QUEUE = Symbol.valueOf("temporary-queue"); |
| private static final Symbol TEMPORARY_TOPIC = Symbol.valueOf("temporary-topic"); |
| private InetSocketAddress _brokerAddress; |
| |
| @Before |
| public void setUp() |
| { |
| _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); |
| } |
| |
| @Test |
| @SpecificationTest(section = "5.3", |
| description = "To create a node with the required lifecycle properties, establish a uniquely named sending link with " |
| + "the dynamic field of target set true, the expiry-policy field of target set to symbol “link-detach”, and the " |
| + "dynamic-node-properties field of target containing the “lifetime-policy” symbol key mapped to delete-on-close.") |
| public void deleteOnCloseWithConnectionCloseForQueue() throws Exception |
| { |
| deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_QUEUE}); |
| } |
| |
| @Test |
| @SpecificationTest(section = "5.3", |
| description = "To create a node with the required lifecycle properties, establish a uniquely named sending link with " |
| + "the dynamic field of target set true, the expiry-policy field of target set to symbol “link-detach”, and the " |
| + "dynamic-node-properties field of target containing the “lifetime-policy” symbol key mapped to delete-on-close.") |
| public void deleteOnCloseWithConnectionCloseForTopic() throws Exception |
| { |
| deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_TOPIC}); |
| } |
| |
| private void deleteOnCloseWithConnectionClose(final Symbol[] targetCapabilities) throws Exception |
| { |
| String newTemporaryNodeAddress; |
| |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| Target target = createTarget(targetCapabilities); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final Attach attachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.SENDER) |
| .attachTarget(target) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(attachResponse.getSource(), is(notNullValue())); |
| assertThat(attachResponse.getTarget(), is(notNullValue())); |
| |
| newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| interaction.consumeResponse().getLatestResponse(Flow.class); |
| |
| interaction.doCloseConnection(); |
| } |
| |
| assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false)); |
| } |
| |
| |
| @Test |
| @SpecificationTest(section = "N/A", |
| description = "JMS 2.0." |
| + " 6.2.2. Creating temporary destinations" |
| + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" |
| + " that are system - generated uniquely for their connection. Only their own connection" |
| + " is allowed to create consumer objects for them." |
| + "" |
| + "4.1.5. TemporaryQueue" |
| + "A TemporaryQueue is a unique Queue object created for the duration of a connection." |
| + " It is a system-defined queue that can only be consumed by the connection that created it." |
| + "" |
| + "AMQP JMS Mapping." |
| + " 5.2. Destinations And Producers/Consumers" |
| + "[...] type information SHOULD be conveyed when creating producer or consumer links" |
| + " for the application by supplying a terminus capability for the particular Destination" |
| + " type to which the client expects to attach [...]" |
| + "TemporaryQueue Terminus capability : 'temporary-queue'") |
| public void createTemporaryQueueReceivingLink() throws Exception |
| { |
| final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE}; |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| Target target = createTarget(capabilities); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final UnsignedInteger senderHandle = UnsignedInteger.ONE; |
| final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.SENDER) |
| .attachHandle(senderHandle) |
| .attachTarget(target) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(senderAttachResponse.getSource(), is(notNullValue())); |
| assertThat(senderAttachResponse.getTarget(), is(notNullValue())); |
| |
| String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| interaction.consumeResponse().getLatestResponse(Flow.class); |
| |
| final Attach receiverAttachResponse = interaction.attachRole(Role.RECEIVER) |
| .attachSource(createSource(newTemporaryNodeAddress, |
| capabilities)) |
| .attachHandle(UnsignedInteger.valueOf(2)) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(receiverAttachResponse.getSource(), is(notNullValue())); |
| assertThat(receiverAttachResponse.getSource(), is(notNullValue())); |
| assertThat(((Source) receiverAttachResponse.getSource()).getAddress(), |
| is(equalTo(newTemporaryNodeAddress))); |
| |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| @Test |
| @SpecificationTest(section = "N/A", |
| description = "JMS 2.0." |
| + " 6.2.2. Creating temporary destinations" |
| + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" |
| + " that are system - generated uniquely for their connection. Only their own connection" |
| + " is allowed to create consumer objects for them." |
| + "" |
| + "4.1.5. TemporaryQueue" |
| + "A TemporaryQueue is a unique Queue object created for the duration of a connection." |
| + " It is a system-defined queue that can only be consumed by the connection that created it." |
| + "" |
| + "AMQP JMS Mapping." |
| + " 5.2. Destinations And Producers/Consumers" |
| + "[...] type information SHOULD be conveyed when creating producer or consumer links" |
| + " for the application by supplying a terminus capability for the particular Destination" |
| + " type to which the client expects to attach [...]" |
| + "TemporaryQueue Terminus capability : 'temporary-queue'") |
| public void createTemporaryQueueReceivingLinkFromOtherConnectionDisallowed() throws Exception |
| { |
| final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE}; |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| Target target = createTarget(capabilities); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final UnsignedInteger senderHandle = UnsignedInteger.ONE; |
| final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.SENDER) |
| .attachHandle(senderHandle) |
| .attachTarget(target) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(senderAttachResponse.getSource(), is(notNullValue())); |
| assertThat(senderAttachResponse.getTarget(), is(notNullValue())); |
| |
| String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| interaction.consumeResponse().getLatestResponse(Flow.class); |
| |
| assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED); |
| |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| @Test |
| @SpecificationTest(section = "N/A", |
| description = "JMS 2.0." |
| + " 6.2.2. Creating temporary destinations" |
| + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" |
| + " that are system - generated uniquely for their connection. Only their own connection" |
| + " is allowed to create consumer objects for them.") |
| public void createTemporaryQueueSendingLinkFromOtherConnectionAllowed() throws Exception |
| { |
| final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE}; |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| Target target = createTarget(capabilities); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final UnsignedInteger senderHandle = UnsignedInteger.ONE; |
| final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.SENDER) |
| .attachHandle(senderHandle) |
| .attachTarget(target) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(senderAttachResponse.getSource(), is(notNullValue())); |
| assertThat(senderAttachResponse.getTarget(), is(notNullValue())); |
| |
| String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| interaction.consumeResponse().getLatestResponse(Flow.class); |
| |
| assertSendingLinkSucceeds(newTemporaryNodeAddress); |
| |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| @Test |
| @SpecificationTest(section = "N/A", |
| description = "JMS 2.0." |
| + " 6.2.2. Creating temporary destinations" |
| + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" |
| + " that are system - generated uniquely for their connection. Only their own connection" |
| + " is allowed to create consumer objects for them." |
| + "" |
| + "4.2.7. Temporary topics" |
| + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext," |
| + " Connection or TopicConnection . It is a system defined Topic whose messages may be" |
| + " consumed only by the connection that created it." |
| + "" |
| + "AMQP JMS Mapping." |
| + " 5.2. Destinations And Producers/Consumers" |
| + "[...] type information SHOULD be conveyed when creating producer or consumer links" |
| + " for the application by supplying a terminus capability for the particular Destination" |
| + " type to which the client expects to attach" |
| + "TemporaryTopic Terminus capability : 'temporary-topic'") |
| public void createTemporaryTopicSubscriptionReceivingLink() throws Exception |
| { |
| final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC}; |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Source source = new Source(); |
| source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); |
| source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| source.setCapabilities(capabilities); |
| source.setDynamic(true); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final UnsignedInteger receiverHandle = UnsignedInteger.ONE; |
| final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.RECEIVER) |
| .attachSource(source) |
| .attachHandle(receiverHandle) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(receiverAttachResponse.getSource(), is(notNullValue())); |
| |
| String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| Target target = new Target(); |
| target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); |
| target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| target.setCapabilities(capabilities); |
| target.setAddress(newTemporaryNodeAddress); |
| |
| final UnsignedInteger senderHandle = UnsignedInteger.valueOf(2); |
| interaction.attachRole(Role.SENDER) |
| .attachHandle(senderHandle) |
| .attachTarget(target) |
| .attach() |
| .consumeResponse(Attach.class) |
| .consumeResponse(Flow.class); |
| |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| @Test |
| @SpecificationTest(section = "N/A", |
| description = "JMS 2.0." |
| + " 6.2.2. Creating temporary destinations" |
| + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" |
| + " that are system - generated uniquely for their connection. Only their own connection" |
| + " is allowed to create consumer objects for them." |
| + "" |
| + "4.2.7. Temporary topics" |
| + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext," |
| + " Connection or TopicConnection . It is a system defined Topic whose messages may be" |
| + " consumed only by the connection that created it." |
| + "" |
| + "AMQP JMS Mapping." |
| + " 5.2. Destinations And Producers/Consumers" |
| + "[...] type information SHOULD be conveyed when creating producer or consumer links" |
| + " for the application by supplying a terminus capability for the particular Destination" |
| + " type to which the client expects to attach" |
| + "TemporaryTopic Terminus capability : 'temporary-topic'") |
| public void createTemporaryTopicSubscriptionReceivingLinkFromOtherConnectionDisallowed() throws Exception |
| { |
| final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC}; |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Source source = new Source(); |
| source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); |
| source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| source.setCapabilities(capabilities); |
| source.setDynamic(true); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final UnsignedInteger receiverHandle = UnsignedInteger.ONE; |
| final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.RECEIVER) |
| .attachSource(source) |
| .attachHandle(receiverHandle) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(receiverAttachResponse.getSource(), is(notNullValue())); |
| assertThat(receiverAttachResponse.getSource(), is(notNullValue())); |
| |
| String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED); |
| |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| @Test |
| @SpecificationTest(section = "N/A", |
| description = "JMS 2.0." |
| + " 6.2.2. Creating temporary destinations" |
| + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations" |
| + " that are system - generated uniquely for their connection. Only their own connection" |
| + " is allowed to create consumer objects for them.") |
| public void createTemporaryTopicSendingLinkFromOtherConnectionAllowed() throws Exception |
| { |
| final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC}; |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Source source = new Source(); |
| source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); |
| source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| source.setCapabilities(capabilities); |
| source.setDynamic(true); |
| |
| final Interaction interaction = transport.newInteraction(); |
| final UnsignedInteger receiverHandle = UnsignedInteger.ONE; |
| final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse() |
| .open().consumeResponse(Open.class) |
| .begin().consumeResponse(Begin.class) |
| .attachRole(Role.RECEIVER) |
| .attachSource(source) |
| .attachHandle(receiverHandle) |
| .attach().consumeResponse() |
| .getLatestResponse(Attach.class); |
| |
| assertThat(receiverAttachResponse.getSource(), is(notNullValue())); |
| |
| String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress(); |
| assertThat(newTemporaryNodeAddress, is(notNullValue())); |
| |
| assertSendingLinkSucceeds(newTemporaryNodeAddress); |
| |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| private void assertReceivingLinkFails(final Source source, final AmqpError expectedError) throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| final Interaction interaction = transport.newInteraction(); |
| final Detach responseDetach = interaction.negotiateProtocol() |
| .consumeResponse() |
| .open() |
| .consumeResponse(Open.class) |
| .begin() |
| .consumeResponse(Begin.class) |
| .attachRole(Role.RECEIVER) |
| .attachSource(source) |
| .attach() |
| .consumeResponse(Attach.class) |
| .consumeResponse(Detach.class) |
| .getLatestResponse(Detach.class); |
| assertThat(responseDetach.getClosed(), is(true)); |
| assertThat(responseDetach.getError(), is(Matchers.notNullValue())); |
| assertThat(responseDetach.getError().getCondition(), is(equalTo(expectedError))); |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| private void assertSendingLinkSucceeds(final String address) throws Exception |
| { |
| try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) |
| { |
| Target target = new Target(); |
| target.setAddress(address); |
| |
| final Interaction interaction = transport.newInteraction(); |
| interaction.negotiateProtocol() |
| .consumeResponse() |
| .open() |
| .consumeResponse(Open.class) |
| .begin() |
| .consumeResponse(Begin.class) |
| .attachRole(Role.SENDER) |
| .attachTarget(target) |
| .attach() |
| .consumeResponse(Attach.class) |
| .consumeResponse(Flow.class); |
| interaction.doCloseConnection(); |
| } |
| } |
| |
| private Target createTarget(final Symbol[] capabilities) |
| { |
| Target target = new Target(); |
| target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); |
| target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| target.setDynamic(true); |
| target.setCapabilities(capabilities); |
| return target; |
| } |
| |
| private Source createSource(final String name, final Symbol[] capabilities) |
| { |
| final Source source = new Source(); |
| source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); |
| source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| source.setCapabilities(capabilities); |
| source.setAddress(name); |
| return source; |
| } |
| } |