blob: 8736a88d53c667081f7f7f36ed1ad1ea64845f25 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
* Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
public class ExchangeActions {
/** */
private List<CacheGroupActionData> cacheGrpsToStart;
/** */
private List<CacheGroupActionData> cacheGrpsToStop;
/** */
private Map<String, CacheActionData> cachesToStart;
* Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when starting
* the cache(s), the whole procedure is rolled back.
private Collection<UUID> cacheStartRequiredAliveNodes;
/** */
private Map<String, CacheActionData> cachesToStop;
/** */
private Map<String, CacheActionData> cachesToResetLostParts;
/** */
private LocalJoinCachesContext locJoinCtx;
/** */
private StateChangeRequest stateChangeReq;
* @param grpId Group ID.
* @return Always {@code true}, fails with assert error if inconsistent.
boolean checkStopRequestConsistency(int grpId) {
Boolean destroy = null;
// Check that caches associated with that group will be all stopped only or all destroyed.
for (CacheActionData action : cacheStopRequests()) {
if (action.descriptor().groupId() == grpId) {
if (destroy == null)
destroy = action.request().destroy();
else {
assert action.request().destroy() == destroy
: "Both cache stop only and cache destroy request associated with one group in batch "
+ cacheStopRequests();
return true;
* @return {@code True} if server nodes should not participate in exchange.
public boolean clientOnlyExchange() {
return F.isEmpty(cachesToStart) &&
F.isEmpty(cachesToStop) &&
F.isEmpty(cacheGrpsToStart) &&
F.isEmpty(cacheGrpsToStop) &&
* @return New caches start requests.
public Collection<CacheActionData> cacheStartRequests() {
return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
* @return Stop cache requests.
public Collection<CacheActionData> cacheStopRequests() {
return cachesToStop != null ? cachesToStop.values() : Collections.emptyList();
* @param ctx Context.
* @param err Error if any.
public void completeRequestFutures(GridCacheSharedContext ctx, Throwable err) {
completeRequestFutures(cachesToStart, ctx, err);
completeRequestFutures(cachesToStop, ctx, err);
completeRequestFutures(cachesToResetLostParts, ctx, err);
* @return {@code True} if starting system caches.
public boolean systemCachesStarting() {
if (cachesToStart != null) {
for (CacheActionData data : cachesToStart.values()) {
if (CU.isSystemCache(data.request().cacheName()))
return true;
return false;
* @param map Actions map.
* @param ctx Context.
private void completeRequestFutures(
Map<String, CacheActionData> map,
GridCacheSharedContext ctx,
@Nullable Throwable err
) {
if (map != null) {
for (CacheActionData req : map.values())
ctx.cache().completeCacheStartFuture(req.req, (err == null), err);
* @return {@code True} if have cache stop requests.
public boolean hasStop() {
return !F.isEmpty(cachesToStop);
* @return Caches to reset lost partitions for.
public Set<String> cachesToResetLostPartitions() {
Set<String> caches = null;
if (cachesToResetLostParts != null)
caches = new HashSet<>(cachesToResetLostParts.keySet());
return caches != null ? caches : Collections.<String>emptySet();
* @param cacheId Cache ID.
* @return {@code True} if cache stop was requested.
public boolean cacheStopped(int cacheId) {
if (cachesToStop != null) {
for (CacheActionData cache : cachesToStop.values()) {
if (cache.desc.cacheId() == cacheId)
return true;
return false;
* @param cacheId Cache ID.
* @return {@code True} if cache start was requested.
public boolean cacheStarted(int cacheId) {
if (cachesToStart != null) {
for (CacheActionData cache : cachesToStart.values()) {
if (cache.desc.cacheId() == cacheId)
return true;
return false;
* @param stateChange Cluster state change request.
public void stateChangeRequest(StateChangeRequest stateChange) {
this.stateChangeReq = stateChange;
* @return {@code True} if has deactivate request.
public boolean deactivate() {
return stateChangeReq != null && stateChangeReq.activeChanged() && !stateChangeReq.state().active();
* @return {@code True} if has activate request.
public boolean activate() {
return stateChangeReq != null && stateChangeReq.activeChanged() && stateChangeReq.state().active();
* @return {@code True} if cluster state was changed.
public boolean changedClusterState() {
return stateChangeReq != null && stateChangeReq.prevState() != stateChangeReq.state();
* @return {@code True} if has baseline topology change request.
public boolean changedBaseline() {
return stateChangeReq != null && !stateChangeReq.activeChanged();
* @return Cluster state change request.
@Nullable public StateChangeRequest stateChangeRequest() {
return stateChangeReq;
* @param map Actions map.
* @param req Request.
* @param desc Cache descriptor.
* @return Actions map.
private Map<String, CacheActionData> add(Map<String, CacheActionData> map,
DynamicCacheChangeRequest req,
DynamicCacheDescriptor desc) {
assert req != null;
assert desc != null;
if (map == null)
map = new LinkedHashMap<>();
CacheActionData old = map.put(req.cacheName(), new CacheActionData(req, desc));
assert old == null : old;
return map;
* @param req Request.
* @param desc Cache descriptor.
void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req.start() : req;
cachesToStart = add(cachesToStart, req, desc);
* @param req Request.
* @param desc Cache descriptor.
public void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req.stop() : req;
cachesToStop = add(cachesToStop, req, desc);
* @param req Request.
* @param desc Cache descriptor.
void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req.resetLostPartitions() : req;
cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
* @param grpDesc Group descriptor.
void addCacheGroupToStart(CacheGroupDescriptor grpDesc) {
assert grpDesc != null;
if (cacheGrpsToStart == null)
cacheGrpsToStart = new ArrayList<>();
cacheGrpsToStart.add(new CacheGroupActionData(grpDesc));
* @return Cache groups to start.
public List<CacheGroupActionData> cacheGroupsToStart() {
return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupActionData>emptyList();
* @param grpId Group ID.
* @return {@code True} if given cache group starting.
public boolean cacheGroupStarting(int grpId) {
if (cacheGrpsToStart != null) {
for (CacheGroupActionData grp : cacheGrpsToStart) {
if (grp.desc.groupId() == grpId)
return true;
return false;
* @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
* starting the cache(s), the whole procedure is rolled back.
public Collection<UUID> cacheStartRequiredAliveNodes() {
return cacheStartRequiredAliveNodes == null ? Collections.emptyList() : cacheStartRequiredAliveNodes;
* @param cacheStartRequiredAliveNodes Server nodes on which a successful start of the cache(s) is required, if any
* of these nodes fails when starting the cache(s), the whole procedure is
* rolled back.
public void cacheStartRequiredAliveNodes(Collection<UUID> cacheStartRequiredAliveNodes) {
this.cacheStartRequiredAliveNodes = new ArrayList<>(cacheStartRequiredAliveNodes);
* @param grpDesc Group descriptor.
* @param destroy Destroy flag.
public void addCacheGroupToStop(CacheGroupDescriptor grpDesc, boolean destroy) {
assert grpDesc != null;
if (cacheGrpsToStop == null)
cacheGrpsToStop = new ArrayList<>();
cacheGrpsToStop.add(new CacheGroupActionData(grpDesc, destroy));
* @return Cache groups to start.
public List<CacheGroupActionData> cacheGroupsToStop() {
return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupActionData>emptyList();
* @param grpId Group ID.
* @return {@code True} if given cache group stopping.
public boolean cacheGroupStopping(int grpId) {
if (cacheGrpsToStop != null) {
for (CacheGroupActionData grp : cacheGrpsToStop) {
if (grp.desc.groupId() == grpId)
return true;
return false;
* @return {@code True} if there are no cache change actions.
public boolean empty() {
return F.isEmpty(cachesToStart) &&
F.isEmpty(cachesToStop) &&
F.isEmpty(cacheGrpsToStart) &&
F.isEmpty(cacheGrpsToStop) &&
F.isEmpty(cachesToResetLostParts) &&
stateChangeReq == null &&
locJoinCtx == null;
* @param locJoinCtx Caches local join context.
public void localJoinContext(LocalJoinCachesContext locJoinCtx) {
this.locJoinCtx = locJoinCtx;
* @return Caches local join context.
public LocalJoinCachesContext localJoinContext() {
return locJoinCtx;
public static class CacheActionData {
/** */
private final DynamicCacheChangeRequest req;
/** */
private final DynamicCacheDescriptor desc;
* @param req Request.
* @param desc Cache descriptor.
CacheActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req != null;
assert desc != null;
this.req = req;
this.desc = desc;
* @return Request.
public DynamicCacheChangeRequest request() {
return req;
* @return Cache descriptor.
public DynamicCacheDescriptor descriptor() {
return desc;
public static class CacheGroupActionData {
/** */
private final CacheGroupDescriptor desc;
/** */
private final boolean destroy;
* @param desc Group descriptor
* @param destroy Destroy flag
CacheGroupActionData(CacheGroupDescriptor desc, boolean destroy) {
assert desc != null;
this.desc = desc;
this.destroy = destroy;
* @param desc Group descriptor
CacheGroupActionData(CacheGroupDescriptor desc) {
this(desc, false);
* @return Group descriptor
public CacheGroupDescriptor descriptor() {
return desc;
* @return Destroy flag
public boolean destroy() {
return destroy;
/** {@inheritDoc} */
@Override public String toString() {
Object startGrps = F.viewReadOnly(cacheGrpsToStart, new C1<CacheGroupActionData, String>() {
@Override public String apply(CacheGroupActionData data) {
return data.desc.cacheOrGroupName();
Object stopGrps = F.viewReadOnly(cacheGrpsToStop, new C1<CacheGroupActionData, String>() {
@Override public String apply(CacheGroupActionData data) {
return data.desc.cacheOrGroupName() + ", destroy=" + data.destroy;
return "ExchangeActions [startCaches=" + (cachesToStart != null ? cachesToStart.keySet() : null) +
", stopCaches=" + (cachesToStop != null ? cachesToStop.keySet() : null) +
", startGrps=" + startGrps +
", stopGrps=" + stopGrps +
", resetParts=" + (cachesToResetLostParts != null ? cachesToResetLostParts.keySet() : null) +
", stateChangeRequest=" + stateChangeReq + ']';