blob: 1d5626b02cefe20da3789f84081421031ea1e167 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v1_0.transport.connection;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import org.junit.jupiter.api.Test;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.EmptyResponse;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.BrokerSpecific;
public class OpenTest extends BrokerAdminUsingTestBase
{
@Test
@SpecificationTest(section = "1.3.4",
description = "Open without mandatory fields should result in a decoding error.")
public void emptyOpen() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
Open responseOpen = interaction.openContainerId(null)
.negotiateOpen()
.getLatestResponse(Open.class);
assertThat(responseOpen.getContainerId(), is(notNullValue()));
Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
// 2.7.9: If set, this field indicates that the connection is being closed due to an error condition.
// The value of the field SHOULD contain details on the cause of the error.
Error error = responseClose.getError();
if (error != null)
{
assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
}
}
}
@Test
@SpecificationTest(section = "2.4.1",
description = "Each AMQP connection begins with an exchange of capabilities and limitations, "
+ "including the maximum frame size.")
public void successfulOpen() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
Interaction interaction = transport.newInteraction();
final Open responseOpen = interaction.openContainerId("testContainerId")
.negotiateOpen()
.getLatestResponse(Open.class);
assertThat(responseOpen.getContainerId(), is(notNullValue()));
assertThat(responseOpen.getMaxFrameSize(),
is(anyOf(nullValue(),
both(greaterThan(UnsignedInteger.ZERO)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE)))));
assertThat(responseOpen.getChannelMax(),
is(anyOf(nullValue(),
both(greaterThanOrEqualTo(UnsignedShort.ZERO)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE)))));
interaction.doCloseConnection();
}
}
@Test
@SpecificationTest(section = "2.4.5",
description = "Implementations MUST be prepared to handle empty frames arriving on any valid channel")
public void emptyFrame() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
Interaction interaction = transport.newInteraction();
interaction.openContainerId("testContainerId")
.negotiateOpen()
.emptyFrame()
.doCloseConnection();
}
}
@Test
@SpecificationTest(section = "2.4.5",
description = "Connections are subject to an idle timeout threshold.")
public void idleTimeout() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
Interaction interaction = transport.newInteraction();
final int idleTimeOut = 1000;
Open responseOpen = interaction.openContainerId("testContainerId")
.openIdleTimeOut(idleTimeOut)
.negotiateOpen()
.getLatestResponse(Open.class);
final UnsignedInteger peerIdleTimeOut = responseOpen.getIdleTimeOut();
assertThat(peerIdleTimeOut, is(anyOf(nullValue(), greaterThanOrEqualTo(UnsignedInteger.ZERO))));
final int timeout = peerIdleTimeOut == null || peerIdleTimeOut.intValue() == 0
? idleTimeOut
: peerIdleTimeOut.intValue();
assumeTrue(lessThan(30000).matches(timeout));
Thread.sleep(timeout);
interaction.consumeResponse(EmptyResponse.class);
}
}
@Test
@SpecificationTest(section = "2.4.1",
description = "The open frame can only be sent on channel 0. ยง2.7.1: A peer that receives a channel number"
+ " outside the supported range MUST close the connection with the framing-error error-code.")
public void failOpenOnChannelNotZero() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
Open responseOpen = interaction.openContainerId("testContainerId")
.connectionChannel(UnsignedShort.valueOf((short) 1))
.negotiateOpen()
.getLatestResponse(Open.class);
assertThat(responseOpen.getContainerId(), is(notNullValue()));
Close responseClose = interaction.consumeResponse(Close.class).getLatestResponse(Close.class);
// 2.7.9: If set, this field indicates that the connection is being closed due to an error condition.
// The value of the field SHOULD contain details on the cause of the error.
Error error = responseClose.getError();
if (error != null)
{
assertThat(error.getCondition(), equalTo(ConnectionError.FRAMING_ERROR));
}
}
}
@Test
@SpecificationTest(section = "2.7.1", description = "The name of the host (either fully qualified or relative) to which the sending peer is connecting")
@BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
public void failOpenOnNonExistingHostname() throws Exception
{
try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
Open responseOpen = interaction.openContainerId("testContainerId")
.openHostname("non-existing-virtual-host-" + System.currentTimeMillis())
.negotiateOpen()
.getLatestResponse(Open.class);
assertThat(responseOpen.getContainerId(), is(notNullValue()));
Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
assertThat(responseClose.getError(), is(notNullValue()));
assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.NOT_FOUND));
}
}
}