/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

/**
 * Eagerly removes expired entries from cache when
 * {@link CacheConfiguration#isEagerTtl()} flag is set.
 */
public class GridCacheTtlManager extends GridCacheManagerAdapter {
    /**
     * Throttling timeout in millis which avoid excessive PendingTree access on unwind
     * if there is nothing to clean yet.
     */
    private final long unwindThrottlingTimeout = Long.getLong(
        IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, 500L);

    /** Entries pending removal. This collection tracks entries for near cache only. */
    private GridConcurrentSkipListSetEx pendingEntries;

    /** Indicates that  */
    protected volatile boolean hasPendingEntries;

    /** Timestamp when next clean try will be allowed. Used for throttling on per-cache basis. */
    protected volatile long nextCleanTime;

    /** See {@link CacheConfiguration#isEagerTtl()}. */
    private volatile boolean eagerTtlEnabled;

    /** */
    private GridCacheContext dhtCtx;

    /** */
    private final IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> expireC =
        new IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion>() {
            @Override public void applyx(GridCacheEntryEx entry, GridCacheVersion obsoleteVer) {
                boolean touch = !entry.isNear();

                while (true) {
                    try {
                        if (log.isTraceEnabled())
                            log.trace("Trying to remove expired entry from cache: " + entry);

                        if (entry.onTtlExpired(obsoleteVer))
                            touch = false;

                        break;
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        entry = entry.context().cache().entryEx(entry.key());

                        touch = true;
                    }
                }

                if (touch)
                    entry.touch();
            }
        };

    /** {@inheritDoc} */
    @Override protected void start0() throws IgniteCheckedException {
        dhtCtx = cctx.isNear() ? cctx.near().dht().context() : cctx;

        boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
            !cctx.config().isEagerTtl() ||
            CU.isUtilityCache(cctx.name()) ||
            cctx.dataStructuresCache() ||
            (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);

        if (cleanupDisabled)
            return;

        eagerTtlEnabled = true;

        cctx.shared().ttl().register(this);

        pendingEntries = (!cctx.isLocal() && cctx.config().getNearConfiguration() != null) ? new GridConcurrentSkipListSetEx() : null;
    }

    /**
     * @return {@code True} if eager ttl is enabled for cache.
     */
    public boolean eagerTtlEnabled() {
        return eagerTtlEnabled;
    }

    /** {@inheritDoc} */
    @Override protected void onKernalStop0(boolean cancel) {
        if (pendingEntries != null)
            pendingEntries.clear();
    }

    /**
     * Unregister this TTL manager of cache from periodical check on expired entries.
     */
    public void unregister() {
        // Ignoring attempt to unregister manager that has never been started.
        if (!starting.get())
            return;

        cctx.shared().ttl().unregister(this);
    }

    /**
     * Adds tracked entry to ttl processor.
     *
     * @param entry Entry to add.
     */
    void addTrackedEntry(GridNearCacheEntry entry) {
        assert entry.lockedByCurrentThread();

        EntryWrapper e = new EntryWrapper(entry);

        pendingEntries.add(e);
    }

    /**
     * @param entry Entry to remove.
     */
    void removeTrackedEntry(GridNearCacheEntry entry) {
        assert entry.lockedByCurrentThread();

        pendingEntries.remove(new EntryWrapper(entry));
    }

    /**
     * @return The size of pending entries.
     * @throws IgniteCheckedException If failed.
     */
    public long pendingSize() throws IgniteCheckedException {
        return (pendingEntries != null ? pendingEntries.sizex() : 0) + cctx.offheap().expiredSize();
    }

    /**
     * Updates the flag {@code hasPendingEntries} with the given value.
     *
     * @param update {@code true} if the underlying pending tree has entries with expire policy enabled.
     */
    public void hasPendingEntries(boolean update) {
        hasPendingEntries = update;
    }

    /**
     * @return {@code true} if the underlying pending tree has entries with expire policy enabled.
     */
    public boolean hasPendingEntries() {
        return hasPendingEntries;
    }

    /** {@inheritDoc} */
    @Override public void printMemoryStats() {
        try {
            X.println(">>>");
            X.println(">>> TTL processor memory stats [igniteInstanceName=" + cctx.igniteInstanceName() +
                ", cache=" + cctx.name() + ']');
            X.println(">>>   pendingEntriesSize: " + pendingSize());
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed to print statistics: " + e, e);
        }
    }

    /**
     * Processes specified amount of expired entries.
     *
     * @param amount Limit of processed entries by single call, {@code -1} for no limit.
     * @return {@code True} if unprocessed expired entries remains.
     */
    public boolean expire(int amount) {
        // TTL manager is not initialized or eagerTtl disabled for cache.
        if (!eagerTtlEnabled)
            return false;

        assert cctx != null;

        long now = U.currentTimeMillis();

        try {
            if (pendingEntries != null) {
                GridNearCacheAdapter nearCache = cctx.near();

                GridCacheVersion obsoleteVer = null;

                int limit = (-1 != amount) ? amount : pendingEntries.sizex();

                for (int cnt = limit; cnt > 0; cnt--) {
                    EntryWrapper e = pendingEntries.firstx();

                    if (e == null || e.expireTime > now)
                        break; // All expired entries are processed.

                    if (pendingEntries.remove(e)) {
                        if (obsoleteVer == null)
                            obsoleteVer = cctx.versions().next();

                        GridNearCacheEntry nearEntry = nearCache.peekExx(e.key);

                        if (nearEntry != null)
                            expireC.apply(nearEntry, obsoleteVer);
                    }
                }
            }

            if (!cctx.affinityNode())
                return false;  /* Pending tree never contains entries for that cache */

            if (!hasPendingEntries || nextCleanTime > U.currentTimeMillis())
                return false;

            boolean more = cctx.offheap().expire(dhtCtx, expireC, amount);

            if (more)
                return true;

            // There is nothing to clean, so the next clean up can be postponed.
            nextCleanTime = U.currentTimeMillis() + unwindThrottlingTimeout;

            if (amount != -1 && pendingEntries != null) {
                EntryWrapper e = pendingEntries.firstx();

                return e != null && e.expireTime <= now;
            }
        }
        catch (GridDhtInvalidPartitionException e) {
            if (log.isDebugEnabled())
                log.debug("Partition became invalid during rebalancing (will ignore): " + e.partition());

            return false;
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed to process entry expiration: " + e, e);
        }
        catch (IgniteException e) {
            if (e.hasCause(NodeStoppingException.class)) {
                if (log.isDebugEnabled())
                    log.debug("Failed to expire because node is stopped: " + e);
            }
            else
                throw e;
        }

        return false;
    }

    /**
     * @param cctx1 First cache context.
     * @param key1 Left key to compare.
     * @param cctx2 Second cache context.
     * @param key2 Right key to compare.
     * @return Comparison result.
     */
    private static int compareKeys(GridCacheContext cctx1, CacheObject key1, GridCacheContext cctx2, CacheObject key2) {
        int key1Hash = key1.hashCode();
        int key2Hash = key2.hashCode();

        int res = Integer.compare(key1Hash, key2Hash);

        if (res == 0) {
            key1 = (CacheObject)cctx1.unwrapTemporary(key1);
            key2 = (CacheObject)cctx2.unwrapTemporary(key2);

            try {
                byte[] key1ValBytes = key1.valueBytes(cctx1.cacheObjectContext());
                byte[] key2ValBytes = key2.valueBytes(cctx2.cacheObjectContext());

                // Must not do fair array comparison.
                res = Integer.compare(key1ValBytes.length, key2ValBytes.length);

                if (res == 0) {
                    for (int i = 0; i < key1ValBytes.length; i++) {
                        res = Byte.compare(key1ValBytes[i], key2ValBytes[i]);

                        if (res != 0)
                            break;
                    }
                }

                if (res == 0)
                    res = Boolean.compare(cctx1.isNear(), cctx2.isNear());
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException(e);
            }
        }

        return res;
    }

    /**
     * Entry wrapper.
     */
    private static class EntryWrapper implements Comparable<EntryWrapper> {
        /** Entry expire time. */
        private final long expireTime;

        /** Cache Object Context */
        private final GridCacheContext ctx;

        /** Cache Object Key */
        private final KeyCacheObject key;

        /**
         * @param entry Cache entry to create wrapper for.
         */
        private EntryWrapper(GridCacheMapEntry entry) {
            expireTime = entry.expireTimeUnlocked();

            assert expireTime != 0;

            this.ctx = entry.context();
            this.key = entry.key();
        }

        /** {@inheritDoc} */
        @Override public int compareTo(EntryWrapper o) {
            int res = Long.compare(expireTime, o.expireTime);

            if (res == 0)
                res = compareKeys(ctx, key, o.ctx, o.key);

            return res;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (!(o instanceof EntryWrapper))
                return false;

            EntryWrapper that = (EntryWrapper)o;

            return expireTime == that.expireTime && compareKeys(ctx, key, that.ctx, that.key) == 0;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            int res = (int)(expireTime ^ (expireTime >>> 32));

            res = 31 * res + key.hashCode();

            return res;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(EntryWrapper.class, this);
        }
    }

    /**
     * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition:
     * <ul>
     *     <li>{@code #add()}</li>
     *     <li>{@code #remove()}</li>
     *     <li>{@code #pollFirst()}</li>
     * <ul/>
     */
    private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> {
        /** */
        private static final long serialVersionUID = 0L;

        /** Size. */
        private final LongAdder size = new LongAdder();

        /**
         * @return Size based on performed operations.
         */
        public int sizex() {
            return size.intValue();
        }

        /** {@inheritDoc} */
        @Override public boolean add(EntryWrapper e) {
            boolean res = super.add(e);

            if (res)
                size.increment();

            return res;
        }

        /** {@inheritDoc} */
        @Override public boolean remove(Object o) {
            boolean res = super.remove(o);

            if (res)
                size.decrement();

            return res;
        }

        /** {@inheritDoc} */
        @Nullable @Override public EntryWrapper pollFirst() {
            EntryWrapper e = super.pollFirst();

            if (e != null)
                size.decrement();

            return e;
        }
    }
}
