/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.datastructures;

import java.io.Externalizable;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;

/**
 * Cache atomic reference implementation.
 */
public final class GridCacheAtomicReferenceImpl<T> extends AtomicDataStructureProxy<GridCacheAtomicReferenceValue<T>> implements GridCacheAtomicReferenceEx<T>,
    IgniteChangeGlobalStateSupport, Externalizable {
    /** */
    private static final long serialVersionUID = 0L;

    /** Deserialization stash. */
    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
                return new IgniteBiTuple<>();
            }
        };

    /**
     * Empty constructor required by {@link Externalizable}.
     */
    public GridCacheAtomicReferenceImpl() {
        // No-op.
    }

    /**
     * Default constructor.
     *
     * @param name Atomic reference name.
     * @param key Atomic reference key.
     * @param atomicView Atomic projection.
     */
    public GridCacheAtomicReferenceImpl(String name,
        GridCacheInternalKey key,
        IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> atomicView) {
        super(name, key, atomicView);
    }

    /** {@inheritDoc} */
    @Override public String name() {
        return name;
    }

    /** {@inheritDoc} */
    @Override public T get() {
        checkRemoved();

        try {
            GridCacheAtomicReferenceValue<T> ref = cacheView.get(key);

            if (ref == null)
                throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);

            return ref.get();
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public void set(final T val) {
        checkRemoved();

        try {
            if (ctx.dataStructures().knownType(val))
                cacheView.invoke(key, new ReferenceSetEntryProcessor<>(val));
            else {
                CU.retryTopologySafe(new Callable<Void>() {
                    @Override public Void call() throws Exception {
                        try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
                            GridCacheAtomicReferenceValue<T> ref = cacheView.get(key);

                            if (ref == null)
                                throw new IgniteException("Failed to find atomic reference with given name: " + name);

                            cacheView.put(key, new GridCacheAtomicReferenceValue<>(val));

                            tx.commit();
                        }

                        return null;
                    }
                });
            }
        }
        catch (EntryProcessorException e) {
            throw new IgniteException(e.getMessage(), e);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public boolean compareAndSet(final T expVal, final T newVal) {
        checkRemoved();

        try {
            if (ctx.dataStructures().knownType(expVal) && ctx.dataStructures().knownType(newVal)) {
                EntryProcessorResult<Boolean> res =
                    cacheView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal));

                assert res != null && res.get() != null : res;

                return res.get();
            }
            else {
                return CU.retryTopologySafe(new Callable<Boolean>() {
                    @Override public Boolean call() throws Exception {
                        try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
                            GridCacheAtomicReferenceValue<T> ref = cacheView.get(key);

                            if (ref == null)
                                throw new IgniteException("Failed to find atomic reference with given name: " + name);

                            T curVal = ref.get();

                            if (!F.eq(expVal, curVal))
                                return false;
                            else {
                                cacheView.put(key, new GridCacheAtomicReferenceValue<>(newVal));

                                tx.commit();

                                return true;
                            }
                        }
                    }
                });
            }
        }
        catch (EntryProcessorException e) {
            throw new IgniteException(e.getMessage(), e);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /**
     * Compares current value with specified value for equality and, if they are equal, replaces current value.
     *
     * @param newVal New value to set.
     * @param expVal Expected value.
     * @return Original value.
     */
    public T compareAndSetAndGet(final T newVal, final T expVal) {
        checkRemoved();

        try {
            if (ctx.dataStructures().knownType(expVal) && ctx.dataStructures().knownType(newVal)) {
                EntryProcessorResult<T> res =
                    cacheView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal));

                assert res != null;

                return res.get();
            }
            else {
                return CU.retryTopologySafe(new Callable<T>() {
                    @Override public T call() throws Exception {
                        try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
                            GridCacheAtomicReferenceValue<T> ref = cacheView.get(key);

                            if (ref == null)
                                throw new IgniteException("Failed to find atomic reference with given name: " + name);

                            T curVal = ref.get();

                            if (!F.eq(expVal, curVal))
                                return curVal;
                            else {
                                cacheView.put(key, new GridCacheAtomicReferenceValue<>(newVal));

                                tx.commit();

                                return expVal;
                            }
                        }
                    }
                });
            }
        }
        catch (EntryProcessorException e) {
            throw new IgniteException(e.getMessage(), e);
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public void close() {
        if (rmvd)
            return;

        try {
            ctx.kernalContext().dataStructures().removeAtomicReference(name, ctx.group().name());
        }
        catch (IgniteCheckedException e) {
            throw U.convertException(e);
        }
    }

    /**
     * @return Error.
     */
    private IllegalStateException removedError() {
        return new IllegalStateException("Atomic reference was removed from cache: " + name);
    }

    /** {@inheritDoc} */
    @Override public void writeExternal(ObjectOutput out) throws IOException {
        out.writeObject(ctx.kernalContext());
        out.writeUTF(name);
    }

    /** {@inheritDoc} */
    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        IgniteBiTuple<GridKernalContext, String> t = stash.get();

        t.set1((GridKernalContext)in.readObject());
        t.set2(in.readUTF());
    }

    /**
     * Reconstructs object on unmarshalling.
     *
     * @return Reconstructed object.
     * @throws ObjectStreamException Thrown in case of unmarshalling error.
     */
    @SuppressWarnings("unchecked")
    private Object readResolve() throws ObjectStreamException {
        try {
            IgniteBiTuple<GridKernalContext, String> t = stash.get();

            return t.get1().dataStructures().atomicReference(t.get2(), null, null, false);
        }
        catch (IgniteCheckedException e) {
            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
        }
        finally {
            stash.remove();
        }
    }

    /**
     *
     */
    static class ReferenceSetEntryProcessor<T> implements
        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Void> {
        /** */
        private static final long serialVersionUID = 0L;

        /** */
        private final T newVal;

        /**
         * @param newVal New value.
         */
        ReferenceSetEntryProcessor(T newVal) {
            this.newVal = newVal;
        }

        /** {@inheritDoc} */
        @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
            Object... args) {
            GridCacheAtomicReferenceValue val = e.getValue();

            if (val == null)
                throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());

            e.setValue(new GridCacheAtomicReferenceValue<>(newVal));

            return null;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(ReferenceSetEntryProcessor.class, this);
        }
    }

    /**
     *
     */
    static class ReferenceCompareAndSetEntryProcessor<T> implements
        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Boolean> {
        /** */
        private static final long serialVersionUID = 0L;

        /** */
        private final T expVal;

        /** */
        private final T newVal;

        /**
         * @param expVal Expected value.
         * @param newVal New value.
         */
        ReferenceCompareAndSetEntryProcessor(T expVal, T newVal) {
            this.expVal = expVal;
            this.newVal = newVal;
        }

        /** {@inheritDoc} */
        @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
            Object... args) {
            GridCacheAtomicReferenceValue<T> val = e.getValue();

            if (val == null)
                throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());

            T curVal = val.get();

            if (F.eq(expVal, curVal)) {
                e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));

                return true;
            }

            return false;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(ReferenceCompareAndSetEntryProcessor.class, this);
        }
    }

    /**
     *
     */
    static class ReferenceCompareAndSetAndGetEntryProcessor<T> implements
        CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, T> {
        /** */
        private static final long serialVersionUID = 0L;

        /** */
        private final T expVal;

        /** */
        private final T newVal;

        /**
         * @param expVal Expected value.
         * @param newVal New value.
         */
        ReferenceCompareAndSetAndGetEntryProcessor(T expVal, T newVal) {
            this.expVal = expVal;
            this.newVal = newVal;
        }

        /** {@inheritDoc} */
        @Override public T process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
            Object... args) {
            GridCacheAtomicReferenceValue<T> val = e.getValue();

            if (val == null)
                throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());

            T curVal = val.get();

            if (F.eq(expVal, curVal))
                e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));

            return curVal;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(ReferenceCompareAndSetAndGetEntryProcessor.class, this);
        }
    }

    /** {@inheritDoc} */
    @Override public String toString() {
        return S.toString(GridCacheAtomicReferenceImpl.class, this);
    }
}
