blob: 1ffd46dd2cdc6d06b4485920231780a761d8e3a5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0;
import static com.google.common.util.concurrent.Futures.allAsList;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
implements DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
SASLEndpoint,
AMQPConnection_1_0<AMQPConnection_1_0Impl>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.protocol.frame");
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
private static final byte[] SASL_HEADER = new byte[]
{
(byte) 'A',
(byte) 'M',
(byte) 'Q',
(byte) 'P',
(byte) 3,
(byte) 1,
(byte) 0,
(byte) 0
};
private static final byte[] AMQP_HEADER = new byte[]
{
(byte) 'A',
(byte) 'M',
(byte) 'Q',
(byte) 'P',
(byte) 0,
(byte) 1,
(byte) 0,
(byte) 0
};
private final FrameWriter _frameWriter;
private ProtocolHandler _frameHandler;
private volatile boolean _transportBlockedForWriting;
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
private boolean _blocking;
private final Object _blockingLock = new Object();
private List<Symbol> _offeredCapabilities;
private SoleConnectionEnforcementPolicy _soleConnectionEnforcementPolicy;
private static final int CONNECTION_CONTROL_CHANNEL = 0;
private final SubjectCreator _subjectCreator;
private int _channelMax = 0;
private int _maxFrameSize = 4096;
private String _remoteContainerId;
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
private Session_1_0[] _sendingSessions;
// positioned by the *incoming* channel
private Session_1_0[] _receivingSessions;
private volatile boolean _closedForOutput;
private final long _incomingIdleTimeout;
private volatile long _outgoingIdleTimeout;
private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
.registerMessagingLayer()
.registerTransactionLayer()
.registerSecurityLayer()
.registerExtensionSoleconnLayer();
private final Map<Symbol, Object> _properties = new LinkedHashMap<>();
private volatile boolean _saslComplete;
private volatile SaslNegotiator _saslNegotiator;
private String _localHostname;
private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
private Set<Symbol> _remoteDesiredCapabilities;
private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
private final Collection<Session_1_0>
_sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
private final Set<AMQPSession<?,?>> _sessionsWithWork =
Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());
// Multi session transactions
private final Map<Integer, ServerTransaction> _openTransactions = new ConcurrentSkipListMap<>();
private volatile boolean _sendSaslFinalChallengeAsChallenge;
private volatile String _closeCause;
AMQPConnection_1_0Impl(final Broker<?> broker,
final ServerNetworkConnection network,
AmqpPort<?> port,
Transport transport,
long id,
final AggregateTicker aggregateTicker)
{
super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
_subjectCreator = port.getSubjectCreator(transport.isSecure(), network.getSelectedHost());
List<Symbol> offeredCapabilities = new ArrayList<>();
offeredCapabilities.add(ANONYMOUS_RELAY);
offeredCapabilities.add(SHARED_SUBSCRIPTIONS);
offeredCapabilities.add(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER);
setOfferedCapabilities(offeredCapabilities);
setRemoteAddress(network.getRemoteAddress());
_incomingIdleTimeout = 1000L * port.getHeartbeatDelay();
_frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
}
@Override
protected void onOpen()
{
super.onOpen();
_sendSaslFinalChallengeAsChallenge = getContextValue(Boolean.class, AMQPConnection_1_0.SEND_SASL_FINAL_CHALLENGE_AS_CHALLENGE);
}
@Override
public void receiveSaslInit(final SaslInit saslInit)
{
assertState(ConnectionState.AWAIT_SASL_INIT);
if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim()))
{
_localHostname = saslInit.getHostname();
}
else if(getNetwork().getSelectedHost() != null)
{
_localHostname = getNetwork().getSelectedHost();
}
String mechanism = saslInit.getMechanism().toString();
final Binary initialResponse = saslInit.getInitialResponse();
byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
List<String> availableMechanisms =
_subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
if (!availableMechanisms.contains(mechanism))
{
handleSaslError();
}
else
{
_saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
processSaslResponse(response);
}
}
@Override
public void receiveSaslResponse(final SaslResponse saslResponse)
{
assertState(ConnectionState.AWAIT_SASL_RESPONSE);
final Binary responseBinary = saslResponse.getResponse();
byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
processSaslResponse(response);
}
@Override
public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
{
LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
closeSaslWithFailure();
}
@Override
public void receiveSaslChallenge(final SaslChallenge saslChallenge)
{
LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
closeSaslWithFailure();
}
@Override
public void receiveSaslOutcome(final SaslOutcome saslOutcome)
{
LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
closeSaslWithFailure();
}
private void processSaslResponse(final byte[] response)
{
byte[] challenge = null;
SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
if (authenticationResult == null)
{
authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
challenge = authenticationResult.getChallenge();
}
if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
{
final boolean finalChallenge = challenge != null && challenge.length != 0;
_successfulAuthenticationResult = authenticationResult;
if (_sendSaslFinalChallengeAsChallenge && finalChallenge)
{
continueSaslNegotiation(challenge);
}
else
{
setSubject(_successfulAuthenticationResult.getSubject());
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.OK);
if (finalChallenge)
{
outcome.setAdditionalData(new Binary(challenge));
}
send(new SASLFrame(outcome));
_saslComplete = true;
_connectionState = ConnectionState.AWAIT_AMQP_HEADER;
disposeSaslNegotiator();
}
}
else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
{
continueSaslNegotiation(challenge);
}
else
{
handleSaslError();
}
}
private void continueSaslNegotiation(final byte[] challenge)
{
SaslChallenge challengeBody = new SaslChallenge();
challengeBody.setChallenge(new Binary(challenge));
send(new SASLFrame(challengeBody));
_connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
}
private void handleSaslError()
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.AUTH);
send(new SASLFrame(outcome));
_saslComplete = true;
closeSaslWithFailure();
}
private void closeSaslWithFailure()
{
_saslComplete = true;
disposeSaslNegotiator();
_connectionState = ConnectionState.CLOSED;
addCloseTicker();
}
private void disposeSaslNegotiator()
{
if (_saslNegotiator != null)
{
_saslNegotiator.dispose();
}
_saslNegotiator = null;
}
private void setUserPrincipal(final Principal user)
{
setSubject(_subjectCreator.createSubjectWithGroups(user));
}
@Override
public long getIncomingIdleTimeout()
{
return _incomingIdleTimeout;
}
@Override
public long getOutgoingIdleTimeout()
{
return _outgoingIdleTimeout;
}
@Override
public void receiveAttach(final int channel, final Attach attach)
{
assertState(ConnectionState.OPENED);
final Session_1_0 session = getSession(channel);
if (session != null)
{
session.receiveAttach(attach);
}
else
{
closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
}
}
@Override
public void receive(final List<ChannelFrameBody> channelFrameBodies)
{
if (!channelFrameBodies.isEmpty())
{
PeekingIterator<ChannelFrameBody> itr = Iterators.peekingIterator(channelFrameBodies.iterator());
boolean cleanExit = false;
try
{
while (itr.hasNext())
{
final ChannelFrameBody channelFrameBody = itr.next();
final int frameChannel = channelFrameBody.getChannel();
Session_1_0 session = _receivingSessions == null || frameChannel >= _receivingSessions.length
? null
: _receivingSessions[frameChannel];
if (session != null)
{
final AccessControlContext context = session.getAccessControllerContext();
AccessController.doPrivileged((PrivilegedAction<Void>) () ->
{
ChannelFrameBody channelFrame = channelFrameBody;
boolean nextIsSameChannel;
do
{
received(frameChannel, channelFrame.getFrameBody());
nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel();
if (nextIsSameChannel)
{
channelFrame = itr.next();
}
}
while (nextIsSameChannel);
return null;
}, context);
}
else
{
received(frameChannel, channelFrameBody.getFrameBody());
}
}
cleanExit = true;
}
finally
{
if (!cleanExit)
{
while (itr.hasNext())
{
final Object frameBody = itr.next().getFrameBody();
if (frameBody instanceof Transfer)
{
((Transfer) frameBody).dispose();
}
}
}
}
}
}
private void received(int channel, Object val)
{
if (channel > getChannelMax())
{
Error error = new Error(ConnectionError.FRAMING_ERROR,
String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
handleError(error);
return;
}
FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, val);
if (val instanceof FrameBody)
{
((FrameBody) val).invoke(channel, this);
}
else if (val instanceof SaslFrameBody)
{
((SaslFrameBody) val).invoke(channel, this);
}
}
@Override
public void receiveClose(final int channel, final Close close)
{
switch (_connectionState)
{
case AWAIT_AMQP_OR_SASL_HEADER:
case AWAIT_SASL_INIT:
case AWAIT_SASL_RESPONSE:
case AWAIT_AMQP_HEADER:
throw new ConnectionScopedRuntimeException("Received unexpected close when AMQP connection has not been established.");
case AWAIT_OPEN:
closeReceived();
closeConnection(ConnectionError.CONNECTION_FORCED,
"Connection close sent before connection was opened");
break;
case OPENED:
_connectionState = ConnectionState.CLOSE_RECEIVED;
closeReceived();
if(close.getError() != null)
{
final Error error = close.getError();
ErrorCondition condition = error.getCondition();
Symbol errorCondition = condition == null ? null : condition.getValue();
LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
errorCondition, close.getError().getDescription());
}
sendClose(new Close());
_connectionState = ConnectionState.CLOSED;
_orderlyClose.set(true);
addCloseTicker();
break;
case CLOSE_SENT:
closeReceived();
_connectionState = ConnectionState.CLOSED;
_orderlyClose.set(true);
break;
case CLOSE_RECEIVED:
case CLOSED:
break;
default:
throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
}
}
private void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
for (final Session_1_0 session : sessions)
{
AccessController.doPrivileged((PrivilegedAction<Object>) () ->
{
session.remoteEnd(new End());
return null;
}, session.getAccessControllerContext());
}
}
@Override
public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
{
return _describedTypeRegistry;
}
@Override
public SectionDecoderRegistry getSectionDecoderRegistry()
{
return _describedTypeRegistry.getSectionDecoderRegistry();
}
@Override
public boolean isClosed()
{
return _connectionState == ConnectionState.CLOSED
|| _connectionState == ConnectionState.CLOSE_RECEIVED;
}
@Override
public boolean isClosing()
{
return _connectionState == ConnectionState.CLOSED
|| _connectionState == ConnectionState.CLOSE_RECEIVED
|| _connectionState == ConnectionState.CLOSE_SENT;
}
@Override
public boolean closedForInput()
{
return _connectionState == ConnectionState.CLOSE_RECEIVED || _connectionState == ConnectionState.CLOSED ;
}
@Override
public void sessionEnded(final Session_1_0 session)
{
_sessions.remove(session);
}
private void inputClosed()
{
if (!closedForInput())
{
FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
_connectionState = ConnectionState.CLOSED;
closeSender();
closeReceived();
}
}
private void closeSender()
{
setClosedForOutput(true);
}
@Override
public String getRemoteContainerId()
{
return _remoteContainerId;
}
public boolean isOpen()
{
return _connectionState == ConnectionState.OPENED;
}
@Override
public void sendEnd(final int channel, final End end, final boolean remove)
{
sendFrame(channel, end);
if (remove)
{
_sendingSessions[channel] = null;
}
}
@Override
public void receiveEnd(final int channel, final End end)
{
assertState(ConnectionState.OPENED);
final Session_1_0 session = getSession(channel);
if (session != null)
{
_receivingSessions[channel] = null;
session.receiveEnd(end);
}
else
{
closeConnectionWithInvalidChannel(channel, end);
}
}
private void closeConnectionWithInvalidChannel(final int channel, final FrameBody frame)
{
closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
}
@Override
public void receiveDisposition(final int channel,
final Disposition disposition)
{
assertState(ConnectionState.OPENED);
final Session_1_0 session = getSession(channel);
if (session != null)
{
session.receiveDisposition(disposition);
}
else
{
closeConnectionWithInvalidChannel(channel, disposition);
}
}
@Override
public void receiveBegin(final int receivingChannelId, final Begin begin)
{
assertState(ConnectionState.OPENED);
if (begin.getRemoteChannel() != null)
{
closeConnection(ConnectionError.FRAMING_ERROR,
"BEGIN received on channel "
+ receivingChannelId
+ " with given remote-channel "
+ begin.getRemoteChannel()
+ ". Since the broker does not spontaneously start channels, this must be an error.");
}
else // Peer requesting session creation
{
if (_receivingSessions[receivingChannelId] == null)
{
int sendingChannelId = getFirstFreeChannel();
if (sendingChannelId == -1)
{
closeConnection(ConnectionError.FRAMING_ERROR,
"BEGIN received on channel "
+ receivingChannelId
+ ". There are no free channels for the broker to respond on.");
}
else
{
Session_1_0 session = new Session_1_0(this,
begin,
sendingChannelId,
receivingChannelId,
getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
session.create();
_receivingSessions[receivingChannelId] = session;
_sendingSessions[sendingChannelId] = session;
Begin beginToSend = new Begin();
beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
beginToSend.setNextOutgoingId(session.getNextOutgoingId());
beginToSend.setOutgoingWindow(session.getOutgoingWindow());
beginToSend.setIncomingWindow(session.getIncomingWindow());
sendFrame(sendingChannelId, beginToSend);
synchronized (_blockingLock)
{
_sessions.add(session);
if (_blocking)
{
session.block();
}
}
}
}
else
{
closeConnection(ConnectionError.FRAMING_ERROR,
"BEGIN received on channel " + receivingChannelId + " which is already in use.");
}
}
}
private int getFirstFreeChannel()
{
for (int i = 0; i <= _channelMax; i++)
{
if (_sendingSessions[i] == null)
{
return i;
}
}
return -1;
}
@Override
public void handleError(final Error error)
{
if (!_closedForOutput)
{
closeConnection(error);
}
}
@Override
public void receiveTransfer(final int channel, final Transfer transfer)
{
assertState(ConnectionState.OPENED);
final Session_1_0 session = getSession(channel);
if (session != null)
{
session.receiveTransfer(transfer);
}
else
{
closeConnectionWithInvalidChannel(channel, transfer);
}
}
@Override
public void receiveFlow(final int channel, final Flow flow)
{
assertState(ConnectionState.OPENED);
final Session_1_0 session = getSession(channel);
if (session != null)
{
session.receiveFlow(flow);
}
else
{
closeConnectionWithInvalidChannel(channel, flow);
}
}
@Override
public void receiveOpen(final int channel, final Open open)
{
assertState(ConnectionState.AWAIT_OPEN);
int channelMax = getPort().getSessionCountLimit() - 1;
_channelMax = open.getChannelMax() == null ? channelMax
: open.getChannelMax().intValue() < channelMax
? open.getChannelMax().intValue()
: channelMax;
if (_receivingSessions == null)
{
_receivingSessions = new Session_1_0[_channelMax + 1];
_sendingSessions = new Session_1_0[_channelMax + 1];
}
_maxFrameSize = open.getMaxFrameSize() == null
|| open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
? getBroker().getNetworkBufferSize()
: open.getMaxFrameSize().intValue();
_remoteContainerId = open.getContainerId();
if(open.getHostname() != null && !"".equals(open.getHostname().trim()))
{
_localHostname = open.getHostname();
}
if(_localHostname == null || "".equals(_localHostname.trim()) && getNetwork().getSelectedHost() != null)
{
_localHostname = getNetwork().getSelectedHost();
}
if (open.getIdleTimeOut() != null)
{
_outgoingIdleTimeout = open.getIdleTimeOut().longValue();
}
final Map<Symbol, Object> remoteProperties = open.getProperties() == null
? Collections.emptyMap()
: Collections.unmodifiableMap(new LinkedHashMap<>(open.getProperties()));
_remoteDesiredCapabilities = open.getDesiredCapabilities() == null ? Collections.emptySet() : Sets.newHashSet(open.getDesiredCapabilities());
if (remoteProperties.containsKey(Symbol.valueOf("product")))
{
setClientProduct(remoteProperties.get(Symbol.valueOf("product")).toString());
}
if (remoteProperties.containsKey(Symbol.valueOf("version")))
{
setClientVersion(remoteProperties.get(Symbol.valueOf("version")).toString());
}
setClientId(_remoteContainerId);
if (_remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
{
if (remoteProperties != null && remoteProperties.containsKey(SOLE_CONNECTION_ENFORCEMENT_POLICY))
{
try
{
_soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.valueOf(remoteProperties.get(
SOLE_CONNECTION_ENFORCEMENT_POLICY));
}
catch (IllegalArgumentException e)
{
closeConnection(AmqpError.INVALID_FIELD, e.getMessage());
return;
}
}
else
{
_soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.REFUSE_CONNECTION;
}
}
if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
closeConnection(ConnectionError.CONNECTION_FORCED,
"Requested idle timeout of "
+ _outgoingIdleTimeout
+ " is too low. The minimum supported timeout is"
+ MINIMUM_SUPPORTED_IDLE_TIMEOUT);
}
else
{
initialiseHeartbeating(_outgoingIdleTimeout / 2L, _incomingIdleTimeout);
final NamedAddressSpace addressSpace = getPort().getAddressSpace(_localHostname);
if (addressSpace == null)
{
closeConnection(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
}
else
{
receiveOpenInternal(addressSpace);
}
}
}
private void receiveOpenInternal(final NamedAddressSpace addressSpace)
{
if (!addressSpace.isActive())
{
final Error err = new Error();
populateConnectionRedirect(addressSpace, err);
closeConnection(err);
}
else
{
if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
{
closeConnection(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
}
else
{
try
{
boolean registerSucceeded = addressSpace.registerConnection(this, (existingConnections, newConnection) ->
{
boolean proceedWithRegistration = true;
if (newConnection instanceof AMQPConnection_1_0Impl && !newConnection.isClosing())
{
List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>();
for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false)
.filter(con -> con instanceof AMQPConnection_1_0)
.filter(con -> !con.isClosing())
.filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
.collect(Collectors.toList()))
{
SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null;
if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy
!= null)
{
soleConnectionEnforcementPolicy =
((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy;
}
else if (((AMQPConnection_1_0Impl) newConnection)._soleConnectionEnforcementPolicy != null)
{
soleConnectionEnforcementPolicy =
((AMQPConnection_1_0Impl) newConnection)._soleConnectionEnforcementPolicy;
}
if (SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy))
{
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
Error error = new Error(AmqpError.INVALID_FIELD,
String.format(
"Connection closed due to sole-connection-enforcement-policy '%s'",
soleConnectionEnforcementPolicy.toString()));
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
proceedWithRegistration = false;
break;
}
else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
{
final Error error = new Error(AmqpError.RESOURCE_LOCKED,
String.format(
"Connection closed due to sole-connection-enforcement-policy '%s'",
soleConnectionEnforcementPolicy.toString()));
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true));
rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
() -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error)));
proceedWithRegistration = false;
}
}
if (!rescheduleFutures.isEmpty())
{
doAfter(allAsList(rescheduleFutures), () -> newConnection.doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace)));
}
}
return proceedWithRegistration;
});
if (registerSucceeded)
{
setAddressSpace(addressSpace);
if (!addressSpace.authoriseCreateConnection(this))
{
closeConnection(AmqpError.NOT_ALLOWED, "Connection refused");
}
else
{
switch (_connectionState)
{
case AWAIT_OPEN:
sendOpen(_channelMax, _maxFrameSize);
_connectionState = ConnectionState.OPENED;
break;
case CLOSE_SENT:
case CLOSED:
// already sent our close - probably due to an error
break;
default:
throw new ConnectionScopedRuntimeException(String.format(
"Unexpected state %s during connection open.", _connectionState));
}
}
}
}
catch (VirtualHostUnavailableException | AccessControlException e)
{
closeConnection(AmqpError.NOT_ALLOWED, e.getMessage());
}
}
}
}
private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err)
{
final String redirectHost = addressSpace.getRedirectHost(getPort());
if(redirectHost == null)
{
err.setCondition(ConnectionError.CONNECTION_FORCED);
err.setDescription("Virtual host '" + _localHostname + "' is not active");
}
else
{
err.setCondition(ConnectionError.REDIRECT);
String networkHost;
int port;
if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
{
// IPv6 case
networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
if(redirectHost.contains("]:"))
{
port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
}
else
{
port = -1;
}
}
else
{
if(redirectHost.contains(":"))
{
networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
try
{
String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
port = Integer.parseInt(portString);
}
catch (NumberFormatException e)
{
port = -1;
}
}
else
{
networkHost = redirectHost;
port = -1;
}
}
final Map<Symbol, Object> infoMap = new HashMap<>();
infoMap.put(Symbol.valueOf("network-host"), networkHost);
if(port > 0)
{
infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
}
err.setInfo(infoMap);
}
}
@Override
public void receiveDetach(final int channel, final Detach detach)
{
assertState(ConnectionState.OPENED);
final Session_1_0 session = getSession(channel);
if (session != null)
{
session.receiveDetach(detach);
}
else
{
closeConnectionWithInvalidChannel(channel, detach);
}
}
private void transportStateChanged()
{
for (Session_1_0 session : _sessions)
{
session.transportStateChanged();
}
}
@Override
public void close(final Error error)
{
closeConnection(error);
}
private void setRemoteAddress(final SocketAddress remoteAddress)
{
_remoteAddress = remoteAddress;
}
public void setOfferedCapabilities(final List<Symbol> offeredCapabilities)
{
_offeredCapabilities = offeredCapabilities;
}
private void setClosedForOutput(final boolean closed)
{
_closedForOutput = closed;
}
@Override
public String getLocalFQDN()
{
return _localHostname != null ? _localHostname : super.getLocalFQDN();
}
@Override
public int getMaxFrameSize()
{
return _maxFrameSize;
}
@Override
public int getChannelMax()
{
return _channelMax;
}
@Override
public Object getReference()
{
return _reference;
}
private void endpointClosed()
{
try
{
performDeleteTasks();
closeReceived();
}
finally
{
NamedAddressSpace virtualHost = getAddressSpace();
if (virtualHost != null)
{
virtualHost.deregisterConnection(this);
}
}
}
private void closeConnection(ErrorCondition errorCondition, String description)
{
closeConnection(new Error(errorCondition, description));
}
private void closeConnection(final Error error)
{
LOGGER.debug("Closing connection {} (state={}) due to {}", this, _connectionState, error);
_closeCause = error.getDescription();
Close close = new Close();
close.setError(error);
switch (_connectionState)
{
case AWAIT_AMQP_OR_SASL_HEADER:
case AWAIT_SASL_INIT:
case AWAIT_SASL_RESPONSE:
case AWAIT_AMQP_HEADER:
throw new ConnectionScopedRuntimeException("Connection is closed before being fully established: " + error.getDescription());
case AWAIT_OPEN:
sendOpen(0, 0);
sendClose(close);
_connectionState = ConnectionState.CLOSED;
getSender().close();
break;
case OPENED:
sendClose(close);
_connectionState = ConnectionState.CLOSE_SENT;
addCloseTicker();
break;
case CLOSE_RECEIVED:
sendClose(close);
_connectionState = ConnectionState.CLOSED;
addCloseTicker();
break;
case CLOSE_SENT:
case CLOSED:
// already sent our close - too late to do anything more
break;
default:
throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
}
}
@Override
public int sendFrame(final int channel, final FrameBody body, final QpidByteBuffer payload)
{
if (!_closedForOutput)
{
ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
if (payload == null)
{
send(new TransportFrame(channel, body));
return 0;
}
else
{
int size = writer.getEncodedSize();
int maxPayloadSize = _maxFrameSize - (size + 9);
long payloadLength = (long) payload.remaining();
if (payloadLength <= maxPayloadSize)
{
send(new TransportFrame(channel, body, payload));
return (int)payloadLength;
}
else
{
((Transfer) body).setMore(Boolean.TRUE);
writer = _describedTypeRegistry.getValueWriter(body);
size = writer.getEncodedSize();
maxPayloadSize = _maxFrameSize - (size + 9);
try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize))
{
payload.position(payload.position() + maxPayloadSize);
send(new TransportFrame(channel, body, payloadDup));
}
return maxPayloadSize;
}
}
}
else
{
return -1;
}
}
@Override
public void sendFrame(final int channel, final FrameBody body)
{
sendFrame(channel, body, null);
}
public ByteBufferSender getSender()
{
return getNetwork().getSender();
}
@Override
public void writerIdle()
{
send(TransportFrame.HEARTBEAT);
}
@Override
public void readerIdle()
{
AccessController.doPrivileged((PrivilegedAction<Object>) () ->
{
getEventLogger().message(ConnectionMessages.IDLE_CLOSE("", false));
getNetwork().close();
return null;
}, getAccessControllerContext());
}
@Override
public void encryptedTransport()
{
}
public String getAddress()
{
return getNetwork().getRemoteAddress().toString();
}
@Override
protected void onReceive(final QpidByteBuffer msg)
{
try
{
int remaining;
try
{
do
{
remaining = msg.remaining();
switch (_connectionState)
{
case AWAIT_AMQP_OR_SASL_HEADER:
case AWAIT_AMQP_HEADER:
if (remaining >= 8)
{
processProtocolHeader(msg);
}
break;
case AWAIT_SASL_INIT:
case AWAIT_SASL_RESPONSE:
case AWAIT_OPEN:
case OPENED:
case CLOSE_SENT:
_frameHandler.parse(msg);
break;
case CLOSE_RECEIVED:
case CLOSED:
// ignore;
break;
}
}
while (msg.remaining() != remaining);
}
finally
{
receivedComplete();
}
}
catch (IllegalArgumentException | IllegalStateException e)
{
throw new ConnectionScopedRuntimeException(e);
}
}
@Override
public void receivedComplete()
{
if (_receivingSessions != null)
{
for (final Session_1_0 session : _receivingSessions)
{
if (session != null)
{
final AccessControlContext context = session.getAccessControllerContext();
AccessController.doPrivileged((PrivilegedAction<Void>) () ->
{
session.receivedComplete();
return null;
}, context);
}
}
}
}
private void processProtocolHeader(final QpidByteBuffer msg)
{
if(msg.remaining() >= 8)
{
byte[] header = new byte[8];
msg.get(header);
final AuthenticationProvider<?> authenticationProvider = getPort().getAuthenticationProvider();
if(Arrays.equals(header, SASL_HEADER))
{
if(_saslComplete)
{
throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
}
try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(SASL_HEADER))
{
getSender().send(protocolHeader);
}
SaslMechanisms mechanisms = new SaslMechanisms();
ArrayList<Symbol> mechanismsList = new ArrayList<>();
for (String name : authenticationProvider.getAvailableMechanisms(getTransport().isSecure()))
{
mechanismsList.add(Symbol.valueOf(name));
}
mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
send(new SASLFrame(mechanisms));
_connectionState = ConnectionState.AWAIT_SASL_INIT;
_frameHandler = getFrameHandler(true);
}
else if(Arrays.equals(header, AMQP_HEADER))
{
if(!_saslComplete)
{
final List<String> mechanisms = authenticationProvider.getAvailableMechanisms(getTransport().isSecure());
if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
{
setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
}
else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
{
setUserPrincipal(new AuthenticatedPrincipal(((AnonymousAuthenticationManager) authenticationProvider).getAnonymousPrincipal()));
}
else
{
LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", getLogSubject());
_connectionState = ConnectionState.CLOSED;
getNetwork().close();
}
}
try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(AMQP_HEADER))
{
getSender().send(protocolHeader);
}
_connectionState = ConnectionState.AWAIT_OPEN;
_frameHandler = getFrameHandler(false);
}
else
{
LOGGER.warn("{} : unknown AMQP header {}", getLogSubject(), Functions.str(header));
_connectionState = ConnectionState.CLOSED;
getNetwork().close();
}
}
}
private FrameHandler getFrameHandler(final boolean sasl)
{
return new FrameHandler(new ValueHandler(this.getDescribedTypeRegistry()), this, sasl);
}
@Override
public void closed()
{
try
{
inputClosed();
}
catch(RuntimeException e)
{
LOGGER.error("Exception while closing", e);
}
finally
{
try
{
endpointClosed();
}
finally
{
markTransportClosed();
}
}
}
private void send(final AMQFrame amqFrame)
{
updateLastWriteTime();
FRAME_LOGGER.debug("SEND[{}|{}] : {}",
getNetwork().getRemoteAddress(),
amqFrame.getChannel(),
amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());
int size = _frameWriter.send(amqFrame);
if (size > getMaxFrameSize())
{
throw new OversizeFrameException(amqFrame, size);
}
}
private void addCloseTicker()
{
long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, getNetwork()));
// trigger a wakeup to ensure the ticker will be taken into account
notifyWork();
}
@Override
public boolean isTransportBlockedForWriting()
{
return _transportBlockedForWriting;
}
@Override
public void setTransportBlockedForWriting(final boolean blocked)
{
if(_transportBlockedForWriting != blocked)
{
_transportBlockedForWriting = blocked;
transportStateChanged();
}
}
@Override
public Iterator<Runnable> processPendingIterator()
{
if (isIOThread())
{
return new ProcessPendingIterator();
}
else
{
return Collections.emptyIterator();
}
}
@Override
public boolean hasWork()
{
return _stateChanged.get();
}
@Override
public void notifyWork()
{
_stateChanged.set(true);
final Action<ProtocolEngine> listener = _workListener.get();
if(listener != null)
{
listener.performAction(this);
}
}
@Override
public void notifyWork(final AMQPSession<?,?> sessionModel)
{
_sessionsWithWork.add(sessionModel);
notifyWork();
}
@Override
public void clearWork()
{
_stateChanged.set(false);
}
@Override
public void setWorkListener(final Action<ProtocolEngine> listener)
{
_workListener.set(listener);
}
@Override
public boolean hasSessionWithName(final byte[] name)
{
return false;
}
@Override
public void sendConnectionCloseAsync(final CloseReason reason, final String description)
{
stopConnection();
final ErrorCondition cause;
switch(reason)
{
case MANAGEMENT:
cause = ConnectionError.CONNECTION_FORCED;
break;
case TRANSACTION_TIMEOUT:
cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
break;
default:
cause = AmqpError.INTERNAL_ERROR;
}
Action<ConnectionHandler> action = object -> closeConnection(cause, description);
addAsyncTask(action);
}
@Override
public void closeSessionAsync(final AMQPSession<?,?> session,
final CloseReason reason, final String message)
{
final ErrorCondition cause;
switch(reason)
{
case MANAGEMENT:
cause = ConnectionError.CONNECTION_FORCED;
break;
case TRANSACTION_TIMEOUT:
cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
break;
default:
cause = AmqpError.INTERNAL_ERROR;
}
addAsyncTask(object -> AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run()
{
((Session_1_0)session).close(cause, message);
return null;
}
}, ((Session_1_0)session).getAccessControllerContext()));
}
@Override
public void block()
{
synchronized (_blockingLock)
{
if (!_blocking)
{
_blocking = true;
doOnIOThreadAsync(this::doBlock);
}
}
}
private void doBlock()
{
for(Session_1_0 session : _sessions)
{
session.block();
}
}
@Override
public String getRemoteContainerName()
{
return _remoteContainerId;
}
@Override
public Collection<? extends Session_1_0> getSessionModels()
{
return Collections.unmodifiableCollection(_sessions);
}
@Override
public void unblock()
{
synchronized (_blockingLock)
{
if(_blocking)
{
_blocking = false;
doOnIOThreadAsync(this::doUnblock);
}
}
}
private void doUnblock()
{
for(Session_1_0 session : _sessions)
{
session.unblock();
}
}
@Override
public int getSessionCountLimit()
{
return _channelMax + 1;
}
@Override
public boolean isOrderlyClose()
{
return _orderlyClose.get();
}
@Override
protected String getCloseCause()
{
return _closeCause;
}
@Override
public boolean getSendSaslFinalChallengeAsChallenge()
{
return _sendSaslFinalChallengeAsChallenge;
}
@Override
protected void addAsyncTask(final Action<? super ConnectionHandler> action)
{
_asyncTaskList.add(action);
notifyWork();
}
private void sendOpen(final int channelMax, final int maxFrameSize)
{
Open open = new Open();
Map<String,Object> props = Collections.emptyMap();
for(ConnectionPropertyEnricher enricher : getPort().getConnectionPropertyEnrichers())
{
props = enricher.addConnectionProperties(this, props);
}
for(Map.Entry<String,Object> entry : props.entrySet())
{
_properties.put(Symbol.valueOf(entry.getKey()), entry.getValue());
}
if (_receivingSessions == null)
{
_receivingSessions = new Session_1_0[channelMax + 1];
_sendingSessions = new Session_1_0[channelMax + 1];
}
if (channelMax < _channelMax)
{
_channelMax = channelMax;
}
open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
open.setContainerId(getAddressSpace() == null ? UUID.randomUUID().toString() : getAddressSpace().getId().toString());
open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
// TODO - should we try to set the hostname based on the connection information?
// open.setHostname();
open.setIdleTimeOut(UnsignedInteger.valueOf(_incomingIdleTimeout));
// set the offered capabilities
if(_offeredCapabilities != null && !_offeredCapabilities.isEmpty())
{
open.setOfferedCapabilities(_offeredCapabilities.toArray(new Symbol[_offeredCapabilities.size()]));
}
if (_remoteDesiredCapabilities != null
&& _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
{
_properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
SoleConnectionDetectionPolicy.STRONG);
}
if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
{
_properties.put(SOLE_CONNECTION_ENFORCEMENT_POLICY, SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
}
open.setProperties(_properties);
sendFrame(CONNECTION_CONTROL_CHANNEL, open);
}
private Session_1_0 getSession(final int channel)
{
Session_1_0 session = _receivingSessions[channel];
if (session == null)
{
Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
handleError(error);
}
return session;
}
private void sendClose(Close closeToSend)
{
sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
closeSender();
}
private void assertState(final ConnectionState state)
{
if (_connectionState != state)
{
throw new ConnectionScopedRuntimeException(String.format(
"Unexpected state, client has sent frame in an illegal order. Required state: %s, actual state: %s",
state,
_connectionState));
}
}
private class ProcessPendingIterator implements Iterator<Runnable>
{
private Iterator<? extends AMQPSession<?,?>> _sessionIterator;
private ProcessPendingIterator()
{
_sessionIterator = _sessionsWithWork.iterator();
}
@Override
public boolean hasNext()
{
return (!_sessionsWithWork.isEmpty() && !isClosed() && !isConnectionStopped()) || !_asyncTaskList.isEmpty();
}
@Override
public Runnable next()
{
if(!_sessionsWithWork.isEmpty())
{
if(isClosed() || isConnectionStopped())
{
final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
if(asyncAction != null)
{
return () -> asyncAction.performAction(AMQPConnection_1_0Impl.this);
}
else
{
return () -> { };
}
}
else
{
if (!_sessionIterator.hasNext())
{
_sessionIterator = _sessionsWithWork.iterator();
}
final AMQPSession<?,?> session = _sessionIterator.next();
return () ->
{
_sessionIterator.remove();
if (session.processPending())
{
_sessionsWithWork.add(session);
}
};
}
}
else if(!_asyncTaskList.isEmpty())
{
final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
return () -> asyncAction.performAction(AMQPConnection_1_0Impl.this);
}
else
{
throw new NoSuchElementException();
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
@Override
public Iterator<ServerTransaction> getOpenTransactions()
{
return _openTransactions.values().iterator();
}
@Override
public IdentifiedTransaction createIdentifiedTransaction()
{
final LocalTransaction serverTransaction = createLocalTransaction();
int id = 0;
while (id >= 0 && _openTransactions.putIfAbsent(id, serverTransaction) != null)
{
id++;
}
if (id < 0)
{
throw new IllegalStateException("Unsupported state, too many opened transactions");
}
return new IdentifiedTransaction(id, serverTransaction);
}
@Override
public ServerTransaction getTransaction(final int txnId)
{
return Optional.ofNullable(_openTransactions.get(txnId))
.orElseThrow(() -> new UnknownTransactionException(txnId));
}
@Override
public void removeTransaction(final int txnId)
{
Optional.ofNullable(_openTransactions.remove(txnId))
.orElseThrow(() -> new UnknownTransactionException(txnId));
}
@Override
protected boolean isOpeningInProgress()
{
switch (_connectionState)
{
case AWAIT_AMQP_OR_SASL_HEADER:
case AWAIT_SASL_INIT:
case AWAIT_SASL_RESPONSE:
case AWAIT_AMQP_HEADER:
case AWAIT_OPEN:
return true;
case OPENED:
case CLOSE_RECEIVED:
case CLOSE_SENT:
case CLOSED:
return false;
default:
throw new IllegalStateException("Unsupported state " + _connectionState);
}
}
}