/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.queue;

import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;

import java.text.MessageFormat;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.StateChangeListener;

class QueueConsumerImpl
    extends AbstractConfiguredObject<QueueConsumerImpl>
        implements QueueConsumer<QueueConsumerImpl>, LogSubject
{
    private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
    private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
    private final AtomicBoolean _closed = new AtomicBoolean(false);
    private final long _consumerNumber;
    private final long _createTime = System.currentTimeMillis();
    private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl>
            _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
    private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
    private final boolean _acquires;
    private final boolean _seesRequeues;
    private final boolean _isTransient;
    private final AtomicLong _deliveredCount = new AtomicLong(0);
    private final AtomicLong _deliveredBytes = new AtomicLong(0);
    private final FilterManager _filters;
    private final Class<? extends ServerMessage> _messageClass;
    private final Object _sessionReference;
    private final AbstractQueue _queue;

    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;

    static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
            new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);

    static
    {
        STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
        STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
        STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
    }

    private final ConsumerTarget _target;
    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener;
    private volatile QueueContext _queueContext;
    private volatile StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
    {
        public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
        {
            // no-op
        }
    };
    @ManagedAttributeField
    private boolean _exclusive;
    @ManagedAttributeField
    private boolean _noLocal;
    @ManagedAttributeField
    private String _distributionMode;
    @ManagedAttributeField
    private String _settlementMode;
    @ManagedAttributeField
    private String _selector;
    @ManagedAttributeField
    private int _priority;

    QueueConsumerImpl(final AbstractQueue<?> queue,
                      ConsumerTarget target,
                      final String consumerName,
                      final FilterManager filters,
                      final Class<? extends ServerMessage> messageClass,
                      EnumSet<ConsumerOption> optionSet,
                      final Integer priority)
    {
        super(parentsMap(queue, target.getSessionModel().getModelObject()),
              createAttributeMap(consumerName, filters, optionSet, priority));
        _messageClass = messageClass;
        _sessionReference = target.getSessionModel().getConnectionReference();
        _consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
        _filters = filters;
        _acquires = optionSet.contains(ConsumerOption.ACQUIRES);
        _seesRequeues = optionSet.contains(ConsumerOption.SEES_REQUEUES);
        _isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
        _target = target;
        _queue = queue;

        // Access control
        authorise(Operation.CREATE);



        _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
        {
            @Override
            public void stateChanged(final ConsumerTarget object,
                                     final ConsumerTarget.State oldState,
                                     final ConsumerTarget.State newState)
            {
                targetStateChanged(oldState, newState);
            }
        };
        _target.addStateListener(_listener);

        _suspendedConsumerLoggingTicker = target.isMultiQueue()
                ? null
                : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
                    {
                        @Override
                        protected void log(final long period)
                        {
                            getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
                        }
                    };


    }

    @Override
    protected void onOpen()
    {
        super.onOpen();
        setupLogging();
    }

    private static Map<String, Object> createAttributeMap(String name,
                                                          FilterManager filters,
                                                          EnumSet<ConsumerOption> optionSet,
                                                          Integer priority)
    {
        Map<String,Object> attributes = new HashMap<String, Object>();
        attributes.put(ID, UUID.randomUUID());
        attributes.put(NAME, name);
        attributes.put(EXCLUSIVE, optionSet.contains(ConsumerOption.EXCLUSIVE));
        attributes.put(NO_LOCAL, optionSet.contains(ConsumerOption.NO_LOCAL));
        attributes.put(DISTRIBUTION_MODE, optionSet.contains(ConsumerOption.ACQUIRES) ? "MOVE" : "COPY");
        attributes.put(DURABLE,optionSet.contains(ConsumerOption.DURABLE));
        attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
        if(priority != null)
        {
            attributes.put(PRIORITY, priority);
        }
        if(filters != null)
        {
            Iterator<MessageFilter> iter = filters.filters();
            while(iter.hasNext())
            {
                MessageFilter filter = iter.next();
                if(filter instanceof JMSSelectorFilter)
                {
                    attributes.put(SELECTOR, ((JMSSelectorFilter) filter).getSelector());
                    break;
                }
            }
        }

        return attributes;
    }

    private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
    {
        if(oldState != newState)
        {
            if(newState == ConsumerTarget.State.CLOSED)
            {
                if(_targetClosed.compareAndSet(false,true))
                {
                    getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
                }
            }

            if(_suspendedConsumerLoggingTicker != null)
            {
                if (newState == ConsumerTarget.State.SUSPENDED)
                {
                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
                }
                else
                {
                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
                }
            }
        }

        if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
        {
            closeAsync();
        }
        final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
        if(stateListener != null)
        {
            stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
        }
    }

    @Override
    public ConsumerTarget getTarget()
    {
        return _target;
    }

    @Override
    public void awaitCredit(final QueueEntry node)
    {
        _waitingOnCreditMessageListener.update(node);
    }

    @Override
    public boolean hasCredit()
    {
        return _target.hasCredit();
    }

    @Override
    public void externalStateChange()
    {
        if(isPullOnly())
        {
            getSessionModel().getAMQPConnection().notifyWork();
        }
        else
        {
            _queue.deliverAsync();
        }
    }

    @Override
    public boolean hasAvailableMessages()
    {
        return !_queue.isEmpty() && _queue.hasAvailableMessages(this);
    }

    @Override
    public long getUnacknowledgedBytes()
    {
        return _target.getUnacknowledgedBytes();
    }

    @Override
    public long getUnacknowledgedMessages()
    {
        return _target.getUnacknowledgedMessages();
    }

    @Override
    public AMQSessionModel getSessionModel()
    {
        return _target.getSessionModel();
    }

    @Override
    public Object getIdentifier()
    {
        return getConsumerNumber();
    }

    @Override
    public boolean isSuspended()
    {
        return _target.isSuspended();
    }

    @Override
    protected void onClose()
    {
        if(_closed.compareAndSet(false,true))
        {
            _target.getSendLock();
            try
            {
                _waitingOnCreditMessageListener.remove();
                _target.consumerRemoved(this);
                _target.removeStateChangeListener(_listener);
                _queue.unregisterConsumer(this);
                if(_suspendedConsumerLoggingTicker != null)
                {
                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
                }
                deleted();
            }
            finally
            {
                _target.releaseSendLock();
            }

        }
    }

    @Override
    public int getPriority()
    {
        return _priority;
    }

    public void flushBatched()
    {
        _target.flushBatched();
    }

    @Override
    public boolean isPullOnly()
    {
        return _target.isPullOnly();
    }

    public void queueDeleted()
    {
        _target.queueDeleted();
    }

    public boolean wouldSuspend(final QueueEntry msg)
    {
        return !_target.allocateCredit(msg.getMessage());
    }

    public void restoreCredit(final QueueEntry queueEntry)
    {
        _target.restoreCredit(queueEntry.getMessage());
    }

    public void queueEmpty()
    {
        _target.queueEmpty();
    }

    @Override
    public State getState()
    {
        return STATE_MAP.get(_target.getState());
    }

    @Override
    public final Queue<?> getQueue()
    {
        return _queue;
    }

    private void setupLogging()
    {
        final String filterLogString = getFilterLogString();
        getEventLogger().message(this,
                                 SubscriptionMessages.CREATE(filterLogString, _queue.isDurable() && _exclusive,
                                                             filterLogString.length() > 0));
    }

    protected final LogSubject getLogSubject()
    {
        return this;
    }

    @Override
    public final void flush()
    {
        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
        try
        {
            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
            _queue.flushConsumer(this);
            _target.processPending();
        }
        finally
        {
            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
        }

    }

    @Override
    public void pullMessage()
    {
        AMQPConnection<?> connection = _target.getSessionModel().getAMQPConnection();
        try
        {
            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(true);
            _queue.flushConsumer(this, 1);
        }
        finally
        {
            connection.alwaysAllowMessageAssignmentInThisThreadIfItIsIOThread(false);
        }

    }

    public boolean resend(final QueueEntry entry)
    {
        boolean messageWasResent = getQueue().resend(entry, this);
        if (messageWasResent)
        {
            _target.processPending();
        }
        return messageWasResent;
    }

    public final long getConsumerNumber()
    {
        return _consumerNumber;
    }

    public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
    {
        return _stateListener;
    }

    public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
    {
        _stateListener = listener;
    }

    public final QueueContext getQueueContext()
    {
        return _queueContext;
    }

    final void setQueueContext(QueueContext queueContext)
    {
        _queueContext = queueContext;
    }

    public final boolean isActive()
    {
        return getState() == State.ACTIVE;
    }

    public final boolean isClosed()
    {
        return getState() == State.DELETED;
    }

    public final boolean hasInterest(QueueEntry entry)
    {
       //check that the message hasn't been rejected
        if (entry.isRejectedBy(this) || entry.checkHeld(System.currentTimeMillis()))
        {
            return false;
        }

        if (entry.getMessage().getClass() == _messageClass)
        {
            if(_noLocal)
            {
                Object connectionRef = entry.getMessage().getConnectionReference();
                if (connectionRef != null && connectionRef == _sessionReference)
                {
                    return false;
                }
            }
        }
        else
        {
            // no interest in messages we can't convert
            if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(),
                                                                              _messageClass)==null)
            {
                return false;
            }
        }

        if (_filters == null)
        {
            return true;
        }
        else
        {
            MessageReference ref = entry.newMessageReference();
            if(ref != null)
            {
                try
                {

                    Filterable msg = entry.asFilterable();
                    try
                    {
                        return _filters.allAllow(msg);
                    }
                    catch (SelectorParsingException e)
                    {
                        LOGGER.info(this + " could not evaluate filter [" + _filters
                                    + "]  against message " + msg
                                    + ". Error was : " + e.getMessage());
                        return false;
                    }
                }
                finally
                {
                    ref.release();
                }
            }
            else
            {
                return false;
            }
        }
    }

    protected String getFilterLogString()
    {
        StringBuilder filterLogString = new StringBuilder();
        String delimiter = ", ";
        boolean hasEntries = false;
        if (_filters != null && _filters.hasFilters())
        {
            filterLogString.append(_filters.toString());
            hasEntries = true;
        }

        if (!acquires())
        {
            if (hasEntries)
            {
                filterLogString.append(delimiter);
            }
            filterLogString.append("Browser");
        }

        return filterLogString.toString();
    }

    public final boolean trySendLock()
    {
        return getTarget().trySendLock();
    }

    public final void getSendLock()
    {
        getTarget().getSendLock();
    }

    public final void releaseSendLock()
    {
        getTarget().releaseSendLock();
    }

    public final long getCreateTime()
    {
        return _createTime;
    }

    public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl> getOwningState()
    {
        return _owningState;
    }

    public final boolean acquires()
    {
        return _acquires;
    }

    public final boolean seesRequeues()
    {
        return _seesRequeues;
    }

    public final boolean isTransient()
    {
        return _isTransient;
    }

    public final long getBytesOut()
    {
        return _deliveredBytes.longValue();
    }

    public final long getMessagesOut()
    {
        return _deliveredCount.longValue();
    }

    public final void send(final QueueEntry entry, final boolean batch)
    {
        _deliveredCount.incrementAndGet();
        long size = _target.send(this, entry, batch);
        _deliveredBytes.addAndGet(size);
    }

    @Override
    public void acquisitionRemoved(final QueueEntry node)
    {
        _target.acquisitionRemoved(node);
        _queue.decrementUnackedMsgCount(node);
    }

    @Override
    public String getDistributionMode()
    {
        return _distributionMode;
    }

    @Override
    public String getSettlementMode()
    {
        return _settlementMode;
    }

    @Override
    public boolean isExclusive()
    {
        return _exclusive;
    }

    @Override
    public boolean isNoLocal()
    {
        return _noLocal;
    }

    @Override
    public String getSelector()
    {
        return _selector;
    }


    @Override
    public String toLogString()
    {
        String logString;
        if(_queue == null)
        {
            logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
                        + "(UNKNOWN)"
                        + "] ";
        }
        else
        {
            String queueString = new QueueLogSubject(_queue).toLogString();
            logString = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
                                     + "("
                                     // queueString is [vh(/{0})/qu({1}) ] so need to trim
                                     //                ^                ^^
                                     + queueString.substring(1,queueString.length() - 3)
                                     + ")"
                                     + "] ";

        }

        return logString;
    }

    private EventLogger getEventLogger()
    {
        return _queue.getEventLogger();
    }

    public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, MessageInstance.EntryState>
    {
        private final AtomicReference<MessageInstance> _entry = new AtomicReference<>();

        public WaitingOnCreditMessageListener()
        {
        }

        public void update(final MessageInstance entry)
        {
            remove();
            // this only happens under send lock so only one thread can be setting to a non null value at any time
            _entry.set(entry);
            entry.addStateChangeListener(this);
            if(!entry.isAvailable())
            {
                if(isPullOnly())
                {
                    getSessionModel().getAMQPConnection().notifyWork();
                }
                else
                {
                    _queue.deliverAsync();
                }
                remove();
            }
        }

        public void remove()
        {
            MessageInstance instance;
            if((instance = _entry.getAndSet(null)) != null)
            {
                instance.removeStateChangeListener(this);
            }

        }

        @Override
        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
        {
            entry.removeStateChangeListener(this);
            _entry.compareAndSet(entry, null);
            if(isPullOnly())
            {
                getSessionModel().getAMQPConnection().notifyWork();
            }
            else
            {
                _queue.deliverAsync();
            }
        }

    }
}
