| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.tests.protocol.v0_8; |
| |
| import static java.nio.charset.StandardCharsets.US_ASCII; |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.qpid.server.security.auth.manager.AbstractScramAuthenticationManager.PLAIN; |
| |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.qpid.server.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.protocol.ProtocolVersion; |
| import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; |
| import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; |
| import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; |
| import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody; |
| import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody; |
| import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody; |
| import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody; |
| import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; |
| import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager; |
| import org.apache.qpid.tests.protocol.AbstractInteraction; |
| import org.apache.qpid.tests.protocol.Response; |
| import org.apache.qpid.tests.utils.BrokerAdmin; |
| |
| public class Interaction extends AbstractInteraction<Interaction> |
| { |
| private final BrokerAdmin _brokerAdmin; |
| private final BrokerAdmin.PortType _portType; |
| private byte[] _protocolHeader; |
| private int _channelId; |
| private int _maximumPayloadSize = 512; |
| private ConnectionInteraction _connectionInteraction; |
| private ChannelInteraction _channelInteraction; |
| private QueueInteraction _queueInteraction; |
| private BasicInteraction _basicInteraction; |
| private TxInteraction _txInteraction; |
| private ExchangeInteraction _exchangeInteraction; |
| |
| Interaction(final FrameTransport transport, final BrokerAdmin brokerAdmin, BrokerAdmin.PortType portType) |
| { |
| super(transport); |
| _connectionInteraction = new ConnectionInteraction(this); |
| _channelInteraction = new ChannelInteraction(this); |
| _queueInteraction = new QueueInteraction(this); |
| _basicInteraction = new BasicInteraction(this); |
| _txInteraction = new TxInteraction(this); |
| _exchangeInteraction = new ExchangeInteraction(this); |
| _protocolHeader = getTransport().getProtocolHeader(); |
| _brokerAdmin = brokerAdmin; |
| _portType = portType; |
| } |
| |
| @Override |
| public Interaction protocolHeader(final byte[] header) |
| { |
| _protocolHeader = header; |
| return this; |
| } |
| |
| @Override |
| protected byte[] getProtocolHeader() |
| { |
| return _protocolHeader; |
| } |
| |
| public ProtocolVersion getProtocolVersion() |
| { |
| return ((FrameTransport) getTransport()).getProtocolVersion(); |
| } |
| |
| public Interaction sendPerformative(final AMQBody amqBody) throws Exception |
| { |
| return sendPerformative(_channelId, amqBody); |
| } |
| |
| public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception |
| { |
| final AMQFrame frameBody = new AMQFrame(channel, amqBody); |
| sendPerformativeAndChainFuture(frameBody); |
| return this; |
| } |
| |
| public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception |
| { |
| sendPerformativeAndChainFuture(dataBlock); |
| return this; |
| } |
| |
| public Interaction negotiateOpen() throws Exception |
| { |
| authenticateConnection().connection().tuneOk() |
| .connection().open().consumeResponse(ConnectionOpenOkBody.class); |
| return this; |
| } |
| |
| public Interaction authenticateConnection() throws Exception |
| { |
| if (_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS) |
| { |
| authenticateAnonymous(); |
| } |
| else |
| { |
| final ConnectionStartBody start = negotiateProtocol().consumeResponse() |
| .getLatestResponse(ConnectionStartBody.class); |
| final String mechanisms = start.getMechanisms() == null ? "" : new String(start.getMechanisms(), US_ASCII); |
| final List<String> supportedMechanisms = Arrays.asList(mechanisms.split(" ")); |
| |
| if (supportedMechanisms.stream().noneMatch(m -> m.equalsIgnoreCase(PLAIN))) |
| { |
| if (supportedMechanisms.stream() |
| .noneMatch(m -> m.equalsIgnoreCase(AnonymousAuthenticationManager.MECHANISM_NAME))) |
| { |
| throw new IllegalStateException(String.format( |
| "PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", mechanisms)); |
| } |
| else |
| { |
| authenticateAnonymous(); |
| } |
| } |
| else |
| { |
| final byte[] initialResponse = String.format("\0%s\0%s", |
| _brokerAdmin.getValidUsername(), |
| _brokerAdmin.getValidPassword()) |
| .getBytes(US_ASCII); |
| this.connection().startOkMechanism(PLAIN).startOk().consumeResponse(ConnectionSecureBody.class) |
| .connection().secureOk(initialResponse).consumeResponse(ConnectionTuneBody.class); |
| } |
| } |
| return this; |
| } |
| |
| private void authenticateAnonymous() throws Exception |
| { |
| this.negotiateProtocol().consumeResponse(ConnectionStartBody.class) |
| .connection().startOkMechanism(AnonymousAuthenticationManager.MECHANISM_NAME).startOk() |
| .consumeResponse(ConnectionTuneBody.class); |
| |
| } |
| |
| public ConnectionInteraction connection() |
| { |
| return _connectionInteraction; |
| } |
| |
| public ChannelInteraction channel() |
| { |
| return _channelInteraction; |
| } |
| |
| public QueueInteraction queue() |
| { |
| return _queueInteraction; |
| } |
| |
| public int getChannelId() |
| { |
| return _channelId; |
| } |
| |
| public Interaction channelId(final int channelId) |
| { |
| _channelId = channelId; |
| return this; |
| } |
| |
| public int getMaximumFrameSize() |
| { |
| return _maximumPayloadSize; |
| } |
| |
| public BasicInteraction basic() |
| { |
| return _basicInteraction; |
| } |
| |
| public TxInteraction tx() |
| { |
| return _txInteraction; |
| } |
| |
| public ExchangeInteraction exchange() |
| { |
| return _exchangeInteraction; |
| } |
| |
| @SafeVarargs |
| public final <T extends AMQBody> T consume(final Class<T> expected, |
| final Class<? extends AMQBody>... ignore) |
| throws Exception |
| { |
| final Class<? extends AMQBody>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1); |
| expectedResponses[ignore.length] = expected; |
| |
| T completed = null; |
| do |
| { |
| Response<?> response = consumeResponse(expectedResponses).getLatestResponse(); |
| if (expected.isAssignableFrom(response.getBody().getClass())) |
| { |
| completed = (T) response.getBody(); |
| } |
| } |
| while (completed == null); |
| return completed; |
| } |
| |
| public String getLatestResponseContentBodyAsString() throws Exception |
| { |
| ContentBody content = getLatestResponse(ContentBody.class); |
| QpidByteBuffer payload = content.getPayload(); |
| byte[] contentData = new byte[payload.remaining()]; |
| payload.get(contentData); |
| payload.dispose(); |
| return new String(contentData, UTF_8); |
| } |
| } |