/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;

/**
 * Cache gateway.
 */
@GridToStringExclude
public class GridCacheGateway<K, V> {
    /** Context. */
    private final GridCacheContext<K, V> ctx;

    /** Stopped flag for dynamic caches. */
    private final AtomicReference<State> state = new AtomicReference<>(State.STARTED);

    /** */
    private IgniteFuture<?> reconnectFut;

    /** */
    private StripedCompositeReadWriteLock rwLock =
        new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors());

    /**
     * @param ctx Cache context.
     */
    public GridCacheGateway(GridCacheContext<K, V> ctx) {
        assert ctx != null;

        this.ctx = ctx;
    }

    /**
     * Enter a cache call.
     */
    public void enter() {
        if (ctx.deploymentEnabled())
            ctx.deploy().onEnter();

        rwLock.readLock().lock();

        checkState(true, true);
    }

    /**
     * @param lock {@code True} if lock is held.
     * @param stopErr {@code True} if throw exception if stopped.
     * @return {@code True} if cache is in started state.
     */
    private boolean checkState(boolean lock, boolean stopErr) {
        State state = this.state.get();

        if (state != State.STARTED) {
            if (lock)
                rwLock.readLock().unlock();

            if (state == State.STOPPED) {
                if (stopErr)
                    throw new IllegalStateException(new CacheStoppedException(ctx.name()));
                else
                    return false;
            }
            else {
                assert reconnectFut != null;

                throw new CacheException(
                    new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " +
                        ctx.igniteInstanceName()));
            }
        }

        return true;
    }

    /**
     * Enter a cache call.
     *
     * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
     */
    public boolean enterIfNotStopped() {
        onEnter(null);

        // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop.
        rwLock.readLock().lock();

        return checkState(true, false);
    }

    /**
     * Enter a cache call without lock.
     *
     * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped.
     */
    public boolean enterIfNotStoppedNoLock() {
        onEnter(null);

        return checkState(false, false);
    }

    /**
     * Leave a cache call entered by {@link #enterNoLock} method.
     */
    public void leaveNoLock() {
        ctx.tm().resetContext();
        ctx.mvcc().contextReset();

        // Unwind eviction notifications.
        if (!ctx.shared().closed(ctx))
            CU.unwindEvicts(ctx);
    }

    /**
     * Leave a cache call entered by {@link #enter()} method.
     */
    public void leave() {
        try {
            leaveNoLock();
        }
        finally {
            rwLock.readLock().unlock();
        }
    }

    /**
     * @param opCtx Cache operation context to guard.
     * @return Previous operation context set on this thread.
     */
    @Nullable public CacheOperationContext enter(@Nullable CacheOperationContext opCtx) {
        try {
            GridCacheAdapter<K, V> cache = ctx.cache();

            GridCachePreloader preldr = cache != null ? cache.preloader() : null;

            if (preldr == null)
                throw new IllegalStateException(new CacheStoppedException(ctx.name()));

            preldr.startFuture().get();
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException("Failed to wait for cache preloader start [cacheName=" +
                ctx.name() + "]", e);
        }

        onEnter(opCtx);

        Lock lock = rwLock.readLock();

        lock.lock();

        checkState(true, true);

        // Must unlock in case of unexpected errors to avoid
        // deadlocks during kernal stop.
        try {
            return setOperationContextPerCall(opCtx);
        }
        catch (Throwable e) {
            lock.unlock();

            throw e;
        }
    }

    /**
     * @param opCtx Operation context to guard.
     * @return Previous operation context set on this thread.
     */
    @Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) {
        onEnter(opCtx);

        checkState(false, false);

        return setOperationContextPerCall(opCtx);
    }

    /**
     * Set thread local operation context per call.
     *
     * @param opCtx Operation context to guard.
     * @return Previous operation context set on this thread.
     */
    private CacheOperationContext setOperationContextPerCall(@Nullable CacheOperationContext opCtx) {
        CacheOperationContext prev = ctx.operationContextPerCall();

        if (prev != null || opCtx != null)
            ctx.operationContextPerCall(opCtx);

        return prev;
    }

    /**
     * @param prev Previous.
     */
    public void leave(CacheOperationContext prev) {
        try {
            leaveNoLock(prev);
        }
        finally {
            rwLock.readLock().unlock();
        }
    }

    /**
     * @param prev Previous.
     */
    public void leaveNoLock(CacheOperationContext prev) {
        ctx.tm().resetContext();
        ctx.mvcc().contextReset();

        // Unwind eviction notifications.
        CU.unwindEvicts(ctx);

        // Return back previous thread local operation context per call.
        ctx.operationContextPerCall(prev);
    }

    /**
     * @param opCtx Cache operation context.
     */
    private void onEnter(CacheOperationContext opCtx) {
        ctx.itHolder().checkWeakQueue();

        if (ctx.deploymentEnabled())
            ctx.deploy().onEnter();

        if (opCtx != null)
            checkAtomicOpsInTx(opCtx);
    }

    /**
     *
     */
    public boolean isStopped() {
        return !checkState(false, false);
    }

    /**
     *
     */
    public void stopped() {
        state.set(State.STOPPED);
    }

    /**
     * @param reconnectFut Reconnect future.
     */
    public void onDisconnected(IgniteFuture<?> reconnectFut) {
        assert reconnectFut != null;

        this.reconnectFut = reconnectFut;

        state.compareAndSet(State.STARTED, State.DISCONNECTED);
    }

    /**
     *
     */
    public void writeLock(){
        rwLock.writeLock().lock();
    }

    /**
     *
     */
    public void writeUnlock() {
        rwLock.writeLock().unlock();
    }

    /**
     * @param stopped Cache stopped flag.
     */
    public void reconnected(boolean stopped) {
        State newState = stopped ? State.STOPPED : State.STARTED;

        state.compareAndSet(State.DISCONNECTED, newState);
    }

    /**
     *
     */
    public void onStopped() {
        boolean interrupted = false;

        while (true) {
            try {
                if (rwLock.writeLock().tryLock(200, TimeUnit.MILLISECONDS))
                    break;
                else
                    U.sleep(200);
            }
            catch (IgniteInterruptedCheckedException | InterruptedException ignore) {
                interrupted = true;
            }
        }

        if (interrupted)
            Thread.currentThread().interrupt();

        try {
            state.set(State.STOPPED);
        }
        finally {
            rwLock.writeLock().unlock();
        }
    }

    /**
     *
     */
    private enum State {
        /** */
        STARTED,

        /** */
        DISCONNECTED,

        /** */
        STOPPED
    }

    /**
     * Checks if this operation is available to be used in transaction.
     *
     * @throws IgniteException - in case of atomic operation inside transaction without permission.
     */
    private void checkAtomicOpsInTx(CacheOperationContext opCtx) throws IgniteException {
        if (ctx.atomic() && !opCtx.allowedAtomicOpsInTx()) {
            if (ctx.grid().transactions().tx() != null) {
                throw new IgniteException("Transaction spans operations on atomic cache " +
                    "(don't use atomic cache inside transaction or set up flag by cache.allowedAtomicOpsInTx()).");
            }
        }
    }
}
