/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.protocol.v1_0;

import static com.google.common.util.concurrent.Futures.allAsList;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;

import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy;
import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.transport.util.Functions;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;

public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
        implements DescribedTypeConstructorRegistry.Source,
        ValueWriter.Registry.Source,
        SASLEndpoint,
        AMQPConnection_1_0<AMQPConnection_1_0Impl>
{

    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("org.apache.qpid.server.protocol.frame");

    private final AtomicBoolean _stateChanged = new AtomicBoolean();
    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();


    private static final byte[] SASL_HEADER = new byte[]
            {
                    (byte) 'A',
                    (byte) 'M',
                    (byte) 'Q',
                    (byte) 'P',
                    (byte) 3,
                    (byte) 1,
                    (byte) 0,
                    (byte) 0
            };

    private static final byte[] AMQP_HEADER = new byte[]
            {
                    (byte) 'A',
                    (byte) 'M',
                    (byte) 'Q',
                    (byte) 'P',
                    (byte) 0,
                    (byte) 1,
                    (byte) 0,
                    (byte) 0
            };

    private final FrameWriter _frameWriter;
    private ProtocolHandler _frameHandler;
    private volatile boolean _transportBlockedForWriting;
    private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
    private boolean _blocking;
    private final Object _blockingLock = new Object();
    private List<Symbol> _offeredCapabilities;
    private SoleConnectionEnforcementPolicy _soleConnectionEnforcementPolicy;

    private static final int CONNECTION_CONTROL_CHANNEL = 0;
    private final SubjectCreator _subjectCreator;

    private int _channelMax = 0;
    private int _maxFrameSize = 4096;
    private String _remoteContainerId;

    private SocketAddress _remoteAddress;

    // positioned by the *outgoing* channel
    private Session_1_0[] _sendingSessions;

    // positioned by the *incoming* channel
    private Session_1_0[] _receivingSessions;
    private volatile boolean _closedForOutput;

    private final long _incomingIdleTimeout;

    private volatile long _outgoingIdleTimeout;

    private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;

    private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
            .registerTransportLayer()
            .registerMessagingLayer()
            .registerTransactionLayer()
            .registerSecurityLayer()
            .registerExtensionSoleconnLayer();

    private final Map<Symbol, Object> _properties = new LinkedHashMap<>();
    private volatile boolean _saslComplete;

    private volatile SaslNegotiator _saslNegotiator;
    private String _localHostname;

    private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;

    private Set<Symbol> _remoteDesiredCapabilities;

    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);

    private final Collection<Session_1_0>
            _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());

    private final Object _reference = new Object();

    private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
            new ConcurrentLinkedQueue<>();

    private final Set<AMQPSession<?,?>> _sessionsWithWork =
            Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());


    // Multi session transactions
    private final Map<Integer, ServerTransaction> _openTransactions = new ConcurrentSkipListMap<>();
    private volatile boolean _sendSaslFinalChallengeAsChallenge;
    private volatile String _closeCause;

    AMQPConnection_1_0Impl(final Broker<?> broker,
                           final ServerNetworkConnection network,
                           AmqpPort<?> port,
                           Transport transport,
                           long id,
                           final AggregateTicker aggregateTicker)
    {
        super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);

        _subjectCreator = port.getSubjectCreator(transport.isSecure(), network.getSelectedHost());

        List<Symbol> offeredCapabilities = new ArrayList<>();
        offeredCapabilities.add(ANONYMOUS_RELAY);
        offeredCapabilities.add(SHARED_SUBSCRIPTIONS);
        offeredCapabilities.add(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER);

        setOfferedCapabilities(offeredCapabilities);

        setRemoteAddress(network.getRemoteAddress());

        _incomingIdleTimeout = 1000L * port.getHeartbeatDelay();

        _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
    }

    @Override
    protected void onOpen()
    {
        super.onOpen();
        _sendSaslFinalChallengeAsChallenge = getContextValue(Boolean.class, AMQPConnection_1_0.SEND_SASL_FINAL_CHALLENGE_AS_CHALLENGE);
    }

    @Override
    public void receiveSaslInit(final SaslInit saslInit)
    {
        assertState(ConnectionState.AWAIT_SASL_INIT);
        if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim()))
        {
            _localHostname = saslInit.getHostname();
        }
        else if(getNetwork().getSelectedHost() != null)
        {
            _localHostname = getNetwork().getSelectedHost();
        }
        String mechanism = saslInit.getMechanism().toString();
        final Binary initialResponse = saslInit.getInitialResponse();
        byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();

        List<String> availableMechanisms =
                _subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
        if (!availableMechanisms.contains(mechanism))
        {
            handleSaslError();
        }
        else
        {
            _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
            processSaslResponse(response);
        }
    }

    @Override
    public void receiveSaslResponse(final SaslResponse saslResponse)
    {
        assertState(ConnectionState.AWAIT_SASL_RESPONSE);
        final Binary responseBinary = saslResponse.getResponse();
        byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();

        processSaslResponse(response);
    }

    @Override
    public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
    {
        LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
        closeSaslWithFailure();
    }

    @Override
    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
    {
        LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
        closeSaslWithFailure();
    }

    @Override
    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
    {
        LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
        closeSaslWithFailure();
    }

    private void processSaslResponse(final byte[] response)
    {
        byte[] challenge = null;
        SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
        if (authenticationResult == null)
        {
            authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
            challenge = authenticationResult.getChallenge();
        }

        if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
        {
            final boolean finalChallenge = challenge != null && challenge.length != 0;
            _successfulAuthenticationResult = authenticationResult;
            if (_sendSaslFinalChallengeAsChallenge && finalChallenge)
            {
                continueSaslNegotiation(challenge);
            }
            else
            {
                setSubject(_successfulAuthenticationResult.getSubject());
                SaslOutcome outcome = new SaslOutcome();
                outcome.setCode(SaslCode.OK);
                if (finalChallenge)
                {
                    outcome.setAdditionalData(new Binary(challenge));
                }
                send(new SASLFrame(outcome));
                _saslComplete = true;
                _connectionState = ConnectionState.AWAIT_AMQP_HEADER;
                disposeSaslNegotiator();
            }
        }
        else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
        {
            continueSaslNegotiation(challenge);
        }
        else
        {
            handleSaslError();
        }
    }

    private void continueSaslNegotiation(final byte[] challenge)
    {
        SaslChallenge challengeBody = new SaslChallenge();
        challengeBody.setChallenge(new Binary(challenge));
        send(new SASLFrame(challengeBody));

        _connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
    }

    private void handleSaslError()
    {
        SaslOutcome outcome = new SaslOutcome();
        outcome.setCode(SaslCode.AUTH);
        send(new SASLFrame(outcome));
        _saslComplete = true;
        closeSaslWithFailure();
    }

    private void closeSaslWithFailure()
    {
        _saslComplete = true;
        disposeSaslNegotiator();
        _connectionState = ConnectionState.CLOSED;
        addCloseTicker();
    }

    private void disposeSaslNegotiator()
    {
        if (_saslNegotiator != null)
        {
            _saslNegotiator.dispose();
        }
        _saslNegotiator = null;
    }

    private void setUserPrincipal(final Principal user)
    {
        setSubject(_subjectCreator.createSubjectWithGroups(user));
    }

    @Override
    public long getIncomingIdleTimeout()
    {
        return _incomingIdleTimeout;
    }

    @Override
    public long getOutgoingIdleTimeout()
    {
        return _outgoingIdleTimeout;
    }


    @Override
    public void receiveAttach(final int channel, final Attach attach)
    {
        assertState(ConnectionState.OPENED);
        final Session_1_0 session = getSession(channel);
        if (session != null)
        {
            session.receiveAttach(attach);
        }
        else
        {
            closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
        }
    }

    @Override
    public void receive(final List<ChannelFrameBody> channelFrameBodies)
    {
        if (!channelFrameBodies.isEmpty())
        {
            PeekingIterator<ChannelFrameBody> itr = Iterators.peekingIterator(channelFrameBodies.iterator());
            boolean cleanExit = false;
            try
            {
                while (itr.hasNext())
                {
                    final ChannelFrameBody channelFrameBody = itr.next();
                    final int frameChannel = channelFrameBody.getChannel();

                    Session_1_0 session = _receivingSessions == null || frameChannel >= _receivingSessions.length
                            ? null
                            : _receivingSessions[frameChannel];
                    if (session != null)
                    {
                        final AccessControlContext context = session.getAccessControllerContext();
                        AccessController.doPrivileged((PrivilegedAction<Void>) () ->
                        {
                            ChannelFrameBody channelFrame = channelFrameBody;
                            boolean nextIsSameChannel;
                            do
                            {
                                received(frameChannel, channelFrame.getFrameBody());
                                nextIsSameChannel = itr.hasNext() && frameChannel == itr.peek().getChannel();
                                if (nextIsSameChannel)
                                {
                                    channelFrame = itr.next();
                                }
                            }
                            while (nextIsSameChannel);
                            return null;
                        }, context);
                    }
                    else
                    {
                        received(frameChannel, channelFrameBody.getFrameBody());
                    }
                }
                cleanExit = true;
            }
            finally
            {
                if (!cleanExit)
                {
                    while (itr.hasNext())
                    {
                        final Object frameBody = itr.next().getFrameBody();
                        if (frameBody instanceof Transfer)
                        {
                            ((Transfer) frameBody).dispose();
                        }
                    }
                }
            }
        }
    }

    private void received(int channel, Object val)
    {
        if (channel > getChannelMax())
        {
            Error error = new Error(ConnectionError.FRAMING_ERROR,
                    String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax()));
            handleError(error);
            return;
        }

        FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, val);
        if (val instanceof FrameBody)
        {
            ((FrameBody) val).invoke(channel, this);
        }
        else if (val instanceof SaslFrameBody)
        {
            ((SaslFrameBody) val).invoke(channel, this);
        }
    }


    @Override
    public void receiveClose(final int channel, final Close close)
    {
        switch (_connectionState)
        {
            case AWAIT_AMQP_OR_SASL_HEADER:
            case AWAIT_SASL_INIT:
            case AWAIT_SASL_RESPONSE:
            case AWAIT_AMQP_HEADER:
                throw new ConnectionScopedRuntimeException("Received unexpected close when AMQP connection has not been established.");
            case AWAIT_OPEN:
                closeReceived();
                closeConnection(ConnectionError.CONNECTION_FORCED,
                        "Connection close sent before connection was opened");
                break;
            case OPENED:
                _connectionState = ConnectionState.CLOSE_RECEIVED;
                closeReceived();
                if(close.getError() != null)
                {
                    final Error error = close.getError();
                    ErrorCondition condition = error.getCondition();
                    Symbol errorCondition = condition == null ? null : condition.getValue();
                    LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
                            errorCondition, close.getError().getDescription());
                }
                sendClose(new Close());
                _connectionState = ConnectionState.CLOSED;
                _orderlyClose.set(true);
                addCloseTicker();
                break;
            case CLOSE_SENT:
                closeReceived();
                _connectionState = ConnectionState.CLOSED;
                _orderlyClose.set(true);
                break;
            case CLOSE_RECEIVED:
            case CLOSED:
                break;
            default:
                throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
        }
    }

    private void closeReceived()
    {
        Collection<Session_1_0> sessions = new ArrayList<>(_sessions);

        for (final Session_1_0 session : sessions)
        {
            AccessController.doPrivileged((PrivilegedAction<Object>) () ->
            {
                session.remoteEnd(new End());
                return null;
            }, session.getAccessControllerContext());
        }
    }


    @Override
    public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
    {
        return _describedTypeRegistry;
    }

    @Override
    public SectionDecoderRegistry getSectionDecoderRegistry()
    {
        return _describedTypeRegistry.getSectionDecoderRegistry();
    }

    @Override
    public boolean isClosed()
    {
        return _connectionState == ConnectionState.CLOSED
                || _connectionState == ConnectionState.CLOSE_RECEIVED;
    }

    @Override
    public boolean isClosing()
    {
        return _connectionState == ConnectionState.CLOSED
                || _connectionState == ConnectionState.CLOSE_RECEIVED
                || _connectionState == ConnectionState.CLOSE_SENT;
    }

    @Override
    public boolean closedForInput()
    {
        return _connectionState == ConnectionState.CLOSE_RECEIVED || _connectionState == ConnectionState.CLOSED ;
    }

    @Override
    public void sessionEnded(final Session_1_0 session)
    {
        _sessions.remove(session);
    }

    private void inputClosed()
    {
        if (!closedForInput())
        {
            FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
            _connectionState = ConnectionState.CLOSED;
            closeSender();
            closeReceived();
        }
    }

    private void closeSender()
    {
        setClosedForOutput(true);
    }

    @Override
    public String getRemoteContainerId()
    {
        return _remoteContainerId;
    }

    public boolean isOpen()
    {
        return _connectionState == ConnectionState.OPENED;
    }

    @Override
    public void sendEnd(final int channel, final End end, final boolean remove)
    {
        sendFrame(channel, end);
        if (remove)
        {
            _sendingSessions[channel] = null;
        }
    }

    @Override
    public void receiveEnd(final int channel, final End end)
    {
        assertState(ConnectionState.OPENED);
        final Session_1_0 session = getSession(channel);
        if (session != null)
        {
            _receivingSessions[channel] = null;
            session.receiveEnd(end);
        }
        else
        {
            closeConnectionWithInvalidChannel(channel, end);
        }
    }

    private void closeConnectionWithInvalidChannel(final int channel, final FrameBody frame)
    {
        closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
    }

    @Override
    public void receiveDisposition(final int channel,
                                   final Disposition disposition)
    {
        assertState(ConnectionState.OPENED);
        final Session_1_0 session = getSession(channel);
        if (session != null)
        {
            session.receiveDisposition(disposition);
        }
        else
        {
            closeConnectionWithInvalidChannel(channel, disposition);
        }

    }

    @Override
    public void receiveBegin(final int receivingChannelId, final Begin begin)
    {

        assertState(ConnectionState.OPENED);
        if (begin.getRemoteChannel() != null)
        {
            closeConnection(ConnectionError.FRAMING_ERROR,
                    "BEGIN received on channel "
                            + receivingChannelId
                            + " with given remote-channel "
                            + begin.getRemoteChannel()
                            + ". Since the broker does not spontaneously start channels, this must be an error.");

        }
        else // Peer requesting session creation
        {

            if (_receivingSessions[receivingChannelId] == null)
            {
                int sendingChannelId = getFirstFreeChannel();
                if (sendingChannelId == -1)
                {
                    closeConnection(ConnectionError.FRAMING_ERROR,
                            "BEGIN received on channel "
                                    + receivingChannelId
                                    + ". There are no free channels for the broker to respond on.");
                }
                else
                {
                    Session_1_0 session = new Session_1_0(this,
                            begin,
                            sendingChannelId,
                            receivingChannelId,
                            getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
                    session.create();

                    _receivingSessions[receivingChannelId] = session;
                    _sendingSessions[sendingChannelId] = session;

                    Begin beginToSend = new Begin();
                    beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
                    beginToSend.setNextOutgoingId(session.getNextOutgoingId());
                    beginToSend.setOutgoingWindow(session.getOutgoingWindow());
                    beginToSend.setIncomingWindow(session.getIncomingWindow());
                    sendFrame(sendingChannelId, beginToSend);

                    synchronized (_blockingLock)
                    {
                        _sessions.add(session);
                        if (_blocking)
                        {
                            session.block();
                        }
                    }
                }
            }
            else
            {
                closeConnection(ConnectionError.FRAMING_ERROR,
                        "BEGIN received on channel " + receivingChannelId + " which is already in use.");
            }

        }

    }

    private int getFirstFreeChannel()
    {
        for (int i = 0; i <= _channelMax; i++)
        {
            if (_sendingSessions[i] == null)
            {
                return i;
            }
        }
        return -1;
    }

    @Override
    public void handleError(final Error error)
    {
        if (!_closedForOutput)
        {
            closeConnection(error);
        }
    }

    @Override
    public void receiveTransfer(final int channel, final Transfer transfer)
    {
        assertState(ConnectionState.OPENED);
        final Session_1_0 session = getSession(channel);
        if (session != null)
        {
            session.receiveTransfer(transfer);
        }
        else
        {
            closeConnectionWithInvalidChannel(channel, transfer);
        }
    }

    @Override
    public void receiveFlow(final int channel, final Flow flow)
    {
        assertState(ConnectionState.OPENED);
        final Session_1_0 session = getSession(channel);
        if (session != null)
        {
            session.receiveFlow(flow);
        }
        else
        {
            closeConnectionWithInvalidChannel(channel, flow);
        }

    }

    @Override
    public void receiveOpen(final int channel, final Open open)
    {
        assertState(ConnectionState.AWAIT_OPEN);

        int channelMax = getPort().getSessionCountLimit() - 1;
        _channelMax = open.getChannelMax() == null ? channelMax
                : open.getChannelMax().intValue() < channelMax
                ? open.getChannelMax().intValue()
                : channelMax;
        if (_receivingSessions == null)
        {
            _receivingSessions = new Session_1_0[_channelMax + 1];
            _sendingSessions = new Session_1_0[_channelMax + 1];
        }
        _maxFrameSize = open.getMaxFrameSize() == null
                || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize()
                ? getBroker().getNetworkBufferSize()
                : open.getMaxFrameSize().intValue();
        _remoteContainerId = open.getContainerId();

        if(open.getHostname() != null && !"".equals(open.getHostname().trim()))
        {
            _localHostname = open.getHostname();
        }

        if(_localHostname == null || "".equals(_localHostname.trim()) && getNetwork().getSelectedHost() != null)
        {
            _localHostname = getNetwork().getSelectedHost();
        }
        if (open.getIdleTimeOut() != null)
        {
            _outgoingIdleTimeout = open.getIdleTimeOut().longValue();
        }
        final Map<Symbol, Object> remoteProperties = open.getProperties() == null
                ? Collections.emptyMap()
                : Collections.unmodifiableMap(new LinkedHashMap<>(open.getProperties()));
        _remoteDesiredCapabilities = open.getDesiredCapabilities() == null ? Collections.emptySet() : Sets.newHashSet(open.getDesiredCapabilities());
        if (remoteProperties.containsKey(Symbol.valueOf("product")))
        {
            setClientProduct(remoteProperties.get(Symbol.valueOf("product")).toString());
        }
        if (remoteProperties.containsKey(Symbol.valueOf("version")))
        {
            setClientVersion(remoteProperties.get(Symbol.valueOf("version")).toString());
        }
        setClientId(_remoteContainerId);
        if (_remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
        {
            if (remoteProperties != null && remoteProperties.containsKey(SOLE_CONNECTION_ENFORCEMENT_POLICY))
            {
                try
                {
                    _soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.valueOf(remoteProperties.get(
                            SOLE_CONNECTION_ENFORCEMENT_POLICY));
                }
                catch (IllegalArgumentException e)
                {
                    closeConnection(AmqpError.INVALID_FIELD, e.getMessage());
                    return;
                }
            }
            else
            {
                _soleConnectionEnforcementPolicy = SoleConnectionEnforcementPolicy.REFUSE_CONNECTION;
            }
        }

        if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
        {
            closeConnection(ConnectionError.CONNECTION_FORCED,
                    "Requested idle timeout of "
                            + _outgoingIdleTimeout
                            + " is too low. The minimum supported timeout is"
                            + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
        }
        else
        {
            initialiseHeartbeating(_outgoingIdleTimeout / 2L, _incomingIdleTimeout);
            final NamedAddressSpace addressSpace = getPort().getAddressSpace(_localHostname);
            if (addressSpace == null)
            {
                closeConnection(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
            }
            else
            {
                receiveOpenInternal(addressSpace);
            }
        }
    }

    private void receiveOpenInternal(final NamedAddressSpace addressSpace)
    {
        if (!addressSpace.isActive())
        {
            final Error err = new Error();
            populateConnectionRedirect(addressSpace, err);
            closeConnection(err);
        }
        else
        {
            if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
            {
                closeConnection(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
            }
            else
            {
                try
                {
                    boolean registerSucceeded = addressSpace.registerConnection(this, (existingConnections, newConnection) ->
                    {
                        boolean proceedWithRegistration = true;
                        if (newConnection instanceof AMQPConnection_1_0Impl && !newConnection.isClosing())
                        {
                            List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>();
                            for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false)
                                    .filter(con -> con instanceof AMQPConnection_1_0)
                                    .filter(con -> !con.isClosing())
                                    .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
                                    .collect(Collectors.toList()))
                            {
                                SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null;
                                if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy
                                        != null)
                                {
                                    soleConnectionEnforcementPolicy =
                                            ((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy;
                                }
                                else if (((AMQPConnection_1_0Impl) newConnection)._soleConnectionEnforcementPolicy != null)
                                {
                                    soleConnectionEnforcementPolicy =
                                            ((AMQPConnection_1_0Impl) newConnection)._soleConnectionEnforcementPolicy;
                                }
                                if (SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy))
                                {
                                    _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
                                    Error error = new Error(AmqpError.INVALID_FIELD,
                                            String.format(
                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
                                                    soleConnectionEnforcementPolicy.toString()));
                                    error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id")));
                                    newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
                                    proceedWithRegistration = false;
                                    break;
                                }
                                else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
                                {
                                    final Error error = new Error(AmqpError.RESOURCE_LOCKED,
                                            String.format(
                                                    "Connection closed due to sole-connection-enforcement-policy '%s'",
                                                    soleConnectionEnforcementPolicy.toString()));
                                    error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true));
                                    rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
                                            () -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error)));
                                    proceedWithRegistration = false;
                                }
                            }
                            if (!rescheduleFutures.isEmpty())
                            {
                                doAfter(allAsList(rescheduleFutures), () -> newConnection.doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace)));
                            }
                        }
                        return proceedWithRegistration;
                    });

                    if (registerSucceeded)
                    {
                        setAddressSpace(addressSpace);

                        if (!addressSpace.authoriseCreateConnection(this))
                        {
                            closeConnection(AmqpError.NOT_ALLOWED, "Connection refused");
                        }
                        else
                        {
                            switch (_connectionState)
                            {
                                case AWAIT_OPEN:
                                    sendOpen(_channelMax, _maxFrameSize);
                                    _connectionState = ConnectionState.OPENED;
                                    break;
                                case CLOSE_SENT:
                                case CLOSED:
                                    // already sent our close - probably due to an error
                                    break;
                                default:
                                    throw new ConnectionScopedRuntimeException(String.format(
                                            "Unexpected state %s during connection open.", _connectionState));
                            }
                        }
                    }
                }
                catch (VirtualHostUnavailableException | AccessControlException e)
                {
                    closeConnection(AmqpError.NOT_ALLOWED, e.getMessage());
                }
            }
        }
    }

    private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err)
    {
        final String redirectHost = addressSpace.getRedirectHost(getPort());

        if(redirectHost == null)
        {
            err.setCondition(ConnectionError.CONNECTION_FORCED);
            err.setDescription("Virtual host '" + _localHostname + "' is not active");
        }
        else
        {
            err.setCondition(ConnectionError.REDIRECT);
            String networkHost;
            int port;
            if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
            {
                // IPv6 case
                networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
                if(redirectHost.contains("]:"))
                {
                    port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
                }
                else
                {
                    port = -1;
                }
            }
            else
            {
                if(redirectHost.contains(":"))
                {
                    networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
                    try
                    {
                        String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
                        port = Integer.parseInt(portString);
                    }
                    catch (NumberFormatException e)
                    {
                        port = -1;
                    }
                }
                else
                {
                    networkHost = redirectHost;
                    port = -1;
                }
            }
            final Map<Symbol, Object> infoMap = new HashMap<>();
            infoMap.put(Symbol.valueOf("network-host"), networkHost);
            if(port > 0)
            {
                infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
            }
            err.setInfo(infoMap);
        }
    }

    @Override
    public void receiveDetach(final int channel, final Detach detach)
    {
        assertState(ConnectionState.OPENED);
        final Session_1_0 session = getSession(channel);
        if (session != null)
        {
            session.receiveDetach(detach);
        }
        else
        {
            closeConnectionWithInvalidChannel(channel, detach);
        }
    }

    private void transportStateChanged()
    {
        for (Session_1_0 session : _sessions)
        {
            session.transportStateChanged();
        }
    }

    @Override
    public void close(final Error error)
    {
        closeConnection(error);
    }

    private void setRemoteAddress(final SocketAddress remoteAddress)
    {
        _remoteAddress = remoteAddress;
    }

    public void setOfferedCapabilities(final List<Symbol> offeredCapabilities)
    {
        _offeredCapabilities = offeredCapabilities;
    }


    private void setClosedForOutput(final boolean closed)
    {
        _closedForOutput = closed;
    }

    @Override
    public String getLocalFQDN()
    {
        return _localHostname != null ? _localHostname : super.getLocalFQDN();
    }

    @Override
    public int getMaxFrameSize()
    {
        return _maxFrameSize;
    }

    @Override
    public int getChannelMax()
    {
        return _channelMax;
    }

    @Override
    public Object getReference()
    {
        return _reference;
    }

    private void endpointClosed()
    {
        try
        {
            performDeleteTasks();
            closeReceived();
        }
        finally
        {
            NamedAddressSpace virtualHost = getAddressSpace();
            if (virtualHost != null)
            {
                virtualHost.deregisterConnection(this);
            }
        }
    }

    private void closeConnection(ErrorCondition errorCondition, String description)
    {
        closeConnection(new Error(errorCondition, description));
    }

    private void closeConnection(final Error error)
    {
        LOGGER.debug("Closing connection {} (state={}) due to {}", this, _connectionState, error);
        _closeCause = error.getDescription();
        Close close = new Close();
        close.setError(error);
        switch (_connectionState)
        {
            case AWAIT_AMQP_OR_SASL_HEADER:
            case AWAIT_SASL_INIT:
            case AWAIT_SASL_RESPONSE:
            case AWAIT_AMQP_HEADER:
                throw new ConnectionScopedRuntimeException("Connection is closed before being fully established: " + error.getDescription());

            case AWAIT_OPEN:
                sendOpen(0, 0);
                sendClose(close);
                _connectionState = ConnectionState.CLOSED;
                getSender().close();
                break;
            case OPENED:
                sendClose(close);
                _connectionState = ConnectionState.CLOSE_SENT;
                addCloseTicker();
                break;
            case CLOSE_RECEIVED:
                sendClose(close);
                _connectionState = ConnectionState.CLOSED;
                addCloseTicker();
                break;
            case CLOSE_SENT:
            case CLOSED:
                // already sent our close - too late to do anything more
                break;
            default:
                throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
        }
    }

    @Override
    public int sendFrame(final int channel, final FrameBody body, final QpidByteBuffer payload)
    {
        if (!_closedForOutput)
        {
            ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
            if (payload == null)
            {
                send(new TransportFrame(channel, body));
                return 0;
            }
            else
            {
                int size = writer.getEncodedSize();
                int maxPayloadSize = _maxFrameSize - (size + 9);
                long payloadLength = (long) payload.remaining();
                if (payloadLength <= maxPayloadSize)
                {
                    send(new TransportFrame(channel, body, payload));
                    return (int)payloadLength;
                }
                else
                {
                    ((Transfer) body).setMore(Boolean.TRUE);

                    writer = _describedTypeRegistry.getValueWriter(body);
                    size = writer.getEncodedSize();
                    maxPayloadSize = _maxFrameSize - (size + 9);

                    try (QpidByteBuffer payloadDup = payload.view(0, maxPayloadSize))
                    {
                        payload.position(payload.position() + maxPayloadSize);
                        send(new TransportFrame(channel, body, payloadDup));
                    }

                    return maxPayloadSize;
                }
            }
        }
        else
        {
            return -1;
        }
    }

    @Override
    public void sendFrame(final int channel, final FrameBody body)
    {
        sendFrame(channel, body, null);
    }

    public ByteBufferSender getSender()
    {
        return getNetwork().getSender();
    }

    @Override
    public void writerIdle()
    {
        send(TransportFrame.HEARTBEAT);
    }

    @Override
    public void readerIdle()
    {
        AccessController.doPrivileged((PrivilegedAction<Object>) () ->
        {
            getEventLogger().message(ConnectionMessages.IDLE_CLOSE("", false));
            getNetwork().close();
            return null;
        }, getAccessControllerContext());
    }

    @Override
    public void encryptedTransport()
    {
    }

    public String getAddress()
    {
        return getNetwork().getRemoteAddress().toString();
    }

    @Override
    protected void onReceive(final QpidByteBuffer msg)
    {
        try
        {
            int remaining;

            try
            {
                do
                {
                    remaining = msg.remaining();

                    switch (_connectionState)
                    {
                        case AWAIT_AMQP_OR_SASL_HEADER:
                        case AWAIT_AMQP_HEADER:
                            if (remaining >= 8)
                            {
                                processProtocolHeader(msg);
                            }
                            break;
                        case AWAIT_SASL_INIT:
                        case AWAIT_SASL_RESPONSE:
                        case AWAIT_OPEN:
                        case OPENED:
                        case CLOSE_SENT:
                            _frameHandler.parse(msg);
                            break;
                        case CLOSE_RECEIVED:
                        case CLOSED:
                            // ignore;
                            break;
                    }


                }
                while (msg.remaining() != remaining);
            }
            finally
            {
                receivedComplete();
            }
        }
        catch (IllegalArgumentException | IllegalStateException e)
        {
            throw new ConnectionScopedRuntimeException(e);
        }
    }

    @Override
    public void receivedComplete()
    {
        if (_receivingSessions != null)
        {
            for (final Session_1_0 session : _receivingSessions)
            {
                if (session != null)
                {
                    final AccessControlContext context = session.getAccessControllerContext();
                    AccessController.doPrivileged((PrivilegedAction<Void>) () ->
                    {
                        session.receivedComplete();
                        return null;
                    }, context);
                }
            }
        }
    }

    private void processProtocolHeader(final QpidByteBuffer msg)
    {
        if(msg.remaining() >= 8)
        {
            byte[] header = new byte[8];
            msg.get(header);

            final AuthenticationProvider<?> authenticationProvider = getPort().getAuthenticationProvider();

            if(Arrays.equals(header, SASL_HEADER))
            {
                if(_saslComplete)
                {
                    throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
                }

                try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(SASL_HEADER))
                {
                    getSender().send(protocolHeader);
                }
                SaslMechanisms mechanisms = new SaslMechanisms();
                ArrayList<Symbol> mechanismsList = new ArrayList<>();
                for (String name :  authenticationProvider.getAvailableMechanisms(getTransport().isSecure()))
                {
                    mechanismsList.add(Symbol.valueOf(name));
                }
                mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
                send(new SASLFrame(mechanisms));

                _connectionState = ConnectionState.AWAIT_SASL_INIT;
                _frameHandler = getFrameHandler(true);
            }
            else if(Arrays.equals(header, AMQP_HEADER))
            {
                if(!_saslComplete)
                {
                    final List<String> mechanisms = authenticationProvider.getAvailableMechanisms(getTransport().isSecure());

                    if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
                    {
                        setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
                    }
                    else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
                    {
                        setUserPrincipal(new AuthenticatedPrincipal(((AnonymousAuthenticationManager) authenticationProvider).getAnonymousPrincipal()));
                    }
                    else
                    {
                        LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", getLogSubject());
                        _connectionState = ConnectionState.CLOSED;
                        getNetwork().close();
                    }

                }
                try (QpidByteBuffer protocolHeader = QpidByteBuffer.wrap(AMQP_HEADER))
                {
                    getSender().send(protocolHeader);
                }
                _connectionState = ConnectionState.AWAIT_OPEN;
                _frameHandler = getFrameHandler(false);

            }
            else
            {
                LOGGER.warn("{} : unknown AMQP header {}", getLogSubject(), Functions.str(header));
                _connectionState = ConnectionState.CLOSED;
                getNetwork().close();
            }

        }

    }

    private FrameHandler getFrameHandler(final boolean sasl)
    {
        return new FrameHandler(new ValueHandler(this.getDescribedTypeRegistry()), this, sasl);
    }


    @Override
    public void closed()
    {
        try
        {
            inputClosed();
        }
        catch(RuntimeException e)
        {
            LOGGER.error("Exception while closing", e);
        }
        finally
        {
            try
            {
                endpointClosed();
            }
            finally
            {
                markTransportClosed();
            }
        }
    }

    private void send(final AMQFrame amqFrame)
    {
        updateLastWriteTime();
        FRAME_LOGGER.debug("SEND[{}|{}] : {}",
                getNetwork().getRemoteAddress(),
                amqFrame.getChannel(),
                amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody());

        int size = _frameWriter.send(amqFrame);
        if (size > getMaxFrameSize())
        {
            throw new OversizeFrameException(amqFrame, size);
        }
    }

    private void addCloseTicker()
    {
        long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);

        getAggregateTicker().addTicker(new ConnectionClosingTicker(timeoutTime, getNetwork()));

        // trigger a wakeup to ensure the ticker will be taken into account
        notifyWork();
    }

    @Override
    public boolean isTransportBlockedForWriting()
    {
        return _transportBlockedForWriting;
    }

    @Override
    public void setTransportBlockedForWriting(final boolean blocked)
    {
        if(_transportBlockedForWriting != blocked)
        {
            _transportBlockedForWriting = blocked;
            transportStateChanged();
        }

    }

    @Override
    public Iterator<Runnable> processPendingIterator()
    {
        if (isIOThread())
        {
            return new ProcessPendingIterator();
        }
        else
        {
            return Collections.emptyIterator();
        }
    }

    @Override
    public boolean hasWork()
    {
        return _stateChanged.get();
    }

    @Override
    public void notifyWork()
    {
        _stateChanged.set(true);

        final Action<ProtocolEngine> listener = _workListener.get();
        if(listener != null)
        {
            listener.performAction(this);
        }
    }

    @Override
    public void notifyWork(final AMQPSession<?,?> sessionModel)
    {
        _sessionsWithWork.add(sessionModel);
        notifyWork();
    }

    @Override
    public void clearWork()
    {
        _stateChanged.set(false);
    }

    @Override
    public void setWorkListener(final Action<ProtocolEngine> listener)
    {
        _workListener.set(listener);
    }

    @Override
    public boolean hasSessionWithName(final byte[] name)
    {
        return false;
    }

    @Override
    public void sendConnectionCloseAsync(final CloseReason reason, final String description)
    {

        stopConnection();
        final ErrorCondition cause;
        switch(reason)
        {
            case MANAGEMENT:
                cause = ConnectionError.CONNECTION_FORCED;
                break;
            case TRANSACTION_TIMEOUT:
                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
                break;
            default:
                cause = AmqpError.INTERNAL_ERROR;
        }
        Action<ConnectionHandler> action = object -> closeConnection(cause, description);
        addAsyncTask(action);
    }

    @Override
    public void closeSessionAsync(final AMQPSession<?,?> session,
                                  final CloseReason reason, final String message)
    {
        final ErrorCondition cause;
        switch(reason)
        {
            case MANAGEMENT:
                cause = ConnectionError.CONNECTION_FORCED;
                break;
            case TRANSACTION_TIMEOUT:
                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
                break;
            default:
                cause = AmqpError.INTERNAL_ERROR;
        }
        addAsyncTask(object -> AccessController.doPrivileged(new PrivilegedAction<Void>() {
            @Override
            public Void run()
            {
                ((Session_1_0)session).close(cause, message);
                return null;
            }
        }, ((Session_1_0)session).getAccessControllerContext()));

    }

    @Override
    public void block()
    {
        synchronized (_blockingLock)
        {
            if (!_blocking)
            {
                _blocking = true;
                doOnIOThreadAsync(this::doBlock);
            }
        }
    }

    private void doBlock()
    {
        for(Session_1_0 session : _sessions)
        {
            session.block();
        }
    }

    @Override
    public String getRemoteContainerName()
    {
        return _remoteContainerId;
    }

    @Override
    public Collection<? extends Session_1_0> getSessionModels()
    {
        return Collections.unmodifiableCollection(_sessions);
    }

    @Override
    public void unblock()
    {
        synchronized (_blockingLock)
        {
            if(_blocking)
            {
                _blocking = false;
                doOnIOThreadAsync(this::doUnblock);
            }
        }
    }

    private void doUnblock()
    {
        for(Session_1_0 session : _sessions)
        {
            session.unblock();
        }
    }

    @Override
    public int getSessionCountLimit()
    {
        return _channelMax + 1;
    }

    @Override
    public boolean isOrderlyClose()
    {
        return _orderlyClose.get();
    }

    @Override
    protected String getCloseCause()
    {
        return _closeCause;
    }

    @Override
    public boolean getSendSaslFinalChallengeAsChallenge()
    {
        return _sendSaslFinalChallengeAsChallenge;
    }

    @Override
    protected void addAsyncTask(final Action<? super ConnectionHandler> action)
    {
        _asyncTaskList.add(action);
        notifyWork();
    }


    private void sendOpen(final int channelMax, final int maxFrameSize)
    {
        Open open = new Open();

        Map<String,Object> props = Collections.emptyMap();
        for(ConnectionPropertyEnricher enricher : getPort().getConnectionPropertyEnrichers())
        {
            props = enricher.addConnectionProperties(this, props);
        }
        for(Map.Entry<String,Object> entry : props.entrySet())
        {
            _properties.put(Symbol.valueOf(entry.getKey()), entry.getValue());
        }

        if (_receivingSessions == null)
        {
            _receivingSessions = new Session_1_0[channelMax + 1];
            _sendingSessions = new Session_1_0[channelMax + 1];
        }
        if (channelMax < _channelMax)
        {
            _channelMax = channelMax;
        }
        open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
        open.setContainerId(getAddressSpace() == null ? UUID.randomUUID().toString() : getAddressSpace().getId().toString());
        open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
        // TODO - should we try to set the hostname based on the connection information?
        // open.setHostname();
        open.setIdleTimeOut(UnsignedInteger.valueOf(_incomingIdleTimeout));

        // set the offered capabilities
        if(_offeredCapabilities != null && !_offeredCapabilities.isEmpty())
        {
            open.setOfferedCapabilities(_offeredCapabilities.toArray(new Symbol[_offeredCapabilities.size()]));
        }

        if (_remoteDesiredCapabilities != null
                && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
        {
            _properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
                    SoleConnectionDetectionPolicy.STRONG);
        }

        if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
        {
            _properties.put(SOLE_CONNECTION_ENFORCEMENT_POLICY, SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
        }

        open.setProperties(_properties);

        sendFrame(CONNECTION_CONTROL_CHANNEL, open);
    }

    private Session_1_0 getSession(final int channel)
    {
        Session_1_0 session = _receivingSessions[channel];
        if (session == null)
        {
            Error error = new Error();
            error.setCondition(ConnectionError.FRAMING_ERROR);
            error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
            handleError(error);
        }

        return session;
    }

    private void sendClose(Close closeToSend)
    {
        sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
        closeSender();
    }


    private void assertState(final ConnectionState state)
    {
        if (_connectionState != state)
        {
            throw new ConnectionScopedRuntimeException(String.format(
                    "Unexpected state, client has sent frame in an illegal order.  Required state: %s, actual state: %s",
                    state,
                    _connectionState));
        }
    }

    private class ProcessPendingIterator implements Iterator<Runnable>
    {
        private Iterator<? extends AMQPSession<?,?>> _sessionIterator;
        private ProcessPendingIterator()
        {
            _sessionIterator = _sessionsWithWork.iterator();
        }

        @Override
        public boolean hasNext()
        {
            return (!_sessionsWithWork.isEmpty() && !isClosed() && !isConnectionStopped()) || !_asyncTaskList.isEmpty();
        }

        @Override
        public Runnable next()
        {
            if(!_sessionsWithWork.isEmpty())
            {
                if(isClosed() || isConnectionStopped())
                {

                    final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
                    if(asyncAction != null)
                    {
                        return () -> asyncAction.performAction(AMQPConnection_1_0Impl.this);
                    }
                    else
                    {
                        return () -> { };
                    }
                }
                else
                {
                    if (!_sessionIterator.hasNext())
                    {
                        _sessionIterator = _sessionsWithWork.iterator();
                    }
                    final AMQPSession<?,?> session = _sessionIterator.next();
                    return () ->
                    {
                        _sessionIterator.remove();
                        if (session.processPending())
                        {
                            _sessionsWithWork.add(session);
                        }
                    };
                }
            }
            else if(!_asyncTaskList.isEmpty())
            {
                final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
                return () -> asyncAction.performAction(AMQPConnection_1_0Impl.this);
            }
            else
            {
                throw new NoSuchElementException();
            }
        }

        @Override
        public void remove()
        {
            throw new UnsupportedOperationException();
        }
    }

    @Override
    public Iterator<ServerTransaction> getOpenTransactions()
    {
        return _openTransactions.values().iterator();
    }

    @Override
    public IdentifiedTransaction createIdentifiedTransaction()
    {
        final LocalTransaction serverTransaction = createLocalTransaction();
        int id = 0;
        while (id >= 0 && _openTransactions.putIfAbsent(id, serverTransaction) != null)
        {
            id++;
        }
        if (id < 0)
        {
            throw new IllegalStateException("Unsupported state, too many opened transactions");
        }
        return new IdentifiedTransaction(id, serverTransaction);
    }

    @Override
    public ServerTransaction getTransaction(final int txnId)
    {
        return Optional.ofNullable(_openTransactions.get(txnId))
                .orElseThrow(() -> new UnknownTransactionException(txnId));
    }

    @Override
    public void removeTransaction(final int txnId)
    {
        Optional.ofNullable(_openTransactions.remove(txnId))
                .orElseThrow(() -> new UnknownTransactionException(txnId));
    }

    @Override
    protected boolean isOpeningInProgress()
    {
        switch (_connectionState)
        {
            case AWAIT_AMQP_OR_SASL_HEADER:
            case AWAIT_SASL_INIT:
            case AWAIT_SASL_RESPONSE:
            case AWAIT_AMQP_HEADER:
            case AWAIT_OPEN:
                return true;
            case OPENED:
            case CLOSE_RECEIVED:
            case CLOSE_SENT:
            case CLOSED:
                return false;
            default:
                throw new IllegalStateException("Unsupported state " + _connectionState);
        }
    }
}
