blob: 2c0d0ef7698b583c3621fec83ac488a5b627c292 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.server.transport.AggregateTicker;
public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0>
implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
ErrorHandler,
SASLEndpoint,
ConnectionHandler
{
private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
private static final long CLOSE_RESPONSE_TIMEOUT = 10000L;
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private static final byte[] SASL_HEADER = new byte[]
{
(byte) 'A',
(byte) 'M',
(byte) 'Q',
(byte) 'P',
(byte) 3,
(byte) 1,
(byte) 0,
(byte) 0
};
private static final byte[] AMQP_HEADER = new byte[]
{
(byte) 'A',
(byte) 'M',
(byte) 'Q',
(byte) 'P',
(byte) 0,
(byte) 1,
(byte) 0,
(byte) 0
};
private FrameWriter _frameWriter;
private ProtocolHandler _frameHandler;
private volatile boolean _transportBlockedForWriting;
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
private enum FrameReceivingState
{
AMQP_OR_SASL_HEADER,
SASL_INIT_ONLY,
SASL_RESPONSE_ONLY,
AMQP_HEADER,
OPEN_ONLY,
ANY_FRAME,
CLOSED
}
private volatile FrameReceivingState _frameReceivingState = FrameReceivingState.AMQP_OR_SASL_HEADER;
private static final short CONNECTION_CONTROL_CHANNEL = (short) 0;
private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
private AmqpPort<?> _port;
private SubjectCreator _subjectCreator;
private Transport _transport;
private long _connectionId;
private Container _container;
private int _channelMax = DEFAULT_CHANNEL_MAX;
private int _maxFrameSize = 4096;
private String _remoteContainerId;
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
private Session_1_0[] _sendingSessions;
// positioned by the *incoming* channel
private Session_1_0[] _receivingSessions;
private boolean _closedForInput;
private boolean _closedForOutput;
private long _idleTimeout;
private ConnectionState _connectionState = ConnectionState.UNOPENED;
private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
.registerMessagingLayer()
.registerTransactionLayer()
.registerSecurityLayer();
private Map _properties;
private SaslServerProvider _saslServerProvider;
private boolean _saslComplete;
private SaslServer _saslServer;
private String _localHostname;
private long _desiredIdleTimeout;
private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
private Error _remoteError;
private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
private Map _remoteProperties;
private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
private final Collection<Session_1_0>
_sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
private boolean _closedOnOpen;
AMQPConnection_1_0(final Broker<?> broker,
final ServerNetworkConnection network,
AmqpPort<?> port, Transport transport, long id,
final AggregateTicker aggregateTicker,
final boolean useSASL)
{
super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
_container = new Container(broker.getId().toString());
_subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
_saslServerProvider = useSASL ? asSaslServerProvider(_subjectCreator, network) : null;
_port = port;
_transport = transport;
_connectionId = id;
Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
setProperties(serverProperties);
setRemoteAddress(network.getRemoteAddress());
setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
_frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
}
private void setUserPrincipal(final Principal user)
{
setSubject(_subjectCreator.createSubjectWithGroups(user));
}
private long getDesiredIdleTimeout()
{
return _desiredIdleTimeout;
}
public void receiveAttach(final short channel, final Attach attach)
{
assertState(FrameReceivingState.ANY_FRAME);
final Session_1_0 session = getSession(channel);
if (session != null)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
session.receiveAttach(attach);
return null;
}
}, session.getAccessControllerContext());
}
else
{
// TODO - error
}
}
public void receive(final short channel, final Object frame)
{
FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
if (frame instanceof FrameBody)
{
((FrameBody) frame).invoke(channel, this);
}
else if (frame instanceof SaslFrameBody)
{
((SaslFrameBody) frame).invoke(channel, this);
}
}
private void closeSaslWithFailure()
{
_saslComplete = true;
_frameReceivingState = FrameReceivingState.CLOSED;
setClosedForInput(true);
close();
}
public void receiveSaslChallenge(final SaslChallenge saslChallenge)
{
// TODO - log unexpected frame
closeSaslWithFailure();
}
public void receiveClose(final short channel, final Close close)
{
assertState(FrameReceivingState.ANY_FRAME);
_frameReceivingState = FrameReceivingState.CLOSED;
setClosedForInput(true);
closeReceived();
switch (_connectionState)
{
case UNOPENED:
case AWAITING_OPEN:
Error error = new Error();
error.setCondition(ConnectionError.CONNECTION_FORCED);
error.setDescription("Connection close sent before connection was opened");
closeConnection(error);
break;
case OPEN:
_connectionState = ConnectionState.CLOSE_RECEIVED;
// TODO - we should log the error we received from the client if present
sendClose(new Close());
_connectionState = ConnectionState.CLOSED;
_orderlyClose.set(true);
break;
case CLOSE_SENT:
_connectionState = ConnectionState.CLOSED;
_orderlyClose.set(true);
default:
}
_remoteError = close.getError();
}
private void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
for(final Session_1_0 session : sessions)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
session.remoteEnd(new End());
return null;
}
}, session.getAccessControllerContext());
}
}
private void setClosedForInput(final boolean closed)
{
_closedForInput = closed;
}
public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
{
// TODO - log unexpected frame
closeSaslWithFailure();
}
public void receiveSaslResponse(final SaslResponse saslResponse)
{
final Binary responseBinary = saslResponse.getResponse();
byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
assertState(FrameReceivingState.SASL_RESPONSE_ONLY);
processSaslResponse(response);
}
public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
{
return _describedTypeRegistry;
}
private void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message)
{
addAsyncTask(new Action<ConnectionHandler>()
{
@Override
public void performAction(final ConnectionHandler object)
{
session.close(cause, message);
}
});
}
private boolean closedForOutput()
{
return _closedForOutput;
}
public boolean isClosed()
{
return _connectionState == ConnectionState.CLOSED
|| _connectionState == ConnectionState.CLOSE_RECEIVED;
}
public boolean closedForInput()
{
return _closedForInput;
}
void sessionEnded(final Session_1_0 session)
{
if(!_closedOnOpen)
{
_sessions.remove(session);
sessionRemoved(session);
}
}
public int send(final short channel, final FrameBody body, final QpidByteBuffer payload)
{
return sendFrame(channel, body, payload);
}
private void inputClosed()
{
List<Runnable> postLockActions;
if (!_closedForInput)
{
_closedForInput = true;
FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
switch (_connectionState)
{
case UNOPENED:
case AWAITING_OPEN:
case CLOSE_SENT:
_connectionState = ConnectionState.CLOSED;
closeSender();
break;
case OPEN:
_connectionState = ConnectionState.CLOSE_RECEIVED;
case CLOSED:
// already sent our close - too late to do anything more
break;
default:
}
closeReceived();
}
}
private void closeSender()
{
setClosedForOutput(true);
close();
}
String getRemoteContainerId()
{
return _remoteContainerId;
}
private void setDesiredIdleTimeout(final long desiredIdleTimeout)
{
_desiredIdleTimeout = desiredIdleTimeout;
}
public boolean isOpen()
{
return _connectionState == ConnectionState.OPEN;
}
void sendEnd(final short channel, final End end, final boolean remove)
{
sendFrame(channel, end);
if (remove)
{
_sendingSessions[channel] = null;
}
}
public void receiveSaslOutcome(final SaslOutcome saslOutcome)
{
// TODO - log unexpected frame
closeSaslWithFailure();
}
public void receiveEnd(final short channel, final End end)
{
assertState(FrameReceivingState.ANY_FRAME);
Session_1_0 endpoint = _receivingSessions[channel];
if (endpoint != null)
{
_receivingSessions[channel] = null;
endpoint.receiveEnd(end);
}
else
{
// TODO error
}
}
public void receiveDisposition(final short channel,
final Disposition disposition)
{
assertState(FrameReceivingState.ANY_FRAME);
final Session_1_0 session = getSession(channel);
if (session != null)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
session.receiveDisposition(disposition);
return null;
}
}, session.getAccessControllerContext());
}
else
{
// TODO - error
}
}
public void receiveBegin(final short channel, final Begin begin)
{
assertState(FrameReceivingState.ANY_FRAME);
short myChannelId;
if (begin.getRemoteChannel() != null)
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+ begin.getRemoteChannel() + ". Since the broker does not spontaneously start channels, this must be an error.");
closeConnection(error);
}
else // Peer requesting session creation
{
if (_receivingSessions[channel] == null)
{
myChannelId = getFirstFreeChannel();
if (myChannelId == -1)
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + ". There are no free channels for the broker to responsd on.");
closeConnection(error);
}
Session_1_0 session = new Session_1_0(this, begin);
_receivingSessions[channel] = session;
_sendingSessions[myChannelId] = session;
Begin beginToSend = new Begin();
session.setReceivingChannel(channel);
session.setSendingChannel(myChannelId);
beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
beginToSend.setNextOutgoingId(session.getNextOutgoingId());
beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
beginToSend.setIncomingWindow(session.getIncomingWindowSize());
sendFrame(myChannelId, beginToSend);
_sessions.add(session);
sessionAdded(session);
}
else
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
closeConnection(error);
}
}
}
private short getFirstFreeChannel()
{
for (int i = 0; i <= _channelMax; i++)
{
if (_sendingSessions[i] == null)
{
return (short) i;
}
}
return -1;
}
public void handleError(final Error error)
{
if (!closedForOutput())
{
Close close = new Close();
close.setError(error);
sendFrame((short) 0, close);
setClosedForOutput(true);
}
}
public void receiveTransfer(final short channel, final Transfer transfer)
{
assertState(FrameReceivingState.ANY_FRAME);
final Session_1_0 session = getSession(channel);
if (session != null)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
session.receiveTransfer(transfer);
return null;
}
}, session.getAccessControllerContext());
}
else
{
// TODO - error
}
}
public void receiveFlow(final short channel, final Flow flow)
{
assertState(FrameReceivingState.ANY_FRAME);
final Session_1_0 session = getSession(channel);
if (session != null)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
session.receiveFlow(flow);
return null;
}
}, session.getAccessControllerContext());
}
else
{
// TODO - error
}
}
public void receiveOpen(final short channel, final Open open)
{
assertState(FrameReceivingState.OPEN_ONLY);
_frameReceivingState = FrameReceivingState.ANY_FRAME;
_channelMax = open.getChannelMax() == null ? _channelMax
: open.getChannelMax().intValue() < _channelMax
? open.getChannelMax().intValue()
: _channelMax;
if (_receivingSessions == null)
{
_receivingSessions = new Session_1_0[_channelMax + 1];
_sendingSessions = new Session_1_0[_channelMax + 1];
}
_maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue();
_remoteContainerId = open.getContainerId();
_localHostname = open.getHostname();
if (open.getIdleTimeOut() != null)
{
_idleTimeout = open.getIdleTimeOut().longValue();
}
_remoteProperties = open.getProperties();
if (_remoteProperties != null)
{
if (_remoteProperties.containsKey(Symbol.valueOf("product")))
{
setClientProduct(_remoteProperties.get(Symbol.valueOf("product")).toString());
}
if (_remoteProperties.containsKey(Symbol.valueOf("version")))
{
setClientVersion(_remoteProperties.get(Symbol.valueOf("version")).toString());
}
setClientId(_remoteContainerId);
}
if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
closeConnection(new Error(ConnectionError.CONNECTION_FORCED,
"Requested idle timeout of "
+ _idleTimeout
+ " is too low. The minimum supported timeout is"
+ MINIMUM_SUPPORTED_IDLE_TIMEOUT));
close();
_closedOnOpen = true;
}
else
{
long desiredIdleTimeout = getDesiredIdleTimeout();
initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout);
final NamedAddressSpace addressSpace = ((AmqpPort) _port).getAddressSpace(_localHostname);
if (addressSpace == null)
{
closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
}
else
{
if (!addressSpace.isActive())
{
final Error err = new Error();
err.setCondition(AmqpError.NOT_FOUND);
closeConnection(err);
_closedOnOpen = true;
populateConnectionRedirect(addressSpace, err);
closeConnection(err);
close();
_closedOnOpen = true;
}
else
{
if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
{
closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
}
else
{
try
{
setAddressSpace(addressSpace);
}
catch (VirtualHostUnavailableException e)
{
closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
}
}
}
}
}
switch (_connectionState)
{
case UNOPENED:
sendOpen(_channelMax, _maxFrameSize);
case AWAITING_OPEN:
_connectionState = ConnectionState.OPEN;
default:
// TODO bad stuff (connection already open)
}
}
private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err)
{
final String redirectHost = addressSpace.getRedirectHost(((AmqpPort) _port));
if(redirectHost == null)
{
err.setDescription("Virtual host '" + _localHostname + "' is not active");
}
else
{
String networkHost;
int port;
if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
{
// IPv6 case
networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
if(redirectHost.contains("]:"))
{
port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
}
else
{
port = -1;
}
}
else
{
if(redirectHost.contains(":"))
{
networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
try
{
String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
port = Integer.parseInt(portString);
}
catch (NumberFormatException e)
{
port = -1;
}
}
else
{
networkHost = redirectHost;
port = -1;
}
}
final Map<Symbol, Object> infoMap = new HashMap<>();
infoMap.put(Symbol.valueOf("network-host"), networkHost);
if(port > 0)
{
infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
}
err.setInfo(infoMap);
}
}
public void receiveDetach(final short channel, final Detach detach)
{
assertState(FrameReceivingState.ANY_FRAME);
final Session_1_0 session = getSession(channel);
if (session != null)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
session.receiveDetach(detach);
return null;
}
}, session.getAccessControllerContext());
}
else
{
// TODO - error
}
}
private void transportStateChanged()
{
for (Session_1_0 session : _sessions)
{
session.transportStateChanged();
}
}
public void close(final Error error)
{
closeConnection(error);
}
private void setRemoteAddress(final SocketAddress remoteAddress)
{
_remoteAddress = remoteAddress;
}
public void setProperties(final Map<Symbol, Object> properties)
{
_properties = properties;
}
private void setClosedForOutput(final boolean closed)
{
_closedForOutput = closed;
}
public void receiveSaslInit(final SaslInit saslInit)
{
assertState(FrameReceivingState.SASL_INIT_ONLY);
String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
final Binary initialResponse = saslInit.getInitialResponse();
byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
try
{
_saslServer = _saslServerProvider.getSaslServer(mechanism, "localhost");
processSaslResponse(response);
}
catch (SaslException e)
{
handleSaslError();
}
}
private void processSaslResponse(final byte[] response)
{
byte[] challenge = null;
SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
if (authenticationResult == null)
{
authenticationResult = _subjectCreator.authenticate(_saslServer, response != null ? response : new byte[0]);
challenge = authenticationResult.getChallenge();
}
if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
{
_successfulAuthenticationResult = authenticationResult;
if (challenge == null || challenge.length == 0)
{
setSubject(_successfulAuthenticationResult.getSubject());
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.OK);
send(new SASLFrame(outcome), null);
_saslComplete = true;
_frameReceivingState = FrameReceivingState.AMQP_HEADER;
}
else
{
continueSaslNegotiation(challenge);
}
}
else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
{
continueSaslNegotiation(challenge);
}
else
{
handleSaslError();
}
}
private void continueSaslNegotiation(final byte[] challenge)
{
SaslChallenge challengeBody = new SaslChallenge();
challengeBody.setChallenge(new Binary(challenge));
send(new SASLFrame(challengeBody), null);
_frameReceivingState = FrameReceivingState.SASL_RESPONSE_ONLY;
}
private void handleSaslError()
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.AUTH);
send(new SASLFrame(outcome), null);
_saslComplete = true;
closeSaslWithFailure();
}
public int getMaxFrameSize()
{
return _maxFrameSize;
}
Object getReference()
{
return _reference;
}
private void endpointClosed()
{
try
{
performDeleteTasks();
closeReceived();
}
finally
{
NamedAddressSpace virtualHost = getAddressSpace();
if (virtualHost != null)
{
virtualHost.deregisterConnection(this);
}
}
}
private void closeConnection()
{
switch (_connectionState)
{
case AWAITING_OPEN:
case OPEN:
Close closeToSend = new Close();
sendClose(closeToSend);
_connectionState = ConnectionState.CLOSE_SENT;
break;
case CLOSE_SENT:
default:
}
}
private void closeConnection(final Error error)
{
Close close = new Close();
close.setError(error);
switch (_connectionState)
{
case UNOPENED:
sendOpen(0, 0);
sendClose(close);
_connectionState = ConnectionState.CLOSED;
break;
case AWAITING_OPEN:
case OPEN:
sendClose(close);
_connectionState = ConnectionState.CLOSE_SENT;
case CLOSE_SENT:
case CLOSED:
// already sent our close - too late to do anything more
break;
default:
// TODO Unknown state
}
}
int sendFrame(final short channel, final FrameBody body, final QpidByteBuffer payload)
{
if (!_closedForOutput)
{
ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate();
int payloadSent = _maxFrameSize - (size + 9);
try
{
if (payloadSent < (payload == null ? 0 : payload.remaining()))
{
if (body instanceof Transfer)
{
((Transfer) body).setMore(Boolean.TRUE);
}
writer = _describedTypeRegistry.getValueWriter(body);
size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
payloadSent = _maxFrameSize - (size + 9);
payloadDup.limit(payloadDup.position() + payloadSent);
}
else
{
payloadSent = payload == null ? 0 : payload.remaining();
}
send(AMQFrame.createAMQFrame(channel, body, payloadDup));
}
finally
{
if (payloadDup != null)
{
payloadDup.dispose();
}
}
return payloadSent;
}
else
{
return -1;
}
}
void sendFrame(final short channel, final FrameBody body)
{
sendFrame(channel, body, null);
}
public ByteBufferSender getSender()
{
return getNetwork().getSender();
}
@Override
public void writerIdle()
{
send(TransportFrame.createAMQFrame((short)0,null));
}
@Override
public void readerIdle()
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
getEventLogger().message(ConnectionMessages.IDLE_CLOSE("", false));
getNetwork().close();
return null;
}
}, getAccessControllerContext());
}
@Override
public void encryptedTransport()
{
}
private static SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator,
final ServerNetworkConnection network)
{
return new SaslServerProvider()
{
@Override
public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
{
return subjectCreator.createSaslServer(mechanism, fqdn, network.getPeerPrincipal());
}
};
}
public String getAddress()
{
return getNetwork().getRemoteAddress().toString();
}
public void received(final QpidByteBuffer msg)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
updateLastReadTime();
try
{
int remaining;
do
{
remaining = msg.remaining();
switch (_frameReceivingState)
{
case AMQP_OR_SASL_HEADER:
case AMQP_HEADER:
if (remaining >= 8)
{
processProtocolHeader(msg);
}
break;
case OPEN_ONLY:
case ANY_FRAME:
case SASL_INIT_ONLY:
case SASL_RESPONSE_ONLY:
_frameHandler.parse(msg);
break;
case CLOSED:
// ignore;
break;
}
}
while (msg.remaining() != remaining);
}
catch (IllegalArgumentException | IllegalStateException e)
{
throw new ConnectionScopedRuntimeException(e);
}
catch (StoreException e)
{
if (getAddressSpace().isActive())
{
throw new ServerScopedRuntimeException(e);
}
else
{
throw new ConnectionScopedRuntimeException(e);
}
}
return null;
}
}, getAccessControllerContext());
}
private void processProtocolHeader(final QpidByteBuffer msg)
{
if(msg.remaining() >= 8)
{
byte[] header = new byte[8];
msg.get(header);
final AuthenticationProvider authenticationProvider = getPort().getAuthenticationProvider();
final SubjectCreator subjectCreator = authenticationProvider.getSubjectCreator(getTransport().isSecure());
if(Arrays.equals(header, SASL_HEADER))
{
if(_saslComplete)
{
throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
}
getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
SaslMechanisms mechanisms = new SaslMechanisms();
ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
for (String name : subjectCreator.getMechanisms())
{
mechanismsList.add(Symbol.valueOf(name));
}
mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
send(new SASLFrame(mechanisms), null);
_frameReceivingState = FrameReceivingState.SASL_INIT_ONLY;
_frameHandler = new FrameHandler(this, true);
}
else if(Arrays.equals(header, AMQP_HEADER))
{
if(!_saslComplete)
{
final List<String> mechanisms = subjectCreator.getMechanisms();
if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
{
setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
}
else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
{
setUserPrincipal(new AuthenticatedPrincipal(((AnonymousAuthenticationManager) authenticationProvider).getAnonymousPrincipal()));
}
else
{
// TODO - log auth failure / close
getNetwork().close();
}
}
getSender().send(QpidByteBuffer.wrap(AMQP_HEADER));
_frameReceivingState = FrameReceivingState.OPEN_ONLY;
_frameHandler = new FrameHandler(this, false);
}
else
{
throw new ConnectionScopedRuntimeException("Unknown protocol header");
}
}
}
public void closed()
{
try
{
inputClosed();
}
catch(RuntimeException e)
{
LOGGER.error("Exception while closing", e);
}
finally
{
try
{
endpointClosed();
}
finally
{
markTransportClosed();
}
}
}
public boolean canSend()
{
return true;
}
public void send(final AMQFrame amqFrame)
{
send(amqFrame, null);
}
public void send(final AMQFrame amqFrame, ByteBuffer buf)
{
updateLastWriteTime();
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
getNetwork().getRemoteAddress(),
amqFrame.getChannel(),
amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
int size = _frameWriter.send(amqFrame);
if (size > getMaxFrameSize())
{
throw new OversizeFrameException(amqFrame, size);
}
}
public void send(short channel, FrameBody body)
{
AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
send(frame);
}
public void close()
{
getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_RESPONSE_TIMEOUT,
getNetwork()));
// trigger a wakeup to ensure the ticker will be taken into account
notifyWork();
}
@Override
public boolean isTransportBlockedForWriting()
{
return _transportBlockedForWriting;
}
@Override
public void setTransportBlockedForWriting(final boolean blocked)
{
if(_transportBlockedForWriting != blocked)
{
_transportBlockedForWriting = blocked;
transportStateChanged();
}
}
@Override
public Iterator<Runnable> processPendingIterator()
{
if (isIOThread())
{
return new ProcessPendingIterator();
}
else
{
return Collections.emptyIterator();
}
}
@Override
public boolean hasWork()
{
return _stateChanged.get();
}
@Override
public void notifyWork()
{
_stateChanged.set(true);
final Action<ProtocolEngine> listener = _workListener.get();
if(listener != null)
{
listener.performAction(this);
}
}
@Override
public void clearWork()
{
_stateChanged.set(false);
}
@Override
public void setWorkListener(final Action<ProtocolEngine> listener)
{
_workListener.set(listener);
}
public boolean hasSessionWithName(final byte[] name)
{
return false;
}
public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
{
Action<ConnectionHandler> action = new Action<ConnectionHandler>()
{
@Override
public void performAction(final ConnectionHandler object)
{
closeConnection();
}
};
addAsyncTask(action);
}
public void closeSessionAsync(final AMQSessionModel<?> session,
final AMQConstant cause, final String message)
{
closeSessionAsync((Session_1_0) session, cause, message);
}
public void block()
{
// TODO
}
public String getRemoteContainerName()
{
return _remoteContainerId;
}
public Collection<? extends Session_1_0> getSessionModels()
{
return Collections.unmodifiableCollection(_sessions);
}
public void unblock()
{
// TODO
}
public long getSessionCountLimit()
{
return 0; // TODO
}
@Override
public boolean isOrderlyClose()
{
return _orderlyClose.get();
}
private void addAsyncTask(final Action<ConnectionHandler> action)
{
_asyncTaskList.add(action);
notifyWork();
}
private void sendOpen(final int channelMax, final int maxFrameSize)
{
Open open = new Open();
if (_receivingSessions == null)
{
_receivingSessions = new Session_1_0[channelMax + 1];
_sendingSessions = new Session_1_0[channelMax + 1];
}
if (channelMax < _channelMax)
{
_channelMax = channelMax;
}
open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
open.setContainerId(_container.getId());
open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
// TODO - should we try to set the hostname based on the connection information?
// open.setHostname();
open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
if (_properties != null)
{
open.setProperties(_properties);
}
sendFrame(CONNECTION_CONTROL_CHANNEL, open);
}
private void closeWithError(final AmqpError amqpError, final String errorDescription)
{
final Error err = new Error();
err.setCondition(amqpError);
err.setDescription(errorDescription);
closeConnection(err);
close();
_closedOnOpen = true;
}
private Session_1_0 getSession(final short channel)
{
Session_1_0 session = _receivingSessions[channel];
if (session == null)
{
Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
handleError(error);
}
return session;
}
private void sendClose(Close closeToSend)
{
sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
closeSender();
}
@Override
public String toString()
{
NamedAddressSpace virtualHost = getAddressSpace();
return "Connection_1_0["
+ _connectionId
+ " "
+ getAddress()
+ (virtualHost == null ? "" : (" vh : " + virtualHost.getName()))
+ ']';
}
private void assertState(final FrameReceivingState state)
{
if(_frameReceivingState != state)
{
throw new ConnectionScopedRuntimeException("Unexpected state, client has sent frame in an illegal order. Required state: " + state + ", actual state: " + _frameReceivingState);
}
}
private class ProcessPendingIterator implements Iterator<Runnable>
{
private final Collection<? extends AMQSessionModel<?>> _sessionsWithPending;
private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
private ProcessPendingIterator()
{
_sessionsWithPending = new ArrayList<>(getSessionModels());
_sessionIterator = _sessionsWithPending.iterator();
}
@Override
public boolean hasNext()
{
return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
}
@Override
public Runnable next()
{
if(!_sessionsWithPending.isEmpty())
{
if(!_sessionIterator.hasNext())
{
_sessionIterator = _sessionsWithPending.iterator();
}
final AMQSessionModel<?> session = _sessionIterator.next();
return new Runnable()
{
@Override
public void run()
{
if(!session.processPending())
{
_sessionIterator.remove();
}
}
};
}
else if(!_asyncTaskList.isEmpty())
{
final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
return new Runnable()
{
@Override
public void run()
{
asyncAction.performAction(AMQPConnection_1_0.this);
}
};
}
else
{
throw new NoSuchElementException();
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
@Override
public void initialiseHeartbeating(final long writerDelay, final long readerDelay)
{
super.initialiseHeartbeating(writerDelay, readerDelay);
}
}