/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheWriter;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.spi.encryption.EncryptionSpi;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.plugin.CachePluginConfiguration;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.configuration.CacheConfiguration.DFLT_CACHE_MODE;
import static org.apache.ignite.internal.GridTopic.TOPIC_REPLICATION;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
import static org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager.SYSTEM_DATA_REGION_NAME;

/**
 * Cache utility methods.
 */
public class GridCacheUtils {
    /** Cheat cache ID for debugging and benchmarking purposes. */
    public static final int cheatCacheId;

    /** Each cache operation removes this amount of entries with expired TTL. */
    private static final int TTL_BATCH_SIZE = IgniteSystemProperties.getInteger(
        IgniteSystemProperties.IGNITE_TTL_EXPIRE_BATCH_SIZE, 5);

    /** */
    public static final int UNDEFINED_CACHE_ID = 0;

    /*
     *
     */
    static {
        String cheatCache = System.getProperty("CHEAT_CACHE");

        if (cheatCache != null) {
            cheatCacheId = cheatCache.hashCode();

            if (cheatCacheId == 0)
                throw new RuntimeException();

            System.out.println(">>> Cheat cache ID [id=" + cheatCacheId + ", name=" + cheatCache + ']');
        }
        else
            cheatCacheId = 0;
    }

    /**
     * Quickly checks if passed in cache ID is a "cheat cache ID" set by -DCHEAT_CACHE=user_cache_name
     * and resolved in static block above.
     *
     * FOR DEBUGGING AND TESTING PURPOSES!
     *
     * @param id Cache ID to check.
     * @return {@code True} if this is cheat cache ID.
     */
    @Deprecated
    public static boolean cheatCache(int id) {
        return cheatCacheId != 0 && id == cheatCacheId;
    }

    /**  Hadoop syste cache name. */
    public static final String SYS_CACHE_HADOOP_MR = "ignite-hadoop-mr-sys-cache";

    /** System cache name. */
    public static final String UTILITY_CACHE_NAME = "ignite-sys-cache";

    /** Reserved cache names */
    public static final String[] RESERVED_NAMES = new String[] {
        SYS_CACHE_HADOOP_MR,
        UTILITY_CACHE_NAME,
        MetaStorage.METASTORAGE_CACHE_NAME,
        TxLog.TX_LOG_CACHE_NAME,
    };

    /** */
    public static final String CONTINUOUS_QRY_LOG_CATEGORY = "org.apache.ignite.continuous.query";

    /** */
    public static final String CACHE_MSG_LOG_CATEGORY = "org.apache.ignite.cache.msg";

    /** */
    public static final String ATOMIC_MSG_LOG_CATEGORY = CACHE_MSG_LOG_CATEGORY + ".atomic";

    /** */
    public static final String TX_MSG_LOG_CATEGORY = CACHE_MSG_LOG_CATEGORY + ".tx";

    /** */
    public static final String TX_MSG_PREPARE_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".prepare";

    /** */
    public static final String TX_MSG_FINISH_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".finish";

    /** */
    public static final String TX_MSG_LOCK_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".lock";

    /** */
    public static final String TX_MSG_RECOVERY_LOG_CATEGORY = TX_MSG_LOG_CATEGORY + ".recovery";

    /** Default mask name. */
    private static final String DEFAULT_MASK_NAME = "<default>";

    /** TTL: minimum positive value. */
    public static final long TTL_MINIMUM = 1L;

    /** TTL: eternal. */
    public static final long TTL_ETERNAL = 0L;

    /** TTL: not changed. */
    public static final long TTL_NOT_CHANGED = -1L;

    /** TTL: zero (immediate expiration). */
    public static final long TTL_ZERO = -2L;

    /** Expire time: eternal. */
    public static final long EXPIRE_TIME_ETERNAL = 0L;

    /** Expire time: must be calculated based on TTL value. */
    public static final long EXPIRE_TIME_CALCULATE = -1L;

    /** Empty predicate array. */
    private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0];

    /** Default transaction config. */
    private static final TransactionConfiguration DEFAULT_TX_CFG = new TransactionConfiguration();

    /** Empty predicate array. */
    private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0];

    /** Empty predicate array. */
    private static final CacheEntryPredicate[] EMPTY_FILTER0 = new CacheEntryPredicate[0];

    /** */
    private static final CacheEntryPredicate ALWAYS_FALSE0 = new CacheEntrySerializablePredicate(
        new CacheEntryPredicateAdapter() {
            @Override public boolean apply(GridCacheEntryEx e) {
                return false;
            }
        }
    );

    /** */
    private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0};

    /** Read filter. */
    public static final IgnitePredicate READ_FILTER = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.op() == READ;
        }

        @Override public String toString() {
            return "READ_FILTER";
        }
    };

    /** Read filter. */
    public static final IgnitePredicate READ_FILTER_NEAR = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.op() == READ && e.context().isNear();
        }

        @Override public String toString() {
            return "READ_FILTER_NEAR";
        }
    };

    /** Read filter. */
    public static final IgnitePredicate READ_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.op() == READ && !e.context().isNear();
        }

        @Override public String toString() {
            return "READ_FILTER_COLOCATED";
        }
    };

    /** Write filter. */
    public static final IgnitePredicate WRITE_FILTER = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.op() != READ;
        }

        @Override public String toString() {
            return "WRITE_FILTER";
        }
    };

    /** Write filter. */
    public static final IgnitePredicate WRITE_FILTER_NEAR = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.op() != READ && e.context().isNear();
        }

        @Override public String toString() {
            return "WRITE_FILTER_NEAR";
        }
    };

    /** Write filter. */
    public static final IgnitePredicate WRITE_FILTER_COLOCATED = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.op() != READ && !e.context().isNear();
        }

        @Override public String toString() {
            return "WRITE_FILTER_COLOCATED";
        }
    };

    /** Write filter. */
    public static final IgnitePredicate FILTER_NEAR_CACHE_ENTRY = new P1<IgniteTxEntry>() {
        @Override public boolean apply(IgniteTxEntry e) {
            return e.context().isNear();
        }

        @Override public String toString() {
            return "FILTER_NEAR_CACHE_ENTRY";
        }
    };

    /** Query mapped filter. */
    public static final IgnitePredicate<GridDistributedTxMapping> FILTER_QUERY_MAPPING = new P1<GridDistributedTxMapping>() {

        @Override public boolean apply(GridDistributedTxMapping m) {
            return m.queryUpdate();
        }

        @Override public String toString() {
            return "FILTER_QUERY_MAPPING";
        }
    };

    /** Transaction entry to key. */
    private static final IgniteClosure tx2key = new C1<IgniteTxEntry, Object>() {
        @Override public Object apply(IgniteTxEntry e) {
            return e.key();
        }

        @Override public String toString() {
            return "Cache transaction entry to key converter.";
        }
    };

    /** Transaction entry to key. */
    private static final IgniteClosure txCol2key = new C1<Collection<IgniteTxEntry>, Collection<Object>>() {
        @SuppressWarnings( {"unchecked"})
        @Override public Collection<Object> apply(Collection<IgniteTxEntry> e) {
            return F.viewReadOnly(e, tx2key);
        }

        @Override public String toString() {
            return "Cache transaction entry collection to key collection converter.";
        }
    };

    /** Converts transaction to XID version. */
    private static final IgniteClosure tx2xidVer = new C1<IgniteInternalTx, GridCacheVersion>() {
        @Override public GridCacheVersion apply(IgniteInternalTx tx) {
            return tx.xidVersion();
        }

        @Override public String toString() {
            return "Transaction to XID version converter.";
        }
    };

    /** Converts tx entry to entry. */
    private static final IgniteClosure tx2entry = new C1<IgniteTxEntry, GridCacheEntryEx>() {
        @Override public GridCacheEntryEx apply(IgniteTxEntry e) {
            return e.cached();
        }
    };

    /** Transaction entry to key. */
    private static final IgniteClosure entry2key = new C1<GridCacheEntryEx, KeyCacheObject>() {
        @Override public KeyCacheObject apply(GridCacheEntryEx e) {
            return e.key();
        }

        @Override public String toString() {
            return "Cache extended entry to key converter.";
        }
    };

    /** Transaction entry to key. */
    private static final IgniteClosure info2key = new C1<GridCacheEntryInfo, Object>() {
        @Override public Object apply(GridCacheEntryInfo e) {
            return e.key();
        }

        @Override public String toString() {
            return "Cache extended entry to key converter.";
        }
    };

    /**
     * Ensure singleton.
     */
    protected GridCacheUtils() {
        // No-op.
    }

    /**
     * @param err If {@code true}, then throw {@link GridCacheFilterFailedException},
     *      otherwise return {@code val} passed in.
     * @return Always return {@code null}.
     * @throws GridCacheFilterFailedException If {@code err} flag is {@code true}.
     */
    @Nullable public static CacheObject failed(boolean err) throws GridCacheFilterFailedException {
        return failed(err, null);
    }

    /**
     * @param err If {@code true}, then throw {@link GridCacheFilterFailedException},
     *      otherwise return {@code val} passed in.
     * @param val Value for which evaluation happened.
     * @return Always return {@code val} passed in or throw exception.
     * @throws GridCacheFilterFailedException If {@code err} flag is {@code true}.
     */
    @Nullable public static CacheObject failed(boolean err, CacheObject val) throws GridCacheFilterFailedException {
        if (err)
            throw new GridCacheFilterFailedException(val);

        return null;
    }

    /**
     * Create filter array.
     *
     * @param filter Filter.
     * @return Filter array.
     */
    public static CacheEntryPredicate[] filterArray(@Nullable CacheEntryPredicate filter) {
        return filter != null ? new CacheEntryPredicate[] { filter } : CU.empty0();
    }

    /**
     * Entry predicate factory mostly used for deserialization.
     *
     * @param <K> Key type.
     * @param <V> Value type.
     * @return Factory instance.
     */
    public static <K, V> IgniteClosure<Integer, IgnitePredicate<Cache.Entry<K, V>>[]> factory() {
        return new IgniteClosure<Integer, IgnitePredicate<Cache.Entry<K, V>>[]>() {
            @SuppressWarnings({"unchecked"})
            @Override public IgnitePredicate<Cache.Entry<K, V>>[] apply(Integer len) {
                return (IgnitePredicate<Cache.Entry<K, V>>[])(len == 0 ? EMPTY : new IgnitePredicate[len]);
            }
        };
    }

    /**
     * Checks that cache store is present.
     *
     * @param ctx Registry.
     * @throws IgniteCheckedException If cache store is not present.
     */
    public static void checkStore(GridCacheContext<?, ?> ctx) throws IgniteCheckedException {
        if (!ctx.store().configured())
            throw new IgniteCheckedException("Failed to find cache store for method 'reload(..)' " +
                "(is GridCacheStore configured?)");
    }

    /**
     * Gets DHT affinity nodes.
     *
     * @param ctx Cache context.
     * @param topVer Topology version.
     * @return Cache affinity nodes for given topology version.
     */
    public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topVer) {
        return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topVer);
    }

    /**
     * Checks if near cache is enabled for cache context.
     *
     * @param ctx Cache context to check.
     * @return {@code True} if near cache is enabled, {@code false} otherwise.
     */
    public static boolean isNearEnabled(GridCacheContext ctx) {
        return isNearEnabled(ctx.config());
    }

    /**
     * Checks if near cache is enabled for cache configuration.
     *
     * @param cfg Cache configuration to check.
     * @return {@code True} if near cache is enabled, {@code false} otherwise.
     */
    @SuppressWarnings("SimplifiableIfStatement")
    public static boolean isNearEnabled(CacheConfiguration cfg) {
        if (cfg.getCacheMode() == LOCAL)
            return false;

        return cfg.getNearConfiguration() != null;
    }

    /**
     * @param nodes Nodes.
     * @return Oldest node for the given topology version.
     */
    @Nullable public static ClusterNode oldest(Collection<ClusterNode> nodes) {
        ClusterNode oldest = null;

        for (ClusterNode n : nodes) {
            if (oldest == null || n.order() < oldest.order())
                oldest = n;
        }

        return oldest;
    }

    /**
     * @return Empty filter.
     */
    @SuppressWarnings({"unchecked"})
    public static <K, V> IgnitePredicate<Cache.Entry<K, V>>[] empty() {
        return (IgnitePredicate<Cache.Entry<K, V>>[])EMPTY_FILTER;
    }

    /**
     * @return Empty filter.
     */
    @SuppressWarnings({"unchecked"})
    public static CacheEntryPredicate[] empty0() {
        return EMPTY_FILTER0;
    }

    /**
     * @return Always false filter.
     */
    public static CacheEntryPredicate[] alwaysFalse0Arr() {
        return ALWAYS_FALSE0_ARR;
    }

    /**
     * @return Closure which converts transaction entry xid to XID version.
     */
    @SuppressWarnings( {"unchecked"})
    public static IgniteClosure<IgniteInternalTx, GridCacheVersion> tx2xidVersion() {
        return (IgniteClosure<IgniteInternalTx, GridCacheVersion>)tx2xidVer;
    }

    /**
     * @return Closure that converts entry to key.
     */
    @SuppressWarnings({"unchecked"})
    public static IgniteClosure<GridCacheEntryEx, KeyCacheObject> entry2Key() {
        return entry2key;
    }

    /**
     * @return Closure that converts entry info to key.
     */
    @SuppressWarnings({"unchecked"})
    public static <K, V> IgniteClosure<GridCacheEntryInfo, K> info2Key() {
        return (IgniteClosure<GridCacheEntryInfo, K>)info2key;
    }

    /**
     * @return Filter for transaction reads.
     */
    @SuppressWarnings({"unchecked"})
    public static IgnitePredicate<IgniteTxEntry> reads() {
        return READ_FILTER;
    }

    /**
     * @return Filter for transaction writes.
     */
    @SuppressWarnings({"unchecked"})
    public static IgnitePredicate<IgniteTxEntry> writes() {
        return WRITE_FILTER;
    }

    /**
     * @return Boolean reducer.
     */
    public static IgniteReducer<Boolean, Boolean> boolReducer() {
        return new IgniteReducer<Boolean, Boolean>() {
            private final AtomicBoolean bool = new AtomicBoolean(true);

            @Override public boolean collect(Boolean b) {
                bool.compareAndSet(true, b);

                // Stop collecting on first failure.
                return bool.get();
            }

            @Override public Boolean reduce() {
                return bool.get();
            }

            @Override public String toString() {
                return "Bool reducer: " + bool;
            }
        };
    }

    /**
     * @return Long reducer.
     */
    public static IgniteReducer<Long, Long> longReducer() {
        return new IgniteReducer<Long, Long>() {
            private final LongAdder res = new LongAdder();

            @Override public boolean collect(Long l) {
                if(l != null)
                    res.add(l);

                return true;
            }

            @Override public Long reduce() {
                return res.sum();
            }

            @Override public String toString() {
                return "Long reducer: " + res;
            }
        };
    }

    /**
     * Gets reducer that aggregates maps into one.
     *
     * @param size Predicted size of the resulting map to avoid resizings.
     * @param <K> Key type.
     * @param <V> Value type.
     * @return Reducer.
     */
    public static <K, V> IgniteReducer<Map<K, V>, Map<K, V>> mapsReducer(final int size) {
        return new IgniteReducer<Map<K, V>, Map<K, V>>() {
            private final Map<K, V> ret = new ConcurrentHashMap<>(size);

            @Override public boolean collect(Map<K, V> map) {
                if (map != null)
                    ret.putAll(map);

                return true;
            }

            @Override public Map<K, V> reduce() {
                return ret;
            }

            /** {@inheritDoc} */
            @Override public String toString() {
                return "Map reducer: " + ret;
            }
        };
    }

    /**
     * Gets reducer that aggregates collections.
     *
     * @param <T> Collection element type.
     * @return Reducer.
     */
    public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer(final int size) {
        return new IgniteReducer<Collection<T>, Collection<T>>() {
            private List<T> ret;

            @Override public synchronized boolean collect(Collection<T> c) {
                if (c == null)
                    return true;

                if (ret == null)
                    ret = new ArrayList<>(size);

                ret.addAll(c);

                return true;
            }

            @Override public synchronized Collection<T> reduce() {
                return ret == null ? Collections.<T>emptyList() : ret;
            }

            /** {@inheritDoc} */
            @Override public synchronized String toString() {
                return "Collection reducer: " + ret;
            }
        };
    }

    /**
     * Gets reducer that aggregates items into collection.
     *
     * @param <T> Items type.
     * @return Reducer.
     */
    public static <T> IgniteReducer<T, Collection<T>> objectsReducer() {
        return new IgniteReducer<T, Collection<T>>() {
            private final Collection<T> ret = new ConcurrentLinkedQueue<>();

            @Override public boolean collect(T item) {
                if (item != null)
                    ret.add(item);

                return true;
            }

            @Override public Collection<T> reduce() {
                return ret;
            }
        };
    }

    /**
     *
     * @param nodes Set of nodes.
     * @return Primary node.
     */
    public static ClusterNode primary(Iterable<? extends ClusterNode> nodes) {
        ClusterNode n = F.first(nodes);

        assert n != null;

        return n;
    }

    /**
     * @param nodes Nodes.
     * @return Backup nodes.
     */
    public static Collection<ClusterNode> backups(Collection<ClusterNode> nodes) {
        if (nodes == null || nodes.size() <= 1)
            return Collections.emptyList();

        return F.view(nodes, F.notEqualTo(F.first(nodes)));
    }

    /**
     * @param log Logger.
     * @param excl Excludes.
     * @return Future listener that logs errors.
     */
    public static IgniteInClosure<IgniteInternalFuture<?>> errorLogger(final IgniteLogger log,
        final Class<? extends Exception>... excl) {
        return new CI1<IgniteInternalFuture<?>>() {
            @Override public void apply(IgniteInternalFuture<?> f) {
                try {
                    f.get();
                }
                catch (IgniteCheckedException e) {
                    if (!F.isEmpty(excl))
                        for (Class cls : excl)
                            if (e.hasCause(cls))
                                return;

                    U.error(log, "Future execution resulted in error: " + f, e);
                }
            }

            @Override public String toString() {
                return "Error logger [excludes=" + Arrays.toString(excl) + ']';
            }
        };
    }

    /**
     * @param t Exception to check.
     * @return {@code true} if caused by lock timeout or cancellation.
     */
    public static boolean isLockTimeoutOrCancelled(Throwable t) {
        if (t == null)
            return false;

        while (t instanceof IgniteCheckedException || t instanceof IgniteException)
            t = t.getCause();

        return t instanceof GridCacheLockTimeoutException || t instanceof GridDistributedLockCancelledException;
    }

    /**
     * @param ctx Cache context.
     * @param obj Object to marshal.
     * @return Buffer that contains obtained byte array.
     * @throws IgniteCheckedException If marshalling failed.
     */
    @SuppressWarnings("unchecked")
    public static byte[] marshal(GridCacheContext ctx, Object obj)
        throws IgniteCheckedException {
        assert ctx != null;

        return marshal(ctx.shared(), ctx.deploymentEnabled(), obj);
    }

    /**
     * @param ctx Cache context.
     * @param depEnabled deployment enabled flag.
     * @param obj Object to marshal.
     * @return Buffer that contains obtained byte array.
     * @throws IgniteCheckedException If marshalling failed.
     */
    public static byte[] marshal(GridCacheSharedContext ctx, boolean depEnabled, Object obj)
        throws IgniteCheckedException {
        assert ctx != null;

        if (depEnabled) {
            if (obj != null) {
                if (obj instanceof Iterable)
                    ctx.deploy().registerClasses((Iterable<?>)obj);
                else if (obj.getClass().isArray()) {
                    if (!U.isPrimitiveArray(obj))
                        ctx.deploy().registerClasses((Object[])obj);
                }
                else
                    ctx.deploy().registerClass(obj);
            }
        }

        return U.marshal(ctx, obj);
    }

    /**
     * @param val Value.
     * @param skip Skip value flag.
     * @return Value.
     */
    public static Object skipValue(Object val, boolean skip) {
        if (skip)
            return val != null ? true : null;
        else
            return val;
    }

    /**
     * @param ctx Context.
     * @param prj Projection.
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @return New transaction.
     */
    public static GridNearTxLocal txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
        TransactionConcurrency concurrency, TransactionIsolation isolation) {
        assert ctx != null;
        assert prj != null;

        ctx.tm().resetContext();

        return prj.txStartEx(concurrency, isolation);
    }

    /**
     * @param tx Transaction.
     * @return String view of all safe-to-print transaction properties.
     */
    public static String txString(@Nullable IgniteInternalTx tx) {
        if (tx == null)
            return "null";

        return tx.getClass().getSimpleName() + "[xid=" + tx.xid() +
            ", xidVersion=" + tx.xidVersion() +
            ", concurrency=" + tx.concurrency() +
            ", isolation=" + tx.isolation() +
            ", state=" + tx.state() +
            ", invalidate=" + tx.isInvalidate() +
            ", rollbackOnly=" + tx.isRollbackOnly() +
            ", nodeId=" + tx.nodeId() +
            ", timeout=" + tx.timeout() +
            ", duration=" + (U.currentTimeMillis() - tx.startTime()) +
            (tx instanceof GridNearTxLocal ? ", label=" + ((GridNearTxLocal)tx).label() : "") +
            ']';
    }

    /**
     * @param ctx Cache context.
     */
    public static void unwindEvicts(GridCacheContext ctx) {
        assert ctx != null;

        ctx.ttl().expire(TTL_BATCH_SIZE);
    }

    /**
     * @param ctx Shared cache context.
     */
    public static <K, V> void unwindEvicts(GridCacheSharedContext<K, V> ctx) {
        assert ctx != null;

        for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts())
            unwindEvicts(cacheCtx);
    }

    /**
     * @param asc {@code True} for ascending.
     * @return Descending order comparator.
     */
    public static Comparator<ClusterNode> nodeComparator(final boolean asc) {
        return new Comparator<ClusterNode>() {
            @Override public int compare(ClusterNode n1, ClusterNode n2) {
                long o1 = n1.order();
                long o2 = n2.order();

                return asc ? o1 < o2 ? -1 : o1 == o2 ? 0 : 1 : o1 < o2 ? 1 : o1 == o2 ? 0 : -1;
            }

            @Override public String toString() {
                return "Node comparator [asc=" + asc + ']';
            }
        };
    }

    /**
     * Mask cache name in case it is null.
     *
     * @param cacheName Cache name.
     * @return The same cache name or {@code <default>} in case the name is {@code null}.
     */
    public static String mask(String cacheName) {
        return cacheName != null ? cacheName : DEFAULT_MASK_NAME;
    }

    /**
     * Unmask cache name.
     *
     * @param cacheName Cache name.
     * @return Unmasked cache name, i.e. in case provided parameter was {@code <default>} then {@code null}
     *     will be returned.
     */
    @Nullable public static String unmask(String cacheName) {
        return DEFAULT_MASK_NAME.equals(cacheName) ? null : cacheName;
    }

    /**
     * Get topic to which replication requests are sent.
     *
     * @return Topic to which replication requests are sent.
     */
    public static String replicationTopicSend() {
        return TOPIC_REPLICATION.toString();
    }

    /**
     * Get topic to which replication responses are sent.
     *
     * @param cacheName Cache name.
     * @return Topic to which replication responses are sent.
     */
    public static String replicationTopicReceive(String cacheName) {
        return TOPIC_REPLICATION + "-" + mask(cacheName);
    }

    /**
     * Checks that local and remove configurations have the same value of given attribute.
     *
     * @param log Logger used to log warning message (used only if fail flag is not set).
     * @param locCfg Local configuration.
     * @param rmtCfg Remote configuration.
     * @param rmtNodeId Remote node.
     * @param attr Attribute name.
     * @param fail If true throws IgniteCheckedException in case of attribute values mismatch, otherwise logs warning.
     * @throws IgniteCheckedException If attribute values are different and fail flag is true.
     */
    public static void checkAttributeMismatch(IgniteLogger log, CacheConfiguration locCfg,
        CacheConfiguration rmtCfg, UUID rmtNodeId, T2<String, String> attr, boolean fail) throws IgniteCheckedException {
        assert rmtNodeId != null;
        assert attr != null;
        assert attr.get1() != null;
        assert attr.get2() != null;

        Object locVal = U.property(locCfg, attr.get1());

        Object rmtVal = U.property(rmtCfg, attr.get1());

        checkAttributeMismatch(log, rmtCfg.getName(), rmtNodeId, attr.get1(), attr.get2(), locVal, rmtVal, fail);
    }

    /**
     * Checks that cache configuration attribute has the same value in local and remote cache configurations.
     *
     * @param log Logger used to log warning message (used only if fail flag is not set).
     * @param cfgName Remote cache name.
     * @param rmtNodeId Remote node.
     * @param attrName Short attribute name for error message.
     * @param attrMsg Full attribute name for error message.
     * @param locVal Local value.
     * @param rmtVal Remote value.
     * @param fail If true throws IgniteCheckedException in case of attribute values mismatch, otherwise logs warning.
     * @throws IgniteCheckedException If attribute values are different and fail flag is true.
     */
    public static void checkAttributeMismatch(IgniteLogger log, String cfgName, UUID rmtNodeId, String attrName,
        String attrMsg, @Nullable Object locVal, @Nullable Object rmtVal, boolean fail) throws IgniteCheckedException {
        assert rmtNodeId != null;
        assert attrName != null;
        assert attrMsg != null;

        if (!F.eq(locVal, rmtVal)) {
            if (fail) {
                throw new IgniteCheckedException(attrMsg + " mismatch (fix " + attrMsg.toLowerCase() + " in cache " +
                    "configuration or set -D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true " +
                    "system property) [cacheName=" + cfgName +
                    ", local" + capitalize(attrName) + "=" + locVal +
                    ", remote" + capitalize(attrName) + "=" + rmtVal +
                    ", rmtNodeId=" + rmtNodeId + ']');
            }
            else {
                assert log != null;

                U.warn(log, attrMsg + " mismatch (fix " + attrMsg.toLowerCase() + " in cache " +
                    "configuration) [cacheName=" + cfgName +
                    ", local" + capitalize(attrName) + "=" + locVal +
                    ", remote" + capitalize(attrName) + "=" + rmtVal +
                    ", rmtNodeId=" + rmtNodeId + ']');
            }
        }
    }

    /**
     * @param cfg1 Existing configuration.
     * @param cfg2 Cache configuration to start.
     * @param attrName Short attribute name for error message.
     * @param attrMsg Full attribute name for error message.
     * @param val1 Attribute value in existing configuration.
     * @param val2 Attribute value in starting configuration.
     * @param fail If true throws IgniteCheckedException in case of attribute values mismatch, otherwise logs warning.
     * @throws IgniteCheckedException If validation failed.
     */
    public static void validateCacheGroupsAttributesMismatch(IgniteLogger log,
        CacheConfiguration cfg1,
        CacheConfiguration cfg2,
        String attrName,
        String attrMsg,
        Object val1,
        Object val2,
        boolean fail) throws IgniteCheckedException {
        if (F.eq(val1, val2))
            return;

        if (fail) {
            throw new IgniteCheckedException(attrMsg + " mismatch for caches related to the same group " +
                "[groupName=" + cfg1.getGroupName() +
                ", existingCache=" + cfg1.getName() +
                ", existing" + capitalize(attrName) + "=" + val1 +
                ", startingCache=" + cfg2.getName() +
                ", starting" + capitalize(attrName) + "=" + val2 + ']');
        }
        else {
            U.warn(log, attrMsg + " mismatch for caches related to the same group " +
                "[groupName=" + cfg1.getGroupName() +
                ", existingCache=" + cfg1.getName() +
                ", existing" + capitalize(attrName) + "=" + val1 +
                ", startingCache=" + cfg2.getName() +
                ", starting" + capitalize(attrName) + "=" + val2 + ']');
        }
    }

    /**
     * @param str String.
     * @return String with first symbol in upper case.
     */
    private static String capitalize(String str) {
        return Character.toUpperCase(str.charAt(0)) + str.substring(1);
    }

    /**
     * Validates that cache key object has overridden equals and hashCode methods.
     * Will also check that a BinaryObject has a hash code set.
     *
     * @param key Key.
     * @throws IllegalArgumentException If equals or hashCode is not implemented.
     */
    public static void validateCacheKey(@Nullable Object key) {
        if (key == null)
            return;

        if (!U.overridesEqualsAndHashCode(key))
            throw new IllegalArgumentException("Cache key must override hashCode() and equals() methods: " +
                key.getClass().getName());
    }

    /**
     * @param cacheName Cache name.
     * @return {@code True} if this is Hadoop system cache.
     */
    public static boolean isHadoopSystemCache(String cacheName) {
        return F.eq(cacheName, SYS_CACHE_HADOOP_MR);
    }

    /**
     * Create system cache used by Hadoop component.
     *
     * @return Hadoop cache configuration.
     */
    public static CacheConfiguration hadoopSystemCache() {
        CacheConfiguration cache = new CacheConfiguration();

        cache.setName(CU.SYS_CACHE_HADOOP_MR);
        cache.setCacheMode(REPLICATED);
        cache.setAtomicityMode(TRANSACTIONAL);
        cache.setWriteSynchronizationMode(FULL_SYNC);

        cache.setEvictionPolicyFactory(null);
        cache.setEvictionPolicy(null);
        cache.setCacheStoreFactory(null);
        cache.setNodeFilter(CacheConfiguration.ALL_NODES);
        cache.setEagerTtl(true);
        cache.setRebalanceMode(SYNC);

        return cache;
    }

    /**
     * @param cacheName Cache name.
     * @return {@code True} if this is utility system cache.
     */
    public static boolean isUtilityCache(String cacheName) {
        return UTILITY_CACHE_NAME.equals(cacheName);
    }

    /**
     * @param cacheName Cache name.
     * @return {@code True} if system cache.
     */
    public static boolean isSystemCache(String cacheName) {
        return isUtilityCache(cacheName) || isHadoopSystemCache(cacheName);
    }

    /**
     * @param cacheName Cache name.
     * @return Cache ID.
     */
    public static int cacheId(String cacheName) {
        if (cacheName != null) {
            int hash = cacheName.hashCode();

            if (hash == 0)
                hash = 1;

            return hash;
        }
        else
            return 1;
    }

    /**
     * @param cacheName Cache name.
     * @param grpName Group name.
     * @return Group ID.
     */
    public static int cacheGroupId(String cacheName, @Nullable String grpName) {
        assert cacheName != null;

        return grpName != null ? CU.cacheId(grpName) : CU.cacheId(cacheName);
    }

    /**
     * @param cfg Grid configuration.
     * @param cacheName Cache name.
     * @return {@code True} in this is IGFS data or meta cache.
     */
    public static boolean isIgfsCache(IgniteConfiguration cfg, @Nullable String cacheName) {
        return IgfsUtils.isIgfsCache(cfg, cacheName);
    }

    /**
     * Convert TTL to expire time.
     *
     * @param ttl TTL.
     * @return Expire time.
     */
    public static long toExpireTime(long ttl) {
        assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : "Invalid TTL: " + ttl;

        long expireTime = ttl == CU.TTL_ETERNAL ? CU.EXPIRE_TIME_ETERNAL : U.currentTimeMillis() + ttl;

        // Account for overflow.
        if (expireTime < 0)
            expireTime = CU.EXPIRE_TIME_ETERNAL;

        return expireTime;
    }

    /**
     * Execute closure inside cache transaction.
     *
     * @param cache Cache.
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @param clo Closure.
     * @throws IgniteCheckedException If failed.
     */
    public static <K, V> void inTx(IgniteInternalCache<K, V> cache, TransactionConcurrency concurrency,
        TransactionIsolation isolation, IgniteInClosureX<IgniteInternalCache<K ,V>> clo) throws IgniteCheckedException {

        try (GridNearTxLocal tx = cache.txStartEx(concurrency, isolation)) {
            clo.applyx(cache);

            tx.commit();
        }
    }

    /**
     * Execute closure inside cache transaction.
     *
     * @param cache Cache.
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @param clo Closure.
     * @throws IgniteCheckedException If failed.
     */
    public static <K, V> void inTx(Ignite ignite, IgniteCache<K, V> cache, TransactionConcurrency concurrency,
        TransactionIsolation isolation, IgniteInClosureX<IgniteCache<K ,V>> clo) throws IgniteCheckedException {

        try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
            clo.applyx(cache);

            tx.commit();
        }
    }

    /**
     * Gets subject ID by transaction.
     *
     * @param tx Transaction.
     * @return Subject ID.
     */
    public static <K, V> UUID subjectId(IgniteInternalTx tx, GridCacheSharedContext<K, V> ctx) {
        if (tx == null)
            return ctx.localNodeId();

        UUID subjId = tx.subjectId();

        return subjId != null ? subjId : tx.originatingNodeId();
    }

    /**
     * Invalidate entry in cache.
     *
     * @param cache Cache.
     * @param key Key.
     */
    public static <K, V> void invalidate(IgniteCache<K, V> cache, K key) {
        cache.localClear(key);
    }

    /**
     * @param duration Duration.
     * @return TTL.
     */
    public static long toTtl(Duration duration) {
        if (duration == null)
            return TTL_NOT_CHANGED;

        if (duration.getDurationAmount() == 0) {
            if (duration.isEternal())
                return TTL_ETERNAL;

            assert duration.isZero();

            return TTL_ZERO;
        }

        assert duration.getTimeUnit() != null : duration;

        return duration.getTimeUnit().toMillis(duration.getDurationAmount());
    }

    /**
     * Get TTL for load operation.
     *
     * @param plc Expiry policy.
     * @return TTL for load operation or {@link #TTL_ZERO} in case of immediate expiration.
     */
    public static long ttlForLoad(ExpiryPolicy plc) {
        if (plc != null) {
            long ttl = toTtl(plc.getExpiryForCreation());

            if (ttl == TTL_NOT_CHANGED)
                ttl = TTL_ETERNAL;

            return ttl;
        }
        else
            return TTL_ETERNAL;
    }

    /**
     * @return Expire time denoting a point in the past.
     */
    public static long expireTimeInPast() {
        return U.currentTimeMillis() - 1L;
    }

    /**
     * @param e Ignite checked exception.
     * @return CacheException runtime exception, never null.
     */
    @NotNull public static RuntimeException convertToCacheException(IgniteCheckedException e) {
        IgniteClientDisconnectedCheckedException disconnectedErr =
            e.getCause(IgniteClientDisconnectedCheckedException.class);

        if (disconnectedErr != null) {
            assert disconnectedErr.reconnectFuture() != null : disconnectedErr;

            e = disconnectedErr;
        }

        if (e.hasCause(CacheWriterException.class))
            return new CacheWriterException(U.convertExceptionNoWrap(e));

        if (e instanceof CachePartialUpdateCheckedException)
            return new CachePartialUpdateException((CachePartialUpdateCheckedException)e);
        else if (e instanceof ClusterTopologyServerNotFoundException)
            return new CacheServerNotFoundException(e.getMessage(), e);
        else if (e instanceof SchemaOperationException)
            return new CacheException(e.getMessage(), e);

        CacheException ce = X.cause(e, CacheException.class);
        if (ce != null)
            return ce;

        if (e.getCause() instanceof NullPointerException)
            return (NullPointerException)e.getCause();

        if (e.getCause() instanceof SecurityException)
            return (SecurityException)e.getCause();

        C1<IgniteCheckedException, IgniteException> converter = U.getExceptionConverter(e.getClass());

        return converter != null ? new CacheException(converter.apply(e)) : new CacheException(e);
    }

    /**
     * @param cacheObj Cache object.
     * @param ctx Cache context.
     * @param cpy Copy flag.
     * @return Cache object value.
     */
    @Nullable public static <T> T value(@Nullable CacheObject cacheObj, GridCacheContext ctx, boolean cpy) {
        return cacheObj != null ? cacheObj.<T>value(ctx.cacheObjectContext(), cpy) : null;
    }

    /**
     * @param cfg Cache configuration.
     * @param cl Type of cache plugin configuration.
     * @return Cache plugin configuration by type from cache configuration or <code>null</code>.
     */
    public static <C extends CachePluginConfiguration> C cachePluginConfiguration(
        CacheConfiguration cfg, Class<C> cl) {
        if (cfg.getPluginConfigurations() != null) {
            for (CachePluginConfiguration pluginCfg : cfg.getPluginConfigurations()) {
                if (pluginCfg.getClass() == cl)
                    return (C)pluginCfg;
            }
        }

        return null;
    }

    /**
     * @param cfg Config.
     * @param cls Class.
     * @return Not <code>null</code> list.
     */
    public static <T extends CachePluginConfiguration> List<T> cachePluginConfigurations(IgniteConfiguration cfg,
        Class<T> cls) {
        List<T> res = new ArrayList<>();

        if (cfg.getCacheConfiguration() != null) {
            for (CacheConfiguration ccfg : cfg.getCacheConfiguration()) {
                for (CachePluginConfiguration pluginCcfg : ccfg.getPluginConfigurations()) {
                    if (cls == pluginCcfg.getClass())
                        res.add((T)pluginCcfg);
                }
            }
        }

        return res;
    }

    /**
     * @param node Node.
     * @param filter Node filter.
     * @return {@code True} if node is not client node and pass given filter.
     */
    public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
        return !node.isDaemon() && !node.isClient() && filter.apply(node);
    }

    /**
     * @param node Node.
     * @param discoveryDataClusterState Discovery data cluster state.
     * @return {@code True} if node is included in BaselineTopology.
     */
    public static boolean baselineNode(ClusterNode node, DiscoveryDataClusterState discoveryDataClusterState) {
        return discoveryDataClusterState.baselineTopology().consistentIds().contains(node.consistentId());
    }

    /**
     * Creates and starts store session listeners.
     *
     * @param ctx Kernal context.
     * @param factories Factories.
     * @return Listeners.
     * @throws IgniteCheckedException In case of error.
     */
    public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx,
        Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
        if (factories == null)
            return null;

        Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);

        for (Factory<CacheStoreSessionListener> factory : factories) {
            CacheStoreSessionListener lsnr = factory.create();

            if (lsnr != null) {
                ctx.resource().injectGeneric(lsnr);

                if (lsnr instanceof LifecycleAware)
                    ((LifecycleAware)lsnr).start();

                lsnrs.add(lsnr);
            }
        }

        return lsnrs;
    }

    /**
     * @param partsMap Cache ID to partition IDs collection map.
     * @return Cache ID to partition ID array map.
     */
    public static Map<Integer, int[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
        Map<Integer, int[]> res = new HashMap<>(partsMap.size());

        for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
            Set<Integer> parts = entry.getValue();

            int[] partsArr = new int[parts.size()];

            int idx = 0;

            for (Integer part : parts)
                partsArr[idx++] = part;

            res.put(entry.getKey(), partsArr);
        }

        return res;
    }

    /**
     * Stops store session listeners.
     *
     * @param ctx Kernal context.
     * @param sesLsnrs Session listeners.
     * @throws IgniteCheckedException In case of error.
     */
    public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs)
        throws IgniteCheckedException {
        if (sesLsnrs == null)
            return;

        for (CacheStoreSessionListener lsnr : sesLsnrs) {
            if (lsnr instanceof LifecycleAware)
                ((LifecycleAware)lsnr).stop();

            ctx.resource().cleanupGeneric(lsnr);
        }
    }

    /**
     * @param c Closure to retry.
     * @throws IgniteCheckedException If failed.
     * @return Closure result.
     */
    public static <S> S retryTopologySafe(final Callable<S> c) throws IgniteCheckedException {
        IgniteCheckedException err = null;

        for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
            try {
                return c.call();
            }
            catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
                throw e;
            }
            catch (TransactionRollbackException e) {
                if (i + 1 == GridCacheAdapter.MAX_RETRIES)
                    throw e;

                U.sleep(1);
            }
            catch (IgniteCheckedException e) {
                if (i + 1 == GridCacheAdapter.MAX_RETRIES)
                    throw e;

                if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
                    ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);

                    if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
                        ClusterTopologyServerNotFoundException)
                        throw e;

                    // IGNITE-1948: remove this check when the issue is fixed
                    if (topErr.retryReadyFuture() != null)
                        topErr.retryReadyFuture().get();
                    else
                        U.sleep(1);
                }
                else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
                    CachePartialUpdateCheckedException.class))
                    U.sleep(1);
                else
                    throw e;
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new IgniteCheckedException(e);
            }
        }

        // Should never happen.
        throw err;
    }

    /**
     * Builds neighborhood map for all nodes in snapshot.
     *
     * @param topSnapshot Topology snapshot.
     * @return Neighbors map.
     */
    public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
        Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);

        // Group by mac addresses.
        for (ClusterNode node : topSnapshot) {
            String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS);

            Collection<ClusterNode> nodes = macMap.get(macs);

            if (nodes == null)
                macMap.put(macs, nodes = new HashSet<>());

            nodes.add(node);
        }

        Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);

        for (Collection<ClusterNode> group : macMap.values())
            for (ClusterNode node : group)
                neighbors.put(node.id(), group);

        return neighbors;
    }

    /**
     * Returns neighbors for all {@code nodes}.
     *
     * @param neighborhood Neighborhood cache.
     * @param nodes Nodes.
     * @return All neighbors for given nodes.
     */
    public static Collection<ClusterNode> neighborsForNodes(Map<UUID, Collection<ClusterNode>> neighborhood,
        Iterable<ClusterNode> nodes) {
        Collection<ClusterNode> res = new HashSet<>();

        for (ClusterNode node : nodes) {
            if (!res.contains(node))
                res.addAll(neighborhood.get(node.id()));
        }

        return res;
    }

    /**
     * @return default TX configuration if system cache is used or current grid TX config otherwise.
     */
    public static TransactionConfiguration transactionConfiguration(final @Nullable GridCacheContext sysCacheCtx,
        final IgniteConfiguration cfg) {
        return sysCacheCtx != null && sysCacheCtx.systemTx()
            ? DEFAULT_TX_CFG
            : cfg.getTransactionConfiguration();
    }

    /**
     * @param name Cache name.
     * @throws IllegalArgumentException In case the name is not valid.
     */
    public static void validateCacheName(String name) throws IllegalArgumentException {
        A.ensure(name != null && !name.isEmpty(), "Cache name must not be null or empty.");
    }

    /**
     * @param name Cache name.
     * @throws IllegalArgumentException In case the name is not valid.
     */
    public static void validateNewCacheName(String name) throws IllegalArgumentException {
        validateCacheName(name);

        A.ensure(!isReservedCacheName(name), "Cache name cannot be \"" + name +
            "\" because it is reserved for internal purposes.");
    }

    /**
     * @param cacheNames Cache names to validate.
     * @throws IllegalArgumentException In case the name is not valid.
     */
    public static void validateCacheNames(Collection<String> cacheNames) throws IllegalArgumentException {
        for (String name : cacheNames)
            validateCacheName(name);
    }

    /**
     * @param ccfgs Configurations to validate.
     * @throws IllegalArgumentException In case the name is not valid.
     */
    public static void validateConfigurationCacheNames(Collection<CacheConfiguration> ccfgs)
        throws IllegalArgumentException {
        for (CacheConfiguration ccfg : ccfgs)
            validateNewCacheName(ccfg.getName());
    }

    /**
     * @param name Cache name.
     * @return {@code True} if it is a reserved cache name.
     */
    public static boolean isReservedCacheName(String name) {
        for (String reserved : RESERVED_NAMES) {
            if (reserved.equals(name))
                return true;
        }

        return false;
    }

    /**
     * @param cfg Initializes cache configuration with proper defaults.
     * @param cacheObjCtx Cache object context.
     * @throws IgniteCheckedException If configuration is not valid.
     */
    public static void initializeConfigDefaults(IgniteLogger log, CacheConfiguration cfg,
        CacheObjectContext cacheObjCtx)
        throws IgniteCheckedException {
        if (cfg.getCacheMode() == null)
            cfg.setCacheMode(DFLT_CACHE_MODE);

        if (cfg.getNodeFilter() == null)
            cfg.setNodeFilter(CacheConfiguration.ALL_NODES);

        if (cfg.getAffinity() == null) {
            if (cfg.getCacheMode() == PARTITIONED) {
                RendezvousAffinityFunction aff = new RendezvousAffinityFunction();

                cfg.setAffinity(aff);
            }
            else if (cfg.getCacheMode() == REPLICATED) {
                RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 512);

                cfg.setAffinity(aff);

                cfg.setBackups(Integer.MAX_VALUE);
            }
            else
                cfg.setAffinity(new GridCacheProcessor.LocalAffinityFunction());
        }
        else {
            if (cfg.getCacheMode() == LOCAL && !(cfg.getAffinity() instanceof GridCacheProcessor.LocalAffinityFunction)) {
                cfg.setAffinity(new GridCacheProcessor.LocalAffinityFunction());

                U.warn(log, "AffinityFunction configuration parameter will be ignored for local cache" +
                    " [cacheName=" + U.maskName(cfg.getName()) + ']');
            }
        }

        if (cfg.getCacheMode() == REPLICATED)
            cfg.setBackups(Integer.MAX_VALUE);

        if (cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
            throw new IgniteCheckedException("Segmented indices are supported for PARTITIONED mode only.");

        if (cfg.getAffinityMapper() == null)
            cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());

        if (cfg.getRebalanceMode() == null)
            cfg.setRebalanceMode(ASYNC);

        if (cfg.getAtomicityMode() == null)
            cfg.setAtomicityMode(CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE);

        if (cfg.getWriteSynchronizationMode() == null)
            cfg.setWriteSynchronizationMode(PRIMARY_SYNC);

        assert cfg.getWriteSynchronizationMode() != null;

        if (cfg.getCacheStoreFactory() == null) {
            Factory<CacheLoader> ldrFactory = cfg.getCacheLoaderFactory();
            Factory<CacheWriter> writerFactory = cfg.isWriteThrough() ? cfg.getCacheWriterFactory() : null;

            if (ldrFactory != null || writerFactory != null)
                cfg.setCacheStoreFactory(new GridCacheLoaderWriterStoreFactory(ldrFactory, writerFactory));
        }
        else {
            if (cfg.getCacheLoaderFactory() != null)
                throw new IgniteCheckedException("Cannot set both cache loaded factory and cache store factory " +
                    "for cache: " + U.maskName(cfg.getName()));

            if (cfg.getCacheWriterFactory() != null)
                throw new IgniteCheckedException("Cannot set both cache writer factory and cache store factory " +
                    "for cache: " + U.maskName(cfg.getName()));
        }

        Collection<QueryEntity> entities = cfg.getQueryEntities();

        if (!F.isEmpty(entities))
            cfg.clearQueryEntities().setQueryEntities(QueryUtils.normalizeQueryEntities(entities, cfg));
    }

    /**
     * Creates closure that saves initial value to backup partition.
     * <p>
     * Useful only when store with readThrough is used. In situation when
     * get() on backup node returns successful result, it's expected that
     * localPeek() will be successful as well. But it isn't true when
     * primary node loaded value from local store, in this case backups
     * will remain non-initialized.
     * <br>
     * To meet that requirement the value requested from primary should
     * be saved on backup during get().
     * </p>
     *
     * @param topVer Topology version.
     * @param log Logger.
     * @param cctx Cache context.
     * @param key Key.
     * @param expiryPlc Expiry policy.
     * @param readThrough Read through.
     * @param skipVals Skip values.
     */
    @Nullable public static BackupPostProcessingClosure createBackupPostProcessingClosure(
        final AffinityTopologyVersion topVer,
        final IgniteLogger log,
        final GridCacheContext cctx,
        @Nullable final KeyCacheObject key,
        @Nullable final IgniteCacheExpiryPolicy expiryPlc,
        boolean readThrough,
        boolean skipVals
    ) {
        if (cctx.mvccEnabled() || !readThrough || skipVals ||
            (key != null && !cctx.affinity().backupsByKey(key, topVer).contains(cctx.localNode())))
            return null;

        return new BackupPostProcessingClosure() {
            private void process(KeyCacheObject key, CacheObject val, GridCacheVersion ver, GridDhtCacheAdapter colocated) {
                while (true) {
                    GridCacheEntryEx entry = null;

                    cctx.shared().database().checkpointReadLock();

                    try {
                        entry = colocated.entryEx(key, topVer);

                        entry.initialValue(
                            val,
                            ver,
                            expiryPlc == null ? 0 : expiryPlc.forCreate(),
                            expiryPlc == null ? 0 : toExpireTime(expiryPlc.forCreate()),
                            true,
                            topVer,
                            GridDrType.DR_BACKUP,
                            true);

                        break;
                    }
                    catch (GridCacheEntryRemovedException ignore) {
                        if (log.isDebugEnabled())
                            log.debug("Got removed entry during postprocessing (will retry): " +
                                entry);
                    }
                    catch (IgniteCheckedException e) {
                        U.error(log, "Error saving backup value: " + entry, e);

                        throw new GridClosureException(e);
                    }
                    catch (GridDhtInvalidPartitionException ignored) {
                        break;
                    }
                    finally {
                        if (entry != null)
                            entry.touch(topVer);

                        cctx.shared().database().checkpointReadUnlock();
                    }
                }
            }

            @Override public void apply(CacheObject val, GridCacheVersion ver) {
                process(key, val, ver, cctx.dht());
            }

            @Override public void apply(Collection<GridCacheEntryInfo> infos) {
                if (!F.isEmpty(infos)) {
                    GridCacheAffinityManager aff = cctx.affinity();
                    ClusterNode locNode = cctx.localNode();

                    GridDhtCacheAdapter colocated = cctx.cache().isNear()
                        ? ((GridNearCacheAdapter)cctx.cache()).dht()
                        : cctx.dht();

                    for (GridCacheEntryInfo info : infos) {
                        // Save backup value.
                        if (aff.backupsByKey(info.key(), topVer).contains(locNode))
                            process(info.key(), info.value(), info.version(), colocated);
                    }
                }
            }
        };
    }

    /**
     * Checks if cache configuration belongs to persistent cache.
     *
     * @param ccfg Cache configuration.
     * @param dsCfg Data storage config.
     */
    public static boolean isPersistentCache(CacheConfiguration ccfg, DataStorageConfiguration dsCfg) {
        if (dsCfg == null)
            return false;

        // Special handling for system cache is needed.
        if (isSystemCache(ccfg.getName()) || isIgfsCacheInSystemRegion(ccfg)) {
            if (dsCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled())
                return true;

            if (dsCfg.getDataRegionConfigurations() != null) {
                for (DataRegionConfiguration drConf : dsCfg.getDataRegionConfigurations()) {
                    if (drConf.isPersistenceEnabled())
                        return true;
                }
            }

            return false;
        }

        String regName = ccfg.getDataRegionName();

        if (regName == null || regName.equals(dsCfg.getDefaultDataRegionConfiguration().getName()))
            return dsCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled();

        if (dsCfg.getDataRegionConfigurations() != null) {
            for (DataRegionConfiguration drConf : dsCfg.getDataRegionConfigurations()) {
                if (regName.equals(drConf.getName()))
                    return drConf.isPersistenceEnabled();
            }
        }

        return false;
    }

    /**
     * Checks whether cache configuration represents IGFS cache that will be placed in system memory region.
     *
     * @param ccfg Cache config.
     */
    private static boolean isIgfsCacheInSystemRegion(CacheConfiguration ccfg) {
        return IgfsUtils.matchIgfsCacheName(ccfg.getName()) &&
            (SYSTEM_DATA_REGION_NAME.equals(ccfg.getDataRegionName()) || ccfg.getDataRegionName() == null);
    }

    /**
     * @return {@code true} if persistence is enabled for at least one data region, {@code false} if not.
     */
    public static boolean isPersistenceEnabled(IgniteConfiguration cfg) {
        return isPersistenceEnabled(cfg.getDataStorageConfiguration());
    }

    /**
     * @return {@code true} if persistence is enabled for at least one data region, {@code false} if not.
     */
    public static boolean isPersistenceEnabled(DataStorageConfiguration cfg) {
        if (cfg == null)
            return false;

        DataRegionConfiguration dfltReg = cfg.getDefaultDataRegionConfiguration();

        if (dfltReg == null)
            return false;

        if (dfltReg.isPersistenceEnabled())
            return true;

        DataRegionConfiguration[] regCfgs = cfg.getDataRegionConfigurations();

        if (regCfgs == null)
            return false;

        for (DataRegionConfiguration regCfg : regCfgs) {
            if (regCfg.isPersistenceEnabled())
                return true;
        }

        return false;
    }

    /**
     * @param pageSize Page size.
     * @param encSpi Encryption spi.
     * @return Page size without encryption overhead.
     */
    public static int encryptedPageSize(int pageSize, EncryptionSpi encSpi) {
        return pageSize
            - (encSpi.encryptedSizeNoPadding(pageSize) - pageSize)
            - encSpi.blockSize(); /* For CRC. */
    }

    /**
     * @param sctx Shared context.
     * @param cacheIds Cache ids.
     * @return First partitioned cache or {@code null} in case no partitioned cache ids are in list.
     */
    public static GridCacheContext<?, ?> firstPartitioned(GridCacheSharedContext<?, ?> sctx, int[] cacheIds) {
        for (int i = 0; i < cacheIds.length; i++) {
            GridCacheContext<?, ?> cctx = sctx.cacheContext(cacheIds[i]);

            if (cctx == null)
                throw new CacheException("Failed to find cache.");

            if (!cctx.isLocal() && !cctx.isReplicated())
                return cctx;
        }

        return null;
    }

    /**
     * @param sctx Shared context.
     * @param cacheIds Cache ids.
     * @return First partitioned cache or {@code null} in case no partitioned cache ids are in list.
     */
    public static GridCacheContext<?, ?> firstPartitioned(GridCacheSharedContext<?, ?> sctx, Iterable<Integer> cacheIds) {
        for (Integer i : cacheIds) {
            GridCacheContext<?, ?> cctx = sctx.cacheContext(i);

            if (cctx == null)
                throw new CacheException("Failed to find cache.");

            if (!cctx.isLocal() && !cctx.isReplicated())
                return cctx;
        }

        return null;
    }

    /**
     * @param cacheName Name of cache or cache template.
     * @return {@code true} if cache name ends with asterisk (*), and therefire is a template name.
     */
    public static boolean isCacheTemplateName(String cacheName) {
        return cacheName.endsWith("*");
    }

    /**
     *
     */
    public interface BackupPostProcessingClosure extends IgniteInClosure<Collection<GridCacheEntryInfo>>,
        IgniteBiInClosure<CacheObject, GridCacheVersion>{
    }
}
