| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.transport; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.server.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody; |
| import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation; |
| import org.apache.qpid.server.protocol.ProtocolVersion; |
| import org.apache.qpid.server.model.Broker; |
| import org.apache.qpid.server.model.Port; |
| import org.apache.qpid.server.model.Protocol; |
| import org.apache.qpid.server.model.port.AmqpPort; |
| import org.apache.qpid.server.protocol.v0_10.ServerDisassembler; |
| import org.apache.qpid.server.util.SystemUtils; |
| import org.apache.qpid.test.utils.QpidBrokerTestCase; |
| import org.apache.qpid.test.utils.TestBrokerConfiguration; |
| import org.apache.qpid.server.protocol.v0_10.transport.Frame; |
| import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat; |
| import org.apache.qpid.server.transport.ByteBufferSender; |
| |
| public class ProtocolNegotiationTest extends QpidBrokerTestCase |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolNegotiationTest.class); |
| private static final int SO_TIMEOUT = 5000; |
| public static final int AMQP_HEADER_LEN = 8; |
| private Protocol _expectedProtocolInit; |
| |
| private static final Map<Protocol, List<List<Byte>>> HEADERS = new HashMap<>(); |
| |
| static |
| { |
| HEADERS.put(Protocol.AMQP_0_8, Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)8, (byte)0))); |
| HEADERS.put(Protocol.AMQP_0_9, Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)0, (byte)9))); |
| HEADERS.put(Protocol.AMQP_0_9_1, Collections.singletonList(Arrays.asList((byte)0, (byte)0, (byte)9, (byte)1))); |
| HEADERS.put(Protocol.AMQP_0_10, Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)0, (byte)10))); |
| HEADERS.put(Protocol.AMQP_1_0, Arrays.asList(Arrays.asList((byte)3, (byte)1, (byte)0, (byte)0), |
| Arrays.asList((byte)0, (byte)1, (byte)0, (byte)0))); |
| |
| } |
| |
| @Override |
| public void setUp() throws Exception |
| { |
| // restrict broker to support only single protocol |
| TestBrokerConfiguration config = getDefaultBrokerConfiguration(); |
| config.setObjectAttribute(Port.class, |
| TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT, |
| Port.PROTOCOLS, |
| Arrays.asList(getBrokerProtocol())); |
| Map<String,String> overriddenPortContext = new HashMap<>(); |
| overriddenPortContext.put(AmqpPort.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY, null); |
| overriddenPortContext.put(AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT, String.valueOf(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT)); |
| config.setObjectAttribute(Port.class, |
| TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT, |
| Port.CONTEXT, |
| overriddenPortContext); |
| config.setBrokerAttribute(Broker.CONTEXT, |
| Collections.singletonMap(AmqpPort.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY, null)); |
| |
| super.setUp(); |
| _expectedProtocolInit = getBrokerProtocol(); |
| } |
| |
| public void testWrongProtocolHeaderSent_BrokerRespondsWithSupportedProtocol() throws Exception |
| { |
| try(Socket socket = new Socket()) |
| { |
| socket.setSoTimeout(SO_TIMEOUT); |
| |
| final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort()); |
| LOGGER.debug("Making connection to {}", inetSocketAddress); |
| |
| socket.connect(inetSocketAddress); |
| |
| assertTrue("Expected socket to be connected", socket.isConnected()); |
| |
| socket.getOutputStream().write("NOTANAMQPHEADER".getBytes()); |
| byte[] receivedHeader = new byte[AMQP_HEADER_LEN]; |
| int len = socket.getInputStream().read(receivedHeader); |
| assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len); |
| assertEquals("Expected end-of-stream from socket signifying socket closed)", |
| -1, |
| socket.getInputStream().read()); |
| |
| ProtocolInitiation protocolInitiation = new ProtocolInitiation(QpidByteBuffer.wrap(receivedHeader)); |
| |
| assertTrue("Unexpected protocol initialisation", matchedExpectedVersion(receivedHeader)); |
| } |
| } |
| |
| private boolean matchedExpectedVersion(byte[] header) |
| { |
| if(header[0] != 'A' || header[1] != 'M' || header[2] != 'Q' || header[3] != 'P') |
| { |
| return false; |
| } |
| List<Byte> version = new ArrayList<>(); |
| for(int i = 4; i<8; i++) |
| { |
| version.add(header[i]); |
| } |
| return HEADERS.get(_expectedProtocolInit).contains(version); |
| } |
| |
| public void testNoProtocolHeaderSent_BrokerClosesConnection() throws Exception |
| { |
| try(Socket socket = new Socket()) |
| { |
| socket.setSoTimeout(SO_TIMEOUT); |
| |
| final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort()); |
| LOGGER.debug("Making connection to {}", inetSocketAddress); |
| |
| socket.connect(inetSocketAddress); |
| |
| assertTrue("Expected socket to be connected", socket.isConnected()); |
| |
| int c = 0; |
| try |
| { |
| c = socket.getInputStream().read(); |
| LOGGER.debug("Read {}", c); |
| |
| } |
| catch(SocketTimeoutException ste) |
| { |
| fail("Broker did not close connection with no activity within expected timeout"); |
| } |
| |
| assertEquals("Expected end-of-stream from socket signifying socket closed)", -1, c); |
| } |
| } |
| |
| public void testNoConnectionOpenSent_BrokerClosesConnection() throws Exception |
| { |
| setSystemProperty(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY, "1000"); |
| |
| try(Socket socket = new Socket()) |
| { |
| socket.setSoTimeout(5000); |
| |
| byte[] header = getHeaderBytesForBrokerVersion(); |
| |
| final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort()); |
| LOGGER.debug("Making connection to {}", inetSocketAddress); |
| |
| socket.connect(inetSocketAddress); |
| |
| assertTrue("Expected socket to be connected", socket.isConnected()); |
| |
| OutputStream outputStream = socket.getOutputStream(); |
| final TestSender sender = new TestSender(outputStream); |
| final InputStream inputStream = socket.getInputStream(); |
| |
| // write header |
| sender.send(QpidByteBuffer.wrap(header)); |
| sender.flush(); |
| |
| // reader header |
| byte[] receivedHeader = new byte[AMQP_HEADER_LEN]; |
| int len = inputStream.read(receivedHeader); |
| assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len); |
| |
| // read the server start / sasl mechanisms |
| inputStream.read(new byte[1024]); |
| |
| // Send heartbeat frames to simulate a client that, although active, fails to |
| // authenticate within the allowed period |
| |
| long timeout = System.currentTimeMillis() + 3000; |
| boolean brokenPipe = false; |
| while (timeout > System.currentTimeMillis()) |
| { |
| if (!writeHeartbeat(sender)) |
| { |
| brokenPipe = true; |
| break; |
| } |
| Thread.sleep(100); |
| } |
| // If AMQP 1.0 we won't have sent anything (heartbeats not valid until after SASL) |
| // Windows also seems to allow writes to a socket which has actually been closed. |
| if(!brokenPipe && (SystemUtils.isWindows() || isBroker10())) |
| { |
| final int read = inputStream.read(new byte[10]); |
| brokenPipe = -1 == read; |
| } |
| assertTrue("Expected pipe to become broken within " |
| + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + " timeout", brokenPipe); |
| } |
| } |
| |
| private byte[] getHeaderBytesForBrokerVersion() |
| { |
| byte[] header = new byte[8]; |
| header[0] = 'A'; |
| header[1] = 'M'; |
| header[2] = 'Q'; |
| header[3] = 'P'; |
| List<Byte> version = HEADERS.get(getBrokerProtocol()).iterator().next(); |
| int i = 4; |
| for(byte b : version) |
| { |
| header[i++] = b; |
| } |
| return header; |
| } |
| |
| public void testIllegalFrameSent_BrokerClosesConnection() throws Exception |
| { |
| try(Socket socket = new Socket()) |
| { |
| socket.setSoTimeout(5000); |
| |
| |
| final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort()); |
| LOGGER.debug("Making connection to {}", inetSocketAddress); |
| |
| socket.connect(inetSocketAddress); |
| |
| assertTrue("Expected socket to be connected", socket.isConnected()); |
| |
| final InputStream inputStream = socket.getInputStream(); |
| |
| // write header |
| TestSender sender = new TestSender(socket.getOutputStream()); |
| sender.send(QpidByteBuffer.wrap(getHeaderBytesForBrokerVersion())); |
| sender.flush(); |
| |
| // reader header |
| byte[] receivedHeader = new byte[AMQP_HEADER_LEN]; |
| int len = inputStream.read(receivedHeader); |
| assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len); |
| |
| sender.send(QpidByteBuffer.wrap("NOTANAMPQFRAME".getBytes())); |
| |
| } |
| } |
| |
| public void testProtocolNegotiationFromUnsupportedVersion() throws Exception |
| { |
| Protocol testProtocol = getBrokerProtocol(); |
| String testSupportedProtocols = System.getProperty("test.amqp_port_protocols"); |
| if (testSupportedProtocols!= null) |
| { |
| Set<Protocol> availableProtocols = new HashSet<>(); |
| List<Object> protocols = new ObjectMapper().readValue(testSupportedProtocols, List.class); |
| for (Object protocol : protocols) |
| { |
| availableProtocols.add(Protocol.valueOf(String.valueOf(protocol))); |
| } |
| availableProtocols.remove(testProtocol); |
| |
| for (Protocol protocol: availableProtocols) |
| { |
| String version = protocol.name().substring(5).replace('_', '-'); |
| LOGGER.debug("Negotiation version {} represented as {}", protocol.name(), version); |
| setTestSystemProperty(ClientProperties.AMQP_VERSION, version); |
| AMQConnection connection = (AMQConnection)getConnection(); |
| LOGGER.debug("Negotiated version {}", connection.getProtocolVersion()); |
| assertEquals("Unexpected version negotiated: " + connection.getProtocolVersion(), convertProtocolToProtocolVersion(_expectedProtocolInit).toString(), connection.getProtocolVersion().toString()); |
| connection.close(); |
| } |
| } |
| } |
| |
| private boolean writeHeartbeat(final TestSender sender) |
| throws IOException |
| { |
| if (isBroker010()) |
| { |
| ConnectionHeartbeat heartbeat = new ConnectionHeartbeat(); |
| ServerDisassembler serverDisassembler = new ServerDisassembler(sender, Frame.HEADER_SIZE + 1); |
| serverDisassembler.command(null, heartbeat); |
| serverDisassembler.closed(); |
| } |
| else if(isBrokerPre010()) |
| { |
| HeartbeatBody.FRAME.writePayload(sender); |
| } |
| |
| return sender.hasSuccess(); |
| } |
| |
| private ProtocolVersion convertProtocolToProtocolVersion(final Protocol p) |
| { |
| final ProtocolVersion protocolVersion; |
| switch(p) |
| { |
| case AMQP_1_0: |
| protocolVersion = null; |
| break; |
| case AMQP_0_10: |
| protocolVersion = ProtocolVersion.v0_10; |
| break; |
| case AMQP_0_9_1: |
| protocolVersion = ProtocolVersion.v0_91; |
| break; |
| case AMQP_0_9: |
| protocolVersion = ProtocolVersion.v0_9; |
| break; |
| case AMQP_0_8: |
| protocolVersion = ProtocolVersion.v0_8; |
| break; |
| default: |
| throw new IllegalArgumentException("Unexpected " + p.name()); |
| } |
| return protocolVersion; |
| } |
| |
| private static class TestSender implements ByteBufferSender |
| { |
| private final OutputStream _output; |
| private boolean _success = true; |
| |
| |
| private TestSender(final OutputStream output) |
| { |
| _output = output; |
| } |
| |
| @Override |
| public boolean isDirectBufferPreferred() |
| { |
| return false; |
| } |
| |
| @Override |
| public void send(final QpidByteBuffer msg) |
| { |
| byte[] data = new byte[msg.remaining()]; |
| msg.get(data); |
| try |
| { |
| _output.write(data); |
| } |
| catch (IOException e) |
| { |
| _success = false; |
| } |
| |
| } |
| |
| public boolean hasSuccess() |
| { |
| return _success; |
| } |
| |
| @Override |
| public void flush() |
| { |
| |
| } |
| |
| @Override |
| public void close() |
| { |
| |
| } |
| |
| } |
| |
| } |