/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.protocol.v0_10;

import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;

import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import javax.security.auth.Subject;

import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.NotAssociatedDtxException;
import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlow;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageSetFlowMode;
import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.network.Ticker;

public class ServerSession extends Session
        implements AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
                   Deletable<ServerSession>

{
    private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);

    private static final String NULL_DESTINATION = UUID.randomUUID().toString();
    private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
    private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;

    private final UUID _id = UUID.randomUUID();
    private final Subject _subject = new Subject();
    private final AccessControlContext _accessControllerContext;
    private final SecurityToken _token;
    private long _createTime = System.currentTimeMillis();

    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());

    private final AtomicBoolean _blocking = new AtomicBoolean(false);
    private ChannelLogSubject _logSubject;
    private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
    private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
    private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
    private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
    private org.apache.qpid.server.model.Session<?> _modelObject;
    private long _blockTime;
    private long _blockingTimeout;
    private boolean _wireBlockingState;
    private final List<ConsumerTarget> _consumersWithPendingWork = new ArrayList<>();
    private final PublishAuthorisationCache _publishAuthCahe;


    public static interface MessageDispositionChangeListener
    {
        public void onAccept();

        public void onRelease(boolean setRedelivered);

        public void onReject();

        public boolean acquire();


    }

    private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
            new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();

    private ServerTransaction _transaction;

    private final AtomicLong _txnStarts = new AtomicLong(0);
    private final AtomicLong _txnCommits = new AtomicLong(0);
    private final AtomicLong _txnRejects = new AtomicLong(0);
    private final AtomicLong _txnCount = new AtomicLong(0);

    private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
    private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();

    private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();

    private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();

    private volatile long _uncommittedMessageSize;
    private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>();
    private long _maxUncommittedInMemorySize;


    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
    {
        super(connection, delegate, name, expiry);
        _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
        _logSubject = new ChannelLogSubject(this);

        ServerConnection serverConnection = (ServerConnection) connection;
        AMQPConnection_0_10 amqpConnection = serverConnection.getAmqpConnection();

        _subject.getPrincipals().addAll(serverConnection.getAuthorizedSubject().getPrincipals());
        _subject.getPrincipals().add(new SessionPrincipal(this));
        _accessControllerContext = amqpConnection.getAccessControlContextFromSubject(_subject);
        final NamedAddressSpace addressSpace = serverConnection.getAddressSpace();

        if(addressSpace instanceof ConfiguredObject)
        {
            _token = ((ConfiguredObject)addressSpace).newToken(_subject);
        }
        else
        {
            _token = amqpConnection.getBroker().newToken(_subject);
        }

        _blockingTimeout = serverConnection.getBroker().getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
        _maxUncommittedInMemorySize = getConnection().getAmqpConnection().getContextProvider().getContextValue(Long.class, org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
        _publishAuthCahe = new PublishAuthorisationCache(_token,
                                                         amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT),
                                                         amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE));

    }

    public AccessControlContext getAccessControllerContext()
    {
        return _accessControllerContext;
    }

    protected void setState(final State state)
    {
        if(runningAsSubject())
        {
            super.setState(state);

            if (state == State.OPEN)
            {
                getConnection().getAmqpConnection().getEventLogger().message(ChannelMessages.CREATE());
            }
        }
        else
        {
            runAsSubject(new PrivilegedAction<Void>() {

                @Override
                public Void run()
                {
                    setState(state);
                    return null;
                }
            });

        }
    }

    private <T> T runAsSubject(final PrivilegedAction<T> privilegedAction)
    {
        return AccessController.doPrivileged(privilegedAction, getAccessControllerContext());
    }

    private boolean runningAsSubject()
    {
        return getAuthorizedSubject().equals(Subject.getSubject(AccessController.getContext()));
    }

    private void invokeBlock()
    {
        invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
        invoke(new MessageStop(""));
    }

    private void invokeUnblock()
    {
        MessageFlow mf = new MessageFlow();
        mf.setUnit(MessageCreditUnit.MESSAGE);
        mf.setDestination("");
        _outstandingCredit.set(Integer.MAX_VALUE);
        mf.setValue(Integer.MAX_VALUE);
        invoke(mf);
    }

    void authorisePublish(final MessageDestination destination,
                          final String routingKey,
                          final boolean immediate,
                          final long currentTime)
    {
        _publishAuthCahe.authorisePublish(destination, routingKey, immediate, currentTime);
    }

    @Override
    protected boolean isFull(int id)
    {
        return isCommandsFull(id);
    }

    public int enqueue(final MessageTransferMessage message,
                       final InstanceProperties instanceProperties,
                       final MessageDestination exchange)
    {
        if(_outstandingCredit.get() != UNLIMITED_CREDIT
                && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
        {
            _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
            invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
        }
        int enqueues = exchange.send(message,
                                     message.getInitialRoutingAddress(),
                                     instanceProperties, _transaction, _checkCapacityAction
                                    );
        getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
        incrementOutstandingTxnsIfNecessary();
        incrementUncommittedMessageSize(message.getStoredMessage());
        return enqueues;
    }

    private void resetUncommittedMessages()
    {
        _uncommittedMessageSize = 0l;
        _uncommittedMessages.clear();
    }

    private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle)
    {
        if (isTransactional() && !(_transaction instanceof DistributedTransaction))
        {
            _uncommittedMessageSize += handle.getMetaData().getContentSize();
            if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
            {
                handle.flowToDisk();
                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
                {
                    getConnection().getAmqpConnection().getEventLogger()
                            .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
                }

                if(!_uncommittedMessages.isEmpty())
                {
                    for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages)
                    {
                        uncommittedHandle.flowToDisk();
                    }
                    _uncommittedMessages.clear();
                }
            }
            else
            {
                _uncommittedMessages.add(handle);
            }
        }
    }


    public void sendMessage(MessageTransfer xfr,
                            Runnable postIdSettingAction)
    {
        getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
        invoke(xfr, postIdSettingAction);
    }

    public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
    {
        _messageDispositionListenerMap.put(xfr.getId(), acceptListener);
    }


    private static interface MessageDispositionAction
    {
        void performAction(MessageDispositionChangeListener  listener);
    }

    public void accept(RangeSet ranges)
    {
        dispositionChange(ranges, new MessageDispositionAction()
        {
            public void performAction(MessageDispositionChangeListener listener)
            {
                listener.onAccept();
            }
        });
    }


    public void release(RangeSet ranges, final boolean setRedelivered)
    {
        dispositionChange(ranges, new MessageDispositionAction()
                                      {
                                          public void performAction(MessageDispositionChangeListener listener)
                                          {
                                              listener.onRelease(setRedelivered);
                                          }
                                      });
    }

    public void reject(RangeSet ranges)
    {
        dispositionChange(ranges, new MessageDispositionAction()
                                      {
                                          public void performAction(MessageDispositionChangeListener listener)
                                          {
                                              listener.onReject();
                                          }
                                      });
    }

    public RangeSet acquire(RangeSet transfers)
    {
        RangeSet acquired = RangeSetFactory.createRangeSet();

        if(!_messageDispositionListenerMap.isEmpty())
        {
            Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
            Iterator<Range> rangeIter = transfers.iterator();

            if(rangeIter.hasNext())
            {
                Range range = rangeIter.next();

                while(range != null && unacceptedMessages.hasNext())
                {
                    int next = unacceptedMessages.next();
                    while(gt(next, range.getUpper()))
                    {
                        if(rangeIter.hasNext())
                        {
                            range = rangeIter.next();
                        }
                        else
                        {
                            range = null;
                            break;
                        }
                    }
                    if(range != null && range.includes(next))
                    {
                        MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next);
                        if(changeListener != null && changeListener.acquire())
                        {
                            acquired.add(next);
                        }
                    }


                }

            }


        }

        return acquired;
    }

    public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
    {
        if(ranges != null)
        {

            if(ranges.size() == 1)
            {
                Range r = ranges.getFirst();
                for(int i = r.getLower(); i <= r.getUpper(); i++)
                {
                    MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
                    if(changeListener != null)
                    {
                        action.performAction(changeListener);
                    }
                }
            }
            else if(!_messageDispositionListenerMap.isEmpty())
            {
                Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
                Iterator<Range> rangeIter = ranges.iterator();

                if(rangeIter.hasNext())
                {
                    Range range = rangeIter.next();

                    while(range != null && unacceptedMessages.hasNext())
                    {
                        int next = unacceptedMessages.next();
                        while(gt(next, range.getUpper()))
                        {
                            if(rangeIter.hasNext())
                            {
                                range = rangeIter.next();
                            }
                            else
                            {
                                range = null;
                                break;
                            }
                        }
                        if(range != null && range.includes(next))
                        {
                            MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
                            action.performAction(changeListener);
                        }


                    }

                }
            }
        }
    }

    public void removeDispositionListener(Method method)
    {
        _messageDispositionListenerMap.remove(method.getId());
    }

    public void onClose()
    {
        if(_transaction instanceof LocalTransaction)
        {
            _transaction.rollback();
        }
        else if(_transaction instanceof DistributedTransaction)
        {
            getAddressSpace().getDtxRegistry().endAssociations(this);
        }

        for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
        {
            listener.onRelease(true);
        }
        _messageDispositionListenerMap.clear();

        for (Action<? super ServerSession> task : _taskList)
        {
            task.performAction(this);
        }

        LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
        if (operationalLoggingMessage == null)
        {
            operationalLoggingMessage = ChannelMessages.CLOSE();
        }
        getConnection().getAmqpConnection().getEventLogger().message(getLogSubject(), operationalLoggingMessage);
    }

    @Override
    protected void awaitClose()
    {
        // Broker shouldn't block awaiting close - thus do override this method to do nothing
    }

    public void acknowledge(final ConsumerImpl consumer,
                            final ConsumerTarget_0_10 target,
                            final MessageInstance entry)
    {
        if (entry.makeAcquisitionUnstealable(consumer))
        {
            _transaction.dequeue(entry.getEnqueueRecord(),
                                 new ServerTransaction.Action()
                                 {

                                     public void postCommit()
                                     {
                                         entry.delete();
                                     }

                                     public void onRollback()
                                     {
                                         // The client has acknowledge the message and therefore have seen it.
                                         // In the event of rollback, the message must be marked as redelivered.
                                         entry.setRedelivered();
                                         entry.release(consumer);
                                     }
                                 });
        }
    }

    Collection<ConsumerTarget_0_10> getSubscriptions()
    {
        return _subscriptions.values();
    }

    public void register(String destination, ConsumerTarget_0_10 sub)
    {
        _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
    }


    public void register(final ConsumerImpl consumerImpl)
    {
        if(consumerImpl instanceof Consumer<?>)
        {
            final Consumer<?> consumer = (Consumer<?>) consumerImpl;
            _consumers.add(consumer);
            consumer.addChangeListener(_consumerClosedListener);
            consumerAdded(consumer);
        }
    }

    public ConsumerTarget_0_10 getSubscription(String destination)
    {
        return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
    }

    public void unregister(ConsumerTarget_0_10 sub)
    {
        _subscriptions.remove(sub.getName());
        sub.close();

    }

    public boolean isTransactional()
    {
        return _transaction.isTransactional();
    }

    public void selectTx()
    {
        _transaction = new LocalTransaction(this.getMessageStore());
        _txnStarts.incrementAndGet();
    }

    public void selectDtx()
    {
        _transaction = new DistributedTransaction(this, getAddressSpace().getDtxRegistry());

    }


    public void startDtx(Xid xid, boolean join, boolean resume)
            throws JoinAndResumeDtxException,
                   UnknownDtxBranchException,
                   AlreadyKnownDtxException,
                   DtxNotSelectedException
    {
        DistributedTransaction distributedTransaction = assertDtxTransaction();
        distributedTransaction.start(xid, join, resume);
    }


    public void endDtx(Xid xid, boolean fail, boolean suspend)
            throws NotAssociatedDtxException,
            UnknownDtxBranchException,
            DtxNotSelectedException,
            SuspendAndFailDtxException, TimeoutDtxException
    {
        DistributedTransaction distributedTransaction = assertDtxTransaction();
        distributedTransaction.end(xid, fail, suspend);
    }


    public long getTimeoutDtx(Xid xid)
            throws UnknownDtxBranchException
    {
        return getAddressSpace().getDtxRegistry().getTimeout(xid);
    }


    public void setTimeoutDtx(Xid xid, long timeout)
            throws UnknownDtxBranchException
    {
        getAddressSpace().getDtxRegistry().setTimeout(xid, timeout);
    }


    public void prepareDtx(Xid xid)
            throws UnknownDtxBranchException,
            IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException
    {
        getAddressSpace().getDtxRegistry().prepare(xid);
    }

    public void commitDtx(Xid xid, boolean onePhase)
            throws UnknownDtxBranchException,
            IncorrectDtxStateException, StoreException, RollbackOnlyDtxException, TimeoutDtxException
    {
        getAddressSpace().getDtxRegistry().commit(xid, onePhase);
    }


    public void rollbackDtx(Xid xid)
            throws UnknownDtxBranchException,
            IncorrectDtxStateException, StoreException, TimeoutDtxException
    {
        getAddressSpace().getDtxRegistry().rollback(xid);
    }


    public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
    {
        getAddressSpace().getDtxRegistry().forget(xid);
    }

    public List<Xid> recoverDtx()
    {
        return getAddressSpace().getDtxRegistry().recover();
    }

    private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
    {
        if(_transaction instanceof DistributedTransaction)
        {
            return (DistributedTransaction) _transaction;
        }
        else
        {
            throw new DtxNotSelectedException();
        }
    }


    public void commit()
    {
        _transaction.commit();

        _txnCommits.incrementAndGet();
        _txnStarts.incrementAndGet();
        decrementOutstandingTxnsIfNecessary();
        resetUncommittedMessages();
    }

    public void rollback()
    {
        _transaction.rollback();

        _txnRejects.incrementAndGet();
        _txnStarts.incrementAndGet();
        decrementOutstandingTxnsIfNecessary();
        resetUncommittedMessages();
    }


    private void incrementOutstandingTxnsIfNecessary()
    {
        if(isTransactional())
        {
            //There can currently only be at most one outstanding transaction
            //due to only having LocalTransaction support. Set value to 1 if 0.
            _txnCount.compareAndSet(0,1);
        }
    }

    private void decrementOutstandingTxnsIfNecessary()
    {
        if(isTransactional())
        {
            //There can currently only be at most one outstanding transaction
            //due to only having LocalTransaction support. Set value to 0 if 1.
            _txnCount.compareAndSet(1,0);
        }
    }

    public Long getTxnCommits()
    {
        return _txnCommits.get();
    }

    public Long getTxnRejects()
    {
        return _txnRejects.get();
    }

    public int getChannelId()
    {
        return getChannel();
    }

    public Long getTxnCount()
    {
        return _txnCount.get();
    }

    public Long getTxnStart()
    {
        return _txnStarts.get();
    }

    public Principal getAuthorizedPrincipal()
    {
        return getConnection().getAuthorizedPrincipal();
    }

    public Subject getAuthorizedSubject()
    {
        return _subject;
    }

    public void addDeleteTask(Action<? super ServerSession> task)
    {
        _taskList.add(task);
    }

    public void removeDeleteTask(Action<? super ServerSession> task)
    {
        _taskList.remove(task);
    }

    public Object getReference()
    {
        return getConnection().getReference();
    }

    public MessageStore getMessageStore()
    {
        return getAddressSpace().getMessageStore();
    }

    public NamedAddressSpace getAddressSpace()
    {
        return getConnection().getAddressSpace();
    }

    public boolean isDurable()
    {
        return false;
    }


    public long getCreateTime()
    {
        return _createTime;
    }

    @Override
    public UUID getId()
    {
        return _id;
    }

    @Override
    public AMQPConnection_0_10 getAMQPConnection()
    {
        return getConnection().getAmqpConnection();
    }

    @Override
    public ServerConnection getConnection()
    {
        return (ServerConnection) super.getConnection();
    }


    public LogSubject getLogSubject()
    {
        return this;
    }

    public void block(Queue<?> queue)
    {
        block(queue, queue.getName());
    }

    public void block()
    {
        block(this, "** All Queues **");
    }


    private void block(final Object queue, final String name)
    {
        synchronized (_blockingEntities)
        {
            if(_blockingEntities.add(queue))
            {

                if(_blocking.compareAndSet(false,true))
                {
                    getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
                    if(getState() == State.OPEN)
                    {
                        getConnection().notifyWork();
                    }
                }


            }
        }
    }

    public void unblock(Queue<?> queue)
    {
        unblock((Object)queue);
    }

    public void unblock()
    {
        unblock(this);
    }

    private void unblock(final Object queue)
    {
        if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
        {
            if(_blocking.compareAndSet(true,false) && !isClosing())
            {
                getConnection().getAmqpConnection().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
                getConnection().notifyWork();
            }
        }
    }


    boolean blockingTimeoutExceeded()
    {
        long blockTime = _blockTime;
        boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
        return b;
    }

    @Override
    public void transportStateChanged()
    {
        for(ConsumerTarget_0_10 consumerTarget : getSubscriptions())
        {
            consumerTarget.transportStateChanged();
        }
    }

    @Override
    public Object getConnectionReference()
    {
        return getConnection().getReference();
    }

    public String toLogString()
    {
        long connectionId = super.getConnection() instanceof ServerConnection
                            ? getConnection().getConnectionId()
                            : -1;
        String authorizedPrincipal = (getAuthorizedPrincipal() == null) ? "?" : getAuthorizedPrincipal().getName();

        String remoteAddress = String.valueOf(getConnection().getRemoteSocketAddress());
        return "[" +
               MessageFormat.format(CHANNEL_FORMAT,
                                    connectionId,
                                    authorizedPrincipal,
                                    remoteAddress,
                                    getAddressSpace().getName(),
                                    getChannel())
            + "] ";
    }

    @Override
    public void close(AMQConstant cause, String message)
    {
        if (cause == null)
        {
            close();
        }
        else
        {
            close(cause.getCode(), message);
        }
    }

    void close(int cause, String message)
    {
        _forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message));
        close();
    }

    @Override
    public void close()
    {
        // unregister subscriptions in order to prevent sending of new messages
        // to subscriptions with closing session
        unregisterSubscriptions();
        if(_modelObject != null)
        {
            _modelObject.delete();
        }
        super.close();
    }

    void unregisterSubscriptions()
    {
        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
        {
            unregister(subscription_0_10);
        }
    }

    void stopSubscriptions()
    {
        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
        {
            subscription_0_10.stop();
        }
    }


    public void receivedComplete()
    {
        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
        {
            subscription_0_10.flushCreditState(false);
        }
        awaitCommandCompletion();
    }

    public int getUnacknowledgedMessageCount()
    {
        return _messageDispositionListenerMap.size();
    }

    public boolean getBlocking()
    {
        return _blocking.get();
    }

    private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();

    public void completeAsyncCommands()
    {
        AsyncCommand cmd;
        while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
        {
            cmd.complete();
            _unfinishedCommandsQueue.poll();
        }
        while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
        {
            cmd = _unfinishedCommandsQueue.poll();
            cmd.complete();
        }
    }


    public void awaitCommandCompletion()
    {
        AsyncCommand cmd;
        while((cmd = _unfinishedCommandsQueue.poll()) != null)
        {
            cmd.complete();
        }
    }


    public Object getAsyncCommandMark()
    {
        return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
    }

    public void recordFuture(final ListenableFuture<Void> future, final ServerTransaction.Action action)
    {
        _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
    }

    private static class AsyncCommand
    {
        private final ListenableFuture<Void> _future;
        private ServerTransaction.Action _action;

        public AsyncCommand(final ListenableFuture<Void> future, final ServerTransaction.Action action)
        {
            _future = future;
            _action = action;
        }

        void complete()
        {
            boolean interrupted = false;
            try
            {
                while (true)
                {
                    try
                    {
                        _future.get();
                        break;
                    }
                    catch (InterruptedException e)
                    {
                        interrupted = true;
                    }

                }
            }
            catch(ExecutionException e)
            {
                if(e.getCause() instanceof RuntimeException)
                {
                    throw (RuntimeException)e.getCause();
                }
                else if(e.getCause() instanceof Error)
                {
                    throw (Error) e.getCause();
                }
                else
                {
                    throw new ServerScopedRuntimeException(e.getCause());
                }
            }
            if(interrupted)
            {
                Thread.currentThread().interrupt();
            }
            _action.postCommit();
            _action = null;
        }

        boolean isReadyForCompletion()
        {
            return _future.isDone();
        }
    }

    protected void setClose(boolean close)
    {
        super.setClose(close);
    }

    @Override
    public int getConsumerCount()
    {
        return _subscriptions.values().size();
    }

    @Override
    public Collection<Consumer<?>> getConsumers()
    {

        return Collections.unmodifiableCollection(_consumers);
    }

    @Override
    public void addConsumerListener(final ConsumerListener listener)
    {
        _consumerListeners.add(listener);
    }

    @Override
    public void removeConsumerListener(final ConsumerListener listener)
    {
        _consumerListeners.remove(listener);
    }

    @Override
    public void setModelObject(final org.apache.qpid.server.model.Session<?> session)
    {
        _modelObject = session;
    }

    @Override
    public org.apache.qpid.server.model.Session<?> getModelObject()
    {
        return _modelObject;
    }

    @Override
    public long getTransactionStartTime()
    {
        ServerTransaction serverTransaction = _transaction;
        if (serverTransaction.isTransactional())
        {
            return serverTransaction.getTransactionStartTime();
        }
        else
        {
            return 0L;
        }
    }

    @Override
    public long getTransactionUpdateTime()
    {
        ServerTransaction serverTransaction = _transaction;
        if (serverTransaction.isTransactional())
        {
            return serverTransaction.getTransactionUpdateTime();
        }
        else
        {
            return 0L;
        }
    }

    private void consumerAdded(Consumer<?> consumer)
    {
        for(ConsumerListener l : _consumerListeners)
        {
            l.consumerAdded(consumer);
        }
    }

    private void consumerRemoved(Consumer<?> consumer)
    {
        for(ConsumerListener l : _consumerListeners)
        {
            l.consumerRemoved(consumer);
        }
    }

    @Override
    public boolean processPending()
    {
        if (!getAMQPConnection().isIOThread())
        {
            return false;
        }

        boolean desiredBlockingState = _blocking.get();
        if (desiredBlockingState != _wireBlockingState)
        {
            _wireBlockingState = desiredBlockingState;

            if (desiredBlockingState)
            {
                invokeBlock();
            }
            else
            {
                invokeUnblock();
            }
            _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
        }

        boolean consumerListNeedsRefreshing;
        if(_consumersWithPendingWork.isEmpty())
        {
            _consumersWithPendingWork.addAll(getSubscriptions());
            consumerListNeedsRefreshing = false;
        }
        else
        {
            consumerListNeedsRefreshing = true;
        }

        // QPID-7447: prevent unnecessary allocation of empty iterator
        Iterator<ConsumerTarget> iter = _consumersWithPendingWork.isEmpty() ? Collections.<ConsumerTarget>emptyIterator() : _consumersWithPendingWork.iterator();

        boolean consumerHasMoreWork = false;
        while(iter.hasNext())
        {
            final ConsumerTarget target = iter.next();
            iter.remove();
            if(target.hasPendingWork())
            {
                consumerHasMoreWork = true;
                target.processPending();
                break;
            }
        }

        return consumerHasMoreWork || consumerListNeedsRefreshing;
    }

    @Override
    public void addTicker(final Ticker ticker)
    {
        getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
        // trigger a wakeup to ensure the ticker will be taken into account
        getAMQPConnection().notifyWork();
    }

    @Override
    public void removeTicker(final Ticker ticker)
    {
        getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
    }

    @Override
    public void notifyConsumerTargetCurrentStates()
    {
        Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
        for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
        {
            if(!consumerTarget.isPullOnly())
            {
                consumerTarget.notifyCurrentState();
            }
        }
    }

    @Override
    public void ensureConsumersNoticedStateChange()
    {
        Collection<ConsumerTarget_0_10> consumerTargets = getSubscriptions();
        for(ConsumerTarget_0_10 consumerTarget: consumerTargets)
        {
            try
            {
                consumerTarget.getSendLock();
            }
            finally
            {
                consumerTarget.releaseSendLock();
            }
        }
    }

    @Override
    public void doTimeoutAction(final String reason)
    {
        getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
    }

    public final long getMaxUncommittedInMemorySize()
    {
        return _maxUncommittedInMemorySize;
    }

    @Override
    public int compareTo(AMQSessionModel o)
    {
        return getId().compareTo(o.getId());
    }

    private class CheckCapacityAction implements Action<MessageInstance>
    {
        @Override
        public void performAction(final MessageInstance entry)
        {
            TransactionLogResource queue = entry.getOwningResource();
            if(queue instanceof CapacityChecker)
            {
                ((CapacityChecker)queue).checkCapacity(ServerSession.this);
            }
        }
    }

    private class ConsumerClosedListener implements ConfigurationChangeListener
    {
        @Override
        public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState)
        {
            if(newState == org.apache.qpid.server.model.State.DELETED)
            {
                consumerRemoved((Consumer<?>)object);
            }
        }

        @Override
        public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
        {

        }

        @Override
        public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
        {

        }

        @Override
        public void attributeSet(final ConfiguredObject object,
                                 final String attributeName,
                                 final Object oldAttributeValue,
                                 final Object newAttributeValue)
        {

        }

        @Override
        public void bulkChangeStart(final ConfiguredObject<?> object)
        {

        }

        @Override
        public void bulkChangeEnd(final ConfiguredObject<?> object)
        {

        }
    }
}
