blob: ef30d47a13c2e24607822627b1caba110e244efb [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v0_8;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.util.Map;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class ExchangeTest extends BrokerAdminUsingTestBase
{
private static final String TEST_EXCHANGE = "testExchange";
@Test
@SpecificationTest(section = "1.6.2.1", description = "verify exchange exists, create if needed")
public void exchangeDeclare() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
ExchangeBoundOkBody response = interaction.exchange()
.boundExchangeName(TEST_EXCHANGE)
.bound()
.consumeResponse()
.getLatestResponse(ExchangeBoundOkBody.class);
assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
}
}
@Test
@SpecificationTest(section = "1.6.2.1",
description = "If [passive is] set, the server will reply with Declare-Ok if the exchange "
+ "already exists with the same name, and raise an error if not. The client can "
+ "use this to check whether an exchange exists without modifying the server state.")
public void exchangeDeclarePassive() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
.exchange().declarePassive(true).declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
.exchange().deleteExchangeName(TEST_EXCHANGE).delete()
.consumeResponse(ExchangeDeleteOkBody.class);
ChannelCloseBody response = interaction.exchange().declarePassive(true).declareName(TEST_EXCHANGE).declare()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
}
}
@Test
@SpecificationTest(section = "1.6.2.1",
description = "Exchange names starting with \"amq.\" are reserved for pre-declared and standardised "
+ "exchanges. The client MAY declare an exchange starting with \"amq.\" if the passive "
+ "option is set, or the exchange already exists.")
public void exchangeDeclareAmqDisallowed() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declarePassive(true).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
.exchange().declarePassive(false).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
ConnectionCloseBody response = interaction.exchange()
.declarePassive(false)
.declareName("amq.illegal")
.declare()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
/* TODO: 0-91 specification requires 'access-refused' (403) but server uses 'not-allowed' (530) */
assertThat(response.getReplyCode(), anyOf(equalTo(ErrorCodes.NOT_ALLOWED), equalTo(ErrorCodes.ACCESS_REFUSED)));
}
}
@Test
@SpecificationTest(section = "1.6.2.1",
description = "The client MUST not attempt to redeclare an existing exchange with a different type than "
+ "used in the original Exchange.Declare method")
public void exchangeRedeclareDifferentType() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
ConnectionCloseBody response = interaction.exchange()
.declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
.declareName(TEST_EXCHANGE).declare()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
}
}
@Test
@Disabled("The server does not implement this rule.")
@SpecificationTest(section = "1.6.2.1",
description = "When [passive] set, all other method fields [of declare] except name and no-wait are ignored.")
public void exchangeRedeclarePassiveDifferentType() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
interaction.exchange()
.declarePassive(true)
.declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
.declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
}
}
@Test
@SpecificationTest(section = "1.6.2.1",
description = "The client MUST NOT attempt to declare an exchange with a type that the server does not "
+ "support.")
public void exchangeUnsupportedExchangeType() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
ConnectionCloseBody response = interaction.exchange().declarePassive(true)
.declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
.declareName(TEST_EXCHANGE).declare()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
}
}
@Test
@SpecificationTest(section = "1.6.2.1",
description = "If [durable is] set when creating a new exchange, the exchange will be marked as durable. "
+ "Durable exchanges remain active when a server restarts. Non-durable exchanges (transient "
+ "exchanges) are purged if/when a server restarts.")
public void exchangeDeclareDurable() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareDurable(true).declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
}
assumeTrue(getBrokerAdmin().supportsRestart());
getBrokerAdmin().restart();
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExchangeBoundOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.boundExchangeName(TEST_EXCHANGE)
.bound()
.consumeResponse()
.getLatestResponse(ExchangeBoundOkBody.class);
assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
}
}
@Test
@SpecificationTest(section = "1.6.2.3",
description = "This method deletes an exchange. When an exchange is deleted all queue bindings on the "
+ "exchange are cancelled.")
public void exchangeDelete() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
ExchangeBoundOkBody boundResponse = interaction.exchange()
.boundExchangeName(TEST_EXCHANGE)
.bound()
.consumeResponse()
.getLatestResponse(ExchangeBoundOkBody.class);
assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
interaction.exchange()
.deleteExchangeName(TEST_EXCHANGE).delete()
.consumeResponse(ExchangeDeleteOkBody.class);
ExchangeBoundOkBody boundResponse2 = interaction.exchange()
.boundExchangeName(TEST_EXCHANGE)
.bound()
.consumeResponse()
.getLatestResponse(ExchangeBoundOkBody.class);
assertThat(boundResponse2.getReplyCode(), is(equalTo(ExchangeBoundOkBody.EXCHANGE_NOT_FOUND)));
}
}
@Test
@SpecificationTest(section = "1.6.2.3",
description = "The client MUST NOT attempt to delete an exchange that does not exist.")
public void exchangeDeleteExchangeNotFound() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ChannelCloseBody unknownExchange = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().deleteExchangeName("unknownExchange").delete()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
assertThat(unknownExchange.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
interaction.channel().closeOk();
}
}
@Test
@SpecificationTest(section = "1.6.2.3",
description = "If [if-unused is] set, the server will only delete the exchange if it has no queue"
+ "bindings. If the exchange has queue bindings the server does not delete it but raises a "
+ "channel exception instead.")
public void exchangeDeleteInUse() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
.queue().bindName(TEST_EXCHANGE).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bind()
.consumeResponse(QueueBindOkBody.class);
ChannelCloseBody response = interaction.exchange()
.deleteExchangeName(TEST_EXCHANGE)
.deleteIfUnused(true)
.delete()
.consumeResponse()
.getLatestResponse(ChannelCloseBody.class);
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.IN_USE)));
interaction.channel().closeOk();
ExchangeBoundOkBody boundResponse = interaction.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.boundExchangeName(TEST_EXCHANGE)
.bound()
.consumeResponse()
.getLatestResponse(ExchangeBoundOkBody.class);
assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
interaction.queue().unbindName(TEST_EXCHANGE).unbindQueueName(BrokerAdmin.TEST_QUEUE_NAME).unbind()
.consumeResponse(QueueUnbindOkBody.class)
.exchange()
.deleteIfUnused(true)
.deleteExchangeName(TEST_EXCHANGE)
.delete()
.consumeResponse(ExchangeDeleteOkBody.class);
}
}
@Test
@SpecificationTest(section = "1.6.2.3",
description = "The server MUST, in each virtual host, pre-declare an exchange instance for each standard "
+ "exchange type that it implements.")
public void exchangeDeleteAmqDisallowed() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.deleteExchangeName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).delete()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
}
}
/** Qpid specific extension */
@Test
public void exchangeDeclareAutoDelete() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declareAutoDelete(true).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
.queue().bindName(TEST_EXCHANGE).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bind()
.consumeResponse(QueueBindOkBody.class)
.queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
.consumeResponse(QueueDeleteOkBody.class);
ExchangeBoundOkBody boundResponse = interaction.exchange()
.boundExchangeName(TEST_EXCHANGE)
.bound()
.consumeResponse()
.getLatestResponse(ExchangeBoundOkBody.class);
assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.EXCHANGE_NOT_FOUND)));
}
}
/** Qpid specific extension */
@Test
public void exchangeDeclareWithAlternateExchange() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final String altExchName = "altExchange";
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareName(altExchName)
.declare()
.consumeResponse(ExchangeDeclareOkBody.class)
.exchange()
.declareName(TEST_EXCHANGE)
.declareArguments(Map.of("alternateExchange", altExchName)).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
ChannelCloseBody inUseResponse = interaction.exchange()
.deleteExchangeName(altExchName)
.delete()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
assertThat(inUseResponse.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
interaction.channel().closeOk();
interaction.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.deleteExchangeName(TEST_EXCHANGE)
.delete()
.consumeResponse(ExchangeDeleteOkBody.class)
.exchange()
.deleteExchangeName(altExchName)
.delete()
.consumeResponse(ExchangeDeleteOkBody.class);
}
}
/** Qpid specific extension */
@Test
public void exchangeDeclareWithUnknownAlternateExchange() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareName(TEST_EXCHANGE)
.declareArguments(Map.of("alternateExchange", "notKnown")).declare()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
// TODO server fails - jira required
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
}
}
}