QPID-8349: [Tests][AMQP 1.0] Make protocol tests for amqp 0-10 consistent with protocol tests for amqp 1.0
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
index 56d7498..57f38d4 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -22,6 +22,7 @@
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecureOk;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
@@ -31,6 +32,7 @@
public static final String SASL_MECHANISM_PLAIN = "PLAIN";
private final Interaction _interaction;
+ private final ConnectionSecureOk _secureOk;
private ConnectionStartOk _startOk;
private ConnectionTuneOk _tuneOk;
private ConnectionOpen _open;
@@ -39,6 +41,7 @@
public ConnectionInteraction(final Interaction interaction)
{
_interaction = interaction;
+ _secureOk = new ConnectionSecureOk();
_startOk = new ConnectionStartOk();
_tuneOk = new ConnectionTuneOk();
_open = new ConnectionOpen();
@@ -84,10 +87,10 @@
return this;
}
- public ConnectionInteraction startOkResponse(final byte[] response)
+ public Interaction secureOk(final byte[] response) throws Exception
{
- _startOk.setResponse(response);
- return this;
+ _secureOk.setResponse(response);
+ return _interaction.sendPerformative(_secureOk);
}
public Interaction heartbeat() throws Exception
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
index 3b7849c..0989af7 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
@@ -20,19 +20,29 @@
*/
package org.apache.qpid.tests.protocol.v0_10;
-import java.net.InetSocketAddress;
-
import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10;
import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+import org.apache.qpid.tests.utils.BrokerAdmin;
public class FrameTransport extends AbstractFrameTransport<Interaction>
{
private final byte[] _protocolHeader;
+ private final BrokerAdmin.PortType _portType;
+ private final BrokerAdmin _brokerAdmin;
- public FrameTransport(final InetSocketAddress brokerAddress)
+ public FrameTransport(final BrokerAdmin brokerAdmin)
{
- super(brokerAddress, new FrameDecoder(new ProtocolEngineCreator_0_10().getHeaderIdentifier()), new FrameEncoder());
+ this(brokerAdmin, getPortType(brokerAdmin));
+ }
+
+ public FrameTransport(final BrokerAdmin brokerAdmin, final BrokerAdmin.PortType portType)
+ {
+ super(brokerAdmin.getBrokerAddress(portType),
+ new FrameDecoder(new ProtocolEngineCreator_0_10().getHeaderIdentifier()),
+ new FrameEncoder());
+ _portType = portType;
+ _brokerAdmin = brokerAdmin;
_protocolHeader = new ProtocolEngineCreator_0_10().getHeaderIdentifier();
}
@@ -45,7 +55,7 @@
@Override
public Interaction newInteraction()
{
- return new Interaction(this);
+ return new Interaction(this, _brokerAdmin, _portType);
}
@Override
@@ -55,4 +65,8 @@
return this;
}
+ private static BrokerAdmin.PortType getPortType(final BrokerAdmin brokerAdmin)
+ {
+ return brokerAdmin.isAnonymousSupported() ? BrokerAdmin.PortType.ANONYMOUS_AMQP : BrokerAdmin.PortType.AMQP;
+ }
}
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index fd05901..12379ce 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -20,12 +20,18 @@
*/
package org.apache.qpid.tests.protocol.v0_10;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
@@ -36,9 +42,12 @@
import org.apache.qpid.tests.protocol.AbstractFrameTransport;
import org.apache.qpid.tests.protocol.AbstractInteraction;
import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.utils.BrokerAdmin;
public class Interaction extends AbstractInteraction<Interaction>
{
+ private final BrokerAdmin _brokerAdmin;
+ private final BrokerAdmin.PortType _portType;
private byte[] _protocolHeader;
private ConnectionInteraction _connectionInteraction;
private SessionInteraction _sessionInteraction;
@@ -49,7 +58,9 @@
private int _channelId;
private TxInteraction _txInteraction;
- public Interaction(final AbstractFrameTransport frameTransport)
+ public Interaction(final AbstractFrameTransport frameTransport,
+ final BrokerAdmin brokerAdmin,
+ final BrokerAdmin.PortType portType)
{
super(frameTransport);
_connectionInteraction = new ConnectionInteraction(this);
@@ -60,6 +71,8 @@
_queueInteraction = new QueueInteraction(this);
_exchangeInteraction = new ExchangeInteraction(this);
_protocolHeader = getTransport().getProtocolHeader();
+ _brokerAdmin = brokerAdmin;
+ _portType = portType;
}
@Override
@@ -151,16 +164,62 @@
return dst;
}
- public Interaction openAnonymousConnection() throws Exception
+ public Interaction negotiateOpen() throws Exception
+ {
+ authenticateConnection().connection().tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOk.class);
+ return this;
+ }
+
+ public Interaction authenticateConnection() throws Exception
+ {
+ if (_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS)
+ {
+ openAnonymous();
+ }
+ else
+ {
+ final ConnectionStart start = this.negotiateProtocol().consumeResponse()
+ .consumeResponse().getLatestResponse(ConnectionStart.class);
+ final List<Object> supportedMechanisms =
+ start.getMechanisms() == null ? Collections.emptyList() : start.getMechanisms();
+
+ if (supportedMechanisms.stream().noneMatch(m -> String.valueOf(m).equalsIgnoreCase(ConnectionInteraction.SASL_MECHANISM_PLAIN)))
+ {
+ if (supportedMechanisms.stream()
+ .noneMatch(m -> String.valueOf(m).equalsIgnoreCase(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)))
+ {
+ throw new IllegalStateException(String.format(
+ "PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", supportedMechanisms.stream().map(
+ String::valueOf).collect(
+ Collectors.joining(","))));
+ }
+ else
+ {
+ openAnonymous();
+ }
+ }
+ else
+ {
+ final byte[] initialResponse = String.format("\0%s\0%s",
+ _brokerAdmin.getValidUsername(),
+ _brokerAdmin.getValidPassword())
+ .getBytes(UTF_8);
+ this.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk()
+ .consumeResponse(ConnectionSecure.class)
+ .connection().secureOk(initialResponse).consumeResponse(ConnectionTune.class);
+ }
+ }
+ return this;
+ }
+
+ private void openAnonymous() throws Exception
{
this.negotiateProtocol().consumeResponse()
.consumeResponse(ConnectionStart.class)
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
- .consumeResponse(ConnectionTune.class)
- .connection().tuneOk()
- .connection().open()
- .consumeResponse(ConnectionOpenOk.class);
- return this;
+ .consumeResponse(ConnectionTune.class);
}
public SessionInteraction session()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
index 18cab0d..ba49fab 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -21,19 +21,15 @@
package org.apache.qpid.tests.protocol.v0_10;
import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
-
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
@@ -50,21 +46,14 @@
public class ConnectionTest extends BrokerAdminUsingTestBase
{
- private static final String DEFAULT_LOCALE = "en_US";
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "9.connection.start-ok",
description = "An AMQP client MUST handle incoming connection.start controls.")
public void startOk() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ assumeThat(getBrokerAdmin().isAnonymousSupported(), is(equalTo(true)));
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.ANONYMOUS_AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
@@ -80,7 +69,8 @@
+ " Certain fields are negotiated, others provide capability information.")
public void tuneOkAndOpen() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ assumeThat(getBrokerAdmin().isAnonymousSupported(), is(equalTo(true)));
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.ANONYMOUS_AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
@@ -98,8 +88,7 @@
description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
public void authenticationBypassBySendingTuneOk() throws Exception
{
- InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
@@ -115,8 +104,7 @@
description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
public void authenticationBypassBySendingOpen() throws Exception
{
- InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse().consumeResponse(ConnectionStart.class)
@@ -130,8 +118,8 @@
description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
public void authenticationBypassAfterSendingStartOk() throws Exception
{
- InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ assumeThat(getBrokerAdmin().isSASLMechanismSupported(ConnectionInteraction.SASL_MECHANISM_PLAIN), is(equalTo(true)));
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse()
@@ -149,13 +137,10 @@
description = "[...] the minimum negotiated value for max-frame-size is also MIN-MAX-FRAME-SIZE [4096]")
public void tooSmallFrameSize() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
- .consumeResponse(ConnectionStart.class)
- .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
- .consumeResponse().getLatestResponse(ConnectionTune.class);
+ ConnectionTune response = interaction.authenticateConnection().getLatestResponse(ConnectionTune.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkMaxFrameSize(1024)
@@ -172,13 +157,10 @@
+ " The server may report the error in some fashion to assist implementers.")
public void tooLargeFrameSize() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
- .consumeResponse(ConnectionStart.class)
- .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
- .consumeResponse().getLatestResponse(ConnectionTune.class);
+ ConnectionTune response = interaction.authenticateConnection().getLatestResponse(ConnectionTune.class);
assumeThat(response.hasMaxFrameSize(), is(true));
assumeThat(response.getMaxFrameSize(), is(lessThan(0xFFFF)));
@@ -196,13 +178,10 @@
+ "is idle.")
public void heartbeating() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
- .consumeResponse(ConnectionStart.class)
- .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
- .consumeResponse().getLatestResponse(ConnectionTune.class);
+ ConnectionTune response = interaction.authenticateConnection().getLatestResponse(ConnectionTune.class);
assumeThat(response.hasHeartbeatMin(), is(true));
assumeThat(response.hasHeartbeatMax(), is(true));
@@ -241,13 +220,10 @@
+ "be considered disconnected.")
public void heartbeatingIncomingIdle() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
- .consumeResponse(ConnectionStart.class)
- .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
- .consumeResponse().getLatestResponse(ConnectionTune.class);
+ ConnectionTune response = interaction.authenticateConnection().getLatestResponse(ConnectionTune.class);
assumeThat(response.hasHeartbeatMin(), is(true));
assumeThat(response.hasHeartbeatMax(), is(true));
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ExchangeTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ExchangeTest.java
index b0d9639..51f8865 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ExchangeTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ExchangeTest.java
@@ -28,11 +28,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
import org.hamcrest.Matchers;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -50,23 +48,16 @@
public class ExchangeTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
-
@Test
@SpecificationTest(section = "10.exchange.declare", description = "verify exchange exists, create if needed.")
public void exchangeDeclare() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- SessionCompleted completed = interaction.openAnonymousConnection()
+ SessionCompleted completed = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -90,10 +81,10 @@
+ " message will be sent.")
public void exchangeDeclareWithAlternateExchange() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -115,10 +106,10 @@
+ "then an exception must be raised.")
public void exchangeDeclareAlternateExchangeNotFound() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -145,10 +136,10 @@
public void exchangeDeclareDurable() throws Exception
{
String exchangeName = "myexch";
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -166,10 +157,10 @@
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
getBrokerAdmin().restart();
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -190,10 +181,10 @@
public void exchangeDelete() throws Exception
{
String exchangeName = "myexch";
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -237,10 +228,10 @@
{
String exchangeName1 = "myexch1";
String exchangeName2 = "myexch2";
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -274,10 +265,10 @@
public void exchangeQuery() throws Exception
{
String exchangeName = "myexch";
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionResult result = interaction.openAnonymousConnection()
+ ExecutionResult result = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -307,10 +298,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -355,10 +346,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -388,10 +379,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -419,10 +410,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -449,10 +440,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -482,10 +473,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -542,10 +533,10 @@
{
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -571,10 +562,10 @@
getBrokerAdmin().restart();
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionResult execResult = interaction.openAnonymousConnection()
+ ExecutionResult execResult = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -605,10 +596,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -660,10 +651,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- final ExecutionException response = interaction.openAnonymousConnection()
+ final ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -699,10 +690,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- final ExecutionException response = interaction.openAnonymousConnection()
+ final ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -739,10 +730,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- final ExecutionException response = interaction.openAnonymousConnection()
+ final ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
index 51a376a..86f0691 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeApplicationHeadersTest.java
@@ -26,7 +26,6 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -34,38 +33,37 @@
import org.junit.Before;
import org.junit.Test;
-import org.apache.qpid.server.protocol.v0_10.transport.*;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class LargeApplicationHeadersTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void applicationHeadersSentOverManyFrames() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
final String subscriberName = "testSubscriber";
byte[] sessionName = "test".getBytes(UTF_8);
- ConnectionTune tune = interaction.negotiateProtocol()
- .consumeResponse()
- .consumeResponse(ConnectionStart.class)
- .connection()
- .startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)
- .startOk()
- .consumeResponse()
- .getLatestResponse(ConnectionTune.class);
+ final ConnectionTune tune = interaction.authenticateConnection().getLatestResponse(ConnectionTune.class);
int headerPropertySize = ((1<<16) - 1);
Map<String, Object> applicationHeaders = createApplicationHeadersThatExceedSingleFrame(headerPropertySize,
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
index 6d829d5..396966e 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/LargeMessageBodyTest.java
@@ -25,7 +25,6 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.stream.IntStream;
import org.junit.Before;
@@ -46,32 +45,23 @@
public class LargeMessageBodyTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void messageBodyOverManyFrames() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
final String subscriberName = "testSubscriber";
byte[] sessionName = "test".getBytes(UTF_8);
- ConnectionTune tune = interaction.negotiateProtocol()
- .consumeResponse()
- .consumeResponse(ConnectionStart.class)
- .connection()
- .startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)
- .startOk()
- .consumeResponse()
- .getLatestResponse(ConnectionTune.class);
+ final ConnectionTune tune = interaction.authenticateConnection().getLatestResponse(ConnectionTune.class);
final byte[] messageContent = new byte[tune.getMaxFrameSize() * 2];
IntStream.range(0, messageContent.length).forEach(i -> {messageContent[i] = (byte) (i & 0xFF);});
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 08baf22..50d7423 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -26,8 +26,6 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
-
import org.junit.Before;
import org.junit.Test;
@@ -50,12 +48,10 @@
public class MessageTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -64,11 +60,11 @@
description = "This command transfers a message between two peers.")
public void sendTransfer() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(UTF_8);
- SessionCompleted completed = interaction.openAnonymousConnection()
+ SessionCompleted completed = interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.message()
@@ -93,12 +89,12 @@
+ " which is a request for messages from a specific queue.")
public void subscribe() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "testSession".getBytes(UTF_8);
final String subscriberName = "testSubscriber";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.message()
@@ -127,12 +123,12 @@
{
String testMessageBody = "testMessage";
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "testSession".getBytes(UTF_8);
final String subscriberName = "testSubscriber";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.message()
@@ -174,12 +170,12 @@
{
String testMessageBody = "testMessage";
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "testSession".getBytes(UTF_8);
final String subscriberName = "testSubscriber";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.message()
@@ -235,12 +231,12 @@
{
String testMessageBody = "testMessage";
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "testSession".getBytes(UTF_8);
final String subscriberName = "testSubscriber";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.message()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
index 35675e1..c1ac532 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ProtocolTest.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.tests.protocol.v0_10;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assume.assumeThat;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import org.hamcrest.core.IsEqual;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
@@ -46,13 +46,6 @@
public class ProtocolTest extends BrokerAdminUsingTestBase
{
private static final String DEFAULT_LOCALE = "en_US";
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "4.3. Version Negotiation",
@@ -62,7 +55,8 @@
+ " header with the requested version to the socket, and then implement the protocol accordingly")
public void versionNegotiation() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ assumeThat(getBrokerAdmin().isAnonymousSupported(), is(equalTo(true)));
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.ANONYMOUS_AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse();
@@ -83,7 +77,7 @@
+ "header with a supported protocol version and then close the socket.")
public void unrecognisedProtocolHeader() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -104,7 +98,7 @@
+ "header with a supported protocol version and then close the socket.")
public void unrecognisedProtocolVersion() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
@@ -124,7 +118,7 @@
@SpecificationTest(section = "8. Domains", description = "valid values for the frame type indicator.")
public void invalidSegmentType() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java
index 32ac9b9..09f23d8 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java
@@ -22,15 +22,12 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
-
import org.hamcrest.Matchers;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
@@ -47,23 +44,16 @@
public class QueueTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
-
@Test
@SpecificationTest(section = "10.queue.declare", description = "This command creates or checks a queue.")
public void queueDeclare() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- SessionCompleted completed = interaction.openAnonymousConnection()
+ SessionCompleted completed = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -87,10 +77,10 @@
+ "they are rejected by a subscriber, or when they are orphaned by queue deletion.")
public void queueDeclareWithAlternateExchange() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -111,10 +101,10 @@
+ "then an exception must be raised.")
public void queueDeclareAlternateExchangeNotFound() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -140,10 +130,10 @@
public void queueDeclarePassive() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -164,10 +154,10 @@
description = "[...] If the queue does not exist, the server treats this as a failure.")
public void queueDeclarePassiveQueueNotFound() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -192,10 +182,10 @@
+ "remain active when a server restarts.")
public void queueDeclareDurable() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -212,10 +202,10 @@
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
getBrokerAdmin().restart();
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -237,10 +227,10 @@
+ "declared as exclusive by an existing client session, it MUST raise an exception.")
public void queueDeclareAttemptedConsumeOfExclusivelyDeclaredQueue() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -253,10 +243,10 @@
.flush()
.consumeResponse(SessionCompleted.class);
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
- ExecutionException response = interaction2.openAnonymousConnection()
+ ExecutionException response = interaction2.negotiateOpen()
.channelId(1)
.attachSession("test2".getBytes(UTF_8))
.message()
@@ -275,10 +265,10 @@
}
}
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
- interaction2.openAnonymousConnection()
+ interaction2.negotiateOpen()
.channelId(1)
.attachSession("test2".getBytes(UTF_8))
.message()
@@ -299,10 +289,10 @@
+ "declared as exclusive by an existing client session, it MUST raise an exception.")
public void queueDeclareRedeclareOfExclusivelyDeclaredQueue() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -315,10 +305,10 @@
.flush()
.consumeResponse(SessionCompleted.class);
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
- ExecutionException response = interaction2.openAnonymousConnection()
+ ExecutionException response = interaction2.negotiateOpen()
.channelId(1)
.attachSession("test2".getBytes(UTF_8))
.queue()
@@ -337,10 +327,10 @@
}
}
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
- interaction2.openAnonymousConnection()
+ interaction2.negotiateOpen()
.channelId(1)
.attachSession("test2".getBytes(UTF_8))
.queue()
@@ -361,10 +351,10 @@
+ "MUST be deleted when the session closes.")
public void queueDeclareAutoDeleteAndExclusiveDeletedBySessionDetach() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -407,10 +397,10 @@
+ "because its session is closed.")
public void queueDeclareAutoDeleteDeletedByLastConsumerCancelled() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -424,11 +414,11 @@
.consumeResponse(SessionCompleted.class);
}
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
String subscriberName = "mysub";
- interaction2.openAnonymousConnection()
+ interaction2.negotiateOpen()
.channelId(1)
.attachSession("test2".getBytes(UTF_8))
.queue()
@@ -476,10 +466,10 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -513,10 +503,10 @@
+ "MUST raise an exception.")
public void queueDeleteQueueNotFound() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -541,12 +531,12 @@
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try (FrameTransport consumerTransport = new FrameTransport(_brokerAddress).connect();
- FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport consumerTransport = new FrameTransport(getBrokerAdmin()).connect();
+ FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction consumerInteraction = consumerTransport.newInteraction();
String subscriberName = "mysub";
- consumerInteraction.openAnonymousConnection()
+ consumerInteraction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.message()
@@ -560,7 +550,7 @@
.consumeResponse(SessionCompleted.class);
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession("test2".getBytes(UTF_8))
.queue()
@@ -605,10 +595,10 @@
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionResult result = interaction.openAnonymousConnection()
+ ExecutionResult result = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -651,10 +641,10 @@
+ "MUST raise an exception.")
public void queuePurgeQueueNotFound() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionException response = interaction.openAnonymousConnection()
+ ExecutionException response = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -680,10 +670,10 @@
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExecutionResult result = interaction.openAnonymousConnection()
+ ExecutionResult result = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
index a5cbbb1..d4c412a 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
@@ -22,42 +22,31 @@
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import org.hamcrest.core.IsEqual;
-import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
import org.apache.qpid.server.protocol.v0_10.transport.SessionDetachCode;
import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class SessionTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "9.session.attach",
description = "Requests that the current transport be attached to the named session.")
public void attach() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
final int channelId = 1;
- SessionAttached sessionAttached = interaction.openAnonymousConnection()
+ SessionAttached sessionAttached = interaction.negotiateOpen()
.channelId(channelId)
.session()
.attachName(sessionName)
@@ -75,12 +64,12 @@
description = "Detaches the current transport from the named session.")
public void detach() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
final int channelId = 1;
- SessionDetached sessionDetached = interaction.openAnonymousConnection()
+ SessionDetached sessionDetached = interaction.negotiateOpen()
.channelId(channelId)
.session()
.attachName(sessionName)
@@ -105,12 +94,12 @@
+ " session.detached with the \"not-attached\" reason code.")
public void detachUnknownSession() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
final int channelId = 1;
- SessionDetached sessionDetached = interaction.openAnonymousConnection()
+ SessionDetached sessionDetached = interaction.negotiateOpen()
.channelId(channelId)
.session()
.detachName(sessionName)
@@ -128,12 +117,12 @@
description = "A session MUST NOT be attached to more than one transport at a time.")
public void attachSameSessionTwiceDisallowed() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport1 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction1 = transport1.newInteraction();
byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
final int channelId1 = 1;
- SessionAttached sessionAttached = interaction1.openAnonymousConnection()
+ SessionAttached sessionAttached = interaction1.negotiateOpen()
.channelId(channelId1)
.session()
.attachName(sessionName)
@@ -145,11 +134,11 @@
assertThat(sessionAttached.getChannel(), IsEqual.equalTo(channelId1));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
final int channelId2 = 2;
- SessionDetached sessionDetached = interaction2.openAnonymousConnection()
+ SessionDetached sessionDetached = interaction2.negotiateOpen()
.channelId(channelId2)
.session()
.attachName(sessionName)
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
index 443aede..ae376c0 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -25,8 +25,6 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
-
import org.junit.Before;
import org.junit.Test;
@@ -37,12 +35,10 @@
public class TransactionTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -51,11 +47,11 @@
description = "This command commits all messages published and accepted in the current transaction.")
public void messageSendCommit() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(UTF_8);
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.tx().selectId(0).select()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/authtimeout/AuthenticationTimeoutTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/authtimeout/AuthenticationTimeoutTest.java
index cc9d8d6..3353f14 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/authtimeout/AuthenticationTimeoutTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/authtimeout/AuthenticationTimeoutTest.java
@@ -19,15 +19,17 @@
*/
package org.apache.qpid.tests.protocol.v0_10.extensions.authtimeout;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
-
-import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.tests.protocol.v0_10.ConnectionInteraction;
import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
import org.apache.qpid.tests.protocol.v0_10.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -40,15 +42,16 @@
@Test
public void authenticationTimeout() throws Exception
{
- InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ assumeThat(getBrokerAdmin().isSASLMechanismSupported(ConnectionInteraction.SASL_MECHANISM_PLAIN),
+ is(equalTo(true)));
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
final ConnectionStart start = interaction.negotiateProtocol()
- .consumeResponse()
- .consumeResponse()
- .getLatestResponse(ConnectionStart.class);
- assertThat(start.getMechanisms(), CoreMatchers.hasItem("PLAIN"));
+ .consumeResponse()
+ .consumeResponse()
+ .getLatestResponse(ConnectionStart.class);
+ assertThat(start.getMechanisms(), hasItem(ConnectionInteraction.SASL_MECHANISM_PLAIN));
transport.assertNoMoreResponsesAndChannelClosed();
}
}
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
index a7a67d1..2331619 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/exchange/ExchangeTest.java
@@ -26,10 +26,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.exchange.ExchangeDefaults;
@@ -39,29 +37,21 @@
import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
import org.apache.qpid.tests.protocol.v0_10.Interaction;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.BrokerSpecific;
@BrokerSpecific(kind = KIND_BROKER_J)
public class ExchangeTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
-
@Test
public void exchangeDeclareValidWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- SessionCompleted completed = interaction.openAnonymousConnection()
+ SessionCompleted completed = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
@@ -83,10 +73,10 @@
@Test
public void exchangeDeclareInvalidWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.exchange()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/maxsize/MaximumMessageSizeTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/maxsize/MaximumMessageSizeTest.java
index 546c40e..517cfb1 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/maxsize/MaximumMessageSizeTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/maxsize/MaximumMessageSizeTest.java
@@ -24,7 +24,6 @@
import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.stream.IntStream;
import org.hamcrest.core.IsEqual;
@@ -46,19 +45,17 @@
@ConfigItem(name = "qpid.max_message_size", value = "1000")
public class MaximumMessageSizeTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void limitExceeded() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(UTF_8);
@@ -69,7 +66,7 @@
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentLength(messageContent.length);
- ExecutionException executionException = interaction.openAnonymousConnection()
+ ExecutionException executionException = interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.message()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
index 19592de..ba166d5 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
@@ -23,7 +23,6 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -57,20 +56,18 @@
@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
public class MalformedMessage extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final String CONTENT_TEXT = "Test";
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void malformedMessage() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(UTF_8);
@@ -102,7 +99,7 @@
}
};
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.sendPerformativeWithoutCopying(malformedTransfer)
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
index 4365812..8335e21 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/queue/QueueTest.java
@@ -26,11 +26,9 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -51,22 +49,15 @@
@BrokerSpecific(kind = KIND_BROKER_J)
public class QueueTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
-
@Test
public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- SessionCompleted completed = interaction.openAnonymousConnection()
+ SessionCompleted completed = interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
@@ -149,10 +140,10 @@
@Test
public void queueDeclareInvalidWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(SESSION_NAME)
.queue()
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/transactiontimeout/TransactionTimeoutTest.java
index ab4ce40..565f3e8 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/transactiontimeout/TransactionTimeoutTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/transactiontimeout/TransactionTimeoutTest.java
@@ -27,8 +27,6 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
-
import org.junit.Before;
import org.junit.Test;
@@ -53,23 +51,21 @@
@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void publishTransactionTimeout() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "test".getBytes(UTF_8);
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.tx().selectId(0).select()
@@ -104,12 +100,12 @@
{
String testMessageBody = "testMessage";
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
byte[] sessionName = "testSession".getBytes(UTF_8);
final String subscriberName = "testSubscriber";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.attachSession(sessionName)
.tx().selectId(0).select()