blob: 3b93244e96cb214eb167979bb85dc04fb6277546 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.Nullable;
/**
* Cache change requests to execute when receive {@link DynamicCacheChangeBatch} event.
*/
public class ExchangeActions {
/** */
private List<CacheGroupActionData> cacheGrpsToStart;
/** */
private List<CacheGroupActionData> cacheGrpsToStop;
/** */
private Map<String, CacheActionData> cachesToStart;
/**
* Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when starting
* the cache(s), the whole procedure is rolled back.
*/
private Collection<UUID> cacheStartRequiredAliveNodes;
/** */
private Map<String, CacheActionData> cachesToStop;
/** */
private Map<String, CacheActionData> cachesToResetLostParts;
/** */
private LocalJoinCachesContext locJoinCtx;
/** */
private StateChangeRequest stateChangeReq;
/**
* @param grpId Group ID.
* @return Always {@code true}, fails with assert error if inconsistent.
*/
boolean checkStopRequestConsistency(int grpId) {
Boolean destroy = null;
// Check that caches associated with that group will be all stopped only or all destroyed.
for (CacheActionData action : cacheStopRequests()) {
if (action.descriptor().groupId() == grpId) {
if (destroy == null)
destroy = action.request().destroy();
else {
assert action.request().destroy() == destroy
: "Both cache stop only and cache destroy request associated with one group in batch "
+ cacheStopRequests();
}
}
}
return true;
}
/**
* @return {@code True} if server nodes should not participate in exchange.
*/
public boolean clientOnlyExchange() {
return F.isEmpty(cachesToStart) &&
F.isEmpty(cachesToStop) &&
F.isEmpty(cacheGrpsToStart) &&
F.isEmpty(cacheGrpsToStop) &&
F.isEmpty(cachesToResetLostParts);
}
/**
* @return New caches start requests.
*/
public Collection<CacheActionData> cacheStartRequests() {
return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
}
/**
* @return Stop cache requests.
*/
public Collection<CacheActionData> cacheStopRequests() {
return cachesToStop != null ? cachesToStop.values() : Collections.emptyList();
}
/**
* @param ctx Context.
* @param err Error if any.
*/
public void completeRequestFutures(GridCacheSharedContext ctx, Throwable err) {
completeRequestFutures(cachesToStart, ctx, err);
completeRequestFutures(cachesToStop, ctx, err);
completeRequestFutures(cachesToResetLostParts, ctx, err);
}
/**
* @return {@code True} if starting system caches.
*/
public boolean systemCachesStarting() {
if (cachesToStart != null) {
for (CacheActionData data : cachesToStart.values()) {
if (CU.isSystemCache(data.request().cacheName()))
return true;
}
}
return false;
}
/**
* @param map Actions map.
* @param ctx Context.
*/
private void completeRequestFutures(
Map<String, CacheActionData> map,
GridCacheSharedContext ctx,
@Nullable Throwable err
) {
if (map != null) {
for (CacheActionData req : map.values())
ctx.cache().completeCacheStartFuture(req.req, (err == null), err);
}
}
/**
* @return {@code True} if have cache stop requests.
*/
public boolean hasStop() {
return !F.isEmpty(cachesToStop);
}
/**
* @return Caches to reset lost partitions for.
*/
public Set<String> cachesToResetLostPartitions() {
Set<String> caches = null;
if (cachesToResetLostParts != null)
caches = new HashSet<>(cachesToResetLostParts.keySet());
return caches != null ? caches : Collections.<String>emptySet();
}
/**
* @param cacheId Cache ID.
* @return {@code True} if cache stop was requested.
*/
public boolean cacheStopped(int cacheId) {
if (cachesToStop != null) {
for (CacheActionData cache : cachesToStop.values()) {
if (cache.desc.cacheId() == cacheId)
return true;
}
}
return false;
}
/**
* @param cacheId Cache ID.
* @return {@code True} if cache start was requested.
*/
public boolean cacheStarted(int cacheId) {
if (cachesToStart != null) {
for (CacheActionData cache : cachesToStart.values()) {
if (cache.desc.cacheId() == cacheId)
return true;
}
}
return false;
}
/**
* @param stateChange Cluster state change request.
*/
public void stateChangeRequest(StateChangeRequest stateChange) {
this.stateChangeReq = stateChange;
}
/**
* @return {@code True} if has deactivate request.
*/
public boolean deactivate() {
return stateChangeReq != null && stateChangeReq.activeChanged() && !stateChangeReq.state().active();
}
/**
* @return {@code True} if has activate request.
*/
public boolean activate() {
return stateChangeReq != null && stateChangeReq.activeChanged() && stateChangeReq.state().active();
}
/**
* @return {@code True} if cluster state was changed.
*/
public boolean changedClusterState() {
return stateChangeReq != null && stateChangeReq.prevState() != stateChangeReq.state();
}
/**
* @return {@code True} if has baseline topology change request.
*/
public boolean changedBaseline() {
return stateChangeReq != null && !stateChangeReq.activeChanged();
}
/**
* @return Cluster state change request.
*/
@Nullable public StateChangeRequest stateChangeRequest() {
return stateChangeReq;
}
/**
* @param map Actions map.
* @param req Request.
* @param desc Cache descriptor.
* @return Actions map.
*/
private Map<String, CacheActionData> add(Map<String, CacheActionData> map,
DynamicCacheChangeRequest req,
DynamicCacheDescriptor desc) {
assert req != null;
assert desc != null;
if (map == null)
map = new LinkedHashMap<>();
CacheActionData old = map.put(req.cacheName(), new CacheActionData(req, desc));
assert old == null : old;
return map;
}
/**
* @param req Request.
* @param desc Cache descriptor.
*/
void addCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req.start() : req;
cachesToStart = add(cachesToStart, req, desc);
}
/**
* @param req Request.
* @param desc Cache descriptor.
*/
public void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req.stop() : req;
cachesToStop = add(cachesToStop, req, desc);
}
/**
* @param req Request.
* @param desc Cache descriptor.
*/
void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req.resetLostPartitions() : req;
cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
}
/**
* @param grpDesc Group descriptor.
*/
void addCacheGroupToStart(CacheGroupDescriptor grpDesc) {
assert grpDesc != null;
if (cacheGrpsToStart == null)
cacheGrpsToStart = new ArrayList<>();
cacheGrpsToStart.add(new CacheGroupActionData(grpDesc));
}
/**
* @return Cache groups to start.
*/
public List<CacheGroupActionData> cacheGroupsToStart() {
return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupActionData>emptyList();
}
/**
* @param grpId Group ID.
* @return {@code True} if given cache group starting.
*/
public boolean cacheGroupStarting(int grpId) {
if (cacheGrpsToStart != null) {
for (CacheGroupActionData grp : cacheGrpsToStart) {
if (grp.desc.groupId() == grpId)
return true;
}
}
return false;
}
/**
* @return Server nodes on which a successful start of the cache(s) is required, if any of these nodes fails when
* starting the cache(s), the whole procedure is rolled back.
*/
public Collection<UUID> cacheStartRequiredAliveNodes() {
return cacheStartRequiredAliveNodes == null ? Collections.emptyList() : cacheStartRequiredAliveNodes;
}
/**
* @param cacheStartRequiredAliveNodes Server nodes on which a successful start of the cache(s) is required, if any
* of these nodes fails when starting the cache(s), the whole procedure is
* rolled back.
*/
public void cacheStartRequiredAliveNodes(Collection<UUID> cacheStartRequiredAliveNodes) {
this.cacheStartRequiredAliveNodes = new ArrayList<>(cacheStartRequiredAliveNodes);
}
/**
* @param grpDesc Group descriptor.
* @param destroy Destroy flag.
*/
public void addCacheGroupToStop(CacheGroupDescriptor grpDesc, boolean destroy) {
assert grpDesc != null;
if (cacheGrpsToStop == null)
cacheGrpsToStop = new ArrayList<>();
cacheGrpsToStop.add(new CacheGroupActionData(grpDesc, destroy));
}
/**
* @return Cache groups to start.
*/
public List<CacheGroupActionData> cacheGroupsToStop() {
return cacheGrpsToStop != null ? cacheGrpsToStop : Collections.<CacheGroupActionData>emptyList();
}
/**
* @param grpId Group ID.
* @return {@code True} if given cache group stopping.
*/
public boolean cacheGroupStopping(int grpId) {
if (cacheGrpsToStop != null) {
for (CacheGroupActionData grp : cacheGrpsToStop) {
if (grp.desc.groupId() == grpId)
return true;
}
}
return false;
}
/**
* @return {@code True} if there are no cache change actions.
*/
public boolean empty() {
return F.isEmpty(cachesToStart) &&
F.isEmpty(cachesToStop) &&
F.isEmpty(cacheGrpsToStart) &&
F.isEmpty(cacheGrpsToStop) &&
F.isEmpty(cachesToResetLostParts) &&
stateChangeReq == null &&
locJoinCtx == null;
}
/**
* @param locJoinCtx Caches local join context.
*/
public void localJoinContext(LocalJoinCachesContext locJoinCtx) {
this.locJoinCtx = locJoinCtx;
}
/**
* @return Caches local join context.
*/
public LocalJoinCachesContext localJoinContext() {
return locJoinCtx;
}
/**
*
*/
public static class CacheActionData {
/** */
private final DynamicCacheChangeRequest req;
/** */
private final DynamicCacheDescriptor desc;
/**
* @param req Request.
* @param desc Cache descriptor.
*/
CacheActionData(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
assert req != null;
assert desc != null;
this.req = req;
this.desc = desc;
}
/**
* @return Request.
*/
public DynamicCacheChangeRequest request() {
return req;
}
/**
* @return Cache descriptor.
*/
public DynamicCacheDescriptor descriptor() {
return desc;
}
}
/**
*
*/
public static class CacheGroupActionData {
/** Cache group descriptor. */
private final CacheGroupDescriptor desc;
/** Destroy flag. */
private final boolean destroy;
/**
* @param desc Group descriptor
* @param destroy Destroy flag
*/
CacheGroupActionData(CacheGroupDescriptor desc, boolean destroy) {
assert desc != null;
this.desc = desc;
this.destroy = destroy;
}
/**
* @param desc Group descriptor
*/
CacheGroupActionData(CacheGroupDescriptor desc) {
this(desc, false);
}
/**
* @return Group descriptor
*/
public CacheGroupDescriptor descriptor() {
return desc;
}
/**
* @return Destroy flag
*/
public boolean destroy() {
return destroy;
}
}
/** {@inheritDoc} */
@Override public String toString() {
Object startGrps = F.viewReadOnly(cacheGrpsToStart, new C1<CacheGroupActionData, String>() {
@Override public String apply(CacheGroupActionData data) {
return data.desc.cacheOrGroupName();
}
});
Object stopGrps = F.viewReadOnly(cacheGrpsToStop, new C1<CacheGroupActionData, String>() {
@Override public String apply(CacheGroupActionData data) {
return data.desc.cacheOrGroupName() + ", destroy=" + data.destroy;
}
});
return "ExchangeActions [startCaches=" + (cachesToStart != null ? cachesToStart.keySet() : null) +
", stopCaches=" + (cachesToStop != null ? cachesToStop.keySet() : null) +
", startGrps=" + startGrps +
", stopGrps=" + stopGrps +
", resetParts=" + (cachesToResetLostParts != null ? cachesToResetLostParts.keySet() : null) +
", stateChangeRequest=" + stateChangeReq + ']';
}
}