blob: bdc68b1f2389207620b35f0c64150463e10a9614 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v0_8;
import static org.apache.qpid.tests.protocol.SaslUtils.generateCramMD5ClientResponse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody;
import org.apache.qpid.tests.protocol.ChannelClosedResponse;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class ConnectionTest extends BrokerAdminUsingTestBase
{
private static final String ANONYMOUS = "ANONYMOUS";
private static final String PLAIN = "PLAIN";
private static final String CRAM_MD5 = "CRAM-MD5";
@Test
@SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
public void connectionStart() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionStartBody response =
interaction.negotiateProtocol().consumeResponse().getLatestResponse(ConnectionStartBody.class);
assertThat(response.getVersionMajor(), is(equalTo((short)transport.getProtocolVersion().getMajorVersion())));
assertThat(response.getVersionMinor(), is(equalTo((short)transport.getProtocolVersion().getActualMinorVersion())));
}
}
@Test
@SpecificationTest(section = "1.4.2.2", description = "select security mechanism and locale")
public void connectionStartOk() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.authenticateConnection();
interaction.getLatestResponse(ConnectionTuneBody.class);
}
}
@Test
@SpecificationTest(section = "1.4.2.2",
description = "If the mechanism field does not contain one of the security mechanisms proposed by the "
+ "server in the Start method, the server MUST close the connection without sending any "
+ "further data.")
public void connectionStartOkUnsupportedMechanism() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
.consumeResponse(ConnectionStartBody.class)
.connection().startOkMechanism("NOT-A-MECHANISM")
.startOk();
final ConnectionCloseBody res = interaction.consumeResponse()
.getLatestResponse(ConnectionCloseBody.class);
assertThat(res.getReplyCode(), is(equalTo(ErrorCodes.CONNECTION_FORCED)));
}
}
@Test
@SpecificationTest(section = "1.4.2.6", description = "negotiate connection tuning parameters")
public void connectionTuneOkAndOpen() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(response.getFrameMax())
.tuneOkHeartbeat(response.getHeartbeat())
.tuneOk()
.connection().open()
.consumeResponse().getLatestResponse(ConnectionOpenOkBody.class);
}
}
@Test
@SpecificationTest(section = "1.4.2.3", description = "security challenge data")
public void connectionSecure() throws Exception
{
assumeTrue(getBrokerAdmin().isSASLMechanismSupported(PLAIN));
try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final byte[] initialResponse = String.format("\0%s\0%s",
getBrokerAdmin().getValidUsername(),
getBrokerAdmin().getValidPassword())
.getBytes(StandardCharsets.US_ASCII);
final Interaction interaction = transport.newInteraction();
final ConnectionStartBody start = interaction.negotiateProtocol()
.consumeResponse()
.getLatestResponse(ConnectionStartBody.class);
assertThat(Arrays.asList(new String(start.getMechanisms()).split(" ")), hasItem(PLAIN));
final ConnectionSecureBody secure = interaction.connection()
.startOkMechanism(PLAIN)
.startOk()
.consumeResponse().getLatestResponse(ConnectionSecureBody.class);
assertThat(secure.getChallenge(), is(anyOf(nullValue(), equalTo(new byte[0]))));
final ConnectionTuneBody tune = interaction.connection().secureOk(initialResponse)
.consumeResponse()
.getLatestResponse(ConnectionTuneBody.class);
interaction.connection()
.tuneOkChannelMax(tune.getChannelMax())
.tuneOkFrameMax(tune.getFrameMax())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionOpenOkBody.class);
}
}
@Test
@SpecificationTest(section = "1.4.2.3", description = "security challenge data")
public void connectionSecureWithChallengeResponse() throws Exception
{
assumeTrue(getBrokerAdmin().isSASLMechanismSupported(CRAM_MD5));
try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
final ConnectionStartBody start = interaction.negotiateProtocol()
.consumeResponse()
.getLatestResponse(ConnectionStartBody.class);
assertThat(Arrays.asList(new String(start.getMechanisms()).split(" ")), hasItem(CRAM_MD5));
final ConnectionSecureBody secure = interaction.connection()
.startOkMechanism(CRAM_MD5)
.startOk()
.consumeResponse()
.getLatestResponse(ConnectionSecureBody.class);
byte[] response = generateCramMD5ClientResponse(getBrokerAdmin().getValidUsername(),
getBrokerAdmin().getValidPassword(),
secure.getChallenge());
final ConnectionTuneBody tune = interaction.connection()
.secureOk(response)
.consumeResponse()
.getLatestResponse(ConnectionTuneBody.class);
interaction.connection()
.tuneOkChannelMax(tune.getChannelMax())
.tuneOkFrameMax(tune.getFrameMax())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionOpenOkBody.class);
}
}
@Test
@SpecificationTest(section = "1.4.2.3", description = "security challenge data")
public void connectionSecureUnsuccessfulAuthentication() throws Exception
{
assumeTrue(getBrokerAdmin().isSASLMechanismSupported(PLAIN));
try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final byte[] initialResponse = String.format("\0%s\0%s",
getBrokerAdmin().getValidUsername(),
"badpassword")
.getBytes(StandardCharsets.US_ASCII);
final Interaction interaction = transport.newInteraction();
final ConnectionStartBody start = interaction.negotiateProtocol()
.consumeResponse()
.getLatestResponse(ConnectionStartBody.class);
assertThat(Arrays.asList(new String(start.getMechanisms()).split(" ")), hasItem(PLAIN));
final ConnectionCloseBody close = interaction.connection()
.startOkMechanism(PLAIN)
.startOk()
.consumeResponse(ConnectionSecureBody.class)
.connection()
.secureOk(initialResponse)
.consumeResponse()
.getLatestResponse(ConnectionCloseBody.class);
assertThat(close.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
assertThat(String.valueOf(close.getReplyText()).toLowerCase(), containsString("authentication failed"));
}
}
@Test
@SpecificationTest(section = "1.4.2.6", description = "[...] the minimum negotiated value for frame-max is also"
+ " frame-min-size [4096].")
public void tooSmallFrameSize() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(1024)
.tuneOkHeartbeat(response.getHeartbeat())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
}
}
@Test
@SpecificationTest(section = "1.4.2.6.2.", description = "If the client specifies a frame max that is higher than"
+ " the value provided by the server, the server MUST"
+ " close the connection without attempting a negotiated"
+ " close. The server may report the error in some fashion"
+ " to assist implementors.")
public void tooLargeFrameSize() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(Long.MAX_VALUE)
.tuneOkHeartbeat(response.getHeartbeat())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
}
}
@Test
@SpecificationTest(section = "4.2.3",
description = "A peer MUST NOT send frames larger than the agreed-upon size. A peer that receives an "
+ "oversized frame MUST signal a connection exception with reply code 501 (frame error).")
public void overlySizedContentBodyFrame() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final long frameMax = response.getFrameMax();
// Older Qpid JMS Client 0-x had a defect that meant they could send content body frames that were too
// large. Rather then limiting the user content of each frame to frameSize - 8, it sent frameSize bytes
// of user content meaning the resultant frame was too big. The server accommodates this behaviour
// by reducing the frame-size advertised to the client.
final int overlyLargeFrameBodySize = (int) (frameMax + 1); // Should be frameMax - 8 + 1.
final byte[] bodyBytes = new byte[overlyLargeFrameBodySize];
interaction.connection()
.tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(frameMax)
.tuneOkHeartbeat(response.getHeartbeat())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionOpenOkBody.class)
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().publish()
.basic().contentHeader(bodyBytes.length)
.basic().contentBody(bodyBytes);
// Spec requires:
//assertThat(res.getReplyCode(), CoreMatchers.is(CoreMatchers.equalTo(ErrorCodes.COMMAND_INVALID)));
// Server actually abruptly closes the connection. We might see a graceful TCP/IP close or a broken pipe.
try
{
interaction.consumeResponse().getLatestResponse(ChannelClosedResponse.class);
}
catch (ExecutionException e)
{
Throwable original = e.getCause();
if (original instanceof IOException)
{
// PASS
}
else
{
throw new RuntimeException(original);
}
}
}
}
@Test
@SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK"
+ " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
public void authenticationBypassBySendingTuneOk() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
.connection().tuneOk()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
}
}
@Test
@SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK"
+ " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
public void authenticationBypassBySendingOpen() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
.connection().open()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
}
}
@Test
@SpecificationTest(section = "9",
description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
public void authenticationBypassAfterSendingStartOk() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
.consumeResponse(ConnectionStartBody.class)
.connection().startOkMechanism(PLAIN).startOk().consumeResponse(ConnectionSecureBody.class)
.connection().tuneOk()
.connection().open()
.consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
}
}
@Test
@SpecificationTest(section = "4.2.7",
description = "Heartbeat frames tell the recipient that the sender is still alive. The rate and timing of"
+ " heartbeat frames is negotiated during connection tuning.")
public void heartbeating() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final Long heartbeatPeriod = 1L;
interaction.connection()
.tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(response.getFrameMax())
.tuneOkHeartbeat(heartbeatPeriod.intValue())
.tuneOk();
final long startTime = System.currentTimeMillis();
interaction.connection().open()
.consumeResponse(ConnectionOpenOkBody.class)
.consumeResponse().getLatestResponse(HeartbeatBody.class);
final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
final long maximumExpectedHeartbeatDelay = heartbeatPeriod * 2 * 2; // Includes wiggle room to allow for slow boxes.
assertThat("Heartbeat not received within expected time frame",
actualHeartbeatDelay / 1000,
is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(maximumExpectedHeartbeatDelay))));
interaction.sendPerformative(new HeartbeatBody());
interaction.consumeResponse(HeartbeatBody.class)
.sendPerformative(new HeartbeatBody());
}
}
@Test
@SpecificationTest(section = "4.2.7", description = "Any sent octet is a valid substitute for a heartbeat")
public void heartbeatingIncomingTrafficIsNonHeartbeat() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final Long heartbeatPeriod = 1L;
interaction.connection()
.tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(response.getFrameMax())
.tuneOkHeartbeat(heartbeatPeriod.intValue())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionOpenOkBody.class)
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.consumeResponse(HeartbeatBody.class)
.exchange().declarePassive(true).declareNoWait(true).declare()
.consumeResponse(HeartbeatBody.class)
.sendPerformative(new HeartbeatBody())
.exchange().declarePassive(true).declareNoWait(true).declare();
interaction.connection()
.close()
.consumeResponse().getLatestResponse(ConnectionCloseOkBody.class);
}
}
@Test
@SpecificationTest(section = "4.2.7",
description = " If a peer detects no incoming traffic (i.e. received octets) for two heartbeat intervals "
+ "or longer, it should close the connection without following the Connection.Close/Close-Ok handshaking")
public void heartbeatingNoIncomingTraffic() throws Exception
{
try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final Long heartbeatPeriod = 1L;
interaction.connection()
.tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(response.getFrameMax())
.tuneOkHeartbeat(heartbeatPeriod.intValue())
.tuneOk()
.connection().open()
.consumeResponse(ConnectionOpenOkBody.class)
.consumeResponse(HeartbeatBody.class);
// Do not reflect a heartbeat so incoming line will be silent thus
// requiring the broker to close the connection.
Class[] classes = new Class[] {ChannelClosedResponse.class, HeartbeatBody.class};
do
{
Response<?> latestResponse = interaction.consumeResponse(classes)
.getLatestResponse();
if (latestResponse instanceof ChannelClosedResponse)
{
break;
}
classes = new Class[] {ChannelClosedResponse.class};
}
while (true);
}
}
@Test
@SpecificationTest(section = "1.4.2.7", description = "The client tried to work with an unknown virtual host.")
public void connectionOpenUnknownVirtualHost() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionTuneBody tune = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
ConnectionCloseBody close = interaction.connection()
.tuneOkChannelMax(tune.getChannelMax())
.tuneOkFrameMax(tune.getFrameMax())
.tuneOkHeartbeat(tune.getHeartbeat())
.tuneOk()
.connection()
.openVirtualHost("unknown-virtualhost")
.open()
.consumeResponse()
.getLatestResponse(ConnectionCloseBody.class);
// Spec requires INVALID_PATH, but implementation uses NOT_FOUND
assertThat(close.getReplyCode(), is(anyOf(equalTo(ErrorCodes.NOT_FOUND), equalTo(ErrorCodes.INVALID_PATH))));
assertThat(String.valueOf(close.getReplyText()).toLowerCase(), containsString("unknown virtual host"));
}
}
}