blob: a546a369f099e5126898a4ec47c736afa76c2473 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
/**
*
*/
public class CacheGroupContext {
/**
* Unique group ID. Currently for shared group it is generated as group name hash,
* for non-shared as cache name hash (see {@link ClusterCachesInfo#checkCacheConflict}).
*/
private final int grpId;
/** Node ID cache group was received from. */
private final UUID rcvdFrom;
/** Flag indicating that this cache group is in a recovery mode due to partitions loss. */
private boolean needsRecovery;
/** */
private final AffinityTopologyVersion locStartVer;
/** */
private final CacheConfiguration<?, ?> ccfg;
/** */
private final GridCacheSharedContext ctx;
/** */
private final boolean affNode;
/** */
private final CacheType cacheType;
/** */
private final byte ioPlc;
/** */
private final boolean depEnabled;
/** */
private final boolean storeCacheId;
/** */
private volatile List<GridCacheContext> caches;
/** */
private volatile List<GridCacheContext> contQryCaches;
/** */
private final IgniteLogger log;
/** */
private GridAffinityAssignmentCache aff;
/** */
private GridDhtPartitionTopologyImpl top;
/** */
private IgniteCacheOffheapManager offheapMgr;
/** */
private GridCachePreloader preldr;
/** */
private final DataRegion dataRegion;
/** Persistence enabled flag. */
private final boolean persistenceEnabled;
/** */
private final CacheObjectContext cacheObjCtx;
/** */
private final FreeList freeList;
/** */
private final ReuseList reuseList;
/** */
private boolean drEnabled;
/** */
private boolean qryEnabled;
/** */
private boolean mvccEnabled;
/** MXBean. */
private CacheGroupMetricsMXBean mxBean;
/** */
private volatile boolean localWalEnabled;
/** */
private volatile boolean globalWalEnabled;
/**
* @param ctx Context.
* @param grpId Group ID.
* @param rcvdFrom Node ID cache group was received from.
* @param cacheType Cache type.
* @param ccfg Cache configuration.
* @param affNode Affinity node flag.
* @param dataRegion data region.
* @param cacheObjCtx Cache object context.
* @param freeList Free list.
* @param reuseList Reuse list.
* @param locStartVer Topology version when group was started on local node.
* @param persistenceEnabled Persistence enabled flag.
* @param walEnabled Wal enabled flag.
*/
CacheGroupContext(
GridCacheSharedContext ctx,
int grpId,
UUID rcvdFrom,
CacheType cacheType,
CacheConfiguration ccfg,
boolean affNode,
DataRegion dataRegion,
CacheObjectContext cacheObjCtx,
FreeList freeList,
ReuseList reuseList,
AffinityTopologyVersion locStartVer,
boolean persistenceEnabled,
boolean walEnabled
) {
assert ccfg != null;
assert dataRegion != null || !affNode;
assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
this.grpId = grpId;
this.rcvdFrom = rcvdFrom;
this.ctx = ctx;
this.ccfg = ccfg;
this.affNode = affNode;
this.dataRegion = dataRegion;
this.cacheObjCtx = cacheObjCtx;
this.freeList = freeList;
this.reuseList = reuseList;
this.locStartVer = locStartVer;
this.cacheType = cacheType;
this.globalWalEnabled = walEnabled;
this.persistenceEnabled = persistenceEnabled;
this.localWalEnabled = true;
persistGlobalWalState(walEnabled);
ioPlc = cacheType.ioPolicy();
depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);
storeCacheId = affNode && dataRegion.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED;
mvccEnabled = ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
log = ctx.kernalContext().log(getClass());
caches = new ArrayList<>();
mxBean = new CacheGroupMetricsMXBeanImpl(this);
}
/**
* @return Mvcc flag.
*/
public boolean mvccEnabled() {
return mvccEnabled;
}
/**
* @return {@code True} if this is cache group for one of system caches.
*/
public boolean systemCache() {
return !sharedGroup() && CU.isSystemCache(ccfg.getName());
}
/**
* @return Node ID initiated cache group start.
*/
public UUID receivedFrom() {
return rcvdFrom;
}
/**
* @return {@code True} if cacheId should be stored in data pages.
*/
public boolean storeCacheIdInDataPage() {
return storeCacheId;
}
/**
* @return {@code True} if deployment is enabled.
*/
public boolean deploymentEnabled() {
return depEnabled;
}
/**
* @return Preloader.
*/
public GridCachePreloader preloader() {
return preldr;
}
/**
* @return IO policy for the given cache group.
*/
public byte ioPolicy() {
return ioPlc;
}
/**
* @param cctx Cache context.
* @throws IgniteCheckedException If failed.
*/
void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
addCacheContext(cctx);
offheapMgr.onCacheStarted(cctx);
}
/**
* @param cacheName Cache name.
* @return {@code True} if group contains cache with given name.
*/
public boolean hasCache(String cacheName) {
List<GridCacheContext> caches = this.caches;
for (int i = 0; i < caches.size(); i++) {
if (caches.get(i).name().equals(cacheName))
return true;
}
return false;
}
/**
* @param cctx Cache context.
*/
private void addCacheContext(GridCacheContext cctx) {
assert cacheType.userCache() == cctx.userCache() : cctx.name();
assert grpId == cctx.groupId() : cctx.name();
ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
assert sharedGroup() || caches.isEmpty();
boolean add = caches.add(cctx);
assert add : cctx.name();
if (!qryEnabled && QueryUtils.isEnabled(cctx.config()))
qryEnabled = true;
if (!drEnabled && cctx.isDrEnabled())
drEnabled = true;
this.caches = caches;
}
/**
* @param cctx Cache context.
*/
private void removeCacheContext(GridCacheContext cctx) {
ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
// It is possible cache was not added in case of errors on cache start.
for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) {
GridCacheContext next = it.next();
if (next == cctx) {
assert sharedGroup() || caches.size() == 1 : caches.size();
it.remove();
break;
}
}
if (QueryUtils.isEnabled(cctx.config())) {
boolean qryEnabled = false;
for (int i = 0; i < caches.size(); i++) {
if (QueryUtils.isEnabled(caches.get(i).config())) {
qryEnabled = true;
break;
}
}
this.qryEnabled = qryEnabled;
}
if (cctx.isDrEnabled()) {
boolean drEnabled = false;
for (int i = 0; i < caches.size(); i++) {
if (caches.get(i).isDrEnabled()) {
drEnabled = true;
break;
}
}
this.drEnabled = drEnabled;
}
this.caches = caches;
}
/**
* @return Cache context if group contains single cache.
*/
public GridCacheContext singleCacheContext() {
List<GridCacheContext> caches = this.caches;
assert !sharedGroup() && caches.size() == 1 :
"stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() +
", caches=" + caches;
return caches.get(0);
}
/**
*
*/
public void unwindUndeploys() {
List<GridCacheContext> caches = this.caches;
for (int i = 0; i < caches.size(); i++) {
GridCacheContext cctx = caches.get(i);
cctx.deploy().unwind(cctx);
}
}
/**
* @param type Event type to check.
* @return {@code True} if given event type should be recorded.
*/
public boolean eventRecordable(int type) {
return cacheType.userCache() && ctx.gridEvents().isRecordable(type);
}
/**
* @return {@code True} if cache created by user.
*/
public boolean userCache() {
return cacheType.userCache();
}
/**
* Adds rebalancing event.
*
* @param part Partition.
* @param type Event type.
* @param discoNode Discovery node.
* @param discoType Discovery event type.
* @param discoTs Discovery event timestamp.
*/
public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType, long discoTs) {
assert discoNode != null;
assert type > 0;
assert discoType > 0;
assert discoTs > 0;
if (!eventRecordable(type))
LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
List<GridCacheContext> caches = this.caches;
for (int i = 0; i < caches.size(); i++) {
GridCacheContext cctx = caches.get(i);
if (!cctx.config().isEventsDisabled() && cctx.recordEvent(type)) {
cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
cctx.localNode(),
"Cache rebalancing event.",
type,
part,
discoNode,
discoType,
discoTs));
}
}
}
/**
* Adds partition unload event.
*
* @param part Partition.
*/
public void addUnloadEvent(int part) {
if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
LT.warn(log, "Added event without checking if event is recordable: " +
U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
List<GridCacheContext> caches = this.caches;
for (int i = 0; i < caches.size(); i++) {
GridCacheContext cctx = caches.get(i);
if (!cctx.config().isEventsDisabled())
cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
cctx.localNode(),
"Cache unloading event.",
EVT_CACHE_REBALANCE_PART_UNLOADED,
part,
null,
0,
0));
}
}
/**
* @param part Partition.
* @param key Key.
* @param evtNodeId Event node ID.
* @param type Event type.
* @param newVal New value.
* @param hasNewVal Has new value flag.
* @param oldVal Old values.
* @param hasOldVal Has old value flag.
* @param keepBinary Keep binary flag.
*/
public void addCacheEvent(
int part,
KeyCacheObject key,
UUID evtNodeId,
int type,
@Nullable CacheObject newVal,
boolean hasNewVal,
@Nullable CacheObject oldVal,
boolean hasOldVal,
boolean keepBinary
) {
List<GridCacheContext> caches = this.caches;
for (int i = 0; i < caches.size(); i++) {
GridCacheContext cctx = caches.get(i);
if (!cctx.config().isEventsDisabled())
cctx.events().addEvent(part,
key,
evtNodeId,
(IgniteUuid)null,
null,
type,
newVal,
hasNewVal,
oldVal,
hasOldVal,
null,
null,
null,
keepBinary);
}
}
/**
* @return {@code True} if contains cache with query indexing enabled.
*/
public boolean queriesEnabled() {
return qryEnabled;
}
/**
* @return {@code True} in case replication is enabled.
*/
public boolean isDrEnabled() {
return drEnabled;
}
/**
* @return Free List.
*/
public FreeList freeList() {
return freeList;
}
/**
* @return Reuse List.
*/
public ReuseList reuseList() {
return reuseList;
}
/**
* @return Cache object context.
*/
public CacheObjectContext cacheObjectContext() {
return cacheObjCtx;
}
/**
* @return Cache shared context.
*/
public GridCacheSharedContext shared() {
return ctx;
}
/**
* @return data region.
*/
public DataRegion dataRegion() {
return dataRegion;
}
/**
* @return {@code True} if local node is affinity node.
*/
public boolean affinityNode() {
return affNode;
}
/**
* @return Topology.
*/
public GridDhtPartitionTopology topology() {
if (top == null)
throw new IllegalStateException("Topology is not initialized: " + cacheOrGroupName());
return top;
}
/**
* @return {@code True} if current thread holds lock on topology.
*/
public boolean isTopologyLocked() {
if (top == null)
return false;
return top.holdsLock();
}
/**
* @return Offheap manager.
*/
public IgniteCacheOffheapManager offheap() {
return offheapMgr;
}
/**
* @return Current cache state. Must only be modified during exchange.
*/
public boolean needsRecovery() {
return needsRecovery;
}
/**
* @param needsRecovery Needs recovery flag.
*/
public void needsRecovery(boolean needsRecovery) {
this.needsRecovery = needsRecovery;
}
/**
* @return Topology version when group was started on local node.
*/
public AffinityTopologyVersion localStartVersion() {
return locStartVer;
}
/**
* @return {@code True} if cache is local.
*/
public boolean isLocal() {
return ccfg.getCacheMode() == LOCAL;
}
/**
* @return {@code True} if cache is local.
*/
public boolean isReplicated() {
return ccfg.getCacheMode() == REPLICATED;
}
/**
* @return Cache configuration.
*/
public CacheConfiguration config() {
return ccfg;
}
/**
* @return Cache node filter.
*/
public IgnitePredicate<ClusterNode> nodeFilter() {
return ccfg.getNodeFilter();
}
/**
* @return Configured user objects which should be initialized/stopped on group start/stop.
*/
Collection<?> configuredUserObjects() {
return Arrays.asList(ccfg.getAffinity(), ccfg.getNodeFilter(), ccfg.getTopologyValidator());
}
/**
* @return Configured topology validator.
*/
@Nullable public TopologyValidator topologyValidator() {
return ccfg.getTopologyValidator();
}
/**
* @return Configured affinity function.
*/
public AffinityFunction affinityFunction() {
return ccfg.getAffinity();
}
/**
* @return Affinity.
*/
public GridAffinityAssignmentCache affinity() {
return aff;
}
/**
* @return Group name or {@code null} if group name was not specified for cache.
*/
@Nullable public String name() {
return ccfg.getGroupName();
}
/**
* @return Group name if it is specified, otherwise cache name.
*/
public String cacheOrGroupName() {
return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName();
}
/**
* @return Group ID.
*/
public int groupId() {
return grpId;
}
/**
* @return {@code True} if group can contain multiple caches.
*/
public boolean sharedGroup() {
return ccfg.getGroupName() != null;
}
/**
*
*/
public void onKernalStop() {
aff.cancelFutures(new IgniteCheckedException("Failed to wait for topology update, node is stopping."));
preldr.onKernalStop();
offheapMgr.onKernalStop();
}
/**
* @param cctx Cache context.
* @param destroy Destroy data flag. Setting to <code>true</code> will remove all cache data.
*/
void stopCache(GridCacheContext cctx, boolean destroy) {
if (top != null)
top.onCacheStopped(cctx.cacheId());
offheapMgr.stopCache(cctx.cacheId(), destroy);
removeCacheContext(cctx);
}
/**
*
*/
void stopGroup() {
IgniteCheckedException err =
new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
ctx.evict().onCacheGroupStopped(this);
aff.cancelFutures(err);
preldr.onKernalStop();
offheapMgr.stop();
ctx.io().removeCacheGroupHandlers(grpId);
}
/**
* @return IDs of caches in this group.
*/
public Set<Integer> cacheIds() {
List<GridCacheContext> caches = this.caches;
Set<Integer> ids = U.newHashSet(caches.size());
for (int i = 0; i < caches.size(); i++)
ids.add(caches.get(i).cacheId());
return ids;
}
/**
* @return Caches in this group.
*/
public List<GridCacheContext> caches() {
return this.caches;
}
/**
* @return {@code True} if group contains caches.
*/
boolean hasCaches() {
List<GridCacheContext> caches = this.caches;
return !caches.isEmpty();
}
/**
* @param part Partition ID.
*/
public void onPartitionEvicted(int part) {
List<GridCacheContext> caches = this.caches;
for (int i = 0; i < caches.size(); i++) {
GridCacheContext cctx = caches.get(i);
if (cctx.isDrEnabled())
cctx.dr().partitionEvicted(part);
cctx.continuousQueries().onPartitionEvicted(part);
}
}
/**
* @param cctx Cache context.
*/
public void addCacheWithContinuousQuery(GridCacheContext cctx) {
assert sharedGroup() : cacheOrGroupName();
assert cctx.group() == this : cctx.name();
assert !cctx.isLocal() : cctx.name();
synchronized (this) {
List<GridCacheContext> contQryCaches = this.contQryCaches;
if (contQryCaches == null)
contQryCaches = new ArrayList<>();
contQryCaches.add(cctx);
this.contQryCaches = contQryCaches;
}
}
/**
* @param cctx Cache context.
*/
public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
assert sharedGroup() : cacheOrGroupName();
assert cctx.group() == this : cctx.name();
assert !cctx.isLocal() : cctx.name();
synchronized (this) {
List<GridCacheContext> contQryCaches = this.contQryCaches;
if (contQryCaches == null)
return;
contQryCaches.remove(cctx);
if (contQryCaches.isEmpty())
contQryCaches = null;
this.contQryCaches = contQryCaches;
}
}
/**
* @param cacheId ID of cache initiated counter update.
* @param part Partition number.
* @param cntr Counter.
* @param topVer Topology version for current operation.
*/
public void onPartitionCounterUpdate(int cacheId,
int part,
long cntr,
AffinityTopologyVersion topVer,
boolean primary) {
assert sharedGroup();
if (isLocal())
return;
List<GridCacheContext> contQryCaches = this.contQryCaches;
if (contQryCaches == null)
return;
CounterSkipContext skipCtx = null;
for (int i = 0; i < contQryCaches.size(); i++) {
GridCacheContext cctx = contQryCaches.get(i);
if (cacheId != cctx.cacheId())
skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr, topVer, primary);
}
final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null;
if (procC != null) {
ctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
for (Runnable c : procC)
c.run();
}
});
}
}
/**
* @return {@code True} if there is at least one cache with registered CQ exists in this group.
*/
public boolean hasContinuousQueryCaches() {
return !F.isEmpty(contQryCaches);
}
/**
* @throws IgniteCheckedException If failed.
*/
public void start() throws IgniteCheckedException {
aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
cacheOrGroupName(),
grpId,
ccfg.getAffinity(),
ccfg.getNodeFilter(),
ccfg.getBackups(),
ccfg.getCacheMode() == LOCAL,
persistenceEnabled());
if (ccfg.getCacheMode() != LOCAL) {
top = new GridDhtPartitionTopologyImpl(ctx, this);
if (!ctx.kernalContext().clientNode()) {
ctx.io().addCacheGroupHandler(groupId(), GridDhtAffinityAssignmentRequest.class,
new IgniteBiInClosure<UUID, GridDhtAffinityAssignmentRequest>() {
@Override public void apply(UUID nodeId, GridDhtAffinityAssignmentRequest msg) {
processAffinityAssignmentRequest(nodeId, msg);
}
});
}
preldr = new GridDhtPreloader(this);
preldr.start();
}
else
preldr = new GridCachePreloaderAdapter(this);
if (persistenceEnabled()) {
try {
offheapMgr = new GridCacheOffheapManager();
}
catch (Exception e) {
throw new IgniteCheckedException("Failed to initialize offheap manager", e);
}
}
else
offheapMgr = new IgniteCacheOffheapManagerImpl();
offheapMgr.start(ctx, this);
ctx.affinity().onCacheGroupCreated(this);
}
/**
* @return Persistence enabled flag.
*/
public boolean persistenceEnabled() {
return persistenceEnabled;
}
/**
* @param nodeId Node ID.
* @param req Request.
*/
private void processAffinityAssignmentRequest(final UUID nodeId,
final GridDhtAffinityAssignmentRequest req) {
if (log.isDebugEnabled())
log.debug("Processing affinity assignment request [node=" + nodeId + ", req=" + req + ']');
IgniteInternalFuture<AffinityTopologyVersion> fut = aff.readyFuture(req.topologyVersion());
if (fut != null) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
processAffinityAssignmentRequest0(nodeId, req);
}
});
}
else
processAffinityAssignmentRequest0(nodeId, req);
}
/**
* @param nodeId Node ID.
* @param req Request.
*/
private void processAffinityAssignmentRequest0(UUID nodeId, final GridDhtAffinityAssignmentRequest req) {
AffinityTopologyVersion topVer = req.topologyVersion();
if (log.isDebugEnabled())
log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
", node=" + nodeId + ']');
AffinityAssignment assignment = aff.cachedAffinity(topVer);
GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(
req.futureId(),
grpId,
topVer,
assignment.assignment());
if (aff.centralizedAffinityFunction()) {
assert assignment.idealAssignment() != null;
res.idealAffinityAssignment(assignment.idealAssignment());
}
if (req.sendPartitionsState())
res.partitionMap(top.partitionMap(true));
try {
ctx.io().send(nodeId, res, AFFINITY_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send affinity assignment response to remote node [node=" + nodeId + ']', e);
}
}
/**
* @param reconnectFut Reconnect future.
*/
public void onDisconnected(IgniteFuture reconnectFut) {
IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
"Failed to wait for topology update, client disconnected.");
if (aff != null)
aff.cancelFutures(err);
}
/**
* @return {@code True} if rebalance is enabled.
*/
public boolean rebalanceEnabled() {
return ccfg.getRebalanceMode() != NONE;
}
/**
*
*/
public void onReconnected() {
aff.onReconnected();
if (top != null)
top.onReconnected();
preldr.onReconnected();
}
/**
* @return MXBean.
*/
public CacheGroupMetricsMXBean mxBean() {
return mxBean;
}
/** {@inheritDoc} */
@Override public String toString() {
return "CacheGroupContext [grp=" + cacheOrGroupName() + ']';
}
/**
* WAL enabled flag.
*/
public boolean walEnabled() {
return localWalEnabled && globalWalEnabled;
}
/**
* Local WAL enabled flag.
*/
public boolean localWalEnabled() {
return localWalEnabled;
}
/**
* @return Global WAL enabled flag.
*/
public boolean globalWalEnabled() {
return globalWalEnabled;
}
/**
* @param enabled Global WAL enabled flag.
*/
public void globalWalEnabled(boolean enabled) {
if (globalWalEnabled != enabled) {
log.info("Global WAL state for group=" + cacheOrGroupName() +
" changed from " + globalWalEnabled + " to " + enabled);
persistGlobalWalState(enabled);
globalWalEnabled = enabled;
}
}
/**
* @param enabled Local WAL enabled flag.
*/
public void localWalEnabled(boolean enabled) {
if (localWalEnabled != enabled){
log.info("Local WAL state for group=" + cacheOrGroupName() +
" changed from " + localWalEnabled + " to " + enabled);
persistLocalWalState(enabled);
localWalEnabled = enabled;
}
}
/**
* @param enabled Enabled flag..
*/
private void persistGlobalWalState(boolean enabled) {
shared().database().walEnabled(grpId, enabled, false);
}
/**
* @param enabled Enabled flag..
*/
private void persistLocalWalState(boolean enabled) {
shared().database().walEnabled(grpId, enabled, true);
}
}