/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;

/**
 *
 */
public class CacheGroupContext {
    /**
     * Unique group ID. Currently for shared group it is generated as group name hash,
     * for non-shared as cache name hash (see {@link ClusterCachesInfo#checkCacheConflict}).
     */
    private final int grpId;

    /** Node ID cache group was received from. */
    private final UUID rcvdFrom;

    /** Flag indicating that this cache group is in a recovery mode due to partitions loss. */
    private boolean needsRecovery;

    /** */
    private final AffinityTopologyVersion locStartVer;

    /** */
    private final CacheConfiguration<?, ?> ccfg;

    /** */
    private final GridCacheSharedContext ctx;

    /** */
    private final boolean affNode;

    /** */
    private final CacheType cacheType;

    /** */
    private final byte ioPlc;

    /** */
    private final boolean depEnabled;

    /** */
    private final boolean storeCacheId;

    /** */
    private volatile List<GridCacheContext> caches;

    /** */
    private volatile List<GridCacheContext> contQryCaches;

    /** */
    private final IgniteLogger log;

    /** */
    private GridAffinityAssignmentCache aff;

    /** */
    private GridDhtPartitionTopologyImpl top;

    /** */
    private IgniteCacheOffheapManager offheapMgr;

    /** */
    private GridCachePreloader preldr;
    /** */
    private final DataRegion dataRegion;

    /** Persistence enabled flag. */
    private final boolean persistenceEnabled;

    /** */
    private final CacheObjectContext cacheObjCtx;

    /** */
    private final FreeList freeList;

    /** */
    private final ReuseList reuseList;

    /** */
    private boolean drEnabled;

    /** */
    private boolean qryEnabled;

    /** */
    private boolean mvccEnabled;

    /** MXBean. */
    private CacheGroupMetricsMXBean mxBean;

    /** */
    private volatile boolean localWalEnabled;

    /** */
    private volatile boolean globalWalEnabled;

    /**
     * @param ctx Context.
     * @param grpId Group ID.
     * @param rcvdFrom Node ID cache group was received from.
     * @param cacheType Cache type.
     * @param ccfg Cache configuration.
     * @param affNode Affinity node flag.
     * @param dataRegion data region.
     * @param cacheObjCtx Cache object context.
     * @param freeList Free list.
     * @param reuseList Reuse list.
     * @param locStartVer Topology version when group was started on local node.
     * @param persistenceEnabled Persistence enabled flag.
     * @param walEnabled Wal enabled flag.
     */
    CacheGroupContext(
        GridCacheSharedContext ctx,
        int grpId,
        UUID rcvdFrom,
        CacheType cacheType,
        CacheConfiguration ccfg,
        boolean affNode,
        DataRegion dataRegion,
        CacheObjectContext cacheObjCtx,
        FreeList freeList,
        ReuseList reuseList,
        AffinityTopologyVersion locStartVer,
        boolean persistenceEnabled,
        boolean walEnabled
    ) {
        assert ccfg != null;
        assert dataRegion != null || !affNode;
        assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';

        this.grpId = grpId;
        this.rcvdFrom = rcvdFrom;
        this.ctx = ctx;
        this.ccfg = ccfg;
        this.affNode = affNode;
        this.dataRegion = dataRegion;
        this.cacheObjCtx = cacheObjCtx;
        this.freeList = freeList;
        this.reuseList = reuseList;
        this.locStartVer = locStartVer;
        this.cacheType = cacheType;
        this.globalWalEnabled = walEnabled;
        this.persistenceEnabled = persistenceEnabled;
        this.localWalEnabled = true;

        persistGlobalWalState(walEnabled);

        ioPlc = cacheType.ioPolicy();

        depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);

        storeCacheId = affNode && dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED;

        mvccEnabled = ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;

        log = ctx.kernalContext().log(getClass());

        caches = new ArrayList<>();

        mxBean = new CacheGroupMetricsMXBeanImpl(this);
    }

    /**
     * @return Mvcc flag.
     */
    public boolean mvccEnabled() {
        return mvccEnabled;
    }

    /**
     * @return {@code True} if this is cache group for one of system caches.
     */
    public boolean systemCache() {
        return !sharedGroup() && CU.isSystemCache(ccfg.getName());
    }

    /**
     * @return Node ID initiated cache group start.
     */
    public UUID receivedFrom() {
        return rcvdFrom;
    }

    /**
     * @return {@code True} if cacheId should be stored in data pages.
     */
    public boolean storeCacheIdInDataPage() {
        return storeCacheId;
    }

    /**
     * @return {@code True} if deployment is enabled.
     */
    public boolean deploymentEnabled() {
        return depEnabled;
    }

    /**
     * @return Preloader.
     */
    public GridCachePreloader preloader() {
        return preldr;
    }

    /**
     * @return IO policy for the given cache group.
     */
    public byte ioPolicy() {
        return ioPlc;
    }

    /**
     * @param cctx Cache context.
     * @throws IgniteCheckedException If failed.
     */
    void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
        addCacheContext(cctx);

        offheapMgr.onCacheStarted(cctx);
    }

    /**
     * @param cacheName Cache name.
     * @return {@code True} if group contains cache with given name.
     */
    public boolean hasCache(String cacheName) {
        List<GridCacheContext> caches = this.caches;

        for (int i = 0; i < caches.size(); i++) {
            if (caches.get(i).name().equals(cacheName))
                return true;
        }

        return false;
    }

    /**
     * @param cctx Cache context.
     */
    private void addCacheContext(GridCacheContext cctx) {
        assert cacheType.userCache() == cctx.userCache() : cctx.name();
        assert grpId == cctx.groupId() : cctx.name();

        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);

        assert sharedGroup() || caches.isEmpty();

        boolean add = caches.add(cctx);

        assert add : cctx.name();

        if (!qryEnabled && QueryUtils.isEnabled(cctx.config()))
            qryEnabled = true;

        if (!drEnabled && cctx.isDrEnabled())
            drEnabled = true;

        this.caches = caches;
    }

    /**
     * @param cctx Cache context.
     */
    private void removeCacheContext(GridCacheContext cctx) {
        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);

        // It is possible cache was not added in case of errors on cache start.
        for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) {
            GridCacheContext next = it.next();

            if (next == cctx) {
                assert sharedGroup() || caches.size() == 1 : caches.size();

                it.remove();

                break;
            }
        }

        if (QueryUtils.isEnabled(cctx.config())) {
            boolean qryEnabled = false;

            for (int i = 0; i < caches.size(); i++) {
                if (QueryUtils.isEnabled(caches.get(i).config())) {
                    qryEnabled = true;

                    break;
                }
            }

            this.qryEnabled = qryEnabled;
        }

        if (cctx.isDrEnabled()) {
            boolean drEnabled = false;

            for (int i = 0; i < caches.size(); i++) {
                if (caches.get(i).isDrEnabled()) {
                    drEnabled = true;

                    break;
                }
            }

            this.drEnabled = drEnabled;
        }

        this.caches = caches;
    }

    /**
     * @return Cache context if group contains single cache.
     */
    public GridCacheContext singleCacheContext() {
        List<GridCacheContext> caches = this.caches;

        assert !sharedGroup() && caches.size() == 1 :
            "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() +
                ", caches=" + caches;

        return caches.get(0);
    }

    /**
     *
     */
    public void unwindUndeploys() {
        List<GridCacheContext> caches = this.caches;

        for (int i = 0; i < caches.size(); i++) {
            GridCacheContext cctx = caches.get(i);

            cctx.deploy().unwind(cctx);
        }
    }

    /**
     * @param type Event type to check.
     * @return {@code True} if given event type should be recorded.
     */
    public boolean eventRecordable(int type) {
        return cacheType.userCache() && ctx.gridEvents().isRecordable(type);
    }

    /**
     * @return {@code True} if cache created by user.
     */
    public boolean userCache() {
        return cacheType.userCache();
    }

    /**
     * Adds rebalancing event.
     *
     * @param part Partition.
     * @param type Event type.
     * @param discoNode Discovery node.
     * @param discoType Discovery event type.
     * @param discoTs Discovery event timestamp.
     */
    public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) {
        assert discoNode != null;
        assert type > 0;
        assert discoType > 0;
        assert discoTs > 0;

        if (!eventRecordable(type))
            LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));

        List<GridCacheContext> caches = this.caches;

        for (int i = 0; i < caches.size(); i++) {
            GridCacheContext cctx = caches.get(i);

            if (!cctx.config().isEventsDisabled() && cctx.recordEvent(type)) {
                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
                    cctx.localNode(),
                    "Cache rebalancing event.",
                    type,
                    part,
                    discoNode,
                    discoType,
                    discoTs));
            }
        }
    }

    /**
     * Adds partition unload event.
     *
     * @param part Partition.
     */
    public void addUnloadEvent(int part) {
        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
            LT.warn(log, "Added event without checking if event is recordable: " +
                U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));

        List<GridCacheContext> caches = this.caches;

        for (int i = 0; i < caches.size(); i++) {
            GridCacheContext cctx = caches.get(i);

            if (!cctx.config().isEventsDisabled())
                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
                    cctx.localNode(),
                    "Cache unloading event.",
                    EVT_CACHE_REBALANCE_PART_UNLOADED,
                    part,
                    null,
                    0,
                    0));
        }
    }

    /**
     * @param part Partition.
     * @param key Key.
     * @param evtNodeId Event node ID.
     * @param type Event type.
     * @param newVal New value.
     * @param hasNewVal Has new value flag.
     * @param oldVal Old values.
     * @param hasOldVal Has old value flag.
     * @param keepBinary Keep binary flag.
     */
    public void addCacheEvent(
        int part,
        KeyCacheObject key,
        UUID evtNodeId,
        int type,
        @Nullable CacheObject newVal,
        boolean hasNewVal,
        @Nullable CacheObject oldVal,
        boolean hasOldVal,
        boolean keepBinary
    ) {
        List<GridCacheContext> caches = this.caches;

        for (int i = 0; i < caches.size(); i++) {
            GridCacheContext cctx = caches.get(i);

            if (!cctx.config().isEventsDisabled())
                cctx.events().addEvent(part,
                    key,
                    evtNodeId,
                    (IgniteUuid)null,
                    null,
                    type,
                    newVal,
                    hasNewVal,
                    oldVal,
                    hasOldVal,
                    null,
                    null,
                    null,
                    keepBinary);
        }
    }

    /**
     * @return {@code True} if contains cache with query indexing enabled.
     */
    public boolean queriesEnabled() {
        return qryEnabled;
    }

    /**
     * @return {@code True} in case replication is enabled.
     */
    public boolean isDrEnabled() {
        return drEnabled;
    }

    /**
     * @return Free List.
     */
    public FreeList freeList() {
        return freeList;
    }

    /**
     * @return Reuse List.
     */
    public ReuseList reuseList() {
        return reuseList;
    }

    /**
     * @return Cache object context.
     */
    public CacheObjectContext cacheObjectContext() {
        return cacheObjCtx;
    }

    /**
     * @return Cache shared context.
     */
    public GridCacheSharedContext shared() {
        return ctx;
    }

    /**
     * @return data region.
     */
    public DataRegion dataRegion() {
        return dataRegion;
    }

    /**
     * @return {@code True} if local node is affinity node.
     */
    public boolean affinityNode() {
        return affNode;
    }

    /**
     * @return Topology.
     */
    public GridDhtPartitionTopology topology() {
        if (top == null)
            throw new IllegalStateException("Topology is not initialized: " + cacheOrGroupName());

        return top;
    }

    /**
     * @return {@code True} if current thread holds lock on topology.
     */
    public boolean isTopologyLocked() {
        if (top == null)
            return false;

        return top.holdsLock();
    }

    /**
     * @return Offheap manager.
     */
    public IgniteCacheOffheapManager offheap() {
        return offheapMgr;
    }

    /**
     * @return Current cache state. Must only be modified during exchange.
     */
    public boolean needsRecovery() {
        return needsRecovery;
    }

    /**
     * @param needsRecovery Needs recovery flag.
     */
    public void needsRecovery(boolean needsRecovery) {
        this.needsRecovery = needsRecovery;
    }

    /**
     * @return Topology version when group was started on local node.
     */
    public AffinityTopologyVersion localStartVersion() {
        return locStartVer;
    }

    /**
     * @return {@code True} if cache is local.
     */
    public boolean isLocal() {
        return ccfg.getCacheMode() == LOCAL;
    }

    /**
     * @return {@code True} if cache is local.
     */
    public boolean isReplicated() {
        return ccfg.getCacheMode() == REPLICATED;
    }

    /**
     * @return Cache configuration.
     */
    public CacheConfiguration config() {
        return ccfg;
    }

    /**
     * @return Cache node filter.
     */
    public IgnitePredicate<ClusterNode> nodeFilter() {
        return ccfg.getNodeFilter();
    }

    /**
     * @return Configured user objects which should be initialized/stopped on group start/stop.
     */
    Collection<?> configuredUserObjects() {
        return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getTopologyValidator());
    }

    /**
     * @return Configured topology validator.
     */
    @Nullable public TopologyValidator topologyValidator() {
        return ccfg.getTopologyValidator();
    }

    /**
     * @return Configured affinity function.
     */
    public AffinityFunction affinityFunction() {
        return ccfg.getAffinity();
    }

    /**
     * @return Affinity.
     */
    public GridAffinityAssignmentCache affinity() {
        return aff;
    }

    /**
     * @return Group name or {@code null} if group name was not specified for cache.
     */
    @Nullable public String name() {
        return ccfg.getGroupName();
    }

    /**
     * @return Group name if it is specified, otherwise cache name.
     */
    public String cacheOrGroupName() {
        return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName();
    }

    /**
     * @return Group ID.
     */
    public int groupId() {
        return grpId;
    }

    /**
     * @return {@code True} if group can contain multiple caches.
     */
    public boolean sharedGroup() {
        return ccfg.getGroupName() != null;
    }

    /**
     *
     */
    public void onKernalStop() {
        aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping."));

        preldr.onKernalStop();

        offheapMgr.onKernalStop();
    }

    /**
     * @param cctx Cache context.
     * @param destroy Destroy data flag. Setting to <code>true</code> will remove all cache data.
     */
    void stopCache(GridCacheContext cctx, boolean destroy) {
        if (top != null)
            top.onCacheStopped(cctx.cacheId());

        offheapMgr.stopCache(cctx.cacheId(), destroy);

        removeCacheContext(cctx);
    }

    /**
     *
     */
    void stopGroup() {
        IgniteCheckedException err =
            new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");

        ctx.evict().onCacheGroupStopped(this);

        aff.cancelFutures(err);

        preldr.onKernalStop();

        offheapMgr.stop();

        ctx.io().removeCacheGroupHandlers(grpId);
    }

    /**
     * @return IDs of caches in this group.
     */
    public Set<Integer> cacheIds() {
        List<GridCacheContext> caches = this.caches;

        Set<Integer> ids = U.newHashSet(caches.size());

        for (int i = 0; i < caches.size(); i++)
            ids.add(caches.get(i).cacheId());

        return ids;
    }

    /**
     * @return Caches in this group.
     */
    public List<GridCacheContext> caches() {
        return this.caches;
    }

    /**
     * @return {@code True} if group contains caches.
     */
    boolean hasCaches() {
        List<GridCacheContext> caches = this.caches;

        return !caches.isEmpty();
    }

    /**
     * @param part Partition ID.
     */
    public void onPartitionEvicted(int part) {
        List<GridCacheContext> caches = this.caches;

        for (int i = 0; i < caches.size(); i++) {
            GridCacheContext cctx = caches.get(i);

            if (cctx.isDrEnabled())
                cctx.dr().partitionEvicted(part);

            cctx.continuousQueries().onPartitionEvicted(part);
        }
    }

    /**
     * @param cctx Cache context.
     */
    public void addCacheWithContinuousQuery(GridCacheContext cctx) {
        assert sharedGroup() : cacheOrGroupName();
        assert cctx.group() == this : cctx.name();
        assert !cctx.isLocal() : cctx.name();

        synchronized (this) {
            List<GridCacheContext> contQryCaches = this.contQryCaches;

            if (contQryCaches == null)
                contQryCaches = new ArrayList<>();

            contQryCaches.add(cctx);

            this.contQryCaches = contQryCaches;
        }
    }

    /**
     * @param cctx Cache context.
     */
    public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
        assert sharedGroup() : cacheOrGroupName();
        assert cctx.group() == this : cctx.name();
        assert !cctx.isLocal() : cctx.name();

        synchronized (this) {
            List<GridCacheContext> contQryCaches = this.contQryCaches;

            if (contQryCaches == null)
                return;

            contQryCaches.remove(cctx);

            if (contQryCaches.isEmpty())
                contQryCaches = null;

            this.contQryCaches = contQryCaches;
        }
    }

    /**
     * @param cacheId ID of cache initiated counter update.
     * @param part Partition number.
     * @param cntr Counter.
     * @param topVer Topology version for current operation.
     */
    public void onPartitionCounterUpdate(int cacheId,
        int part,
        long cntr,
        AffinityTopologyVersion topVer,
        boolean primary) {
        assert sharedGroup();

        if (isLocal())
            return;

        List<GridCacheContext> contQryCaches = this.contQryCaches;

        if (contQryCaches == null)
            return;

        CounterSkipContext skipCtx = null;

        for (int i = 0; i < contQryCaches.size(); i++) {
            GridCacheContext cctx = contQryCaches.get(i);

            if (cacheId != cctx.cacheId())
                skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary);
        }

        final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null;

        if (procC != null) {
            ctx.kernalContext().closure().runLocalSafe(new Runnable() {
                @Override public void run() {
                    for (Runnable c : procC)
                        c.run();
                }
            });
        }
    }

    /**
     * @return {@code True} if there is at least one cache with registered CQ exists in this group.
     */
    public boolean hasContinuousQueryCaches() {
        return !F.isEmpty(contQryCaches);
    }

    /**
     * @throws IgniteCheckedException If failed.
     */
    public void start() throws IgniteCheckedException {
        aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
            cacheOrGroupName(),
            grpId,
            ccfg.getAffinity(),
            ccfg.getNodeFilter(),
            ccfg.getBackups(),
            ccfg.getCacheMode() == LOCAL,
            persistenceEnabled());

        if (ccfg.getCacheMode() != LOCAL) {
            top = new GridDhtPartitionTopologyImpl(ctx, this);

            if (!ctx.kernalContext().clientNode()) {
                ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class,
                    new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() {
                        @Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) {
                            processAffinityAssignmentRequest(nodeId, msg);
                        }
                    });
            }

            preldr = new GridDhtPreloader(this);

            preldr.start();
        }
        else
            preldr = new GridCachePreloaderAdapter(this);

        if (persistenceEnabled()) {
            try {
                offheapMgr = new GridCacheOffheapManager();
            }
            catch (Exception e) {
                throw new IgniteCheckedException("Failed to initialize offheap manager", e);
            }
        }
        else
            offheapMgr = new IgniteCacheOffheapManagerImpl();

        offheapMgr.start(ctx, this);

        ctx.affinity().onCacheGroupCreated(this);
    }

    /**
     * @return Persistence enabled flag.
     */
    public boolean persistenceEnabled() {
        return persistenceEnabled;
    }

    /**
     * @param nodeId Node ID.
     * @param req Request.
     */
    private void processAffinityAssignmentRequest(final UUID nodeId,
        final GridDhtAffinityAssignmentRequest req) {
        if (log.isDebugEnabled())
            log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']');

        IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(req.topologyVersion());

        if (fut != null) {
            fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                    processAffinityAssignmentRequest0(nodeId, req);
                }
            });
        }
        else
            processAffinityAssignmentRequest0(nodeId, req);
    }

    /**
     * @param nodeId Node ID.
     * @param req Request.
     */
    private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) {
        AffinityTopologyVersion topVer = req.topologyVersion();

        if (log.isDebugEnabled())
            log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
                ", node=" + nodeId + ']');

        AffinityAssignment assignment = aff.cachedAffinity(topVer);

        GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
            req.futureId(),
            grpId,
            topVer,
            assignment.assignment());

        if (aff.centralizedAffinityFunction()) {
            assert assignment.idealAssignment() != null;

            res.idealAffinityAssignment(assignment.idealAssignment());
        }

        if (req.sendPartitionsState())
            res.partitionMap(top.partitionMap(true));

        try {
            ctx.io().send(nodeId, res, AFFINITY_POOL);
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e);
        }
    }

    /**
     * @param reconnectFut Reconnect future.
     */
    public void onDisconnected(IgniteFuture reconnectFut) {
        IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
            "Failed to wait for topology update, client disconnected.");

        if (aff != null)
            aff.cancelFutures(err);
    }

    /**
     * @return {@code True} if rebalance is enabled.
     */
    public boolean rebalanceEnabled() {
        return ccfg.getRebalanceMode() != NONE;
    }

    /**
     *
     */
    public void onReconnected() {
        aff.onReconnected();

        if (top != null)
            top.onReconnected();

        preldr.onReconnected();
    }

    /**
     * @return MXBean.
     */
    public CacheGroupMetricsMXBean mxBean() {
        return mxBean;
    }

    /** {@inheritDoc} */
    @Override public String toString() {
        return "CacheGroupContext [grp=" + cacheOrGroupName() + ']';
    }

    /**
     * WAL enabled flag.
     */
    public boolean walEnabled() {
        return localWalEnabled && globalWalEnabled;
    }

    /**
     * Local WAL enabled flag.
     */
    public boolean localWalEnabled() {
        return localWalEnabled;
    }

    /**
     * @return Global WAL enabled flag.
     */
    public boolean globalWalEnabled() {
        return globalWalEnabled;
    }

    /**
     * @param enabled Global WAL enabled flag.
     */
    public void globalWalEnabled(boolean enabled) {
        if (globalWalEnabled != enabled) {
            log.info("Global WAL state for group=" + cacheOrGroupName() +
                " changed from " + globalWalEnabled + " to " + enabled);

            persistGlobalWalState(enabled);

            globalWalEnabled = enabled;
        }
    }

    /**
     * @param enabled Local WAL enabled flag.
     */
    public void localWalEnabled(boolean enabled) {
        if (localWalEnabled != enabled){
            log.info("Local WAL state for group=" + cacheOrGroupName() +
                " changed from " + localWalEnabled + " to " + enabled);

            persistLocalWalState(enabled);

            localWalEnabled = enabled;
        }
    }

    /**
     * @param enabled Enabled flag..
     */
    private void persistGlobalWalState(boolean enabled) {
        shared().database().walEnabled(grpId, enabled, false);
    }

    /**
     * @param enabled Enabled flag..
     */
    private void persistLocalWalState(boolean enabled) {
        shared().database().walEnabled(grpId, enabled, true);
    }
}
