blob: ef5f8ced614bbf5590e65fe76d85d8a987a38671 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
import org.apache.qpid.server.properties.ConnectionStartProperties;
import org.apache.qpid.server.protocol.v0_10.transport.*;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
static final String MESSAGE_DIGEST_SHA1 = "SHA-1";
static final int BASE64_LIMIT = 64;
private final AmqpPort<?> _port;
private List<Object> _locales;
private List<Object> _mechanisms;
private final Broker<?> _broker;
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
private final SubjectCreator _subjectCreator;
private int _maximumFrameSize;
private boolean _compressionSupported;
private volatile SaslNegotiator _saslNegotiator;
enum ConnectionState
{
INIT,
AWAIT_START_OK,
AWAIT_SECURE_OK,
AWAIT_TUNE_OK,
AWAIT_OPEN,
OPEN
}
private volatile ConnectionState _state = ConnectionState.INIT;
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
public ServerConnectionDelegate(AmqpPort<?> port, boolean secure, final String selectedHost)
{
_port = port;
_broker = (Broker<?>) port.getParent();
_mechanisms = new ArrayList<>(port.getAuthenticationProvider().getAvailableMechanisms(secure));
_maxNoOfChannels = port.getSessionCountLimit();
_subjectCreator = port.getSubjectCreator(secure, selectedHost);
_maximumFrameSize = Math.min(0xffff, _broker.getNetworkBufferSize());
}
@Override
public void control(ServerConnection conn, Method method)
{
method.dispatch(conn, this);
}
@Override
public void command(ServerConnection conn, Method method)
{
method.dispatch(conn, this);
}
@Override
public void error(ServerConnection conn, ProtocolError error)
{
conn.exception(new ConnectionException(error.getMessage()));
}
@Override
public void handle(ServerConnection conn, Method method)
{
conn.dispatch(method);
}
@Override public void connectionHeartbeat(ServerConnection conn, ConnectionHeartbeat hearbeat)
{
// do nothing
}
protected void sendConnectionCloseOkAndCloseSender(ServerConnection conn)
{
conn.connectionCloseOk();
conn.getSender().close();
}
@Override public void connectionCloseOk(ServerConnection conn, ConnectionCloseOk ok)
{
conn.getSender().close();
}
@Override public void sessionDetached(ServerConnection conn, SessionDetached dtc)
{
ServerSession ssn = conn.getSession(dtc.getChannel());
if (ssn != null)
{
ssn.setDetachCode(dtc.getCode());
conn.unmap(ssn);
ssn.closed();
}
}
public final ConnectionState getState()
{
return _state;
}
private void assertState(final ServerConnection conn, final ConnectionState requiredState)
{
if(_state != requiredState)
{
String replyText = "Command Invalid, expected " + requiredState + " but was " + _state;
conn.sendConnectionClose(ConnectionCloseCode.FRAMING_ERROR, replyText);
conn.closeAndIgnoreFutureInput();
throw new ConnectionScopedRuntimeException(replyText);
}
}
@Override
public void init(final ServerConnection serverConnection, final ProtocolHeader hdr)
{
assertState(serverConnection, ConnectionState.INIT);
serverConnection.send(new ProtocolHeader(1, 0, 10));
Map<String,Object> props = Collections.emptyMap();
for(ConnectionPropertyEnricher enricher : _port.getConnectionPropertyEnrichers())
{
props = enricher.addConnectionProperties(serverConnection.getAmqpConnection(), props);
}
serverConnection.sendConnectionStart(props, _mechanisms, Collections.singletonList((Object)"en_US"));
_state = ConnectionState.AWAIT_START_OK;
}
@Override
public void connectionSecureOk(final ServerConnection serverConnection, final ConnectionSecureOk ok)
{
assertState(serverConnection, ConnectionState.AWAIT_SECURE_OK);
secure(serverConnection, ok.getResponse());
}
protected void secure(final ServerConnection sconn, final byte[] response)
{
SubjectAuthenticationResult authResult = _successfulAuthenticationResult;
byte[] challenge = null;
if (authResult == null)
{
authResult = _subjectCreator.authenticate(_saslNegotiator, response);
challenge = authResult.getChallenge();
}
if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
{
_successfulAuthenticationResult = authResult;
if (challenge == null || challenge.length == 0)
{
sconn.sendConnectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax());
sconn.setAuthorizedSubject(authResult.getSubject());
_state = ConnectionState.AWAIT_TUNE_OK;
disposeSaslNegotiator();
}
else
{
sconn.sendConnectionSecure(authResult.getChallenge());
_state = ConnectionState.AWAIT_SECURE_OK;
}
}
else if (AuthenticationStatus.CONTINUE.equals(authResult.getStatus()))
{
sconn.sendConnectionSecure(authResult.getChallenge());
_state = ConnectionState.AWAIT_SECURE_OK;
}
else
{
connectionAuthFailed(sconn, authResult.getCause());
}
}
@Override
public void connectionClose(ServerConnection sconn, ConnectionClose close)
{
sconn.closeCode(close);
sconn.setState(CLOSE_RCVD);
sendConnectionCloseOkAndCloseSender(sconn);
}
@Override
public void connectionOpen(ServerConnection sconn, ConnectionOpen open)
{
assertState(sconn, ConnectionState.AWAIT_OPEN);
NamedAddressSpace addressSpace;
String vhostName;
if(open.hasVirtualHost())
{
vhostName = open.getVirtualHost();
}
else
{
vhostName = "";
}
AmqpPort port = sconn.getPort();
addressSpace = port.getAddressSpace(vhostName);
if(addressSpace != null)
{
if (!addressSpace.isActive())
{
sconn.setState(ServerConnection.State.CLOSING);
final String redirectHost = addressSpace.getRedirectHost(port);
if(redirectHost == null)
{
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
"Virtual host '" + vhostName + "' is not active");
}
else
{
sconn.invoke(new ConnectionRedirect(redirectHost, new ArrayList<Object>()));
}
return;
}
try
{
sconn.setVirtualHost(addressSpace);
if(!addressSpace.authoriseCreateConnection(sconn.getAmqpConnection()))
{
sconn.setState(ServerConnection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized");
return;
}
}
catch (AccessControlException | VirtualHostUnavailableException e)
{
sconn.setState(ServerConnection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
return;
}
sconn.setState(ServerConnection.State.OPEN);
_state = ConnectionState.OPEN;
sconn.invoke(new ConnectionOpenOk(Collections.emptyList()));
}
else
{
sconn.setState(ServerConnection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.INVALID_PATH,
"Unknown virtualhost '" + vhostName + "'");
}
}
@Override
public void connectionTuneOk(final ServerConnection sconn, final ConnectionTuneOk ok)
{
assertState(sconn, ConnectionState.AWAIT_TUNE_OK);
int okChannelMax = ok.getChannelMax();
int okMaxFrameSize = ok.getMaxFrameSize();
if (okChannelMax > getChannelMax())
{
LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
"client connectionTuneOk returned a channelMax (" + okChannelMax +
") above the server's offered limit (" + getChannelMax() +")");
//Due to the error we must forcefully close the connection without negotiation
sconn.closeAndIgnoreFutureInput();
return;
}
if(okMaxFrameSize > getFrameMax())
{
LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
"client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
") above the server's offered limit (" + getFrameMax() +")");
//Due to the error we must forcefully close the connection without negotiation
sconn.closeAndIgnoreFutureInput();
return;
}
else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
{
LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
"client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");
//Due to the error we must forcefully close the connection without negotiation
sconn.closeAndIgnoreFutureInput();
return;
}
else if(okMaxFrameSize == 0)
{
okMaxFrameSize = getFrameMax();
}
if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
{
int heartbeat = ok.getHeartbeat();
sconn.setHeartBeatDelay(heartbeat);
long readerIdle = 2000L * heartbeat;
long writerIdle = 1000L * heartbeat;
sconn.getAmqpConnection().initialiseHeartbeating(writerIdle, readerIdle);
}
//0 means no implied limit, except available server resources
//(or that forced by protocol limitations [0xFFFF])
sconn.setChannelMax(okChannelMax == 0 ? getChannelMax() : okChannelMax);
sconn.setMaxFrameSize(okMaxFrameSize);
_state = ConnectionState.AWAIT_OPEN;
}
private int getChannelMax()
{
return _maxNoOfChannels;
}
private int getFrameMax()
{
return _maximumFrameSize;
}
@Override
public void sessionDetach(ServerConnection conn, SessionDetach dtc)
{
int channel = dtc.getChannel();
ServerSession ssn = conn.getSession(channel);
if (ssn != null)
{
stopAllSubscriptions(ssn);
ssn.setClose(true);
ssn.sessionDetached(dtc.getName(),
ssn.getDetachCode() == null ? SessionDetachCode.NORMAL : ssn.getDetachCode());
conn.unmap(ssn);
ssn.closed();
}
else
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("SessionDetach received on unattached channel : {}", channel);
}
SessionDetached sessionDetached = new SessionDetached(dtc.getName(), SessionDetachCode.NOT_ATTACHED);
sessionDetached.setChannel(channel);
conn.invoke(sessionDetached);
}
}
private void stopAllSubscriptions(final ServerSession ssn)
{
final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
}
@Override
public void sessionAttach(final ServerConnection serverConnection, final SessionAttach atc)
{
assertState(serverConnection, ConnectionState.OPEN);
// We ignore the force flag
if(isSessionNameUnique(atc.getName(), serverConnection))
{
ServerSessionDelegate serverSessionDelegate = new ServerSessionDelegate();
final ServerSession serverSession =
new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
serverSession, getPeerSessionName(atc.getName()));
session.create();
serverSession.setModelObject(session);
serverConnection.map(serverSession, atc.getChannel());
serverConnection.registerSession(serverSession);
serverSession.sendSessionAttached(atc.getName());
serverSession.setState(ServerSession.State.OPEN);
}
else
{
final SessionDetached detached = new SessionDetached(atc.getName(), SessionDetachCode.SESSION_BUSY);
detached.setChannel(atc.getChannel());
serverConnection.invoke(detached);
}
}
private String getPeerSessionName(final byte[] attachName)
{
try
{
return UUID.fromString(new String(attachName, StandardCharsets.UTF_8)).toString();
}
catch (RuntimeException e)
{
return createBase64OrSha1(attachName);
}
}
private String createBase64OrSha1(final byte[] attachName)
{
if (attachName.length <= BASE64_LIMIT)
{
return Base64.getEncoder().encodeToString(attachName);
}
else
{
return createSha1(attachName);
}
}
private String createSha1(final byte[] attachName)
{
try
{
final MessageDigest digest = MessageDigest.getInstance(MESSAGE_DIGEST_SHA1);
return Base64.getEncoder().encodeToString(digest.digest(attachName));
}
catch (NoSuchAlgorithmException e)
{
return Base64.getEncoder().encodeToString(attachName);
}
}
// for test purposes only
void setState(ConnectionState state)
{
_state = state;
}
private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
{
final Principal authorizedPrincipal = conn.getAuthorizedPrincipal();
final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections =
conn.getAddressSpace().getConnections().iterator();
while(connections.hasNext())
{
final AMQPConnection<?> amqConnectionModel = (AMQPConnection<?>) connections.next();
final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
? ""
: amqConnectionModel.getAuthorizedPrincipal().getName();
if (userId.equals(userName) && amqConnectionModel.hasSessionWithName(name))
{
return false;
}
}
return true;
}
@Override
public void connectionStartOk(ServerConnection serverConnection, ConnectionStartOk ok)
{
assertState(serverConnection, ConnectionState.AWAIT_START_OK);
_clientProperties = ok.getClientProperties();
if(_clientProperties != null)
{
Object compressionSupported =
_clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
if (compressionSupported != null)
{
_compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));
}
final AMQPConnection_0_10 protocolEngine = serverConnection.getAmqpConnection();
protocolEngine.setClientId(getStringClientProperty(ConnectionStartProperties.CLIENT_ID_0_10));
protocolEngine.setClientProduct(getStringClientProperty(ConnectionStartProperties.PRODUCT));
protocolEngine.setClientVersion(getStringClientProperty(ConnectionStartProperties.VERSION_0_10));
protocolEngine.setRemoteProcessPid(getStringClientProperty(ConnectionStartProperties.PID));
}
serverConnection.setLocale(ok.getLocale());
String mechanism = ok.getMechanism();
if (mechanism == null || mechanism.length() == 0)
{
serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
"No Sasl mechanism was specified");
return;
}
_saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism,
(SaslSettings) serverConnection.getAmqpConnection());
if (_saslNegotiator == null)
{
serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
"No SaslServer could be created for mechanism: " + mechanism);
}
else
{
secure(serverConnection, ok.getResponse());
}
}
private String getStringClientProperty(final String name)
{
return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name));
}
protected int getHeartbeatMax()
{
int delay = _port.getHeartbeatDelay();
return delay == 0 ? 0xFFFF : delay;
}
public boolean isCompressionSupported()
{
return _compressionSupported && _broker.isMessageCompressionEnabled();
}
private void connectionAuthFailed(final ServerConnection serverConnection, Exception e)
{
if (e != null)
{
serverConnection.exception(e);
}
serverConnection.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e == null ? "Authentication failed" : e.getMessage());
disposeSaslNegotiator();
}
private void disposeSaslNegotiator()
{
_saslNegotiator.dispose();
_saslNegotiator = null;
}
}