/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;

/**
 * Test store.
 */
public final class GridCacheTestStore implements CacheStore<Integer, String> {
    /** */
    @CacheStoreSessionResource
    private CacheStoreSession ses;

    /** Store. */
    private final Map<Integer, String> map;

    /** Transactions. */
    private final Collection<Transaction> txs = new GridConcurrentHashSet<>();

    /** Last method called. */
    private String lastMtd;

    /** */
    private long ts = System.currentTimeMillis();

    /** {@link #load(Object)} method call counter .*/
    private AtomicInteger loadCnt = new AtomicInteger();

    /** {@link #write(Cache.Entry)} method call counter .*/
    private AtomicInteger putCnt = new AtomicInteger();

    /** {@link #writeAll(Collection)} method call counter .*/
    private AtomicInteger putAllCnt = new AtomicInteger();

    /** Flag indicating if methods of this store should fail. */
    private volatile boolean shouldFail;

    /** Configurable delay to simulate slow storage. */
    private int operationDelay;

    /**
     * @param map Underlying store map.
     */
    public GridCacheTestStore(Map<Integer, String> map) {
        this.map = map;
    }

    /**
     * Default constructor.
     */
    public GridCacheTestStore() {
        map = new ConcurrentHashMap<>();
    }

    /**
     * @return Underlying map.
     */
    public Map<Integer, String> getMap() {
        return Collections.unmodifiableMap(map);
    }

    /**
     * Sets a flag indicating if methods of this class should fail with {@link IgniteCheckedException}.
     *
     * @param shouldFail {@code true} if should fail.
     */
    public void setShouldFail(boolean shouldFail) {
        this.shouldFail = shouldFail;
    }

    /**
     * Sets delay that this store should wait on each operation.
     *
     * @param operationDelay If zero, no delay applied, positive value means
     *        delay in milliseconds.
     */
    public void setOperationDelay(int operationDelay) {
        assert operationDelay >= 0;

        this.operationDelay = operationDelay;
    }

    /**
     * @return Transactions.
     */
    public Collection<Transaction> transactions() {
        return txs;
    }

    /**
     *
     * @return Last method called.
     */
    public String getLastMethod() {
        return lastMtd;
    }

    /**
     * @return Last timestamp.
     */
    public long getTimestamp() {
        return ts;
    }

    /**
     * @return Integer timestamp.
     */
    public int getStart() {
        return Math.abs((int)ts);
    }

    /**
     * Sets last method to <tt>null</tt>.
     */
    public void resetLastMethod() {
        lastMtd = null;
    }

    /**
     * Resets timestamp.
     */
    public void resetTimestamp() {
        ts = System.currentTimeMillis();
    }

    /**
     * Resets the store to initial state.
     */
    public void reset() {
        lastMtd = null;

        map.clear();

        loadCnt.set(0);
        putCnt.set(0);
        putAllCnt.set(0);

        ts = System.currentTimeMillis();

        txs.clear();
    }

    /**
     * @return Count of {@link #load(Object)} method calls since last reset.
     */
    public int getLoadCount() {
        return loadCnt.get();
    }

    /**
     * @return Count of {@link #write(Cache.Entry)} method calls since last reset.
     */
    public int getPutCount() {
        return putCnt.get();
    }

    /**
     * @return Count of {@link #writeAll(Collection)} method calls since last reset.
     */
    public int getPutAllCount() {
        return putAllCnt.get();
    }

    /** {@inheritDoc} */
    @Override public String load(Integer key) {
        checkTx(session(), true);

        lastMtd = "load";

        checkOperation();

        loadCnt.incrementAndGet();

        return map.get(key);
    }

    /** {@inheritDoc} */
    @Override public void loadCache(IgniteBiInClosure<Integer, String> clo, Object[] args) {
        lastMtd = "loadAllFull";

        checkOperation();

        int start = getStart();

        int cnt = (Integer)args[0];

        for (int i = start; i < start + cnt; i++) {
            map.put(i, Integer.toString(i));

            clo.apply(i, Integer.toString(i));
        }
    }

    /** {@inheritDoc} */
    @Override public Map<Integer, String> loadAll(Iterable<? extends Integer> keys) {
        checkTx(session(), true);

        lastMtd = "loadAll";

        checkOperation();

        Map<Integer, String> loaded = new HashMap<>();

        for (Integer key : keys) {
            String val = map.get(key);

            if (val != null)
                loaded.put(key, val);
        }

        return loaded;
    }

    /** {@inheritDoc} */
    @Override public void write(Cache.Entry<? extends Integer, ? extends String> e)
        throws CacheWriterException {
        checkTx(session(), false);

        lastMtd = "put";

        checkOperation();

        map.put(e.getKey(), e.getValue());

        putCnt.incrementAndGet();
    }

    /** {@inheritDoc} */
    @Override public void writeAll(Collection<Cache.Entry<? extends Integer, ? extends String>> entries)
        throws CacheWriterException {
        checkTx(session(), false);

        lastMtd = "putAll";

        checkOperation();

        for (Cache.Entry<? extends Integer, ? extends String> e : entries)
            this.map.put(e.getKey(), e.getValue());

        putAllCnt.incrementAndGet();
    }

    /** {@inheritDoc} */
    @Override public void delete(Object key) throws CacheWriterException {
        checkTx(session(), false);

        lastMtd = "remove";

        checkOperation();

        map.remove(key);
    }

    /** {@inheritDoc} */
    @Override public void deleteAll(Collection<?> keys)
        throws CacheWriterException {
        checkTx(session(), false);

        lastMtd = "removeAll";

        checkOperation();

        for (Object key : keys)
            map.remove(key);
    }

    /** {@inheritDoc} */
    @Override public void sessionEnd(boolean commit) {
        // No-op.
    }

    /**
     * Checks the flag and throws exception if it is set. Checks operation delay and sleeps
     * for specified amount of time, if needed.
     */
    private void checkOperation() {
        if (shouldFail)
            throw new IgniteException("Store exception.");

        if (operationDelay > 0) {
            try {
                U.sleep(operationDelay);
            }
            catch (IgniteInterruptedCheckedException e) {
                throw new IgniteException(e);
            }
        }
    }

    /**
     * @param ses Session.
     * @param load {@code True} if {@link #loadAll} method is called.
     */
    private void checkTx(@Nullable CacheStoreSession ses, boolean load) {
        Transaction tx = ses != null ? ses.transaction() : null;

        if (tx == null)
            return;

        txs.add(tx);

        IgniteInternalTx tx0 = GridTestUtils.getFieldValue(tx, "tx");

        if (!tx0.local())
            throw new IgniteException("Tx is not local: " + tx);

        if (tx0.dht() && !load)
            throw new IgniteException("Tx is DHT: " + tx);
    }

    /**
     * @return Store session.
     */
    private CacheStoreSession session() {
        return ses;
    }
}
