blob: 51f886540276040f3f49bbe0d448ab629f9f6c49 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v0_10;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.qpid.server.filter.AMQPFilterTypes.JMS_SELECTOR;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
import java.util.Collections;
import org.hamcrest.Matchers;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.protocol.v0_10.transport.ExchangeBoundResult;
import org.apache.qpid.server.protocol.v0_10.transport.ExchangeQueryResult;
import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class ExchangeTest extends BrokerAdminUsingTestBase
{
private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
@Test
@SpecificationTest(section = "10.exchange.declare", description = "verify exchange exists, create if needed.")
public void exchangeDeclare() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
SessionCompleted completed = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange("myexch")
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareId(0)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse()
.getLatestResponse(SessionCompleted.class);
assertThat(completed.getCommands().includes(0), is(equalTo(true)));
}
}
@Test
@SpecificationTest(section = "10.exchange.declare",
description = "In the event that a message cannot be routed, this is the name of the exchange to which the"
+ " message will be sent.")
public void exchangeDeclareWithAlternateExchange() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange("myexch")
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareAlternateExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME)
.declareId(0)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
}
}
@Test
@SpecificationTest(section = "10.exchange.declare",
description = "if the alternate-exchange does not match the name of any existing exchange on the server, "
+ "then an exception must be raised.")
public void exchangeDeclareAlternateExchangeNotFound() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange("myexch")
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareAlternateExchange("unknownExchange")
.declareId(0)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
}
}
@Test
@SpecificationTest(section = "10.exchange.declare",
description = "If set [durable] when creating a new exchange, the exchange will be marked as durable. "
+ "Durable exchanges remain active when a server restarts. ")
public void exchangeDeclareDurable() throws Exception
{
String exchangeName = "myexch";
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange(exchangeName)
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareId(0)
.declareDurable(true)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
}
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
getBrokerAdmin().restart();
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange(exchangeName)
.declarePassive(true)
.declareId(0)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse()
.getLatestResponse(SessionCompleted.class);
}
}
@Test
@SpecificationTest(section = "10.exchange.delete", description = "delete an exchange")
public void exchangeDelete() throws Exception
{
String exchangeName = "myexch";
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange(exchangeName)
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareId(0)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
interaction.exchange()
.deleteExchange(exchangeName)
.deleteId(1)
.delete()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
ExecutionResult result = interaction.exchange()
.queryExchange(exchangeName)
.queryId(2)
.query()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse().getLatestResponse(ExecutionResult.class);
ExchangeQueryResult queryResult = (ExchangeQueryResult) result.getValue();
assertThat(queryResult.getNotFound(), is(equalTo(true)));
}
}
@Test
@SpecificationTest(section = "10.exchange.delete",
description = "An exchange MUST NOT be deleted if it is in use as an alternate-exchange by a queue or by "
+ "another exchange.")
public void exchangeDeleteInUseAsAlternate() throws Exception
{
String exchangeName1 = "myexch1";
String exchangeName2 = "myexch2";
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareExchange(exchangeName1)
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareId(0)
.declare()
.exchange()
.declareExchange(exchangeName2)
.declareAlternateExchange(exchangeName1)
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareId(1)
.declare()
.exchange()
.deleteExchange(exchangeName1)
.deleteId(2)
.delete()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_ALLOWED)));
}
}
@Test
@SpecificationTest(section = "10.exchange.query", description = "request information about an exchange")
public void exchangeQuery() throws Exception
{
String exchangeName = "myexch";
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionResult result = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.declareId(0)
.declareExchange(exchangeName)
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declare()
.exchange()
.queryId(1)
.queryExchange(exchangeName)
.query()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse().getLatestResponse(ExecutionResult.class);
ExchangeQueryResult queryResult = (ExchangeQueryResult) result.getValue();
assertThat(queryResult.getNotFound(), is(equalTo(false)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description = "This command binds a queue to an exchange."
+ " Until a queue is bound it will not receive any messages.")
public void exchangeBind() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
ExecutionResult execResult = interaction.exchange()
.boundId(2)
.boundExchange("amq.direct")
.boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
.boundBindingKey("bk")
.bound()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionResult.class);
assertThat(execResult.getValue(), is(instanceOf(ExchangeBoundResult.class)));
ExchangeBoundResult result = (ExchangeBoundResult) execResult.getValue();
assertThat(result.getExchangeNotFound(), is(equalTo(false)));
assertThat(result.getQueueNotFound(), is(equalTo(false)));
assertThat(result.getKeyNotMatched(), is(equalTo(false)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description =
"A client MUST NOT be allowed to bind a non-existent and unnamed queue (i.e. empty queue name)"
+ " to an exchange.")
@Ignore("spec requires INVALID_ARGUMENT error but NOT_FOUND is returned")
public void exchangeBindEmptyQueue() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindBindingKey("bk")
.bindQueue("")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.INVALID_ARGUMENT)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description =
"A client MUST NOT be allowed to bind a non-existent and unnamed queue (i.e. empty queue name)"
+ " to an exchange.")
@Ignore("spec requires INVALID_ARGUMENT error but NOT_FOUND is returned")
public void exchangeBindNonExistingQueue() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindBindingKey("bk")
.bindQueue("non-existing")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description = "The name of the exchange MUST NOT be a blank or empty string.")
public void exchangeBindEmptyExchange() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("")
.bindBindingKey("bk")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.INVALID_ARGUMENT)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description = "The name of the exchange MUST NOT be a blank or empty string.")
public void exchangeBindNonExistingExchange() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("non-existing")
.bindBindingKey("bk")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description = "A server MUST ignore duplicate bindings - that is, two or more bind commands with the"
+ " same exchange, queue, and binding-key - without treating these as an error."
+ " The value of the arguments used for the binding MUST NOT be altered"
+ " by subsequent binding requests.")
public void exchangeBindIgnoreDuplicates() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class)
.exchange()
.bindId(1)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bindArguments(Collections.singletonMap(JMS_SELECTOR.name(), "name='a'"))
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
ExecutionResult execResult = interaction.exchange()
.boundId(2)
.boundExchange("amq.direct")
.boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
.boundBindingKey("bk")
.boundArguments(Collections.singletonMap(JMS_SELECTOR.name(),
"name='a'"))
.bound()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionResult.class);
assertThat(execResult.getValue(), is(instanceOf(ExchangeBoundResult.class)));
ExchangeBoundResult result = (ExchangeBoundResult) execResult.getValue();
assertThat(result.getExchangeNotFound(), is(equalTo(false)));
assertThat(result.getQueueNotFound(), is(equalTo(false)));
assertThat(result.getKeyNotMatched(), is(equalTo(false)));
assertThat(result.getArgsNotMatched(), is(equalTo(true)));
}
}
@Test
@SpecificationTest(section = "10.exchange.bind",
description = " Bindings between durable queues and durable exchanges are automatically durable and"
+ " the server MUST restore such bindings after a server restart.")
public void exchangeBindMultipleBindings() throws Exception
{
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
.declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
.declareId(0)
.declareDurable(true)
.declare()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class)
.exchange()
.bindId(1)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
}
getBrokerAdmin().restart();
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ExecutionResult execResult = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.boundId(0)
.boundExchange("amq.direct")
.boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
.boundBindingKey("bk")
.bound()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionResult.class);
assertThat(execResult.getValue(), is(instanceOf(ExchangeBoundResult.class)));
ExchangeBoundResult result = (ExchangeBoundResult) execResult.getValue();
assertThat(result.getExchangeNotFound(), is(equalTo(false)));
assertThat(result.getQueueNotFound(), is(equalTo(false)));
assertThat(result.getKeyNotMatched(), is(equalTo(false)));
}
}
@Test
@SpecificationTest(section = "11.exchange.unbind",
description = "This command unbinds a queue from an exchange.")
public void exchangeUnbind() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class)
.exchange()
.unbindId(1)
.unbindExchange("amq.direct")
.unbindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.unbindBindingKey("bk")
.unbind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class);
ExecutionResult execResult = interaction.exchange()
.boundId(2)
.boundExchange("amq.direct")
.boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
.boundBindingKey("bk")
.bound()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionResult.class);
assertThat(execResult.getValue(), is(instanceOf(ExchangeBoundResult.class)));
ExchangeBoundResult result = (ExchangeBoundResult) execResult.getValue();
assertThat(result.getExchangeNotFound(), is(equalTo(false)));
assertThat(result.getQueueNotFound(), is(equalTo(false)));
assertThat(result.getKeyNotMatched(), is(equalTo(true)));
}
}
@Test
@SpecificationTest(section = "11.exchange.unbind",
description = "This command unbinds a queue from an exchange.")
public void exchangeUnbindWithoutBindingKey() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
final ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class)
.exchange()
.unbindId(1)
.unbindExchange("amq.direct")
.unbindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.unbind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.ILLEGAL_ARGUMENT)));
}
}
@Test
@SpecificationTest(section = "11.exchange.unbind",
description = "If the queue does not exist the server MUST raise an exception.")
public void exchangeUnbindNonExistingQueue() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
final ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.bindId(0)
.bindExchange("amq.direct")
.bindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.bindBindingKey("bk")
.bind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCompleted.class)
.exchange()
.unbindId(1)
.unbindExchange("amq.direct")
.unbindQueue("not-existing")
.unbindBindingKey("bk")
.unbind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
}
}
@Test
@SpecificationTest(section = "11.exchange.unbind",
description = "If the exchange does not exist the server MUST raise an exception.")
public void exchangeUnbindNonExistingExchange() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
final ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
.unbindId(0)
.unbindExchange("not-existing")
.unbindQueue(BrokerAdmin.TEST_QUEUE_NAME)
.unbindBindingKey(BrokerAdmin.TEST_QUEUE_NAME)
.unbind()
.session()
.flushCompleted()
.flush()
.consumeResponse(SessionCommandPoint.class)
.consumeResponse()
.getLatestResponse(ExecutionException.class);
assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
}
}
}