blob: 12379ced02e9f2c66cad1fe7c6de78c5f8d08ffc [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v0_10;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Method;
import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
import org.apache.qpid.tests.protocol.AbstractFrameTransport;
import org.apache.qpid.tests.protocol.AbstractInteraction;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
public class Interaction extends AbstractInteraction<Interaction>
{
private final BrokerAdmin _brokerAdmin;
private final BrokerAdmin.PortType _portType;
private byte[] _protocolHeader;
private ConnectionInteraction _connectionInteraction;
private SessionInteraction _sessionInteraction;
private MessageInteraction _messageInteraction;
private ExecutionInteraction _executionInteraction;
private QueueInteraction _queueInteraction;
private ExchangeInteraction _exchangeInteraction;
private int _channelId;
private TxInteraction _txInteraction;
public Interaction(final AbstractFrameTransport frameTransport,
final BrokerAdmin brokerAdmin,
final BrokerAdmin.PortType portType)
{
super(frameTransport);
_connectionInteraction = new ConnectionInteraction(this);
_sessionInteraction = new SessionInteraction(this);
_messageInteraction = new MessageInteraction(this);
_executionInteraction = new ExecutionInteraction(this);
_txInteraction = new TxInteraction(this);
_queueInteraction = new QueueInteraction(this);
_exchangeInteraction = new ExchangeInteraction(this);
_protocolHeader = getTransport().getProtocolHeader();
_brokerAdmin = brokerAdmin;
_portType = portType;
}
@Override
public Interaction protocolHeader(final byte[] header)
{
_protocolHeader = header;
return this;
}
@Override
protected byte[] getProtocolHeader()
{
return _protocolHeader;
}
public <T extends Method> Interaction sendPerformative(final T performative) throws Exception
{
performative.setChannel(_channelId);
sendPerformativeAndChainFuture(copyPerformative(performative));
return this;
}
public <T extends Method> Interaction sendPerformativeWithoutCopying(final T performative) throws Exception
{
performative.setChannel(_channelId);
sendPerformativeAndChainFuture(performative);
return this;
}
public ConnectionInteraction connection()
{
return _connectionInteraction;
}
private <T extends Method> T copyPerformative(final T src)
{
T dst = (T) Method.create(src.getStructType());
final BBEncoder encoder = new BBEncoder(4096);
encoder.init();
src.write(encoder);
ByteBuffer buffer = encoder.buffer();
final BBDecoder decoder = new BBDecoder();
decoder.init(buffer);
dst.read(decoder);
dst.setChannel(src.getChannel());
if (src.getHeader() != null)
{
Header srcHeader = src.getHeader();
MessageProperties dstMessageProperties = null;
DeliveryProperties dstDeliveryProperties = null;
if (srcHeader.getMessageProperties() != null)
{
MessageProperties properties = srcHeader.getMessageProperties();
dstMessageProperties = new MessageProperties();
encoder.init();
properties.write(encoder);
decoder.init(encoder.buffer());
dstMessageProperties.read(decoder);
}
if (srcHeader.getDeliveryProperties() != null)
{
DeliveryProperties properties = srcHeader.getDeliveryProperties();
dstDeliveryProperties = new DeliveryProperties();
encoder.init();
properties.write(encoder);
decoder.init(encoder.buffer());
dstDeliveryProperties.read(decoder);
}
if (dstMessageProperties != null || dstDeliveryProperties != null)
{
dst.setHeader(new Header(dstDeliveryProperties, dstMessageProperties));
}
}
if (src.getBody() != null)
{
dst.setBody(src.getBody());
}
return dst;
}
public Interaction negotiateOpen() throws Exception
{
authenticateConnection().connection().tuneOk()
.connection().open()
.consumeResponse(ConnectionOpenOk.class);
return this;
}
public Interaction authenticateConnection() throws Exception
{
if (_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS)
{
openAnonymous();
}
else
{
final ConnectionStart start = this.negotiateProtocol().consumeResponse()
.consumeResponse().getLatestResponse(ConnectionStart.class);
final List<Object> supportedMechanisms =
start.getMechanisms() == null ? Collections.emptyList() : start.getMechanisms();
if (supportedMechanisms.stream().noneMatch(m -> String.valueOf(m).equalsIgnoreCase(ConnectionInteraction.SASL_MECHANISM_PLAIN)))
{
if (supportedMechanisms.stream()
.noneMatch(m -> String.valueOf(m).equalsIgnoreCase(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS)))
{
throw new IllegalStateException(String.format(
"PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", supportedMechanisms.stream().map(
String::valueOf).collect(
Collectors.joining(","))));
}
else
{
openAnonymous();
}
}
else
{
final byte[] initialResponse = String.format("\0%s\0%s",
_brokerAdmin.getValidUsername(),
_brokerAdmin.getValidPassword())
.getBytes(UTF_8);
this.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk()
.consumeResponse(ConnectionSecure.class)
.connection().secureOk(initialResponse).consumeResponse(ConnectionTune.class);
}
}
return this;
}
private void openAnonymous() throws Exception
{
this.negotiateProtocol().consumeResponse()
.consumeResponse(ConnectionStart.class)
.connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
.consumeResponse(ConnectionTune.class);
}
public SessionInteraction session()
{
return _sessionInteraction;
}
public int getChannelId()
{
return _channelId;
}
public Interaction channelId(final int channelId)
{
_channelId = channelId;
return this;
}
public Interaction attachSession(final byte[] sessionName) throws Exception
{
this.session()
.attachName(sessionName)
.attach()
.consumeResponse(SessionAttached.class)
.session().commandPointCommandId(0).commandPoint();
return this;
}
public MessageInteraction message()
{
return _messageInteraction;
}
public ExecutionInteraction execution()
{
return _executionInteraction;
}
public TxInteraction tx()
{
return _txInteraction;
}
public QueueInteraction queue()
{
return _queueInteraction;
}
public ExchangeInteraction exchange()
{
return _exchangeInteraction;
}
public <T extends Method> T consume(final Class<T> expected,
final Class<? extends Method>... ignore)
throws Exception
{
final Class<? extends Method>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
expectedResponses[ignore.length] = expected;
T completed = null;
do
{
Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
if (expected.isAssignableFrom(response.getBody().getClass()))
{
completed = (T) response.getBody();
}
}
while (completed == null);
return completed;
}
}