blob: 5e073ab30325345b89243e70a0c0da38f1819369 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.protocol.v0_8;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.qpid.server.security.auth.manager.AbstractScramAuthenticationManager.PLAIN;
import java.util.Arrays;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.tests.protocol.AbstractInteraction;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
public class Interaction extends AbstractInteraction<Interaction>
{
private final BrokerAdmin _brokerAdmin;
private final BrokerAdmin.PortType _portType;
private byte[] _protocolHeader;
private int _channelId;
private int _maximumPayloadSize = 512;
private ConnectionInteraction _connectionInteraction;
private ChannelInteraction _channelInteraction;
private QueueInteraction _queueInteraction;
private BasicInteraction _basicInteraction;
private TxInteraction _txInteraction;
private ExchangeInteraction _exchangeInteraction;
Interaction(final FrameTransport transport, final BrokerAdmin brokerAdmin, BrokerAdmin.PortType portType)
{
super(transport);
_connectionInteraction = new ConnectionInteraction(this);
_channelInteraction = new ChannelInteraction(this);
_queueInteraction = new QueueInteraction(this);
_basicInteraction = new BasicInteraction(this);
_txInteraction = new TxInteraction(this);
_exchangeInteraction = new ExchangeInteraction(this);
_protocolHeader = getTransport().getProtocolHeader();
_brokerAdmin = brokerAdmin;
_portType = portType;
}
@Override
public Interaction protocolHeader(final byte[] header)
{
_protocolHeader = header;
return this;
}
@Override
protected byte[] getProtocolHeader()
{
return _protocolHeader;
}
public ProtocolVersion getProtocolVersion()
{
return ((FrameTransport) getTransport()).getProtocolVersion();
}
public Interaction sendPerformative(final AMQBody amqBody) throws Exception
{
return sendPerformative(_channelId, amqBody);
}
public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception
{
final AMQFrame frameBody = new AMQFrame(channel, amqBody);
sendPerformativeAndChainFuture(frameBody);
return this;
}
public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception
{
sendPerformativeAndChainFuture(dataBlock);
return this;
}
public Interaction negotiateOpen() throws Exception
{
authenticateConnection().connection().tuneOk()
.connection().open().consumeResponse(ConnectionOpenOkBody.class);
return this;
}
public Interaction authenticateConnection() throws Exception
{
if (_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS)
{
authenticateAnonymous();
}
else
{
final ConnectionStartBody start = negotiateProtocol().consumeResponse()
.getLatestResponse(ConnectionStartBody.class);
final String mechanisms = start.getMechanisms() == null ? "" : new String(start.getMechanisms(), US_ASCII);
final List<String> supportedMechanisms = Arrays.asList(mechanisms.split(" "));
if (supportedMechanisms.stream().noneMatch(m -> m.equalsIgnoreCase(PLAIN)))
{
if (supportedMechanisms.stream()
.noneMatch(m -> m.equalsIgnoreCase(AnonymousAuthenticationManager.MECHANISM_NAME)))
{
throw new IllegalStateException(String.format(
"PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", mechanisms));
}
else
{
authenticateAnonymous();
}
}
else
{
final byte[] initialResponse = String.format("\0%s\0%s",
_brokerAdmin.getValidUsername(),
_brokerAdmin.getValidPassword())
.getBytes(US_ASCII);
this.connection().startOkMechanism(PLAIN).startOk().consumeResponse(ConnectionSecureBody.class)
.connection().secureOk(initialResponse).consumeResponse(ConnectionTuneBody.class);
}
}
return this;
}
private void authenticateAnonymous() throws Exception
{
this.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
.connection().startOkMechanism(AnonymousAuthenticationManager.MECHANISM_NAME).startOk()
.consumeResponse(ConnectionTuneBody.class);
}
public ConnectionInteraction connection()
{
return _connectionInteraction;
}
public ChannelInteraction channel()
{
return _channelInteraction;
}
public QueueInteraction queue()
{
return _queueInteraction;
}
public int getChannelId()
{
return _channelId;
}
public Interaction channelId(final int channelId)
{
_channelId = channelId;
return this;
}
public int getMaximumFrameSize()
{
return _maximumPayloadSize;
}
public BasicInteraction basic()
{
return _basicInteraction;
}
public TxInteraction tx()
{
return _txInteraction;
}
public ExchangeInteraction exchange()
{
return _exchangeInteraction;
}
@SafeVarargs
public final <T extends AMQBody> T consume(final Class<T> expected,
final Class<? extends AMQBody>... ignore)
throws Exception
{
final Class<? extends AMQBody>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
expectedResponses[ignore.length] = expected;
T completed = null;
do
{
Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
if (expected.isAssignableFrom(response.getBody().getClass()))
{
completed = (T) response.getBody();
}
}
while (completed == null);
return completed;
}
public String getLatestResponseContentBodyAsString() throws Exception
{
ContentBody content = getLatestResponse(ContentBody.class);
QpidByteBuffer payload = content.getPayload();
byte[] contentData = new byte[payload.remaining()];
payload.get(contentData);
payload.dispose();
return new String(contentData, UTF_8);
}
}