/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.qpid.server.queue;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;

public class LastValueQueueList extends OrderedQueueEntryList
{
    private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueList.class);

    private static final HeadCreator HEAD_CREATOR = new HeadCreator()
    {

        @Override
        public ConflationQueueEntry createHead(final QueueEntryList list)
        {
            return ((LastValueQueueList)list).createHead();
        }
    };

    private final String _conflationKey;
    private final ConcurrentMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap =
        new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>();

    private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
    private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);

    public LastValueQueueList(LastValueQueueImpl queue)
    {
        super(queue, HEAD_CREATOR);
        _conflationKey = queue.getLvqKey();
    }

    private ConflationQueueEntry createHead()
    {
        return new ConflationQueueEntry(this);
    }

    @Override
    protected ConflationQueueEntry createQueueEntry(ServerMessage message,
                                                    final MessageEnqueueRecord enqueueRecord)
    {
        return new ConflationQueueEntry(this, message, enqueueRecord);
    }



    /**
     * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
     */
    @Override
    public ConflationQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
    {
        final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message, enqueueRecord);

        final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
        if (keyValue != null)
        {
            if(LOGGER.isDebugEnabled())
            {
                LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
            }

            final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
            AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
            ConflationQueueEntry entryFromMap;

            // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
            // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
            // indicating that the reference object is no longer valid (it is being removed from the map).
            boolean keepTryingToUpdateEntryReference;
            do
            {
                do
                {
                    entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);

                    // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)  
                    entryFromMap = entryReferenceFromMap.get();
                }
                while(entryFromMap == _deleteInProgress);

                boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;

                keepTryingToUpdateEntryReference = entryFromMapIsOlder
                        && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
            }
            while(keepTryingToUpdateEntryReference);

            if (entryFromMap == _newerEntryAlreadyBeenAndGone)
            {
                discardEntry(addedEntry);
            }
            else if (entryFromMap.compareTo(addedEntry) > 0)
            {
                if(LOGGER.isDebugEnabled())
                {
                    LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
                }
                discardEntry(addedEntry);
            }
            else if (entryFromMap.compareTo(addedEntry) < 0)
            {
                if(LOGGER.isDebugEnabled())
                {
                    LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
                }
                discardEntry(entryFromMap);
            }

            addedEntry.setLatestValueReference(entryReferenceFromMap);
        }

        return addedEntry;
    }

    /**
     * Returns:
     *
     * <ul>
     * <li>the existing entry reference if the value already exists in the map, or</li>
     * <li>referenceToValue if none exists, or</li>
     * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently
     * adds and removes during execution of this method.</li>
     * </ul>
     */
    private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue)
    {
        AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);

        if(latestValueReference == null)
        {
            latestValueReference = _latestValuesMap.get(key);
            if(latestValueReference == null)
            {
                return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone);
            }
        }
        return latestValueReference;
    }

    private void discardEntry(final QueueEntry entry)
    {
        if(entry.acquire())
        {
            ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
            txn.dequeue(entry.getEnqueueRecord(),
                                    new ServerTransaction.Action()
                                {
                                    @Override
                                    public void postCommit()
                                    {
                                        entry.delete();
                                    }

                                    @Override
                                    public void onRollback()
                                    {

                                    }
                                });
        }
    }

    final class ConflationQueueEntry extends OrderedQueueEntry
    {

        private AtomicReference<ConflationQueueEntry> _latestValueReference;

        private ConflationQueueEntry(final LastValueQueueList queueEntryList)
        {
            super(queueEntryList);
        }

        public ConflationQueueEntry(LastValueQueueList queueEntryList,
                                    ServerMessage message,
                                    final MessageEnqueueRecord messageEnqueueRecord)
        {
            super(queueEntryList, message, messageEnqueueRecord);
        }

        @Override
        public void release()
        {
            super.release();

            discardIfReleasedEntryIsNoLongerLatest();
        }

        @Override
        public void release(MessageInstanceConsumer consumer)
        {
            super.release(consumer);

            discardIfReleasedEntryIsNoLongerLatest();
        }

        @Override
        protected void onDelete()
        {
            if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
            {
                Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
                _latestValuesMap.remove(key,_latestValueReference);
            }

        }

        public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference)
        {
            _latestValueReference = latestValueReference;
        }

        private void discardIfReleasedEntryIsNoLongerLatest()
        {
            if(_latestValueReference != null)
            {
                if(_latestValueReference.get() != this)
                {
                    discardEntry(this);
                }
            }
        }

    }

    /**
     * Exposed purposes of unit test only.
     */
    Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap()
    {
        return Collections.unmodifiableMap(_latestValuesMap);
    }
}
