| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.transport; |
| |
| |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.security.cert.Certificate; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import javax.security.auth.Subject; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.logging.messages.ConnectionMessages; |
| import org.apache.qpid.server.logging.messages.PortMessages; |
| import org.apache.qpid.server.logging.subjects.PortLogSubject; |
| import org.apache.qpid.server.model.Broker; |
| import org.apache.qpid.server.model.Protocol; |
| import org.apache.qpid.server.model.Transport; |
| import org.apache.qpid.server.model.port.AmqpPort; |
| import org.apache.qpid.server.plugin.ProtocolEngineCreator; |
| import org.apache.qpid.server.security.ManagedPeerCertificateTrustStore; |
| import org.apache.qpid.server.util.Action; |
| import org.apache.qpid.transport.ByteBufferSender; |
| import org.apache.qpid.transport.network.Ticker; |
| |
| public class MultiVersionProtocolEngine implements ProtocolEngine |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(MultiVersionProtocolEngine.class); |
| |
| private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; |
| |
| private final long _id; |
| private final AmqpPort<?> _port; |
| private Transport _transport; |
| private final ProtocolEngineCreator[] _creators; |
| private final Runnable _onCloseTask; |
| |
| private Set<Protocol> _supported; |
| private String _fqdn; |
| private final Broker<?> _broker; |
| private ServerNetworkConnection _network; |
| private ByteBufferSender _sender; |
| private final Protocol _defaultSupportedReply; |
| |
| private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); |
| private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>(); |
| private final AggregateTicker _aggregateTicker = new AggregateTicker(); |
| |
| public MultiVersionProtocolEngine(final Broker<?> broker, |
| final Set<Protocol> supported, |
| Protocol defaultSupportedReply, |
| AmqpPort<?> port, |
| Transport transport, |
| final long id, |
| ProtocolEngineCreator[] creators, |
| final Runnable onCloseTask) |
| { |
| _id = id; |
| _broker = broker; |
| _supported = supported; |
| _defaultSupportedReply = defaultSupportedReply; |
| _port = port; |
| _transport = transport; |
| _creators = creators; |
| _onCloseTask = onCloseTask; |
| } |
| |
| @Override |
| public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers) |
| { |
| _delegate.setMessageAssignmentSuspended(value, notifyConsumers); |
| } |
| |
| @Override |
| public boolean isMessageAssignmentSuspended() |
| { |
| return _delegate.isMessageAssignmentSuspended(); |
| } |
| |
| public void closed() |
| { |
| _logger.debug("Closed"); |
| |
| try |
| { |
| _delegate.closed(); |
| } |
| finally |
| { |
| if(_onCloseTask != null) |
| { |
| _onCloseTask.run(); |
| } |
| } |
| } |
| |
| public void writerIdle() |
| { |
| _delegate.writerIdle(); |
| } |
| |
| public void readerIdle() |
| { |
| _delegate.readerIdle(); |
| } |
| |
| @Override |
| public void encryptedTransport() |
| { |
| _delegate.encryptedTransport(); |
| } |
| |
| |
| public void received(QpidByteBuffer msg) |
| { |
| _delegate.received(msg); |
| } |
| |
| @Override |
| public void setIOThread(final Thread ioThread) |
| { |
| _delegate.setIOThread(ioThread); |
| } |
| |
| public long getConnectionId() |
| { |
| return _id; |
| } |
| |
| @Override |
| public Subject getSubject() |
| { |
| return _delegate.getSubject(); |
| } |
| |
| @Override |
| public boolean isTransportBlockedForWriting() |
| { |
| return _delegate.isTransportBlockedForWriting(); |
| } |
| |
| @Override |
| public void setTransportBlockedForWriting(final boolean blocked) |
| { |
| _delegate.setTransportBlockedForWriting(blocked); |
| } |
| |
| public void setNetworkConnection(ServerNetworkConnection network) |
| { |
| _network = network; |
| SocketAddress address = _network.getLocalAddress(); |
| if (address instanceof InetSocketAddress) |
| { |
| _fqdn = ((InetSocketAddress) address).getHostName(); |
| } |
| else |
| { |
| throw new IllegalArgumentException("Unsupported socket address class: " + address); |
| } |
| _sender = network.getSender(); |
| |
| SlowProtocolHeaderTicker ticker = new SlowProtocolHeaderTicker(_port.getProtocolHandshakeTimeout(), |
| System.currentTimeMillis()); |
| _aggregateTicker.addTicker(ticker); |
| _network.addSchedulingDelayNotificationListeners(_aggregateTicker); |
| } |
| |
| @Override |
| public long getLastReadTime() |
| { |
| return _delegate.getLastReadTime(); |
| } |
| |
| @Override |
| public long getLastWriteTime() |
| { |
| return _delegate.getLastWriteTime(); |
| } |
| |
| @Override |
| public Iterator<Runnable> processPendingIterator() |
| { |
| return _delegate.processPendingIterator(); |
| } |
| |
| @Override |
| public boolean hasWork() |
| { |
| return _delegate.hasWork(); |
| } |
| |
| @Override |
| public void notifyWork() |
| { |
| _delegate.notifyWork(); |
| } |
| |
| @Override |
| public void setWorkListener(final Action<ProtocolEngine> listener) |
| { |
| _workListener.set(listener); |
| _delegate.setWorkListener(listener); |
| } |
| |
| @Override |
| public void clearWork() |
| { |
| _delegate.clearWork(); |
| } |
| |
| @Override |
| public AggregateTicker getAggregateTicker() |
| { |
| return _aggregateTicker; |
| } |
| |
| private class ClosedDelegateProtocolEngine implements ProtocolEngine |
| { |
| |
| @Override |
| public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers) |
| { |
| |
| } |
| |
| @Override |
| public boolean isMessageAssignmentSuspended() |
| { |
| return false; |
| } |
| |
| @Override |
| public Iterator<Runnable> processPendingIterator() |
| { |
| return Collections.emptyIterator(); |
| } |
| |
| @Override |
| public boolean hasWork() |
| { |
| return false; |
| } |
| |
| @Override |
| public void notifyWork() |
| { |
| |
| } |
| |
| @Override |
| public void setWorkListener(final Action<ProtocolEngine> listener) |
| { |
| |
| } |
| |
| @Override |
| public void clearWork() |
| { |
| |
| } |
| |
| @Override |
| public void received(QpidByteBuffer msg) |
| { |
| _logger.debug("Error processing incoming data, could not negotiate a common protocol"); |
| msg.position(msg.limit()); |
| } |
| |
| @Override |
| public void setIOThread(final Thread ioThread) |
| { |
| |
| } |
| |
| @Override |
| public void closed() |
| { |
| |
| } |
| |
| @Override |
| public void writerIdle() |
| { |
| |
| } |
| |
| @Override |
| public void readerIdle() |
| { |
| |
| } |
| |
| @Override |
| public void encryptedTransport() |
| { |
| |
| } |
| |
| @Override |
| public long getLastReadTime() |
| { |
| return 0; |
| } |
| |
| @Override |
| public long getLastWriteTime() |
| { |
| return 0; |
| } |
| |
| @Override |
| public Subject getSubject() |
| { |
| return new Subject(); |
| } |
| |
| @Override |
| public boolean isTransportBlockedForWriting() |
| { |
| return false; |
| } |
| |
| @Override |
| public void setTransportBlockedForWriting(final boolean blocked) |
| { |
| } |
| |
| @Override |
| public AggregateTicker getAggregateTicker() |
| { |
| return _aggregateTicker; |
| } |
| |
| } |
| |
| private class SelfDelegateProtocolEngine implements ProtocolEngine |
| { |
| private final QpidByteBuffer _header = QpidByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); |
| private long _lastReadTime = System.currentTimeMillis(); |
| private final AtomicBoolean _hasWork = new AtomicBoolean(); |
| |
| @Override |
| public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers) |
| { |
| } |
| |
| @Override |
| public boolean isMessageAssignmentSuspended() |
| { |
| return false; |
| } |
| |
| @Override |
| public Iterator<Runnable> processPendingIterator() |
| { |
| return Collections.emptyIterator(); |
| } |
| |
| @Override |
| public boolean hasWork() |
| { |
| return _hasWork.get(); |
| } |
| |
| @Override |
| public void notifyWork() |
| { |
| _hasWork.set(true); |
| } |
| |
| @Override |
| public void setWorkListener(final Action<ProtocolEngine> listener) |
| { |
| |
| } |
| |
| @Override |
| public AggregateTicker getAggregateTicker() |
| { |
| return _aggregateTicker; |
| } |
| |
| @Override |
| public void clearWork() |
| { |
| _hasWork.set(false); |
| } |
| |
| @Override |
| public void received(QpidByteBuffer msg) |
| { |
| _lastReadTime = System.currentTimeMillis(); |
| QpidByteBuffer msgheader = msg.slice(); |
| |
| if(_header.remaining() > msgheader.limit()) |
| { |
| msgheader.dispose(); |
| return; |
| } |
| else |
| { |
| msgheader.limit(_header.remaining()); |
| msg.position(msg.position()+_header.remaining()); |
| } |
| |
| _header.put(msgheader); |
| |
| msgheader.dispose(); |
| |
| if(!_header.hasRemaining()) |
| { |
| _header.flip(); |
| byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES]; |
| _header.get(headerBytes); |
| |
| |
| ProtocolEngine newDelegate = null; |
| byte[] supportedReplyBytes = null; |
| byte[] defaultSupportedReplyBytes = null; |
| Protocol supportedReplyVersion = null; |
| |
| //Check the supported versions for a header match, and if there is one save the |
| //delegate. Also save most recent supported version and associated reply header bytes |
| for(int i = 0; newDelegate == null && i < _creators.length; i++) |
| { |
| final ProtocolEngineCreator creator = _creators[i]; |
| if(_supported.contains(creator.getVersion())) |
| { |
| supportedReplyBytes = creator.getHeaderIdentifier(); |
| supportedReplyVersion = creator.getVersion(); |
| byte[] compareBytes = creator.getHeaderIdentifier(); |
| boolean equal = true; |
| for(int j = 0; equal && j<compareBytes.length; j++) |
| { |
| equal = headerBytes[j] == compareBytes[j]; |
| } |
| if(equal) |
| { |
| newDelegate = creator.newProtocolEngine(_broker, |
| _network, _port, _transport, _id, |
| _aggregateTicker); |
| if(newDelegate == null && creator.getSuggestedAlternativeHeader() != null) |
| { |
| defaultSupportedReplyBytes = creator.getSuggestedAlternativeHeader(); |
| } |
| } |
| } |
| |
| //If there is a configured default reply to an unsupported version initiation, |
| //then save the associated reply header bytes when we encounter them |
| if(defaultSupportedReplyBytes == null && _defaultSupportedReply != null && creator.getVersion() == _defaultSupportedReply) |
| { |
| defaultSupportedReplyBytes = creator.getHeaderIdentifier(); |
| } |
| } |
| |
| // If no delegate is found then send back a supported protocol version id |
| if(newDelegate == null) |
| { |
| //if a default reply was specified use its reply header instead of the most recent supported version |
| if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion)) |
| { |
| _logger.debug("Default reply to unsupported protocol version was configured, changing reply from {} to {}", |
| supportedReplyVersion, _defaultSupportedReply); |
| |
| supportedReplyBytes = defaultSupportedReplyBytes; |
| supportedReplyVersion = _defaultSupportedReply; |
| } |
| |
| _broker.getEventLogger().message(new PortLogSubject(_port), |
| PortMessages.UNSUPPORTED_PROTOCOL_HEADER(supportedReplyVersion.toString())); |
| |
| final QpidByteBuffer supportedReplyBuf = QpidByteBuffer.allocateDirect(supportedReplyBytes.length); |
| supportedReplyBuf.put(supportedReplyBytes); |
| supportedReplyBuf.flip(); |
| _sender.send(supportedReplyBuf); |
| _sender.flush(); |
| |
| _delegate = new ClosedDelegateProtocolEngine(); |
| |
| _network.close(); |
| |
| } |
| else |
| { |
| boolean hasWork = _delegate.hasWork(); |
| if (hasWork) |
| { |
| newDelegate.notifyWork(); |
| } |
| _delegate = newDelegate; |
| _delegate.setWorkListener(_workListener.get()); |
| _header.flip(); |
| _delegate.received(_header); |
| |
| Certificate peerCertificate = _network.getPeerCertificate(); |
| if(peerCertificate != null && _port.getClientCertRecorder() != null) |
| { |
| ((ManagedPeerCertificateTrustStore)(_port.getClientCertRecorder())).addCertificate(peerCertificate); |
| } |
| |
| |
| if(msg.hasRemaining()) |
| { |
| _delegate.received(msg); |
| } |
| } |
| |
| } |
| |
| |
| |
| } |
| |
| @Override |
| public void setIOThread(final Thread ioThread) |
| { |
| |
| } |
| |
| @Override |
| public Subject getSubject() |
| { |
| return _delegate.getSubject(); |
| } |
| |
| @Override |
| public boolean isTransportBlockedForWriting() |
| { |
| return false; |
| } |
| |
| @Override |
| public void setTransportBlockedForWriting(final boolean blocked) |
| { |
| } |
| |
| @Override |
| public void closed() |
| { |
| try |
| { |
| _delegate = new ClosedDelegateProtocolEngine(); |
| _logger.debug("Connection from {} was closed before any protocol version was established.", |
| _network.getRemoteAddress()); |
| } |
| catch(Exception e) |
| { |
| //ignore |
| } |
| finally |
| { |
| try |
| { |
| _network.close(); |
| } |
| catch(Exception e) |
| { |
| //ignore |
| } |
| } |
| } |
| |
| @Override |
| public void writerIdle() |
| { |
| |
| } |
| |
| @Override |
| public void readerIdle() |
| { |
| } |
| |
| @Override |
| public void encryptedTransport() |
| { |
| if(_transport == Transport.TCP) |
| { |
| _transport = Transport.SSL; |
| } |
| } |
| |
| @Override |
| public long getLastReadTime() |
| { |
| return _lastReadTime; |
| } |
| |
| @Override |
| public long getLastWriteTime() |
| { |
| return 0; |
| } |
| } |
| |
| class SlowProtocolHeaderTicker implements Ticker, SchedulingDelayNotificationListener |
| { |
| private final long _allowedTime; |
| private final long _createdTime; |
| private volatile long _accumulatedSchedulingDelay; |
| |
| public SlowProtocolHeaderTicker(long allowedTime, long createdTime) |
| { |
| _allowedTime = allowedTime; |
| _createdTime = createdTime; |
| } |
| |
| @Override |
| public int getTimeToNextTick(final long currentTime) |
| { |
| return (int) (_createdTime + _allowedTime + _accumulatedSchedulingDelay - currentTime); } |
| |
| @Override |
| public int tick(final long currentTime) |
| { |
| int nextTick = getTimeToNextTick(currentTime); |
| if(nextTick <= 0) |
| { |
| if (isProtocolEstablished()) |
| { |
| _aggregateTicker.removeTicker(this); |
| _network.removeSchedulingDelayNotificationListeners(this); |
| } |
| else |
| { |
| _logger.warn("Connection has taken more than {} ms to send complete protocol header. Closing as possible DoS.", |
| _allowedTime); |
| _broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header not received within timeout period", true)); |
| _network.close(); |
| } |
| } |
| return nextTick; |
| } |
| |
| @Override |
| public void notifySchedulingDelay(final long schedulingDelay) |
| { |
| if (schedulingDelay > 0) |
| { |
| _accumulatedSchedulingDelay += schedulingDelay; |
| } |
| } |
| } |
| |
| public boolean isProtocolEstablished() |
| { |
| return _delegate instanceof AbstractAMQPConnection; |
| } |
| |
| |
| } |