/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.datastructures;

import java.io.Externalizable;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;

import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;

/**
 * Cache count down latch implementation.
 */
public final class GridCacheCountDownLatchImpl extends AtomicDataStructureProxy<GridCacheCountDownLatchValue>
    implements GridCacheCountDownLatchEx, IgniteChangeGlobalStateSupport, Externalizable {
    /** */
    private static final long serialVersionUID = 0L;

    /** Internal latch is in unitialized state. */
    private static final int UNINITIALIZED_LATCH_STATE = 0;

    /** Internal latch is being created. */
    private static final int CREATING_LATCH_STATE = 1;

    /** Internal latch is ready for the usage. */
    private static final int READY_LATCH_STATE = 2;

    /** Deserialization stash. */
    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
                return new IgniteBiTuple<>();
            }
        };

    /** Initial count. */
    private int initCnt;

    /** Auto delete flag. */
    private boolean autoDel;

    /** Internal latch (transient). */
    private CountDownLatch internalLatch;

    /** Initialization guard. */
    private final AtomicInteger initGuard = new AtomicInteger();

    /** Initialization latch. */
    private final CountDownLatch initLatch = new CountDownLatch(1);

    /** Latest latch value that is used at the stage while the internal latch is being initialized. */
    private Integer lastLatchVal = null;

    /**
     * Empty constructor required by {@link Externalizable}.
     */
    public GridCacheCountDownLatchImpl() {
        // No-op.
    }

    /**
     * Constructor.
     *
     * @param name Latch name.
     * @param initCnt Initial count.
     * @param autoDel Auto delete flag.
     * @param key Latch key.
     * @param latchView Latch projection.
     */
    public GridCacheCountDownLatchImpl(String name,
        int initCnt,
        boolean autoDel,
        GridCacheInternalKey key,
        IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> latchView)
    {
        super(name, key, latchView);

        assert name != null;
        assert key != null;
        assert latchView != null;

        this.initCnt = initCnt;
        this.autoDel = autoDel;
    }

    /** {@inheritDoc} */
    @Override public int count() {
        try {
            GridCacheCountDownLatchValue latchVal = cacheView.get(key);

            return latchVal == null ? 0 : latchVal.get();
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public int initialCount() {
        return initCnt;
    }

    /** {@inheritDoc} */
    @Override public boolean autoDelete() {
        return autoDel;
    }

    /** {@inheritDoc} */
    @Override public void await() {
        try {
            initializeLatch();

            U.await(internalLatch);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public boolean await(long timeout, TimeUnit unit) {
        try {
            initializeLatch();

            return U.await(internalLatch, timeout, unit);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public boolean await(long timeout) {
        return await(timeout, TimeUnit.MILLISECONDS);
    }

    /** {@inheritDoc} */
    @Override public int countDown() {
        return countDown(1);
    }

    /** {@inheritDoc} */
    @Override public int countDown(int val) {
        A.ensure(val > 0, "val should be positive");

        try {
            return retryTopologySafe(new CountDownCallable(val));
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc}*/
    @Override public void countDownAll() {
        try {
            retryTopologySafe(new CountDownCallable(0));
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public void needCheckNotRemoved() {
        // No-op.
    }

    /** {@inheritDoc} */
    @Override public void onUpdate(int cnt) {
        assert cnt >= 0;

        CountDownLatch latch0;

        synchronized (initGuard) {
            int state = initGuard.get();

            if (state != READY_LATCH_STATE) {
                /* Internal latch is not fully initialized yet. Remember latest latch value. */
                lastLatchVal = cnt;

                return;
            }

            /* 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
            latch0 = internalLatch;
        }

        /* Internal latch is fully initialized and ready for the usage. */

        assert latch0 != null;

        while (latch0.getCount() > cnt)
            latch0.countDown();
    }

    /**
     * @throws IgniteCheckedException If operation failed.
     */
    private void initializeLatch() throws IgniteCheckedException {
        if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
            try {
                internalLatch = retryTopologySafe(new Callable<CountDownLatch>() {
                    @Override public CountDownLatch call() throws Exception {
                        try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
                            GridCacheCountDownLatchValue val = cacheView.get(key);

                            if (val == null) {
                                if (log.isDebugEnabled())
                                    log.debug("Failed to find count down latch with given name: " + name);

                                return new CountDownLatch(0);
                            }

                            tx.commit();

                            return new CountDownLatch(val.get());
                        }
                    }
                });

                synchronized (initGuard) {
                    if (lastLatchVal != null) {
                        while (internalLatch.getCount() > lastLatchVal)
                            internalLatch.countDown();
                    }

                    initGuard.set(READY_LATCH_STATE);
                }

                if (log.isDebugEnabled())
                    log.debug("Initialized internal latch: " + internalLatch);
            }
            finally {
                initLatch.countDown();
            }
        }
        else {
            U.await(initLatch);

            if (internalLatch == null)
                throw new IgniteCheckedException("Internal latch has not been properly initialized.");
        }
    }

    /** {@inheritDoc} */
    @Override public void close() {
        if (!rmvd) {
            try {
                ctx.kernalContext().dataStructures().removeCountDownLatch(name, ctx.group().name());
            }
            catch (IgniteCheckedException e) {
                throw U.convertException(e);
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void writeExternal(ObjectOutput out) throws IOException {
        out.writeObject(ctx.kernalContext());
        out.writeUTF(name);
    }

    /** {@inheritDoc} */
    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        IgniteBiTuple<GridKernalContext, String> t = stash.get();

        t.set1((GridKernalContext)in.readObject());
        t.set2(in.readUTF());
    }

    /**
     * Reconstructs object on unmarshalling.
     *
     * @return Reconstructed object.
     * @throws ObjectStreamException Thrown in case of unmarshalling error.
     */
    @SuppressWarnings({"ConstantConditions"})
    private Object readResolve() throws ObjectStreamException {
        try {
            IgniteBiTuple<GridKernalContext, String> t = stash.get();

            return t.get1().dataStructures().countDownLatch(t.get2(), null, 0, false, false);
        }
        catch (IgniteCheckedException e) {
            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
        }
        finally {
            stash.remove();
        }
    }

    /** {@inheritDoc} */
    @Override public String toString() {
        return S.toString(GridCacheCountDownLatchImpl.class, this);
    }

    /**
     *
     */
    private class CountDownCallable implements Callable<Integer> {
        /** Value to count down on (if 0 then latch is counted down to 0). */
        private final int val;

        /**
         * @param val Value to count down on (if 0 is passed latch is counted down to 0).
         */
        private CountDownCallable(int val) {
            assert val >= 0;

            this.val = val;
        }

        /** {@inheritDoc} */
        @Override public Integer call() throws Exception {
            try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
                GridCacheCountDownLatchValue latchVal = cacheView.get(key);

                if (latchVal == null) {
                    if (log.isDebugEnabled())
                        log.debug("Failed to find count down latch with given name: " + name);

                    return 0;
                }

                int retVal;

                if (val > 0) {
                    retVal = latchVal.get() - val;

                    if (retVal < 0)
                        retVal = 0;
                }
                else
                    retVal = 0;

                latchVal.set(retVal);

                cacheView.put(key, latchVal);

                tx.commit();

                return retVal;
            }
        }
    }
}
