/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.datastructures;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.managers.systemview.walker.AtomicLongViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.AtomicReferenceViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.AtomicSequenceViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.AtomicStampedViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.CountDownLatchViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.QueueViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.ReentrantLockViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SemaphoreViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.SetViewWalker;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.lang.gridfunc.PredicateCollectionView;
import org.apache.ignite.internal.util.lang.gridfunc.TransformCollectionView;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.GPR;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicLongView;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicReferenceView;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicSequenceView;
import org.apache.ignite.spi.systemview.view.datastructures.AtomicStampedView;
import org.apache.ignite.spi.systemview.view.datastructures.CountDownLatchView;
import org.apache.ignite.spi.systemview.view.datastructures.QueueView;
import org.apache.ignite.spi.systemview.view.datastructures.ReentrantLockView;
import org.apache.ignite.spi.systemview.view.datastructures.SemaphoreView;
import org.apache.ignite.spi.systemview.view.datastructures.SetView;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_LONG;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_REF;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_SEQ;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.ATOMIC_STAMPED;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.COUNT_DOWN_LATCH;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.QUEUE;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.REENTRANT_LOCK;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.SEMAPHORE;
import static org.apache.ignite.internal.processors.datastructures.DataStructureType.SET;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;

/**
 * Manager of data structures.
 */
public final class DataStructuresProcessor extends GridProcessorAdapter implements IgniteChangeGlobalStateSupport {
    /** DataRegionConfiguration name reserved for volatile caches. */
    public static final String VOLATILE_DATA_REGION_NAME = "volatileDsMemPlc";

    /** */
    public static final String DEFAULT_VOLATILE_DS_GROUP_NAME = "default-volatile-ds-group";

    /** */
    public static final String DEFAULT_DS_GROUP_NAME = "default-ds-group";

    /** */
    private static final String DS_CACHE_NAME_PREFIX = "datastructures_";

    /** Atomics system cache name. */
    public static final String ATOMICS_CACHE_NAME = "ignite-sys-atomic-cache";

    /** */
    public static final String QUEUES_VIEW = metricName("ds", "queues");

    /** */
    public static final String SETS_VIEW = metricName("ds", "sets");

    /** */
    public static final String LOCKS_VIEW = metricName("ds", "reentrantlocks");

    /** */
    public static final String SEMAPHORES_VIEW = metricName("ds", "semaphores");

    /** */
    public static final String LATCHES_VIEW = metricName("ds", "countdownlatches");

    /** */
    public static final String STAMPED_VIEW = metricName("ds", "atomicstamped");

    /** */
    public static final String REFERENCES_VIEW = metricName("ds", "atomicreferences");

    /** */
    public static final String LONGS_VIEW = metricName("ds", "atomiclongs");

    /** */
    public static final String SEQUENCES_VIEW = metricName("ds", "atomicsequences");

    /** */
    private static final String QUEUES_VIEW_DESC = "Data structure queues";

    /** */
    private static final String SETS_VIEW_DESC = "Data structure sets";

    /** */
    private static final String LOCKS_VIEW_DESC = "Data structure reentrant locks";

    /** */
    private static final String SEMAPHORES_VIEW_DESC = "Data structure semaphores";

    /** */
    private static final String LATCHES_VIEW_DESC = "Data structure count down latches";

    /** */
    private static final String STAMPED_VIEW_DESC = "Data structure atomic stamped";

    /** */
    private static final String REFERENCES_VIEW_DESC = "Data structure atomic references";

    /** */
    private static final String LONGS_VIEW_DESC = "Data structure atomic longs";

    /** */
    private static final String SEQUENCES_VIEW_DESC = "Data structure atomic sequences";

    /** Non collocated IgniteSet will use separate cache if all nodes in cluster is not older then specified version. */
    private static final IgniteProductVersion SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE =
        IgniteProductVersion.fromString("2.7.0");

    /** Initial capacity. */
    private static final int INITIAL_CAPACITY = 10;

    /** Initialization latch. */
    private volatile CountDownLatch initLatch = new CountDownLatch(1);

    /** Initialization failed flag. */
    private boolean initFailed;

    /** Internal storage of all dataStructures items (sequence, atomic long etc.). */
    private final ConcurrentMap<GridCacheInternalKey, GridCacheRemovable> dsMap;

    /** Atomic data structures configuration. */
    private final AtomicConfiguration dfltAtomicCfg;

    /** Map of continuous query IDs. */
    private final ConcurrentHashMap<Integer, UUID> qryIdMap = new ConcurrentHashMap<>();

    /** Listener. */
    private final GridLocalEventListener lsnr = new GridLocalEventListener() {
        @Override public void onEvent(final Event evt) {
            // This may require cache operation to execute,
            // therefore cannot use event notification thread.
            ctx.closure().callLocalSafe(
                new GridPlainCallable<Object>() {
                    @Override public Object call() {
                        DiscoveryEvent discoEvt = (DiscoveryEvent)evt;

                        UUID leftNodeId = discoEvt.eventNode().id();

                        for (GridCacheRemovable ds : dsMap.values()) {
                            if (ds instanceof GridCacheSemaphoreEx)
                                ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId);
                            else if (ds instanceof GridCacheLockEx)
                                ((GridCacheLockEx)ds).onNodeRemoved(leftNodeId);
                        }

                        return null;
                    }
                },
                false);
        }
    };

    /**
     * @param ctx Context.
     */
    public DataStructuresProcessor(GridKernalContext ctx) {
        super(ctx);

        dsMap = new ConcurrentHashMap<>(INITIAL_CAPACITY);

        dfltAtomicCfg = ctx.config().getAtomicConfiguration();
    }

    /** {@inheritDoc} */
    @Override public void start() {
        ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
    }

    /** {@inheritDoc} */
    @Override public void onKernalStart(boolean active) {
        registerSystemViews();

        if (!active)
            return;

        onKernalStart0();
    }

    /**
     *
     */
    public void onBeforeActivate() {
        CountDownLatch latch0 = initLatch;

        if (latch0 == null || latch0.getCount() == 0)
            initLatch = new CountDownLatch(1);
    }

    /**
     *
     */
    private void onKernalStart0() {
        initLatch.countDown();
    }

    /**
     * @param cctx Cache context.
     * @throws IgniteCheckedException If failed.
     */
    private void startQuery(GridCacheContext cctx) throws IgniteCheckedException {
        if (!qryIdMap.containsKey(cctx.cacheId())) {
            synchronized (this) {
                if (!qryIdMap.containsKey(cctx.cacheId())) {
                    qryIdMap.put(cctx.cacheId(),
                        cctx.continuousQueries().executeInternalQuery(
                            new DataStructuresEntryListener(),
                            new DataStructuresEntryFilter(),
                            cctx.isReplicated() && cctx.affinityNode(),
                            false,
                            false,
                            true
                        ));
                }
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void onKernalStop(boolean cancel) {
        super.onKernalStop(cancel);

        for (GridCacheRemovable ds : dsMap.values()) {
            if (ds instanceof GridCacheSemaphoreEx)
                ((GridCacheSemaphoreEx)ds).stop();

            if (ds instanceof GridCacheLockEx)
                ((GridCacheLockEx)ds).onStop();
        }

        CountDownLatch init0 = initLatch;

        if (init0 != null && init0.getCount() > 0) {
            initFailed = true;

            init0.countDown();

            initLatch = null;
        }

        Iterator<Map.Entry<Integer, UUID>> iter = qryIdMap.entrySet().iterator();

        while (iter.hasNext()) {
            Map.Entry<Integer, UUID> e = iter.next();

            iter.remove();

            GridCacheContext cctx = ctx.cache().context().cacheContext(e.getKey());

            cctx.continuousQueries().cancelInternalQuery(e.getValue());
        }
    }

    /** {@inheritDoc} */
    @Override public void onActivate(GridKernalContext ctx) {
        if (log.isDebugEnabled())
            log.debug("Activating data structure processor [nodeId=" + ctx.localNodeId() +
                " topVer=" + ctx.discovery().topologyVersionEx() + " ]");

        initFailed = false;

        qryIdMap.clear();

        ctx.event().addLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);

        restoreStructuresState(ctx);

        onKernalStart0();
    }

    /** Register system views. */
    private void registerSystemViews() {
        ctx.systemView().registerView(
            SEQUENCES_VIEW,
            SEQUENCES_VIEW_DESC,
            new AtomicSequenceViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicSequence),
            AtomicSequenceView::new
        );

        ctx.systemView().registerView(
            LONGS_VIEW,
            LONGS_VIEW_DESC,
            new AtomicLongViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicLong),
            AtomicLongView::new
        );

        ctx.systemView().registerView(
            REFERENCES_VIEW,
            REFERENCES_VIEW_DESC,
            new AtomicReferenceViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicReference),
            AtomicReferenceView::new
        );

        ctx.systemView().registerView(
            STAMPED_VIEW,
            STAMPED_VIEW_DESC,
            new AtomicStampedViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteAtomicStamped),
            AtomicStampedView::new
        );

        ctx.systemView().registerView(
            LATCHES_VIEW,
            LATCHES_VIEW_DESC,
            new CountDownLatchViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteCountDownLatch),
            CountDownLatchView::new
        );

        ctx.systemView().registerView(
            SEMAPHORES_VIEW,
            SEMAPHORES_VIEW_DESC,
            new SemaphoreViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteSemaphore),
            SemaphoreView::new
        );

        ctx.systemView().registerView(
            LOCKS_VIEW,
            LOCKS_VIEW_DESC,
            new ReentrantLockViewWalker(),
            new PredicateCollectionView<>(dsMap.values(), v -> v instanceof IgniteLock),
            ReentrantLockView::new
        );

        ctx.systemView().registerInnerCollectionView(
            QUEUES_VIEW,
            QUEUES_VIEW_DESC,
            new QueueViewWalker(),
            new TransformCollectionView<>(
                ctx.cache().cacheDescriptors().values(),
                desc -> ctx.cache().cache(desc.cacheName()).context().dataStructures(),
                desc -> desc.cacheType() == CacheType.DATA_STRUCTURES),
            cctx -> cctx.queues().values(),
            (cctx, queue) -> new QueueView(queue)
        );

        ctx.systemView().registerInnerCollectionView(
            SETS_VIEW,
            SETS_VIEW_DESC,
            new SetViewWalker(),
            F.viewReadOnly(
                ctx.cache().cacheDescriptors().values(),
                desc -> ctx.cache().cache(desc.cacheName()).context().dataStructures(),
                desc -> desc.cacheType() == CacheType.DATA_STRUCTURES),
            cctx -> cctx.sets().values(),
            (cctx, set) -> new SetView(set)
        );
    }

    /**
     * @param ctx Context.
     */
    public void restoreStructuresState(GridKernalContext ctx) {
        onKernalStart0();

        try {
            for (GridCacheRemovable v : dsMap.values()) {
                if (v instanceof IgniteChangeGlobalStateSupport)
                    ((IgniteChangeGlobalStateSupport)v).onActivate(ctx);
            }
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed restore data structures state", e);
        }
    }

    /** {@inheritDoc} */
    @Override public void onDeActivate(GridKernalContext ctx) {
        if (log.isDebugEnabled())
            log.debug("DeActivate data structure processor [nodeId=" + ctx.localNodeId() +
                ", topVer=" + ctx.discovery().topologyVersionEx() + "]");

        ctx.event().removeLocalEventListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);

        onKernalStop(false);

        initLatch = null;

        for (GridCacheRemovable v : dsMap.values()) {
            if (v instanceof IgniteChangeGlobalStateSupport)
                ((IgniteChangeGlobalStateSupport)v).onDeActivate(ctx);
        }
    }

    /**
     * @param key Key.
     * @param obj Object.
     */
    void onRemoved(GridCacheInternalKey key, GridCacheRemovable obj) {
        dsMap.remove(key, obj);
    }

    /** {@inheritDoc} */
    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
        UUID nodeId = ctx.localNodeId();

        for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
            GridCacheRemovable obj = e.getValue();

            if (clusterRestarted) {
                obj.onRemoved();

                dsMap.remove(e.getKey(), obj);
            }
            else
                obj.needCheckNotRemoved();

            if (obj instanceof GridCacheLockEx)
                ((GridCacheLockEx)obj).onReconnected(nodeId);
        }

        for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
            cctx.dataStructures().onReconnected(clusterRestarted);

        return null;
    }

    /**
     * @param cacheName Cache name.
     * @return {@code True} if cache with such name is used to store data structures.
     */
    public static boolean isDataStructureCache(String cacheName) {
        return cacheName != null && (cacheName.startsWith(ATOMICS_CACHE_NAME) ||
            cacheName.startsWith(DS_CACHE_NAME_PREFIX) ||
            cacheName.equals(DEFAULT_DS_GROUP_NAME) ||
            cacheName.equals(DEFAULT_VOLATILE_DS_GROUP_NAME));
    }

    /**
     * @param grpName Group name.
     * @return {@code True} if group name is reserved to store data structures.
     */
    public static boolean isReservedGroup(@Nullable String grpName) {
        return grpName != null &&
            (DEFAULT_DS_GROUP_NAME.equals(grpName) ||
            grpName.startsWith(DEFAULT_VOLATILE_DS_GROUP_NAME));
    }

    /**
     * Gets a sequence from cache or creates one if it's not cached.
     *
     * @param name Sequence name.
     * @param cfg Configuration.
     * @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored.
     * @param create If {@code true} sequence will be created in case it is not in cache.
     * @return Sequence.
     * @throws IgniteCheckedException If loading failed.
     */
    public final IgniteAtomicSequence sequence(final String name,
        @Nullable final AtomicConfiguration cfg,
        final long initVal,
        final boolean create
    ) throws IgniteCheckedException {
        return getAtomic(new AtomicAccessor<GridCacheAtomicSequenceEx>() {
            @Override public T2<GridCacheAtomicSequenceEx, AtomicDataStructureValue> get(GridCacheInternalKey key,
                AtomicDataStructureValue val, IgniteInternalCache cache) throws IgniteCheckedException {
                GridCacheAtomicSequenceValue seqVal = cast(val, GridCacheAtomicSequenceValue.class);

                // Check that sequence hasn't been created in other thread yet.
                GridCacheAtomicSequenceEx seq = cast(dsMap.get(key), GridCacheAtomicSequenceEx.class);

                if (seq != null) {
                    assert seqVal != null;

                    return new T2<>(seq, null);
                }

                if (seqVal == null && !create)
                    return null;

                AtomicConfiguration cfg0 = cfg != null ? cfg : dfltAtomicCfg;

                // We should use offset because we already reserved left side of range.
                long off = cfg0.getAtomicSequenceReserveSize() > 1 ? cfg0.getAtomicSequenceReserveSize() - 1 : 1;

                long upBound;
                long locCntr;

                if (seqVal == null) {
                    locCntr = initVal;

                    upBound = locCntr + off;

                    // Global counter must be more than reserved region.
                    seqVal = new GridCacheAtomicSequenceValue(upBound + 1);
                }
                else {
                    locCntr = seqVal.get();

                    upBound = locCntr + off;

                    // Global counter must be more than reserved region.
                    seqVal.set(upBound + 1);
                }

                // Only one thread can be in the transaction scope and create sequence.
                seq = new GridCacheAtomicSequenceImpl(name,
                    key,
                    cache,
                    cfg0.getAtomicSequenceReserveSize(),
                    locCntr,
                    upBound);

                return new T2<GridCacheAtomicSequenceEx, AtomicDataStructureValue>(seq, seqVal);
            }
        }, cfg, name, DataStructureType.ATOMIC_SEQ, create, GridCacheAtomicSequenceEx.class);
    }

    /**
     * Removes sequence from cache.
     *
     * @param name Sequence name.
     * @param grpName Group name.
     * @throws IgniteCheckedException If removing failed.
     */
    final void removeSequence(final String name, final String grpName) throws IgniteCheckedException {
        removeDataStructure(null, name, grpName, ATOMIC_SEQ, null);
    }

    /**
     * Gets an atomic long from cache or creates one if it's not cached.
     *
     * @param name Name of atomic long.
     * @param cfg Configuration.
     * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal}
     *        will be ignored.
     * @param create If {@code true} atomic long will be created in case it is not in cache.
     * @return Atomic long.
     * @throws IgniteCheckedException If loading failed.
     */
    public final IgniteAtomicLong atomicLong(final String name,
        @Nullable AtomicConfiguration cfg,
        final long initVal,
        final boolean create) throws IgniteCheckedException {
        return getAtomic(new AtomicAccessor<GridCacheAtomicLongEx>() {
            @Override public T2<GridCacheAtomicLongEx, AtomicDataStructureValue> get(
                GridCacheInternalKey key,
                AtomicDataStructureValue val,
                IgniteInternalCache cache
            ) throws IgniteCheckedException {
                // Check that atomic long hasn't been created in other thread yet.
                GridCacheAtomicLongEx a = cast(dsMap.get(key), GridCacheAtomicLongEx.class);

                if (a != null) {
                    assert val != null;

                    return new T2<>(a, null);
                }

                if (val == null && !create)
                    return null;

                GridCacheAtomicLongValue retVal = (val == null ? new GridCacheAtomicLongValue(initVal) : null);

                a = new GridCacheAtomicLongImpl(name, key, cache);

                return new T2<GridCacheAtomicLongEx, AtomicDataStructureValue>(a, retVal);
            }
        }, cfg, name, ATOMIC_LONG, create, GridCacheAtomicLongEx.class);
    }

    /**
     * @param c Closure creating data structure instance.
     * @param cfg Optional custom configuration or {@code null} to use default one.
     * @param name Data structure name.
     * @param type Data structure type.
     * @param create Create flag.
     * @param cls Expected data structure class.
     * @return Data structure instance.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable private <T extends GridCacheRemovable> T getAtomic(final AtomicAccessor<T> c,
        @Nullable AtomicConfiguration cfg,
        final String name,
        final DataStructureType type,
        final boolean create,
        Class<? extends T> cls
    ) throws IgniteCheckedException {
        A.notNull(name, "name");

        awaitInitialization();

        if (cfg == null) {
            checkAtomicsConfiguration();

            cfg = dfltAtomicCfg;
        }

        String dataRegionName = null;
        final String grpName;

        if (type.isVolatile()) {
            String volatileGrpName = DEFAULT_VOLATILE_DS_GROUP_NAME;

            dataRegionName = VOLATILE_DATA_REGION_NAME;

            volatileGrpName += "@" + dataRegionName;

            grpName = volatileGrpName;
        }
        else if (cfg.getGroupName() != null)
            grpName = cfg.getGroupName();
        else
            grpName = DEFAULT_DS_GROUP_NAME;

        String cacheName = ATOMICS_CACHE_NAME + "@" + grpName;

        IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> cache0 = ctx.cache().cache(cacheName);

        if (cache0 == null) {
            if (!create && ctx.cache().cacheDescriptor(cacheName) == null)
                return null;

            ctx.cache().dynamicStartCache(cacheConfiguration(cfg, cacheName, grpName, dataRegionName),
                cacheName,
                null,
                CacheType.DATA_STRUCTURES,
                false,
                false,
                true,
                true).get();

            cache0 = ctx.cache().cache(cacheName);

            assert cache0 != null;
        }

        final IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> cache = cache0;

        startQuery(cache.context());

        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name, grpName);

        // Check type of structure received by key from local cache.
        T dataStructure = cast(dsMap.get(key), cls);

        if (dataStructure != null)
            return dataStructure;

        return retryTopologySafe(new IgniteOutClosureX<T>() {
            @Override public T applyx() throws IgniteCheckedException {
                cache.context().gate().enter();

                try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                    AtomicDataStructureValue val = cache.get(key);

                    if (isObsolete(val))
                        val = null;

                    if (val == null && !create)
                        return null;

                    if (val != null) {
                        if (val.type() != type)
                            throw new IgniteCheckedException("Another data structure with the same name already created " +
                                "[name=" + name +
                                ", newType=" + type +
                                ", existingType=" + val.type() + ']');
                    }

                    T2<T, ? extends AtomicDataStructureValue> ret;

                    try {
                        ret = c.get(key, val, cache);

                        dsMap.put(key, ret.get1());

                        if (ret.get2() != null)
                            cache.put(key, ret.get2());

                        tx.commit();
                    }
                    catch (Error | Exception e) {
                        dsMap.remove(key);

                        U.error(log, "Failed to make datastructure: " + name, e);

                        throw e;
                    }

                    return ret.get1();
                }
                finally {
                    cache.context().gate().leave();
                }
            }
        });
    }

    /**
     * @param val Value.
     * @return {@code True} if value is obsolete.
     */
    private boolean isObsolete(AtomicDataStructureValue val) {
        return !(val == null || !(val instanceof VolatileAtomicDataStructureValue)) &&
            ((VolatileAtomicDataStructureValue)val).gridStartTime() != ctx.discovery().gridStartTime();

    }

    /**
     * Removes atomic long from cache.
     *
     * @param name Atomic long name.
     * @param grpName Group name.
     * @throws IgniteCheckedException If removing failed.
     */
    final void removeAtomicLong(final String name, @Nullable final String grpName) throws IgniteCheckedException {
        removeDataStructure(null, name, grpName, ATOMIC_LONG, null);
    }

    /**
     * @param pred Remove predicate.
     * @param name Data structure name.
     * @param grpName Group name.
     * @param type Data structure type.
     * @param afterRmv Optional closure to run after data structure removed.
     * @throws IgniteCheckedException If failed.
     */
    private <T> void removeDataStructure(
        @Nullable final IgnitePredicateX<AtomicDataStructureValue> pred,
        final String name,
        String grpName,
        final DataStructureType type,
        @Nullable final IgniteInClosureX<T> afterRmv
    ) throws IgniteCheckedException {
        assert name != null;
        assert grpName != null;
        assert type != null;

        awaitInitialization();

        final String cacheName = ATOMICS_CACHE_NAME + "@" + grpName;

        final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name, grpName);

        retryTopologySafe(new IgniteOutClosureX<Object>() {
            @Override public Object applyx() throws IgniteCheckedException {
                IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> cache = ctx.cache().cache(cacheName);

                if (cache != null && cache.context().gate().enterIfNotStopped()) {
                    boolean isInterrupted = Thread.interrupted();

                    try {
                        while (true) {
                            try {
                                try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                                    AtomicDataStructureValue val = cache.get(key);

                                    if (val == null)
                                        return null;

                                    if (val.type() != type)
                                        throw new IgniteCheckedException("Data structure has different type " +
                                            "[name=" + name +
                                            ", expectedType=" + type +
                                            ", actualType=" + val.type() + ']');

                                    if (pred == null || pred.applyx(val)) {
                                        cache.remove(key);

                                        tx.commit();

                                        if (afterRmv != null)
                                            afterRmv.applyx(null);
                                    }
                                }

                                break;
                            }
                            catch (IgniteCheckedException e) {
                                if (X.hasCause(e, IgniteInterruptedCheckedException.class, InterruptedException.class))
                                    isInterrupted = Thread.interrupted();
                                else
                                    throw e;
                            }
                        }
                    }
                    finally {
                        cache.context().gate().leave();

                        if (isInterrupted)
                            Thread.currentThread().interrupt();
                    }
                }

                return null;
            }
        });
    }

    /**
     * Would suspend calls for this cache if it is atomics cache.
     * @param cacheName To suspend.
     */
    public void suspend(String cacheName) {
        for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
            String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName();

            if (cacheName0.equals(cacheName))
                e.getValue().suspend();
        }
    }

    /**
     * Would return this cache to normal work if it was suspened (and if it is atomics cache).
     * @param cacheName To restart.
     */
    public void restart(String cacheName, IgniteInternalCache cache) {
        for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
            String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName();

            if (cacheName0.equals(cacheName)) {
                if (cache != null)
                    e.getValue().restart(cache);
                else {
                    e.getValue().onRemoved();

                    dsMap.remove(e.getKey(), e.getValue());
                }
            }
        }
    }

    /**
     * Gets an atomic reference from cache or creates one if it's not cached.
     *
     * @param name Name of atomic reference.
     * @param cfg Configuration.
     * @param initVal Initial value for atomic reference. If atomic reference already cached, {@code initVal}
     *        will be ignored.
     * @param create If {@code true} atomic reference will be created in case it is not in cache.
     * @return Atomic reference.
     * @throws IgniteCheckedException If loading failed.
     */
    @SuppressWarnings("unchecked")
    public final <T> IgniteAtomicReference<T> atomicReference(final String name,
        @Nullable AtomicConfiguration cfg,
        final T initVal,
        final boolean create
    ) throws IgniteCheckedException {
        return getAtomic(new AtomicAccessor<GridCacheAtomicReferenceEx>() {
            @Override public T2<GridCacheAtomicReferenceEx, AtomicDataStructureValue> get(
                GridCacheInternalKey key,
                AtomicDataStructureValue val,
                IgniteInternalCache cache
            ) throws IgniteCheckedException {
                // Check that atomic reference hasn't been created in other thread yet.
                GridCacheAtomicReferenceEx ref = cast(dsMap.get(key),
                    GridCacheAtomicReferenceEx.class);

                if (ref != null) {
                    assert val != null;

                    return new T2<>(ref, null);
                }

                if (val == null && !create)
                    return null;

                AtomicDataStructureValue retVal = (val == null ? new GridCacheAtomicReferenceValue<>(initVal) : null);

                ref = new GridCacheAtomicReferenceImpl(name, key, cache);

                return new T2<>(ref, retVal);
            }
        }, cfg, name, ATOMIC_REF, create, GridCacheAtomicReferenceEx.class);
    }

    /**
     * Removes atomic reference from cache.
     *
     * @param name Atomic reference name.
     * @param grpName Group name.
     * @throws IgniteCheckedException If removing failed.
     */
    final void removeAtomicReference(final String name, @Nullable final String grpName) throws IgniteCheckedException {
        removeDataStructure(null, name, grpName, ATOMIC_REF, null);
    }

    /**
     * Gets an atomic stamped from cache or creates one if it's not cached.
     *
     * @param name Name of atomic stamped.
     * @param cfg Configuration.
     * @param initVal Initial value for atomic stamped. If atomic stamped already cached, {@code initVal}
     *        will be ignored.
     * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, {@code initStamp}
     *        will be ignored.
     * @param create If {@code true} atomic stamped will be created in case it is not in cache.
     * @return Atomic stamped.
     * @throws IgniteCheckedException If loading failed.
     */
    @SuppressWarnings("unchecked")
    public final <T, S> IgniteAtomicStamped<T, S> atomicStamped(final String name, @Nullable AtomicConfiguration cfg,
        final T initVal, final S initStamp, final boolean create) throws IgniteCheckedException {
        return getAtomic(new AtomicAccessor<GridCacheAtomicStampedEx>() {
            @Override public T2<GridCacheAtomicStampedEx, AtomicDataStructureValue> get(
                GridCacheInternalKey key,
                AtomicDataStructureValue val,
                IgniteInternalCache cache
            ) throws IgniteCheckedException {
                // Check that atomic stamped hasn't been created in other thread yet.
                GridCacheAtomicStampedEx stmp = cast(dsMap.get(key),
                    GridCacheAtomicStampedEx.class);

                if (stmp != null) {
                    assert val != null;

                    return new T2(stmp, null);
                }

                if (val == null && !create)
                    return null;

                AtomicDataStructureValue retVal = (val == null ? new GridCacheAtomicStampedValue(initVal, initStamp) : null);

                stmp = new GridCacheAtomicStampedImpl(name, key, cache);

                return new T2<>(stmp, retVal);
            }
        }, cfg, name, ATOMIC_STAMPED, create, GridCacheAtomicStampedEx.class);
    }

    /**
     * Removes atomic stamped from cache.
     *
     * @param name Atomic stamped name.
     * @param grpName Group name.
     * @throws IgniteCheckedException If removing failed.
     */
    final void removeAtomicStamped(final String name, final String grpName) throws IgniteCheckedException {
        removeDataStructure(null, name, grpName, ATOMIC_STAMPED, null);
    }

    /**
     * Gets a queue from cache or creates one if it's not cached.
     *
     * @param name Name of queue.
     * @param grpName Group name. If present, will override groupName from configuration.
     * @param cap Max size of queue.
     * @param cfg Non-null queue configuration if new queue should be created.
     * @return Instance of queue.
     * @throws IgniteCheckedException If failed.
     */
    public final <T> IgniteQueue<T> queue(final String name, @Nullable final String grpName, int cap,
        @Nullable final CollectionConfiguration cfg)
        throws IgniteCheckedException {
        A.notNull(name, "name");

        if (cfg != null) {
            if (cap <= 0)
                cap = Integer.MAX_VALUE;
        }

        final int cap0 = cap;

        final boolean create = cfg != null;

        return getCollection(new IgniteClosureX<GridCacheContext, IgniteQueue<T>>() {
            @Override public IgniteQueue<T> applyx(GridCacheContext ctx) throws IgniteCheckedException {
                return ctx.dataStructures().queue(name, cap0, isCollocated(cfg), create);
            }
        }, cfg, name, grpName, QUEUE, create, false);
    }

    /**
     * Non-collocated mode only makes sense for and is only supported for PARTITIONED caches, so
     * collocated mode should be enabled for non-partitioned cache by default.
     *
     * @param cfg Collection configuration.
     * @return {@code True} If collocated mode should be enabled.
     */
    private boolean isCollocated(CollectionConfiguration cfg) {
        return cfg != null && (cfg.isCollocated() || cfg.getCacheMode() != PARTITIONED);
    }

    /**
     * @param cfg Atomic configuration.
     * @param name Cache name.
     * @param grpName Group name.
     * @param dataRegionName Name of data region for this cache.
     *
     * @return Cache configuration.
     */
    private CacheConfiguration cacheConfiguration(AtomicConfiguration cfg, String name, String grpName,
        String dataRegionName) {
        CacheConfiguration ccfg = new CacheConfiguration();

        ccfg.setName(name);
        ccfg.setGroupName(grpName);
        ccfg.setAtomicityMode(TRANSACTIONAL);
        ccfg.setRebalanceMode(SYNC);
        ccfg.setWriteSynchronizationMode(FULL_SYNC);
        ccfg.setCacheMode(cfg.getCacheMode());
        ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
        ccfg.setAffinity(cfg.getAffinity());
        ccfg.setDataRegionName(dataRegionName);

        if (cfg.getCacheMode() == PARTITIONED)
            ccfg.setBackups(cfg.getBackups());

        return ccfg;
    }

    /**
     * @param cfg Collection configuration.
     * @param name Cache name.
     * @param grpName Group name.
     * @return Cache configuration.
     */
    private CacheConfiguration cacheConfiguration(CollectionConfiguration cfg, String name, String grpName) {
        CacheConfiguration ccfg = new CacheConfiguration();

        ccfg.setName(name);
        ccfg.setGroupName(grpName);
        ccfg.setBackups(cfg.getBackups());
        ccfg.setCacheMode(cfg.getCacheMode());
        ccfg.setAtomicityMode(cfg.getAtomicityMode());
        ccfg.setNodeFilter(cfg.getNodeFilter());
        ccfg.setWriteSynchronizationMode(FULL_SYNC);
        ccfg.setRebalanceMode(SYNC);

        return ccfg;
    }

    /**
     * @param cfg Collection configuration.
     * @param name Cache name.
     * @param grpName Group name.
     * @return Meta cache configuration.
     */
    private CacheConfiguration metaCacheConfiguration(CollectionConfiguration cfg, String name, String grpName) {
        CacheConfiguration ccfg = cacheConfiguration(cfg, name, grpName);

        ccfg.setAtomicityMode(TRANSACTIONAL);

        return ccfg;
    }

    /**
     * Get compatible with collection configuration data structure cache.
     *
     * @param cfg Collection configuration.
     * @param grpName Group name.
     * @param dsType Data structure type.
     * @param dsName Data structure name.
     * @param separated Separated cache flag.
     * @return Data structure cache.
     * @throws IgniteCheckedException If failed.
     */
    private IgniteInternalCache compatibleCache(CollectionConfiguration cfg,
        String grpName,
        DataStructureType dsType,
        String dsName,
        boolean separated
    ) throws IgniteCheckedException {
        String cacheName = DS_CACHE_NAME_PREFIX + cfg.getAtomicityMode() + "_" + cfg.getCacheMode() + "_" +
            cfg.getBackups() + "@" + grpName;

        IgniteInternalCache cache = ctx.cache().cache(cacheName);

        if (separated && (cache == null || !cache.containsKey(new GridCacheSetHeaderKey(dsName)))) {
            cacheName += "#" + dsType.name() + "_" + dsName;

            cache = ctx.cache().cache(cacheName);
        }

        if (cache == null) {
            ctx.cache().dynamicStartCache(cacheConfiguration(cfg, cacheName, grpName),
                cacheName,
                null,
                CacheType.DATA_STRUCTURES,
                false,
                false,
                true,
                true).get();
        }
        else {
            IgnitePredicate<ClusterNode> cacheNodeFilter = cache.context().group().nodeFilter();

            String clsName1 = cacheNodeFilter != null ? cacheNodeFilter.getClass().getName() :
                CacheConfiguration.IgniteAllNodesPredicate.class.getName();
            String clsName2 = cfg.getNodeFilter() != null ? cfg.getNodeFilter().getClass().getName() :
                CacheConfiguration.IgniteAllNodesPredicate.class.getName();

            if (!clsName1.equals(clsName2))
                throw new IgniteCheckedException("Could not add collection to group " + grpName +
                    " because of different node filters [existing=" + clsName1 + ", new=" + clsName2 + "]");
        }

        cache = ctx.cache().getOrStartCache(cacheName);

        assert cache != null;

        return cache;
    }

    /**
     * @param name Queue name.
     * @param cctx Queue cache context.
     * @throws IgniteCheckedException If failed.
     */
    public void removeQueue(final String name, final GridCacheContext cctx) throws IgniteCheckedException {
        assert name != null;
        assert cctx != null;

        CIX1<GridCacheQueueHeader> afterRmv = new CIX1<GridCacheQueueHeader>() {
            @Override public void applyx(GridCacheQueueHeader hdr) throws IgniteCheckedException {
                hdr = (GridCacheQueueHeader)cctx.cache().withNoRetries().getAndRemove(new GridCacheQueueHeaderKey(name));

                if (hdr == null || hdr.empty())
                    return;

                GridCacheQueueAdapter.removeKeys(cctx.cache(),
                    hdr.id(),
                    name,
                    hdr.collocated(),
                    hdr.head(),
                    hdr.tail(),
                    0);
            }
        };

        removeDataStructure(null, name, cctx.group().name(), QUEUE, afterRmv);
    }

    /**
     * @param c Closure creating collection.
     * @param cfg Configuration.
     * @param name Collection name.
     * @param grpName Cache group name.
     * @param type Data structure type.
     * @param create Create flag.
     * @param separated Separated cache flag.
     * @return Collection instance.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c,
        @Nullable CollectionConfiguration cfg,
        String name,
        @Nullable String grpName,
        final DataStructureType type,
        boolean create,
        boolean separated
    ) throws IgniteCheckedException {
        awaitInitialization();

        assert name != null;
        assert type.isCollection() : type;
        assert !create || cfg != null;

        if (grpName == null) {
            if (cfg != null && cfg.getGroupName() != null)
                grpName = cfg.getGroupName();
            else
                grpName = DEFAULT_DS_GROUP_NAME;
        }

        final String metaCacheName = ATOMICS_CACHE_NAME + "@" + grpName;

        IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> metaCache0 = ctx.cache().cache(metaCacheName);

        if (metaCache0 == null) {
            CacheConfiguration ccfg = null;

            if (!create) {
                DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(metaCacheName);

                if (desc == null)
                    return null;
            }
            else
                ccfg = metaCacheConfiguration(cfg, metaCacheName, grpName);

            ctx.cache().dynamicStartCache(ccfg,
                metaCacheName,
                null,
                CacheType.DATA_STRUCTURES,
                false,
                false,
                true,
                true).get();

            metaCache0 = ctx.cache().cache(metaCacheName);

            assert metaCache0 != null;
        }

        final IgniteInternalCache<GridCacheInternalKey, AtomicDataStructureValue> metaCache = metaCache0;

        AtomicDataStructureValue oldVal;

        final IgniteInternalCache cache;

        if (create) {
            cache = compatibleCache(cfg, grpName, type, name, separated);

            DistributedCollectionMetadata newVal = new DistributedCollectionMetadata(type, cfg, cache.name());

            oldVal = metaCache.getAndPutIfAbsent(new GridCacheInternalKeyImpl(name, grpName), newVal);
        }
        else {
            oldVal = metaCache.get(new GridCacheInternalKeyImpl(name, grpName));

            if (oldVal == null)
                return null;
            else if (!(oldVal instanceof DistributedCollectionMetadata))
                throw new IgniteCheckedException("Another data structure with the same name already created " +
                    "[name=" + name +
                    ", newType=" + type +
                    ", existingType=" + oldVal.type() + ']');

            cache = ctx.cache().getOrStartCache(((DistributedCollectionMetadata)oldVal).cacheName());

            if (cache == null)
                return null;
        }

        if (oldVal != null) {
            if (oldVal.type() != type)
                throw new IgniteCheckedException("Another data structure with the same name already created " +
                    "[name=" + name +
                    ", newType=" + type +
                    ", existingType=" + oldVal.type() + ']');

            assert oldVal instanceof DistributedCollectionMetadata;

            if (cfg != null && ((DistributedCollectionMetadata)oldVal).configuration().isCollocated() != cfg.isCollocated()) {
                throw new IgniteCheckedException("Another collection with the same name but different " +
                    "configuration already created [name=" + name +
                    ", newCollocated=" + cfg.isCollocated() +
                    ", existingCollocated=" + !cfg.isCollocated() + ']');
            }
        }

        return retryTopologySafe(new IgniteOutClosureX<T>() {
            @Override public T applyx() throws IgniteCheckedException {
                return c.applyx(cache.context());
            }
        });
    }

    /**
     * Awaits for processor initialization.
     */
    private void awaitInitialization() {
        CountDownLatch latch0 = initLatch;

        if (latch0 == null)
            throw new IllegalStateException("Ignite cluster is not active");

        if (latch0.getCount() > 0) {
            try {
                U.await(latch0);

                if (initFailed)
                    throw new IllegalStateException("Failed to initialize data structures processor.");
            }
            catch (IgniteInterruptedCheckedException e) {
                throw new IllegalStateException("Failed to initialize data structures processor " +
                    "(thread has been interrupted).", e);
            }
        }
    }

    /**
     * Gets or creates count down latch. If count down latch is not found in cache,
     * it is created using provided name and count parameter.
     *
     * @param name Name of the latch.
     * @param cfg Configuration.
     * @param cnt Initial count.
     * @param autoDel {@code True} to automatically delete latch from cache when
     *      its count reaches zero.
     * @param create If {@code true} latch will be created in case it is not in cache,
     *      if it is {@code false} all parameters except {@code name} are ignored.
     * @return Count down latch for the given name or {@code null} if it is not found and
     *      {@code create} is false.
     * @throws IgniteCheckedException If operation failed.
     */
    public IgniteCountDownLatch countDownLatch(final String name,
        @Nullable AtomicConfiguration cfg,
        final int cnt,
        final boolean autoDel,
        final boolean create
    ) throws IgniteCheckedException {
        if (create)
            A.ensure(cnt >= 0, "count can not be negative");

        return getAtomic(new AtomicAccessor<GridCacheCountDownLatchEx>() {
            @Override public T2<GridCacheCountDownLatchEx, AtomicDataStructureValue> get(
                GridCacheInternalKey key,
                AtomicDataStructureValue val,
                IgniteInternalCache cache
            ) throws IgniteCheckedException {
                // Check that count down hasn't been created in other thread yet.
                GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);

                if (latch != null) {
                    assert val != null;

                    return new T2<>(latch, null);
                }

                if (val == null && !create)
                    return null;

                GridCacheCountDownLatchValue retVal = (val == null ? new GridCacheCountDownLatchValue(cnt, autoDel,
                    ctx.discovery().gridStartTime()) : null);

                GridCacheCountDownLatchValue latchVal = retVal != null ? retVal : (GridCacheCountDownLatchValue)val;

                assert latchVal != null;

                latch = new GridCacheCountDownLatchImpl(name, latchVal.initialCount(),
                    latchVal.autoDelete(),
                    key,
                    cache);

                return new T2<GridCacheCountDownLatchEx, AtomicDataStructureValue>(latch, retVal);
            }
        }, cfg, name, COUNT_DOWN_LATCH, create, GridCacheCountDownLatchEx.class);
    }

    /**
     * Removes count down latch from cache.
     *
     * @param name Name of the latch.
     * @param grpName Cache group name.
     * @throws IgniteCheckedException If operation failed.
     */
    public void removeCountDownLatch(final String name, final String grpName) throws IgniteCheckedException {
        removeDataStructure(new IgnitePredicateX<AtomicDataStructureValue>() {
            @Override public boolean applyx(AtomicDataStructureValue val) throws IgniteCheckedException {
                assert val != null && val instanceof GridCacheCountDownLatchValue;

                GridCacheCountDownLatchValue latchVal = (GridCacheCountDownLatchValue)val;

                if (latchVal.get() > 0) {
                    throw new IgniteCheckedException("Failed to remove count down latch " +
                            "with non-zero count: " + latchVal.get());
                }

                return true;
            }
        }, name, grpName, COUNT_DOWN_LATCH, null);
    }

    /**
     * Gets or creates semaphore. If semaphore is not found in cache,
     * it is created using provided name and count parameter.
     *
     * @param name Name of the semaphore.
     * @param cfg Configuration.
     * @param cnt Initial count.
     * @param failoverSafe {@code True} FailoverSafe parameter.
     * @param create If {@code true} semaphore will be created in case it is not in cache,
     *      if it is {@code false} all parameters except {@code name} are ignored.
     * @return Semaphore for the given name or {@code null} if it is not found and
     *      {@code create} is false.
     * @throws IgniteCheckedException If operation failed.
     */
    public IgniteSemaphore semaphore(final String name, @Nullable AtomicConfiguration cfg, final int cnt,
        final boolean failoverSafe, final boolean create)
        throws IgniteCheckedException {
        return getAtomic(new AtomicAccessor<GridCacheSemaphoreEx>() {
            @Override public T2<GridCacheSemaphoreEx, AtomicDataStructureValue> get(
                GridCacheInternalKey key,
                AtomicDataStructureValue val,
                IgniteInternalCache cache
            ) throws IgniteCheckedException {
                // Check that semaphore hasn't been created in other thread yet.
                GridCacheSemaphoreEx sem = cast(dsMap.get(key), GridCacheSemaphoreEx.class);

                if (sem != null) {
                    assert val != null;

                    return new T2<>(sem, null);
                }

                if (val == null && !create)
                    return null;

                AtomicDataStructureValue retVal = (val == null ? new GridCacheSemaphoreState(cnt,
                    new HashMap<UUID, Integer>(), failoverSafe, ctx.discovery().gridStartTime()) : null);

                GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl(name, key, cache);

                //check Cluster state against semaphore state
                if (val != null && failoverSafe) {
                    GridCacheSemaphoreState semState = (GridCacheSemaphoreState)val;

                    boolean updated = false;

                    Map<UUID, Integer> waiters = semState.getWaiters();

                    Integer permit = ((GridCacheSemaphoreState)val).getCount();

                    for (UUID nodeId : new HashSet<>(waiters.keySet())) {

                        ClusterNode node = ctx.cluster().get().node(nodeId);

                        if (node == null) {

                            permit += waiters.get(nodeId);

                            waiters.remove(nodeId);

                            updated = true;
                        }
                    }
                    if (updated) {
                        semState.setWaiters(waiters);
                        semState.setCount(permit);

                        retVal = semState;
                    }
                }

                return new T2<>(sem0, retVal);
            }
        }, cfg, name, SEMAPHORE, create, GridCacheSemaphoreEx.class);
    }

    /**
     * Removes semaphore from cache.
     *
     * @param name Name of the semaphore.
     * @param grpName Group name.
     * @throws IgniteCheckedException If operation failed.
     */
    public void removeSemaphore(final String name, final String grpName) throws IgniteCheckedException {
        removeDataStructure(new IgnitePredicateX<AtomicDataStructureValue>() {
            @Override public boolean applyx(AtomicDataStructureValue val) throws IgniteCheckedException {
                assert val != null && val instanceof GridCacheSemaphoreState;

                GridCacheSemaphoreState semVal = (GridCacheSemaphoreState)val;

                if (semVal.getCount() < 0)
                    throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. ");

                return true;
            }
        }, name, grpName, SEMAPHORE, null);
    }

    /**
     * Gets or creates reentrant lock. If reentrant lock is not found in cache,
     * it is created using provided name, failover mode, and fairness mode parameters.
     *
     * @param name Name of the reentrant lock.
     * @param cfg Configuration.
     * @param failoverSafe Flag indicating behaviour in case of failure.
     * @param fair Flag indicating fairness policy of this lock.
     * @param create If {@code true} reentrant lock will be created in case it is not in cache.
     * @return ReentrantLock for the given name or {@code null} if it is not found and
     *      {@code create} is false.
     * @throws IgniteCheckedException If operation failed.
     */
    public IgniteLock reentrantLock(final String name, @Nullable AtomicConfiguration cfg, final boolean failoverSafe,
        final boolean fair, final boolean create) throws IgniteCheckedException {
        return getAtomic(new AtomicAccessor<GridCacheLockEx>() {
            @Override public T2<GridCacheLockEx, AtomicDataStructureValue> get(
                GridCacheInternalKey key,
                AtomicDataStructureValue val,
                IgniteInternalCache cache
            ) throws IgniteCheckedException {
                // Check that reentrant lock hasn't been created in other thread yet.
                GridCacheLockEx reentrantLock = cast(dsMap.get(key), GridCacheLockEx.class);

                if (reentrantLock != null) {
                    assert val != null;

                    return new T2<>(reentrantLock, null);
                }

                if (val == null && !create)
                    return new T2<>(null, null);

                AtomicDataStructureValue retVal = (val == null ? new GridCacheLockState(0, ctx.localNodeId(),
                    0, failoverSafe, fair, ctx.discovery().gridStartTime()) : null);

                GridCacheLockEx reentrantLock0 = new GridCacheLockImpl(name, key, cache);

                return new T2<>(reentrantLock0, retVal);
            }
        }, cfg, name, REENTRANT_LOCK, create, GridCacheLockEx.class);
    }

    /**
     * Removes reentrant lock from cache.
     *
     * @param name Name of the reentrant lock.
     * @param grpName Group name.
     * @param broken Flag indicating the reentrant lock is broken and should be removed unconditionally.
     * @throws IgniteCheckedException If operation failed.
     */
    public void removeReentrantLock(final String name, final String grpName, final boolean broken) throws IgniteCheckedException {
        removeDataStructure(new IgnitePredicateX<AtomicDataStructureValue>() {
            @Override public boolean applyx(AtomicDataStructureValue val) throws IgniteCheckedException {
                assert val != null && val instanceof GridCacheLockState;

                GridCacheLockState lockVal = (GridCacheLockState)val;

                if (lockVal.get() > 0 && !broken)
                    throw new IgniteCheckedException("Failed to remove reentrant lock with blocked threads. ");

                return true;
            }
        }, name, grpName, REENTRANT_LOCK, null);
    }

    /**
     *
     */
    static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
        /** */
        private static final long serialVersionUID = 0L;

        /** {@inheritDoc} */
        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
            if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
                return evt.getValue() instanceof GridCacheCountDownLatchValue ||
                    evt.getValue() instanceof GridCacheSemaphoreState ||
                    evt.getValue() instanceof GridCacheLockState;
            else {
                assert evt.getEventType() == EventType.REMOVED : evt;

                return true;
            }
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(DataStructuresEntryFilter.class, this);
        }
    }

    /**
     *
     */
    private class DataStructuresEntryListener implements
        CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
        /** {@inheritDoc} */
        @Override public void onUpdated(
            Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts
        ) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
                if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
                    GridCacheInternal val0 = evt.getValue();

                    if (val0 instanceof GridCacheCountDownLatchValue) {
                        final GridCacheInternalKey key = evt.getKey();

                        // Notify latch on changes.
                        final GridCacheRemovable latch = dsMap.get(key);

                        GridCacheCountDownLatchValue val = (GridCacheCountDownLatchValue)val0;

                        if (latch instanceof GridCacheCountDownLatchEx) {
                            final GridCacheCountDownLatchEx latch0 = (GridCacheCountDownLatchEx)latch;

                            latch0.onUpdate(val.get());

                            if (val.get() == 0 && val.autoDelete()) {
                                dsMap.remove(key);

                                IgniteInternalFuture<?> rmvFut = ctx.closure().runLocalSafe(new GPR() {
                                    @Override public void run() {
                                        try {
                                            removeCountDownLatch(latch0.name(), key.groupName());
                                        }
                                        catch (IgniteCheckedException e) {
                                            U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
                                        }
                                        finally {
                                            ctx.cache().context().txContextReset();
                                        }
                                    }
                                });

                                rmvFut.listen(new CI1<IgniteInternalFuture<?>>() {
                                    @Override public void apply(IgniteInternalFuture<?> f) {
                                        try {
                                            f.get();
                                        }
                                        catch (IgniteCheckedException e) {
                                            U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
                                        }

                                        latch.onRemoved();
                                    }
                                });
                            }
                        }
                        else if (latch != null) {
                            U.error(log, "Failed to cast object " +
                                "[expected=" + IgniteCountDownLatch.class.getSimpleName() +
                                ", actual=" + latch.getClass() + ", value=" + latch + ']');
                        }
                    }
                    else if (val0 instanceof GridCacheSemaphoreState) {
                        GridCacheInternalKey key = evt.getKey();

                        // Notify semaphore on changes.
                        final GridCacheRemovable sem = dsMap.get(key);

                        GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;

                        if (sem instanceof GridCacheSemaphoreEx) {
                            final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)sem;

                            semaphore0.onUpdate(val);
                        }
                        else if (sem != null) {
                            U.error(log, "Failed to cast object " +
                                    "[expected=" + IgniteSemaphore.class.getSimpleName() +
                                    ", actual=" + sem.getClass() + ", value=" + sem + ']');
                        }
                    }
                    else if (val0 instanceof GridCacheLockState) {
                        GridCacheInternalKey key = evt.getKey();

                        // Notify reentrant lock on changes.
                        final GridCacheRemovable reentrantLock = dsMap.get(key);

                        GridCacheLockState val = (GridCacheLockState)val0;

                        if (reentrantLock instanceof GridCacheLockEx) {
                            final GridCacheLockEx lock0 = (GridCacheLockEx)reentrantLock;

                            lock0.onUpdate(val);
                        }
                        else if (reentrantLock != null) {
                            U.error(log, "Failed to cast object " +
                                "[expected=" + IgniteLock.class.getSimpleName() +
                                ", actual=" + reentrantLock.getClass() + ", value=" + reentrantLock + ']');
                        }
                    }
                }
                else {
                    assert evt.getEventType() == EventType.REMOVED : evt;

                    GridCacheInternal key = evt.getKey();

                    // Entry's val is null if entry deleted.
                    GridCacheRemovable obj = dsMap.remove(key);

                    if (obj != null)
                        obj.onRemoved();
                }
            }
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(DataStructuresEntryListener.class, this);
        }
    }

    /**
     * Gets a set from cache or creates one if it's not cached.
     *
     * @param name Set name.
     * @param grpName Group name. If present, will override groupName from configuration.
     * @param cfg Set configuration if new set should be created.
     * @return Set instance.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable public <T> IgniteSet<T> set(final String name, @Nullable final String grpName, @Nullable final CollectionConfiguration cfg)
        throws IgniteCheckedException {
        A.notNull(name, "name");

        final boolean create = cfg != null;
        final boolean collocated = isCollocated(cfg);
        final boolean separated = !collocated &&
            U.isOldestNodeVersionAtLeast(SEPARATE_CACHE_PER_NON_COLLOCATED_SET_SINCE, ctx.grid().cluster().nodes());

        return getCollection(new CX1<GridCacheContext, IgniteSet<T>>() {
            @Override public IgniteSet<T> applyx(GridCacheContext cctx) throws IgniteCheckedException {
                return cctx.dataStructures().set(name, collocated, create, separated);
            }
        }, cfg, name, grpName, SET, create, separated);
    }

    /**
     * Gets a set from cache by known cache id. Does not create new sets.
     *
     * @param name Set name.
     * @param cacheId Cache id.
     * @param collocated Colocated mode flag.
     * @param separated Separated cache flag.
     * @return Set instance.
     * @throws IgniteCheckedException If failed.
     */
    @Nullable public <T> IgniteSet<T> set(String name, int cacheId, boolean collocated, boolean separated)
        throws IgniteCheckedException {
        A.notNull(name, "name");

        DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);

        if (desc == null)
            return null;

        IgniteInternalCache<Object, Object> cache = ctx.cache().cache(desc.cacheName());

        if (cache == null)
            return null;

        return cache.context().dataStructures().set(name, collocated, false, separated);
    }

    /**
     * @param name Set name.
     * @param cctx Set cache context.
     * @throws IgniteCheckedException If failed.
     */
    public void removeSet(final String name, final GridCacheContext cctx) throws IgniteCheckedException {
        assert name != null;
        assert cctx != null;

        CIX1<GridCacheSetHeader> afterRmv = new CIX1<GridCacheSetHeader>() {
            @Override public void applyx(GridCacheSetHeader hdr) throws IgniteCheckedException {
                hdr = (GridCacheSetHeader)cctx.cache().withNoRetries().getAndRemove(new GridCacheSetHeaderKey(name));

                if (hdr != null)
                    cctx.dataStructures().removeSetData(hdr.id(), hdr.separated());
            }
        };

        removeDataStructure(null, name, cctx.group().name(), SET, afterRmv);
    }

    /**
     * @param log Logger.
     * @param call Callable.
     * @return Callable result.
     * @throws IgniteCheckedException If all retries failed.
     */
    public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
        try {
            return GridCacheUtils.retryTopologySafe(call);
        }
        catch (IgniteCheckedException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IgniteCheckedException(e);
        }
    }

    /**
     * Tries to cast the object to expected type.
     *
     * @param obj Object which will be casted.
     * @param cls Class
     * @param <R> Type of expected result.
     * @return Object has casted to expected type.
     * @throws IgniteCheckedException If {@code obj} has different to {@code cls} type.
     */
    @Nullable private <R> R cast(@Nullable Object obj, Class<R> cls) throws IgniteCheckedException {
        if (obj == null)
            return null;

        if (cls.isInstance(obj))
            return (R)obj;
        else
            return null;
    }

    /** {@inheritDoc} */
    @Override public void printMemoryStats() {
        X.println(">>> ");
        X.println(">>> Data structure processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
        X.println(">>>   dsMapSize: " + dsMap.size());
    }

    /**
     * @throws IgniteException If atomics configuration is not provided.
     */
    private void checkAtomicsConfiguration() throws IgniteException {
        if (dfltAtomicCfg == null)
            throw new IgniteException("Atomic data structure can not be created, " +
                "need to provide AtomicConfiguration.");
    }

    /**
     * @param c Closure to run.
     * @throws IgniteCheckedException If failed.
     * @return Closure return value.
     */
    private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException {
        for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
            try {
                return c.applyx();
            }
            catch (NodeStoppingException e) {
                throw e;
            }
            catch (IgniteCheckedException e) {
                if (i == GridCacheAdapter.MAX_RETRIES - 1)
                    throw e;

                ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);

                if (topErr == null || (topErr instanceof ClusterTopologyServerNotFoundException))
                    throw e;

                IgniteInternalFuture<?> fut = topErr.retryReadyFuture();

                if (fut != null)
                    fut.get();
            }
        }

        assert false;

        return null;
    }

    /**
     *
     */
    private interface AtomicAccessor<T> {
        /**
         * @param key Key.
         * @param val Existing value.
         * @param cache Data structure cache.
         * @return Data structure instance and value to store in cache.
         * @throws IgniteCheckedException If failed.
         */
        T2<T, AtomicDataStructureValue> get(GridCacheInternalKey key,
            @Nullable AtomicDataStructureValue val,
            IgniteInternalCache cache) throws IgniteCheckedException;
    }
}
