blob: 2944c2f7ceb39851c270080ee713b49a3e76308e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.GridCachePluginContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.systemview.walker.CacheGroupViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.CacheViewWalker;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.lang.GridFunc;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.systemview.view.CacheGroupView;
import org.apache.ignite.spi.systemview.view.CacheView;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
/**
* Logic related to cache discovery data processing.
*/
public class ClusterCachesInfo {
/** */
public static final String CACHES_VIEW = "caches";
/** */
public static final String CACHES_VIEW_DESC = "Caches";
/** */
public static final String CACHE_GRPS_VIEW = "cacheGroups";
/** */
public static final String CACHE_GRPS_VIEW_DESC = "Cache groups";
/** Representation of null for restarting caches map */
private static final IgniteUuid NULL_OBJECT = new IgniteUuid();
/** Version since which merge of config is supports. */
private static final IgniteProductVersion V_MERGE_CONFIG_SINCE = IgniteProductVersion.fromString("2.5.0");
/** */
private final GridKernalContext ctx;
/**
* Map contains cache descriptors that were removed from {@link #registeredCaches} due to cache stop request.
* Such descriptors will be removed from the map only after whole cache stop process is finished.
*/
private final ConcurrentNavigableMap<AffinityTopologyVersion, Map<String, DynamicCacheDescriptor>>
markedForDeletionCaches = new ConcurrentSkipListMap<>();
/**
* Map contains cache group descriptors that were removed from {@link #registeredCacheGrps} due to cache stop request.
* Such descriptors will be removed from the map only after whole cache stop process is finished.
* Affinity topology version equals the version which will be applied after a cache group is completely removed.
*/
private final ConcurrentNavigableMap<AffinityTopologyVersion, Map<Integer, CacheGroupDescriptor>>
markedForDeletionCacheGrps = new ConcurrentSkipListMap<>();
/** Dynamic caches. */
private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
/** Mapping of caches by cache ID */
private final ConcurrentMap<Integer, DynamicCacheDescriptor> registeredCachesById = new ConcurrentHashMap<>();
/** */
private final ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGrps = new ConcurrentHashMap<>();
/** Cache templates. */
private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
/** Caches currently being restarted (with restarter id). */
private final ConcurrentHashMap<String, IgniteUuid> restartingCaches = new ConcurrentHashMap<>();
/** */
private final IgniteLogger log;
/** */
private CachesOnDisconnect cachesOnDisconnect;
/** Local cache info */
private CacheJoinNodeDiscoveryData joinDiscoData;
/** Cluster cache info */
private GridData gridData;
/** */
private LocalJoinCachesContext locJoinCachesCtx;
/** */
private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation = Collections.emptyMap();
/** */
private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs;
/** {@code True} if joined cluster while cluster state change was in progress. */
private boolean joinOnTransition;
/** Flag that caches were already filtered out. */
private final AtomicBoolean alreadyFiltered = new AtomicBoolean();
/**
* @param ctx Context.
*/
public ClusterCachesInfo(GridKernalContext ctx) {
this.ctx = ctx;
ctx.systemView().registerView(CACHES_VIEW, CACHES_VIEW_DESC,
new CacheViewWalker(),
registeredCaches.values(),
(desc) -> new CacheView(desc, ctx));
ctx.systemView().registerView(CACHE_GRPS_VIEW, CACHE_GRPS_VIEW_DESC,
new CacheGroupViewWalker(),
registeredCacheGrps.values(),
CacheGroupView::new);
log = ctx.log(getClass());
}
/**
* Filters all dynamic cache descriptors and groups that were not presented on node start
* and were received with grid discovery data.
*
* @param localCachesOnStart Caches which were already presented on node start.
*/
public void filterDynamicCacheDescriptors(Set<String> localCachesOnStart) {
if (ctx.isDaemon())
return;
if (!alreadyFiltered.compareAndSet(false, true))
return;
filterRegisteredCachesAndCacheGroups(localCachesOnStart);
List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = locJoinCachesCtx.caches();
filterLocalJoinStartCaches(locJoinStartCaches);
List<DynamicCacheDescriptor> initCaches = locJoinCachesCtx.initCaches();
filterInitCaches(initCaches);
locJoinCachesCtx = new LocalJoinCachesContext(
locJoinStartCaches,
initCaches,
registeredCacheGrps,
registeredCaches
);
}
/**
* Filters from registered caches all caches that are not presented in node's local configuration.
*
* Then filters from registered cache groups all groups that became empty after registered caches were filtered.
*
* @param locCaches Caches from local node configuration (static configuration and persistent caches).
*/
private void filterRegisteredCachesAndCacheGroups(Set<String> locCaches) {
//filter registered caches
Iterator<Map.Entry<String, DynamicCacheDescriptor>> cachesIter = registeredCaches.entrySet().iterator();
while (cachesIter.hasNext()) {
Map.Entry<String, DynamicCacheDescriptor> e = cachesIter.next();
if (!locCaches.contains(e.getKey())) {
cachesIter.remove();
registeredCachesById.remove(e.getValue());
ctx.discovery().removeCacheFilter(e.getKey());
}
}
//filter registered cache groups
Iterator<Map.Entry<Integer, CacheGroupDescriptor>> grpsIter = registeredCacheGrps.entrySet().iterator();
while (grpsIter.hasNext()) {
Map.Entry<Integer, CacheGroupDescriptor> e = grpsIter.next();
boolean removeGrp = true;
for (DynamicCacheDescriptor cacheDescr : registeredCaches.values()) {
if (cacheDescr.groupId() == e.getKey()) {
removeGrp = false;
break;
}
}
if (removeGrp) {
grpsIter.remove();
ctx.discovery().removeCacheGroup(e.getValue());
}
}
}
/**
* Filters from local join context cache descriptors that should be started on join.
*
* @param locJoinStartCaches Collection to filter.
*/
private void filterLocalJoinStartCaches(
List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches) {
locJoinStartCaches.removeIf(next -> !registeredCaches.containsKey(next.getKey().cacheName()));
}
/**
* Filters from local join context cache descriptors that require init of query infrastructure without cache
* start.
*
* @param initCaches Collection to filter.
*/
private void filterInitCaches(List<DynamicCacheDescriptor> initCaches) {
initCaches.removeIf(desc -> !registeredCaches.containsKey(desc.cacheName()));
}
/**
* @param joinDiscoData Information about configured caches and templates.
* @throws IgniteCheckedException If configuration validation failed.
*/
public void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException {
this.joinDiscoData = joinDiscoData;
Map<String, CacheConfiguration> grpCfgs = new HashMap<>();
for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) {
if (info.cacheData().config().getGroupName() == null)
continue;
CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName());
if (ccfg == null)
grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config());
else
validateCacheGroupConfiguration(ccfg, info.cacheData().config());
}
String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true);
if (conflictErr != null)
throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr);
}
/**
* @param checkConsistency {@code True} if need check cache configurations consistency.
* @throws IgniteCheckedException If failed.
*/
public void onKernalStart(boolean checkConsistency) throws IgniteCheckedException {
if (gridData != null && gridData.conflictErr != null)
throw new IgniteCheckedException(gridData.conflictErr);
if (gridData != null && gridData.joinDiscoData != null) {
CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData;
for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) {
CacheConfiguration locCfg = locCacheInfo.cacheData().config();
CacheData cacheData = gridData.gridData.caches().get(locCfg.getName());
if (cacheData != null) {
if (!F.eq(cacheData.sql(), locCacheInfo.sql())) {
throw new IgniteCheckedException("Cache configuration mismatch (local cache was created " +
"via " + (locCacheInfo.sql() ? "CREATE TABLE" : "Ignite API") + ", while remote cache " +
"was created via " + (cacheData.sql() ? "CREATE TABLE" : "Ignite API") + "): " +
locCacheInfo.cacheData().config().getName());
}
if (checkConsistency) {
checkCache(locCacheInfo, cacheData, cacheData.receivedFrom());
ClusterNode rmt = ctx.discovery().node(cacheData.receivedFrom());
if (rmt == null) {
for (ClusterNode node : ctx.discovery().localJoin().discoCache().serverNodes()) {
if (!node.isLocal() && ctx.discovery().cacheAffinityNode(node, locCfg.getName())) {
rmt = node;
break;
}
}
}
if (rmt != null) {
for (PluginProvider p : ctx.plugins().allProviders()) {
CachePluginContext pluginCtx = new GridCachePluginContext(ctx, locCfg);
CachePluginProvider provider = p.createCacheProvider(pluginCtx);
if (provider != null)
provider.validateRemote(locCfg, cacheData.cacheConfiguration(), rmt);
}
}
}
}
if (checkConsistency)
validateStartCacheConfiguration(locCfg);
}
}
gridData = null;
}
/**
* Checks that remote caches has configuration compatible with the local.
*
* @param locInfo Local configuration.
* @param rmtData Remote configuration.
* @param rmt Remote node.
* @throws IgniteCheckedException If check failed.
*/
@SuppressWarnings("unchecked")
private void checkCache(CacheJoinNodeDiscoveryData.CacheInfo locInfo, CacheData rmtData, UUID rmt)
throws IgniteCheckedException {
GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtData.cacheConfiguration(), rmtData.cacheConfigurationEnrichment());
GridCacheAttributes locAttr =
new GridCacheAttributes(locInfo.cacheData().config(), locInfo.cacheData().cacheConfigurationEnrichment());
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
locAttr.cacheMode(), rmtAttr.cacheMode(), true);
CU.checkAttributeMismatch(log, rmtAttr.groupName(), rmt, "groupName", "Cache group name",
locAttr.groupName(), rmtAttr.groupName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "interceptor", "Cache Interceptor",
locAttr.interceptorClassName(), rmtAttr.interceptorClassName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "atomicityMode",
"Cache atomicity mode", locAttr.atomicityMode(), rmtAttr.atomicityMode(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
"Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "topologyValidator",
"Cache topology validator", locAttr.topologyValidatorClassName(), rmtAttr.topologyValidatorClassName(), true);
ClusterNode rmtNode = ctx.discovery().node(rmt);
if (CU.affinityNode(ctx.discovery().localNode(), locInfo.cacheData().config().getNodeFilter())
&& rmtNode != null && CU.affinityNode(rmtNode, rmtData.cacheConfiguration().getNodeFilter())) {
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
}
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinity", "Cache affinity",
locAttr.cacheAffinityClassName(), rmtAttr.cacheAffinityClassName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheAffinityMapper",
"Cache affinity mapper", locAttr.cacheAffinityMapperClassName(),
rmtAttr.cacheAffinityMapperClassName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityPartitionsCount",
"Affinity partitions count", locAttr.affinityPartitionsCount(),
rmtAttr.affinityPartitionsCount(), true);
CU.validateKeyConfigiration(rmtAttr.groupName(), rmtAttr.cacheName(), rmt, rmtAttr.configuration().getKeyConfiguration(),
locAttr.configuration().getKeyConfiguration(), log, true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionFilter", "Eviction filter",
locAttr.evictionFilterClassName(), rmtAttr.evictionFilterClassName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicy", "Eviction policy",
locAttr.evictionPolicyClassName(), rmtAttr.evictionPolicyClassName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "evictionPolicyFactory", "Eviction policy factory",
locAttr.evictionPolicyFactoryClassName(), rmtAttr.evictionPolicyFactoryClassName(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "transactionManagerLookup",
"Transaction manager lookup", locAttr.transactionManagerLookupClassName(),
rmtAttr.transactionManagerLookupClassName(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "defaultLockTimeout",
"Default lock timeout", locAttr.defaultLockTimeout(), rmtAttr.defaultLockTimeout(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "preloadBatchSize",
"Preload batch size", locAttr.rebalanceBatchSize(), rmtAttr.rebalanceBatchSize(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "rebalanceDelay",
"Rebalance delay", locAttr.rebalanceDelay(), rmtAttr.rebalanceDelay(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "rebalanceBatchesPrefetchCount",
"Rebalance batches prefetch count", locAttr.rebalanceBatchesPrefetchCount(),
rmtAttr.rebalanceBatchesPrefetchCount(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "rebalanceOrder",
"Rebalance order", locAttr.rebalanceOrder(), rmtAttr.rebalanceOrder(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "rebalanceThrottle",
"Rebalance throttle", locAttr.rebalanceThrottle(), rmtAttr.rebalanceThrottle(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "rebalanceTimeout",
"Rebalance timeout", locAttr.rebalanceTimeout(), rmtAttr.rebalanceTimeout(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeSynchronizationMode",
"Write synchronization mode", locAttr.writeSynchronization(), rmtAttr.writeSynchronization(),
true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindBatchSize",
"Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(),
false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindCoalescing",
"Write behind coalescing", locAttr.writeBehindCoalescing(), rmtAttr.writeBehindCoalescing(),
false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled",
"Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushFrequency",
"Write behind flush frequency", locAttr.writeBehindFlushFrequency(),
rmtAttr.writeBehindFlushFrequency(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushSize",
"Write behind flush size", locAttr.writeBehindFlushSize(), rmtAttr.writeBehindFlushSize(),
false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindFlushThreadCount",
"Write behind flush thread count", locAttr.writeBehindFlushThreadCount(),
rmtAttr.writeBehindFlushThreadCount(), false);
if (locAttr.cacheMode() == PARTITIONED) {
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicy",
"Near eviction policy", locAttr.nearEvictionPolicyClassName(),
rmtAttr.nearEvictionPolicyClassName(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "nearEvictionPolicyFactory",
"Near eviction policy factory", locAttr.nearEvictionPolicyFactoryClassName(),
rmtAttr.nearEvictionPolicyFactoryClassName(), false);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityIncludeNeighbors",
"Affinity include neighbors", locAttr.affinityIncludeNeighbors(),
rmtAttr.affinityIncludeNeighbors(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "affinityKeyBackups",
"Affinity key backups", locAttr.affinityKeyBackups(),
rmtAttr.affinityKeyBackups(), true);
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "qryParallelism",
"Query parallelism", locAttr.qryParallelism(), rmtAttr.qryParallelism(), true);
}
CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "isEncryptionEnabled",
"Cache encrypted", locAttr.isEncryptionEnabled(), rmtAttr.isEncryptionEnabled(), true);
}
/**
* @param msg Message.
* @param node Node sent message.
*/
public void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) {
Map<Integer, Boolean> startedCaches = msg.startedCaches();
if (startedCaches != null) {
for (Map.Entry<Integer, Boolean> e : startedCaches.entrySet()) {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (e.getKey().equals(desc.cacheId())) {
ctx.discovery().addClientNode(desc.cacheName(), node.id(), e.getValue());
break;
}
}
}
}
Set<Integer> closedCaches = msg.closedCaches();
if (closedCaches != null) {
for (Integer cacheId : closedCaches) {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (cacheId.equals(desc.cacheId())) {
ctx.discovery().onClientCacheClose(desc.cacheName(), node.id());
break;
}
}
}
}
}
/**
* Creates exchanges actions. Forms a list of caches and cache groups to be stopped
* due to dynamic cache start failure.
*
* @param failMsg Dynamic change request fail message.
* @param topVer Current topology version.
*/
public void onCacheChangeRequested(DynamicCacheChangeFailureMessage failMsg, AffinityTopologyVersion topVer) {
AffinityTopologyVersion actualTopVer = failMsg.exchangeId().topologyVersion();
ExchangeActions exchangeActions = new ExchangeActions();
List<T2<DynamicCacheChangeRequest, DynamicCacheDescriptor>> descs = new ArrayList<>(failMsg.cacheNames().size());
for (String cacheName : failMsg.cacheNames()) {
DynamicCacheDescriptor cacheDescr = null;
// The required cache desriptor may have already been deleted in the following scenario:
// 1 - DynamicCacheChangeBatch(start a new user cache with name "A")
// 2 - At the same, the user initiates destroing the cache which should be started on the step 1.
// 3 - The corresponding exchange (see step 1) results in an error, and so, the start should be rolled back.
// 4 - When DynamicCacheChangeFailureMessage received the reqired cache descriptor is already deleted
// and therefore, we need to scan markedForDeletion collections in order to fins the right descriptor.
// Find the "earliest" available descriptor.
for (Map<String, DynamicCacheDescriptor> descriptors : markedForDeletionCaches.tailMap(actualTopVer).values()) {
cacheDescr = descriptors.get(cacheName);
if (cacheDescr != null)
break;
}
if (cacheDescr == null)
cacheDescr = registeredCaches.get(cacheName);
assert cacheDescr != null : "Dynamic cache descriptor is missing [cacheName=" + cacheName + "]";
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, cacheDescr.sql(), true);
descs.add(new T2<>(req, cacheDescr));
}
CacheChangeProcessResult res = new CacheChangeProcessResult();
for (T2<DynamicCacheChangeRequest, DynamicCacheDescriptor> desc : descs) {
DynamicCacheChangeRequest req = desc.get1();
DynamicCacheDescriptor cacheDesc = desc.get2();
processStopCacheRequest(exchangeActions, req, res, req.cacheName(), cacheDesc, actualTopVer, true);
}
failMsg.exchangeActions(exchangeActions);
}
/**
* @param batch Cache change request.
* @param topVer Current topology version.
* @return {@code True} if minor topology version should be increased.
*/
public boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) {
DiscoveryDataClusterState state = ctx.state().clusterState();
if (state.active() && !state.transition()) {
Set<IgniteUuid> restartIds = new HashSet<>(F.viewReadOnly(
batch.requests(), DynamicCacheChangeRequest::restartId, req -> req.start() && req.restartId() != null));
assert restartIds.size() <= 1 : batch.requests();
Collection<UUID> nodes = ctx.cache().context().snapshotMgr().cacheStartRequiredAliveNodes(F.first(restartIds));
for (UUID nodeId : nodes) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null && CU.baselineNode(node, state) && ctx.discovery().alive(node))
continue;
ClusterTopologyCheckedException err =
new ClusterTopologyCheckedException("Required node has left the cluster [nodeId=" + nodeId + ']');
for (DynamicCacheChangeRequest req : batch.requests())
ctx.cache().completeCacheStartFuture(req, false, err);
return false;
}
ExchangeActions exchangeActions = new ExchangeActions();
CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions,
batch.requests(),
topVer,
false);
if (res.needExchange) {
assert !exchangeActions.empty() : exchangeActions;
batch.exchangeActions(exchangeActions);
if (!nodes.isEmpty())
exchangeActions.cacheStartRequiredAliveNodes(nodes);
}
return res.needExchange;
}
else {
IgniteCheckedException err = new IgniteCheckedException("Failed to start/stop cache, cluster state change " +
"is in progress.");
for (DynamicCacheChangeRequest req : batch.requests()) {
if (req.template()) {
ctx.cache().completeTemplateAddFuture(req.startCacheConfiguration().getName(),
req.deploymentId());
}
else
ctx.cache().completeCacheStartFuture(req, false, err);
}
return false;
}
}
/**
* @param exchangeActions Exchange actions to update.
* @param reqs Requests.
* @param topVer Current topology version.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @return Process result.
*/
private CacheChangeProcessResult processCacheChangeRequests(
ExchangeActions exchangeActions,
Collection<DynamicCacheChangeRequest> reqs,
AffinityTopologyVersion topVer,
boolean persistedCfgs
) {
CacheChangeProcessResult res = new CacheChangeProcessResult();
final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
for (DynamicCacheChangeRequest req : reqs)
processCacheChangeRequest0(req, exchangeActions, topVer, persistedCfgs, res, reqsToComplete);
if (!F.isEmpty(res.addedDescs)) {
AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer;
for (DynamicCacheDescriptor desc : res.addedDescs) {
assert desc.template() || res.needExchange;
desc.startTopologyVersion(startTopVer);
}
}
if (!F.isEmpty(reqsToComplete)) {
ctx.closure().callLocalSafe(new GridPlainCallable<Void>() {
@Override public Void call() {
for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t : reqsToComplete) {
final DynamicCacheChangeRequest req = t.get1();
AffinityTopologyVersion waitTopVer = t.get2();
IgniteInternalFuture<?> fut = waitTopVer != null ?
ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
if (fut == null || fut.isDone())
ctx.cache().completeCacheStartFuture(req, false, null);
else {
fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
ctx.cache().completeCacheStartFuture(req, false, null);
}
});
}
}
return null;
}
});
}
return res;
}
/**
* @param req Cache change request.
* @param exchangeActions Exchange actions to update.
* @param topVer Current topology version.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param res Accumulator for cache change process results.
* @param reqsToComplete Accumulator for cache change requests which should be completed after
* ({@link org.apache.ignite.internal.processors.cache.GridCacheProcessor#pendingFuts}
*/
private void processCacheChangeRequest0(
DynamicCacheChangeRequest req,
ExchangeActions exchangeActions,
AffinityTopologyVersion topVer,
boolean persistedCfgs,
CacheChangeProcessResult res,
List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete
) {
String cacheName = req.cacheName();
if (req.template()) {
processTemplateAddRequest(persistedCfgs, res, req);
return;
}
assert !req.clientStartOnly() : req;
DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
boolean needExchange = false;
boolean clientCacheStart = false;
AffinityTopologyVersion waitTopVer = null;
if (req.start()) {
boolean proceedFuther = true;
if (restartingCaches.containsKey(cacheName) &&
((req.restartId() == null && restartingCaches.get(cacheName) != NULL_OBJECT)
|| (req.restartId() != null && !req.restartId().equals(restartingCaches.get(cacheName))))) {
if (req.failIfExists()) {
ctx.cache().completeCacheStartFuture(req, false,
new CacheExistsException("Failed to start cache (a cache is restarting): " + cacheName));
}
proceedFuther = false;
}
if (proceedFuther) {
if (desc == null) { /* Starting a new cache.*/
if (!processStartNewCacheRequest(exchangeActions, topVer, persistedCfgs, res, req, cacheName))
return;
needExchange = true;
}
else {
clientCacheStart = processStartAlreadyStartedCacheRequest(topVer, persistedCfgs, req, cacheName, desc);
if (!clientCacheStart) {
if (desc.clientCacheStartVersion() != null)
waitTopVer = desc.clientCacheStartVersion();
else {
AffinityTopologyVersion nodeStartVer =
new AffinityTopologyVersion(ctx.discovery().localNode().order(), 0);
if (desc.startTopologyVersion() != null)
waitTopVer = desc.startTopologyVersion();
else
waitTopVer = desc.receivedFromStartVersion();
if (waitTopVer == null || nodeStartVer.compareTo(waitTopVer) > 0)
waitTopVer = nodeStartVer;
}
}
}
}
}
else if (req.resetLostPartitions()) {
if (desc != null) {
needExchange = true;
exchangeActions.addCacheToResetLostPartitions(req, desc);
}
}
else if (req.stop()) {
if (desc != null) {
if (req.sql() && !desc.sql()) {
ctx.cache().completeCacheStartFuture(req, false,
new IgniteCheckedException("Only cache created with CREATE TABLE may be removed with " +
"DROP TABLE [cacheName=" + cacheName + ']'));
return;
}
if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc, topVer.nextMinorVersion(), false))
return;
needExchange = true;
}
}
else
assert false : req;
if (!needExchange) {
if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId()))
reqsToComplete.add(new T2<>(req, waitTopVer));
}
else
res.needExchange = true;
}
/**
* @param req Cache change request.
* @param exchangeActions Exchange actions to update.
* @param cacheName Cache name.
* @param desc Dynamic cache descriptor.
* @param topVer Topology version that will be applied after the corresponding partition map exchange.
* @param checkForAlreadyDeleted {@code true} indicates that the given cache descriptor may have already been deleted.
* This is possible in the case when the {@code DynamicCacheChangeFailureMessage}
* is received after the {@code DynamicCacheChangeRequest},
* which is responsible for stopping the cache.
* @return {@code true} if stop request can be proceed.
*/
private boolean processStopCacheRequest(
ExchangeActions exchangeActions,
DynamicCacheChangeRequest req,
CacheChangeProcessResult res,
String cacheName,
DynamicCacheDescriptor desc,
AffinityTopologyVersion topVer,
boolean checkForAlreadyDeleted
) {
if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
IgniteCheckedException err = new IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
U.warn(log, err);
res.errs.add(err);
ctx.cache().completeCacheStartFuture(req, false, err);
return false;
}
boolean alreadyRemovedDesc = false;
DynamicCacheDescriptor old = null;
if (checkForAlreadyDeleted) {
// Find the "earliest" available descriptor.
for (Map<String, DynamicCacheDescriptor> descriptors : markedForDeletionCaches.tailMap(topVer).values()) {
old = descriptors.get(cacheName);
if (old != null)
break;
}
alreadyRemovedDesc = old != null;
}
if (old == null) {
old = registeredCaches.get(cacheName);
markedForDeletionCaches
.computeIfAbsent(topVer, map -> new ConcurrentHashMap<>())
.put(cacheName, old);
registeredCaches.remove(cacheName);
ctx.discovery().removeCacheFilter(cacheName);
alreadyRemovedDesc = false;
}
assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
registeredCachesById.remove(old.cacheId());
if (req.restart()) {
IgniteUuid restartId = req.restartId();
restartingCaches.put(cacheName, restartId == null ? NULL_OBJECT : restartId);
}
exchangeActions.addCacheToStop(req, desc);
boolean alreadyRemovedGrp = false;
CacheGroupDescriptor grpDesc = null;
if (checkForAlreadyDeleted) {
// Find the "earliest" available descriptor.
for (Map<Integer, CacheGroupDescriptor> descriptors : markedForDeletionCacheGrps.tailMap(topVer).values()) {
grpDesc = descriptors.get(desc.groupId());
if (grpDesc != null)
break;
}
alreadyRemovedGrp = grpDesc != null;
}
if (grpDesc == null) {
grpDesc = registeredCacheGrps.get(desc.groupId());
alreadyRemovedGrp = false;
}
assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc;
if (!alreadyRemovedDesc)
grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
if (!grpDesc.hasCaches()) {
if (!alreadyRemovedGrp) {
markedForDeletionCacheGrps
.computeIfAbsent(topVer, (map) -> new ConcurrentHashMap<>())
.put(grpDesc.groupId(), grpDesc);
ctx.discovery().removeCacheGroup(grpDesc);
}
registeredCacheGrps.remove(grpDesc.groupId());
exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId());
}
return true;
}
/**
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param res Accumulator for cache change process results.
* @param req Dynamic cache change request.
*/
private void processTemplateAddRequest(
boolean persistedCfgs,
CacheChangeProcessResult res,
DynamicCacheChangeRequest req
) {
CacheConfiguration ccfg = req.startCacheConfiguration();
assert ccfg != null : req;
DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
if (desc == null) {
DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
ccfg,
req.cacheType(),
null,
true,
req.initiatingNodeId(),
false,
false,
req.deploymentId(),
req.schema(),
req.cacheConfigurationEnrichment()
);
DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
assert old == null;
res.addedDescs.add(templateDesc);
}
if (!persistedCfgs)
ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
}
/**
* @param topVer Topology version.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param req Cache change request.
* @param cacheName Cache name.
* @param desc Dynamic cache descriptor.
* @return True if it is needed to start client cache.
*/
private boolean processStartAlreadyStartedCacheRequest(
AffinityTopologyVersion topVer,
boolean persistedCfgs,
DynamicCacheChangeRequest req,
String cacheName,
DynamicCacheDescriptor desc
) {
assert !persistedCfgs;
assert req.initiatingNodeId() != null : req;
if (req.failIfExists()) {
ctx.cache().completeCacheStartFuture(req, false,
new CacheExistsException("Failed to start cache " +
"(a cache with the same name is already started): " + cacheName));
}
else {
// Cache already exists, it is possible client cache is needed.
ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
boolean clientReq = node != null &&
!ctx.discovery().cacheAffinityNode(node, cacheName);
if (clientReq) {
ctx.discovery().addClientNode(cacheName,
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
if (node.id().equals(req.initiatingNodeId())) {
desc.clientCacheStartVersion(topVer);
ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(cacheName, req), null);
return true;
}
}
}
return false;
}
/**
* @param exchangeActions Exchange actions to update.
* @param topVer Topology version.
* @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
* @param res Accumulator for cache change process results.
* @param req Cache change request.
* @param cacheName Cache name.
* @return True if there was no errors.
*/
private boolean processStartNewCacheRequest(
ExchangeActions exchangeActions,
AffinityTopologyVersion topVer,
boolean persistedCfgs,
CacheChangeProcessResult res,
DynamicCacheChangeRequest req,
String cacheName
) {
assert exchangeActions != null;
CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
IgniteCheckedException err = null;
if (ctx.cache().context().readOnlyMode()) {
err = new IgniteClusterReadOnlyException(
String.format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT, "start cache", ccfg.getGroupName(), cacheName)
);
}
if (err == null) {
String conflictErr = checkCacheConflict(req.startCacheConfiguration());
if (conflictErr != null) {
U.warn(log, "Ignore cache start request. " + conflictErr);
err = new IgniteCheckedException("Failed to start cache. " + conflictErr);
}
}
if (err == null)
err = QueryUtils.checkQueryEntityConflicts(req.startCacheConfiguration(), registeredCaches.values());
if (err == null) {
GridEncryptionManager encMgr = ctx.encryption();
if (ccfg.isEncryptionEnabled()) {
if (encMgr.isMasterKeyChangeInProgress())
err = new IgniteCheckedException("Cache start failed. Master key change is in progress.");
else if (encMgr.masterKeyDigest() != null &&
!Arrays.equals(encMgr.masterKeyDigest(), req.masterKeyDigest())) {
err = new IgniteCheckedException("Cache start failed. The request was initiated before " +
"the master key change and can't be processed.");
}
if (err != null)
U.warn(log, "Ignore cache start request during the master key change process.", err);
}
}
if (err == null && req.restartId() == null) {
IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
if (snapshotMgr.isRestoring(ccfg)) {
err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " +
"currently being restored from a snapshot [cache=" + cacheName +
(ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']');
}
}
if (err != null) {
if (persistedCfgs)
res.errs.add(err);
else
ctx.cache().completeCacheStartFuture(req, false, err);
return false;
}
assert req.cacheType() != null : req;
assert F.eq(ccfg.getName(), cacheName) : req;
int cacheId = CU.cacheId(cacheName);
CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions,
topVer,
ccfg,
cacheId,
req.initiatingNodeId(),
req.deploymentId(),
req.encryptionKey(),
req.encryptionKeyId(),
req.cacheConfigurationEnrichment()
);
DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
ccfg,
req.cacheType(),
grpDesc,
false,
req.initiatingNodeId(),
false,
req.sql(),
req.deploymentId(),
req.schema(),
req.cacheConfigurationEnrichment()
);
DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
registeredCachesById.put(startDesc.cacheId(), startDesc);
restartingCaches.remove(ccfg.getName());
assert old == null;
ctx.discovery().setCacheFilter(
startDesc.cacheId(),
grpDesc.groupId(),
ccfg.getName(),
ccfg.getNearConfiguration() != null);
if (!persistedCfgs) {
ctx.discovery().addClientNode(cacheName,
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
}
res.addedDescs.add(startDesc);
exchangeActions.addCacheToStart(req, startDesc);
return true;
}
/**
* @param dataBag Discovery data bag.
*/
void collectJoiningNodeData(DiscoveryDataBag dataBag) {
if (!ctx.isDaemon())
dataBag.addJoiningNodeData(CACHE_PROC.ordinal(), joinDiscoveryData());
}
/**
* @return {@code True} if there are currently restarting caches.
*/
boolean hasRestartingCaches() {
return !F.isEmpty(restartingCaches);
}
/**
* @return Collection of currently restarting caches.
*/
Collection<String> restartingCaches() {
return restartingCaches.keySet();
}
/**
* @return Discovery date sent on local node join.
*/
private Serializable joinDiscoveryData() {
if (cachesOnDisconnect != null) {
Map<Integer, CacheClientReconnectDiscoveryData.CacheGroupInfo> cacheGrpsInfo = new HashMap<>();
Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>();
Map<Integer, CacheGroupDescriptor> grps = cachesOnDisconnect.cacheGrps;
Map<String, DynamicCacheDescriptor> caches = cachesOnDisconnect.caches;
for (CacheGroupContext grp : ctx.cache().cacheGroups()) {
CacheGroupDescriptor desc = grps.get(grp.groupId());
assert desc != null : grp.cacheOrGroupName();
cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(),
desc.deploymentId(),
0));
}
for (IgniteInternalCache cache : ctx.cache().caches()) {
DynamicCacheDescriptor desc = caches.get(cache.name());
assert desc != null : cache.name();
cachesInfo.put(cache.name(), new CacheClientReconnectDiscoveryData.CacheInfo(desc.cacheConfiguration(),
desc.cacheType(),
desc.deploymentId(),
cache.context().isNear(),
0));
}
return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo);
}
else {
assert ctx.config().isDaemon() || joinDiscoData != null;
return joinDiscoData;
}
}
/**
* Called from exchange worker.
*
* @return Caches to be started when this node starts.
*/
@Nullable public LocalJoinCachesContext localJoinCachesContext() {
if (ctx.isDaemon())
return null;
LocalJoinCachesContext result = locJoinCachesCtx;
locJoinCachesCtx = null;
return result;
}
/**
* @param joinedNodeId Joined node ID.
* @return {@code True} if there are new caches received from joined node.
*/
boolean hasCachesReceivedFromJoin(UUID joinedNodeId) {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (desc.staticallyConfigured()) {
assert desc.receivedFrom() != null : desc;
if (joinedNodeId.equals(desc.receivedFrom()))
return true;
}
}
return false;
}
/**
* @param joinedNodeId Joined node ID.
* @return New caches received from joined node.
*/
List<DynamicCacheDescriptor> cachesReceivedFromJoin(UUID joinedNodeId) {
assert joinedNodeId != null;
List<DynamicCacheDescriptor> started = null;
if (!ctx.isDaemon()) {
for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
if (desc.staticallyConfigured()) {
assert desc.receivedFrom() != null : desc;
if (joinedNodeId.equals(desc.receivedFrom())) {
if (started == null)
started = new ArrayList<>();
started.add(desc);
}
}
}
}
return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList();
}
/**
* Discovery event callback, executed from discovery thread.
*
* @param type Event type.
* @param node Event node.
* @param topVer Topology version.
*/
public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
if (type == EVT_NODE_JOINED && !ctx.isDaemon()) {
for (CacheGroupDescriptor desc : registeredCacheGrps.values()) {
if (node.id().equals(desc.receivedFrom()))
desc.receivedFromStartVersion(topVer);
}
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (node.id().equals(desc.receivedFrom()))
desc.receivedFromStartVersion(topVer);
}
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
if (node.id().equals(desc.receivedFrom()))
desc.receivedFromStartVersion(topVer);
}
if (node.id().equals(ctx.discovery().localNode().id())) {
if (gridData == null) { // First node starts.
assert joinDiscoData != null;
initStartCachesForLocalJoin(true, false);
}
}
}
}
/**
* @param dataBag Discovery data bag.
* @param splitter Cache configuration splitter.
*/
public void collectGridNodeData(DiscoveryDataBag dataBag, CacheConfigurationSplitter splitter) {
if (ctx.isDaemon())
return;
if (!dataBag.commonDataCollectedFor(CACHE_PROC.ordinal()))
dataBag.addGridCommonData(CACHE_PROC.ordinal(), collectCommonDiscoveryData(splitter));
}
/**
* @return Information about started caches.
* @param cfgSplitter Cache configuration splitter.
*/
private CacheNodeCommonDiscoveryData collectCommonDiscoveryData(CacheConfigurationSplitter cfgSplitter) {
Map<Integer, CacheGroupData> cacheGrps = new HashMap<>();
for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(grpDesc);
CacheGroupData grpData = new CacheGroupData(splitCfg.get1(),
grpDesc.groupName(),
grpDesc.groupId(),
grpDesc.receivedFrom(),
grpDesc.startTopologyVersion(),
grpDesc.deploymentId(),
grpDesc.caches(),
0,
grpDesc.persistenceEnabled(),
grpDesc.walEnabled(),
grpDesc.walChangeRequests(),
splitCfg.get2() != null ? grpDesc.cacheConfigurationEnrichment() : null
);
cacheGrps.put(grpDesc.groupId(), grpData);
}
Map<String, CacheData> caches = new HashMap<>();
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(desc);
CacheData cacheData = new CacheData(splitCfg.get1(),
desc.cacheId(),
desc.groupId(),
desc.cacheType(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
desc.staticallyConfigured(),
desc.sql(),
false,
0,
splitCfg.get2() != null ? desc.cacheConfigurationEnrichment() : null
);
caches.put(desc.cacheName(), cacheData);
}
Map<String, CacheData> templates = new HashMap<>();
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = cfgSplitter.split(desc);
CacheData cacheData = new CacheData(
splitCfg.get1(),
0,
0,
desc.cacheType(),
desc.deploymentId(),
desc.schema(),
desc.receivedFrom(),
desc.staticallyConfigured(),
false,
true,
0,
splitCfg.get2() != null ? desc.cacheConfigurationEnrichment() : null
);
templates.put(desc.cacheName(), cacheData);
}
Collection<String> restarting = new HashSet<>(restartingCaches.keySet());
return new CacheNodeCommonDiscoveryData(caches,
templates,
cacheGrps,
ctx.discovery().clientNodesMap(),
restarting);
}
/**
* @param data Discovery data.
*/
public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
if (ctx.isDaemon() || data.commonData() == null)
return;
assert joinDiscoData != null || disconnectedState();
assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data;
CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData();
validateNoNewCachesWithNewFormat(cachesData);
// CacheGroup configurations that were created from local node configuration.
Map<Integer, CacheGroupDescriptor> locCacheGrps = new HashMap<>(registeredCacheGroups());
//Replace locally registered data with actual data received from cluster.
cleanCachesAndGroups();
registerReceivedCacheGroups(cachesData, locCacheGrps);
registerReceivedCacheTemplates(cachesData);
registerReceivedCaches(cachesData);
addReceivedClientNodesToDiscovery(cachesData);
String conflictErr = validateRegisteredCaches();
gridData = new GridData(joinDiscoData, cachesData, conflictErr);
if (cachesOnDisconnect == null || cachesOnDisconnect.clusterActive())
initStartCachesForLocalJoin(false, disconnectedState());
}
/**
* Validates that joining node doesn't have newly configured caches
* in case when there is no cluster-wide support of SPLITTED_CACHE_CONFIGURATIONS.
*
* If validation is failed that caches will be destroyed cluster-wide and node joining process will be failed.
*
* @param clusterWideCacheData Cluster wide cache data.
*/
public void validateNoNewCachesWithNewFormat(CacheNodeCommonDiscoveryData clusterWideCacheData) {
IgniteDiscoverySpi spi = (IgniteDiscoverySpi)ctx.discovery().getInjectedDiscoverySpi();
boolean allowSplitCacheConfigurations = spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
if (!allowSplitCacheConfigurations) {
List<String> cachesToDestroy = new ArrayList<>();
for (DynamicCacheDescriptor cacheDescriptor : registeredCaches().values()) {
CacheData clusterCacheData = clusterWideCacheData.caches().get(cacheDescriptor.cacheName());
// Node spawned new cache.
if (clusterCacheData.receivedFrom().equals(cacheDescriptor.receivedFrom()))
cachesToDestroy.add(cacheDescriptor.cacheName());
}
if (!cachesToDestroy.isEmpty()) {
ctx.cache().dynamicDestroyCaches(cachesToDestroy, false);
throw new IllegalStateException(
"Node can't join to cluster in compatibility mode with newly configured caches: " + cachesToDestroy
);
}
}
}
/**
* Validation {@link #registeredCaches} on conflicts.
*
* @return Error message if conflicts was found.
*/
@Nullable private String validateRegisteredCaches() {
String conflictErr = null;
if (joinDiscoData != null) {
for (Map.Entry<String, CacheJoinNodeDiscoveryData.CacheInfo> e : joinDiscoData.caches().entrySet()) {
if (!registeredCaches.containsKey(e.getKey())) {
conflictErr = checkCacheConflict(e.getValue().cacheData().config());
if (conflictErr != null) {
conflictErr = "Failed to start configured cache due to conflict with started caches. " +
conflictErr;
break;
}
}
}
}
return conflictErr;
}
/**
* Adding received client nodes to discovery if needed.
*
* @param cachesData Data received from cluster.
*/
private void addReceivedClientNodesToDiscovery(CacheNodeCommonDiscoveryData cachesData) {
if (!F.isEmpty(cachesData.clientNodesMap())) {
for (Map.Entry<String, Map<UUID, Boolean>> entry : cachesData.clientNodesMap().entrySet()) {
String cacheName = entry.getKey();
for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet())
ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
}
}
}
/**
* Register caches received from cluster.
*
* @param cachesData Data received from cluster.
*/
private void registerReceivedCaches(CacheNodeCommonDiscoveryData cachesData) {
Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new HashMap<>();
Collection<DynamicCacheDescriptor> cachesToSave = new HashSet<>();
boolean hasSchemaPatchConflict = false;
for (CacheData cacheData : cachesData.caches().values()) {
CacheGroupDescriptor grpDesc = registeredCacheGrps.get(cacheData.groupId());
assert grpDesc != null : cacheData.cacheConfiguration().getName();
CacheConfiguration<?, ?> cfg = cacheData.cacheConfiguration();
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
ctx,
cacheData.cacheConfiguration(),
cacheData.cacheType(),
grpDesc,
false,
cacheData.receivedFrom(),
cacheData.staticallyConfigured(),
cacheData.sql(),
cacheData.deploymentId(),
new QuerySchema(cacheData.schema().entities()),
cacheData.cacheConfigurationEnrichment()
);
Collection<QueryEntity> localQueryEntities = getLocalQueryEntities(cfg.getName());
QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(localQueryEntities);
if (schemaPatch.hasConflicts()) {
hasSchemaPatchConflict = true;
log.warning("Skipping apply patch because conflicts : " + schemaPatch.getConflictsMessage());
}
else if (!schemaPatch.isEmpty())
patchesToApply.put(desc, schemaPatch);
else if (!GridFunc.eqNotOrdered(desc.schema().entities(), localQueryEntities))
cachesToSave.add(desc); //received config is different of local config - need to resave
desc.receivedOnDiscovery(true);
registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
registeredCachesById.put(desc.cacheId(), desc);
ctx.discovery().setCacheFilter(
desc.cacheId(),
grpDesc.groupId(),
cfg.getName(),
cfg.getNearConfiguration() != null);
}
updateRegisteredCachesIfNeeded(patchesToApply, cachesToSave, hasSchemaPatchConflict);
}
/**
* Merging config or resaving it if it needed.
*
* @param patchesToApply Patches which need to apply.
* @param cachesToSave Caches which need to resave.
* @param hasSchemaPatchConflict {@code true} if we have conflict during making patch.
*/
private void updateRegisteredCachesIfNeeded(Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply,
Collection<DynamicCacheDescriptor> cachesToSave, boolean hasSchemaPatchConflict) {
//Skip merge of config if least one conflict was found.
if (!hasSchemaPatchConflict && isMergeConfigSupports(ctx.discovery().localNode())) {
boolean isClusterActive = ctx.state().clusterState().active();
//Merge of config for cluster only for inactive grid.
if (!isClusterActive && !patchesToApply.isEmpty()) {
for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) {
if (entry.getKey().applySchemaPatch(entry.getValue()))
saveCacheConfiguration(entry.getKey());
}
for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
}
else if (patchesToApply.isEmpty()) {
for (DynamicCacheDescriptor descriptor : cachesToSave)
saveCacheConfiguration(descriptor);
}
}
}
/**
* Register cache templates received from cluster.
*
* @param cachesData Data received from cluster.
*/
private void registerReceivedCacheTemplates(CacheNodeCommonDiscoveryData cachesData) {
for (CacheData cacheData : cachesData.templates().values()) {
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
ctx,
cacheData.cacheConfiguration(),
cacheData.cacheType(),
null,
true,
cacheData.receivedFrom(),
cacheData.staticallyConfigured(),
false,
cacheData.deploymentId(),
cacheData.schema(),
cacheData.cacheConfigurationEnrichment()
);
registeredTemplates.put(cacheData.cacheConfiguration().getName(), desc);
}
}
/**
* Register cache groups received from cluster.
*
* @param cachesData Data received from cluster.
* @param locCacheGrps Current local cache groups.
*/
private void registerReceivedCacheGroups(CacheNodeCommonDiscoveryData cachesData,
Map<Integer, CacheGroupDescriptor> locCacheGrps) {
for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
grpData.config(),
grpData.groupName(),
grpData.groupId(),
grpData.receivedFrom(),
grpData.startTopologyVersion(),
grpData.deploymentId(),
grpData.caches(),
grpData.persistenceEnabled(),
grpData.walEnabled(),
grpData.walChangeRequests(),
grpData.cacheConfigurationEnrichment()
);
if (locCacheGrps.containsKey(grpDesc.groupId())) {
CacheGroupDescriptor locGrpCfg = locCacheGrps.get(grpDesc.groupId());
grpDesc.mergeWith(locGrpCfg);
}
CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc);
assert old == null : old;
ctx.discovery().addCacheGroup(grpDesc,
grpData.config().getNodeFilter(),
grpData.config().getCacheMode());
}
}
/**
* Clean local registered caches and groups
*/
private void cleanCachesAndGroups() {
registeredCaches.clear();
registeredCachesById.clear();
registeredCacheGrps.clear();
ctx.discovery().cleanCachesAndGroups();
}
/**
* Cleanups cache descriptors that belong to the {@code topVer} and earlier.
*
* @param topVer Topology version.
*/
public void cleanupRemovedCaches(AffinityTopologyVersion topVer) {
markedForDeletionCaches.headMap(topVer, true).clear();
}
/**
* Cleanups cache group descriptors that belong to the {@code topVer} and earlier.
*
* @param topVer Topology version.
*/
public void cleanupRemovedCacheGroups(AffinityTopologyVersion topVer) {
markedForDeletionCacheGrps.headMap(topVer, true).clear();
}
/**
* @param grpId Group id.
*/
public @Nullable CacheGroupDescriptor markedForDeletionCacheGroupDesc(int grpId) {
// Find the "earliest" available descriptor.
for (Map<Integer, CacheGroupDescriptor> descriptors : markedForDeletionCacheGrps.values()) {
CacheGroupDescriptor desc = descriptors.get(grpId);
if (desc != null)
return desc;
}
return null;
}
/**
* Save dynamic cache descriptor on disk.
*
* @param desc Cache to save.
*/
private void saveCacheConfiguration(DynamicCacheDescriptor desc) {
try {
ctx.cache().saveCacheConfiguration(desc);
}
catch (IgniteCheckedException e) {
log.error("Error while saving cache configuration to disk, cfg = " + desc.cacheConfiguration(), e);
}
}
/**
* Get started node query entities by cacheName.
*
* @param cacheName Cache for which query entities will be returned.
* @return Local query entities.
*/
private Collection<QueryEntity> getLocalQueryEntities(String cacheName) {
if (joinDiscoData == null)
return Collections.emptyList();
CacheJoinNodeDiscoveryData.CacheInfo cacheInfo = joinDiscoData.caches().get(cacheName);
if (cacheInfo == null)
return Collections.emptyList();
return cacheInfo.cacheData().queryEntities();
}
/**
* Initialize collection with caches to be start:
* {@code locJoinStartCaches} or {@code locCfgsForActivation} if cluster is inactive.
*
* @param firstNode {@code True} if first node in cluster starts.
*/
private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) {
if (ctx.state().clusterState().transition()) {
joinOnTransition = true;
return;
}
if (joinDiscoData != null) {
List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = new ArrayList<>();
List<DynamicCacheDescriptor> locJoinInitCaches = new ArrayList<>();
locCfgsForActivation = new HashMap<>();
boolean active = ctx.state().clusterState().active();
for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName()))
continue;
CacheConfiguration<?, ?> cfg = desc.cacheConfiguration();
if (reconnect && surviveReconnect(cfg.getName()) && cachesOnDisconnect.state.active() && active)
continue;
CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName());
NearCacheConfiguration nearCfg = null;
if (locCfg != null) {
nearCfg = locCfg.cacheData().config().getNearConfiguration();
DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
locCfg.cacheData().config(),
desc.cacheType(),
desc.groupDescriptor(),
desc.template(),
desc.receivedFrom(),
desc.staticallyConfigured(),
desc.sql(),
desc.deploymentId(),
desc.schema().copy(),
locCfg.cacheData().cacheConfigurationEnrichment()
);
desc0.startTopologyVersion(desc.startTopologyVersion());
desc0.receivedFromStartVersion(desc.receivedFromStartVersion());
desc0.clientCacheStartVersion(desc.clientCacheStartVersion());
desc0.cacheConfiguration().setStatisticsEnabled(cfg.isStatisticsEnabled());
desc = desc0;
}
if (locCfg != null ||
joinDiscoData.startCaches() ||
CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) {
if (active)
locJoinStartCaches.add(new T2<>(desc, nearCfg));
else
locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg));
}
else
locJoinInitCaches.add(desc);
}
locJoinCachesCtx = new LocalJoinCachesContext(
locJoinStartCaches,
locJoinInitCaches,
new HashMap<>(registeredCacheGrps),
new HashMap<>(registeredCaches));
}
}
/**
* @param msg Message.
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (joinOnTransition) {
initStartCachesForLocalJoin(false, false);
joinOnTransition = false;
}
}
/**
* @param msg Message.
* @param topVer Current topology version.
* @param curState Current cluster state.
* @return Exchange action.
* @throws IgniteCheckedException If configuration validation failed.
*/
public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer,
DiscoveryDataClusterState curState)
throws IgniteCheckedException {
ExchangeActions exchangeActions = new ExchangeActions();
if (msg.activate() == curState.active())
return exchangeActions;
if (msg.activate()) {
for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.DIRECT)) {
desc.startTopologyVersion(topVer);
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
desc.cacheName(),
msg.initiatorNodeId());
req.startCacheConfiguration(desc.cacheConfiguration());
req.cacheType(desc.cacheType());
T2<CacheConfiguration, NearCacheConfiguration> locCfg = locCfgsForActivation.get(desc.cacheName());
if (locCfg != null) {
if (locCfg.get1() != null)
req.startCacheConfiguration(locCfg.get1());
req.nearCacheConfiguration(locCfg.get2());
req.locallyConfigured(true);
}
exchangeActions.addCacheToStart(req, desc);
}
for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
exchangeActions.addCacheGroupToStart(grpDesc);
List<StoredCacheData> storedCfgs = msg.storedCacheConfigurations();
if (storedCfgs != null) {
List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
IgniteUuid deploymentId = msg.id();
for (StoredCacheData storedCfg : storedCfgs) {
CacheConfiguration ccfg = storedCfg.config();
if (!registeredCaches.containsKey(ccfg.getName())) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(),
ccfg.getName(),
msg.initiatorNodeId());
req.deploymentId(deploymentId);
req.startCacheConfiguration(ccfg);
req.cacheType(ctx.cache().cacheType(ccfg.getName()));
req.schema(new QuerySchema(storedCfg.queryEntities()));
req.sql(storedCfg.sql());
reqs.add(req);
}
}
CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, reqs, topVer, true);
if (!res.errs.isEmpty()) {
IgniteCheckedException err = new IgniteCheckedException("Failed to activate cluster.");
for (IgniteCheckedException err0 : res.errs)
err.addSuppressed(err0);
throw err;
}
}
}
else {
locCfgsForActivation = new HashMap<>();
for (DynamicCacheDescriptor desc : orderedCaches(CacheComparators.REVERSE)) {
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx,
desc.cacheName(),
desc.sql(),
false);
exchangeActions.addCacheToStop(req, desc);
if (ctx.discovery().cacheClientNode(ctx.discovery().localNode(), desc.cacheName()))
locCfgsForActivation.put(desc.cacheName(), new T2<>((CacheConfiguration)null, (NearCacheConfiguration)null));
}
for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values())
exchangeActions.addCacheGroupToStop(grpDesc, false);
}
return exchangeActions;
}
/**
* @param data Joining node data.
*/
public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
if (data.hasJoiningNodeData()) {
Serializable joiningNodeData = data.joiningNodeData();
if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) {
if (disconnectedState()) {
if (clientReconnectReqs == null)
clientReconnectReqs = new LinkedHashMap<>();
clientReconnectReqs.put(data.joiningNodeId(), (CacheClientReconnectDiscoveryData)joiningNodeData);
}
else
processClientReconnectData((CacheClientReconnectDiscoveryData)joiningNodeData, data.joiningNodeId());
}
else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData)
processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), false);
}
}
/**
* @param data Joining node data.
* @param joiningNodeClient Joining node is client flag.
* @return Message with error or null if everything was OK.
*/
public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData data, boolean joiningNodeClient) {
if (data.hasJoiningNodeData()) {
Serializable joiningNodeData = data.joiningNodeData();
if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) {
CacheJoinNodeDiscoveryData joinData = (CacheJoinNodeDiscoveryData)joiningNodeData;
Set<String> problemCaches = null;
Set<String> encClientCaches = null;
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
if (!registeredCaches.containsKey(cfg.getName())) {
String conflictErr = checkCacheConflict(cfg);
if (conflictErr != null) {
U.warn(log, "Ignore cache received from joining node. " + conflictErr);
continue;
}
long flags = cacheInfo.getFlags();
if (flags == 1L) {
if (problemCaches == null)
problemCaches = new HashSet<>();
problemCaches.add(cfg.getName());
}
else if (joiningNodeClient && cfg.isEncryptionEnabled()) {
if (encClientCaches == null)
encClientCaches = new HashSet<>();
encClientCaches.add(cfg.getName());
}
}
}
if (!F.isEmpty(problemCaches))
return problemCaches.stream().collect(Collectors.joining(", ",
"Joining node has caches with data which are not presented on cluster, " +
"it could mean that they were already destroyed, to add the node to cluster - " +
"remove directories with the caches[", "]"));
if (!F.isEmpty(encClientCaches)) {
return encClientCaches.stream().collect(Collectors.joining(", ",
"Joining node has encrypted caches which are not presented on the cluster, " +
"encrypted caches configured on client node cannot be started when such node joins " +
"the cluster, these caches can be started manually (dynamically) after node joined" +
"[caches=", "]"));
}
}
}
return null;
}
/**
* @param clientData Discovery data.
* @param clientNodeId Client node ID.
*/
private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) {
DiscoveryDataClusterState state = ctx.state().clusterState();
if (state.active() && !state.transition()) {
for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) {
String cacheName = cacheInfo.config().getName();
if (surviveReconnect(cacheName))
ctx.discovery().addClientNode(cacheName, clientNodeId, false);
else {
DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId()))
ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache());
}
}
}
}
/**
* Checks cache configuration on conflict with already registered caches and cache groups.
*
* @param cfg Cache configuration.
* @return {@code null} if validation passed, error message in other case.
*/
private String checkCacheConflict(CacheConfiguration<?, ?> cfg) {
int cacheId = CU.cacheId(cfg.getName());
if (cacheGroupByName(cfg.getName()) != null)
return "Cache name conflict with existing cache group (change cache name) [cacheName=" + cfg.getName() + ']';
if (cfg.getGroupName() != null) {
DynamicCacheDescriptor desc = registeredCaches.get(cfg.getGroupName());
if (desc != null)
return "Cache group name conflict with existing cache (change group name) [cacheName=" + cfg.getName() +
", conflictingCacheName=" + desc.cacheName() + ']';
}
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
if (desc.cacheId() == cacheId)
return "Cache ID conflict (change cache name) [cacheName=" + cfg.getName() +
", conflictingCacheName=" + desc.cacheName() + ']';
}
int grpId = CU.cacheGroupId(cfg);
if (cfg.getGroupName() != null) {
if (cacheGroupByName(cfg.getGroupName()) == null) {
CacheGroupDescriptor desc = registeredCacheGrps.get(grpId);
if (desc != null)
return "Cache group ID conflict (change cache group name) [cacheName=" + cfg.getName() +
", groupName=" + cfg.getGroupName() +
(desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']';
}
}
else {
CacheGroupDescriptor desc = registeredCacheGrps.get(grpId);
if (desc != null)
return "Cache group ID conflict (change cache name) [cacheName=" + cfg.getName() +
(desc.sharedGroup() ? ", conflictingGroupName=" : ", conflictingCacheName=") + desc.cacheOrGroupName() + ']';
}
return null;
}
/**
* @param joinData Joined node discovery data.
* @param nodeId Joined node ID.
* @param locJoin {@code True} if called on local node join.
* @return Configuration conflict error.
*/
private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID nodeId, boolean locJoin) {
registerNewCacheTemplates(joinData, nodeId);
Map<DynamicCacheDescriptor, QuerySchemaPatch> patchesToApply = new HashMap<>();
boolean hasSchemaPatchConflict = false;
boolean active = ctx.state().clusterState().active();
boolean isMergeConfigSupport = isMergeConfigSupports(null);
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) {
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
if (!registeredCaches.containsKey(cfg.getName())) {
String conflictErr = checkCacheConflict(cfg);
if (conflictErr != null) {
if (locJoin)
return conflictErr;
U.warn(log, "Ignore cache received from joining node. " + conflictErr);
continue;
}
registerNewCache(joinData, nodeId, cacheInfo);
}
else if (!active && isMergeConfigSupport) {
DynamicCacheDescriptor desc = registeredCaches.get(cfg.getName());
QuerySchemaPatch schemaPatch = desc.makeSchemaPatch(cacheInfo.cacheData());
if (schemaPatch.hasConflicts()) {
hasSchemaPatchConflict = true;
log.error("Error during making schema patch : " + schemaPatch.getConflictsMessage());
}
else if (!schemaPatch.isEmpty() && !hasSchemaPatchConflict)
patchesToApply.put(desc, schemaPatch);
}
ctx.discovery().addClientNode(cfg.getName(), nodeId, cfg.getNearConfiguration() != null);
}
//If conflict was detected we don't merge config and we leave existed config.
if (!hasSchemaPatchConflict && !patchesToApply.isEmpty())
for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) {
if (entry.getKey().applySchemaPatch(entry.getValue()))
saveCacheConfiguration(entry.getKey());
}
if (joinData.startCaches()) {
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
ctx.discovery().addClientNode(desc.cacheName(),
nodeId,
desc.cacheConfiguration().getNearConfiguration() != null);
}
}
return null;
}
/**
* Register new cache received from joining node.
*
* @param joinData Data from joining node.
* @param nodeId Joining node id.
* @param cacheInfo Cache info of new node.
*/
private void registerNewCache(
CacheJoinNodeDiscoveryData joinData,
UUID nodeId,
CacheJoinNodeDiscoveryData.CacheInfo cacheInfo
) {
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
int cacheId = CU.cacheId(cfg.getName());
CacheGroupDescriptor grpDesc = registerCacheGroup(
null,
null,
cfg,
cacheId,
nodeId,
joinData.cacheDeploymentId(),
null,
null,
cacheInfo.cacheData().cacheConfigurationEnrichment()
);
ctx.discovery().setCacheFilter(
cacheId,
grpDesc.groupId(),
cfg.getName(),
cfg.getNearConfiguration() != null
);
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
cacheInfo.cacheType(),
grpDesc,
false,
nodeId,
cacheInfo.isStaticallyConfigured(),
cacheInfo.sql(),
joinData.cacheDeploymentId(),
new QuerySchema(cacheInfo.cacheData().queryEntities()),
cacheInfo.cacheData().cacheConfigurationEnrichment()
);
DynamicCacheDescriptor old = registeredCaches.put(cfg.getName(), desc);
registeredCachesById.put(desc.cacheId(), desc);
if (cacheInfo.cacheData().groupKeyEncrypted() != null) {
int grpId = CU.cacheGroupId(cacheInfo.cacheData().config());
assert cacheInfo.cacheData().config().isEncryptionEnabled();
GroupKeyEncrypted restoredKey = cacheInfo.cacheData().groupKeyEncrypted();
GroupKey activeKey = ctx.encryption().getActiveKey(grpId);
if (activeKey == null)
ctx.encryption().setInitialGroupKey(grpId, restoredKey.key(), restoredKey.id());
else {
assert activeKey.equals(new GroupKey(restoredKey.id(),
ctx.config().getEncryptionSpi().decryptKey(restoredKey.key())));
}
}
assert old == null : old;
}
/**
* Register new cache templates received from joining node.
*
* @param joinData Data from joining node.
* @param nodeId Joining node id.
*/
private void registerNewCacheTemplates(CacheJoinNodeDiscoveryData joinData, UUID nodeId) {
for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) {
CacheConfiguration<?, ?> cfg = cacheInfo.cacheData().config();
if (!registeredTemplates.containsKey(cfg.getName())) {
DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
cfg,
cacheInfo.cacheType(),
null,
true,
nodeId,
true,
false,
joinData.cacheDeploymentId(),
new QuerySchema(cacheInfo.cacheData().queryEntities()),
cacheInfo.cacheData().cacheConfigurationEnrichment()
);
DynamicCacheDescriptor old = registeredTemplates.put(cfg.getName(), desc);
assert old == null : old;
}
}
}
/**
* @return {@code true} if grid supports merge of config and {@code False} otherwise.
*/
public boolean isMergeConfigSupports(ClusterNode joiningNode) {
DiscoCache discoCache = ctx.discovery().discoCache();
if (discoCache == null)
return true;
if (joiningNode != null && joiningNode.version().compareToIgnoreTimestamp(V_MERGE_CONFIG_SINCE) < 0)
return false;
Collection<ClusterNode> nodes = discoCache.allNodes();
for (ClusterNode node : nodes) {
IgniteProductVersion version = node.version();
if (version.compareToIgnoreTimestamp(V_MERGE_CONFIG_SINCE) < 0)
return false;
}
return true;
}
/**
* @param grpName Group name.
* @return Group descriptor if group found.
*/
@Nullable private CacheGroupDescriptor cacheGroupByName(String grpName) {
assert grpName != null;
for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
if (grpName.equals(grpDesc.groupName()))
return grpDesc;
}
return null;
}
/**
* @param cacheName Cache name.
* @return Group descriptor.
*/
@Nullable private CacheGroupDescriptor nonSharedCacheGroupByCacheName(String cacheName) {
assert cacheName != null;
for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
if (!grpDesc.sharedGroup() && grpDesc.caches().containsKey(cacheName))
return grpDesc;
}
return null;
}
/**
* @param exchActions Optional exchange actions to update if new group was added.
* @param curTopVer Current topology version if dynamic cache started.
* @param startedCacheCfg Cache configuration.
* @param cacheId Cache ID.
* @param rcvdFrom Node ID cache was recived from.
* @param deploymentId Deployment ID.
* @param encKey Encryption key.
* @param encKeyId Id of encryption key.
* @param cacheCfgEnrichment Cache configuration enrichment.
* @return Group descriptor.
*/
private CacheGroupDescriptor registerCacheGroup(
@Nullable ExchangeActions exchActions,
@Nullable AffinityTopologyVersion curTopVer,
CacheConfiguration<?, ?> startedCacheCfg,
Integer cacheId,
UUID rcvdFrom,
IgniteUuid deploymentId,
@Nullable byte[] encKey,
@Nullable Integer encKeyId,
CacheConfigurationEnrichment cacheCfgEnrichment
) {
if (startedCacheCfg.getGroupName() != null) {
CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName());
if (desc != null) {
desc.onCacheAdded(startedCacheCfg.getName(), cacheId);
return desc;
}
}
int grpId = CU.cacheGroupId(startedCacheCfg);
Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId);
boolean persistent = resolvePersistentFlag(exchActions, startedCacheCfg);
boolean walGloballyEnabled;
// client nodes cannot read wal enabled/disabled status so they should use default one
if (ctx.clientNode())
walGloballyEnabled = persistent;
else if (persistent)
walGloballyEnabled = ctx.cache().context().database().walEnabled(grpId, false);
else {
DataRegionConfiguration drCfg =
CU.findDataRegion(ctx.config().getDataStorageConfiguration(), startedCacheCfg.getDataRegionName());
walGloballyEnabled = drCfg != null && drCfg.isCdcEnabled();
}
CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
startedCacheCfg,
startedCacheCfg.getGroupName(),
grpId,
rcvdFrom,
curTopVer != null ? curTopVer.nextMinorVersion() : null,
deploymentId,
caches,
persistent,
walGloballyEnabled,
null,
cacheCfgEnrichment
);
if (startedCacheCfg.isEncryptionEnabled())
ctx.encryption().setInitialGroupKey(grpId, encKey, encKeyId);
CacheGroupDescriptor old = registeredCacheGrps.put(grpId, grpDesc);
assert old == null : old;
ctx.discovery().addCacheGroup(grpDesc, grpDesc.config().getNodeFilter(), startedCacheCfg.getCacheMode());
if (exchActions != null)
exchActions.addCacheGroupToStart(grpDesc);
return grpDesc;
}
/**
* Resolves persistent flag for new cache group descriptor.
*
* @param exchActions Optional exchange actions to update if new group was added.
* @param startedCacheCfg Started cache configuration.
*/
private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions,
CacheConfiguration<?, ?> startedCacheCfg) {
if (!ctx.clientNode()) {
// On server, we always can determine whether cache is persistent by local storage configuration.
return CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration());
}
else if (exchActions == null) {
// It's either client local join event or cache is statically configured on another node.
// No need to resolve on client - we'll anyway receive group descriptor from server with correct flag.
return false;
}
else {
// Dynamic cache start. Initiator of the start may not have known whether cache should be persistent.
// On client, we should peek attributes of any affinity server node to get data storage configuration.
Collection<ClusterNode> aliveSrvNodes = ctx.discovery().aliveServerNodes();
assert !aliveSrvNodes.isEmpty() : "No alive server nodes";
for (ClusterNode srvNode : aliveSrvNodes) {
if (CU.affinityNode(srvNode, startedCacheCfg.getNodeFilter())) {
Object dsCfgBytes = srvNode.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
if (dsCfgBytes instanceof byte[]) {
try {
DataStorageConfiguration crdDsCfg = new JdkMarshaller().unmarshal(
(byte[])dsCfgBytes, U.resolveClassLoader(ctx.config()));
return CU.isPersistentCache(startedCacheCfg, crdDsCfg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal remote data storage configuration [remoteNode=" +
srvNode + ", cacheName=" + startedCacheCfg.getName() + "]", e);
}
}
else {
U.error(log, "Remote marshalled data storage configuration is absent [remoteNode=" + srvNode +
", cacheName=" + startedCacheCfg.getName() + ", dsCfg=" + dsCfgBytes + "]");
}
}
}
U.error(log, "Failed to find affinity server node with data storage configuration for starting cache " +
"[cacheName=" + startedCacheCfg.getName() + ", aliveSrvNodes=" + aliveSrvNodes + "]");
return false;
}
}
/**
* @param ccfg Cache configuration to start.
* @throws IgniteCheckedException If failed.
*/
void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException {
if (ccfg.getGroupName() != null) {
CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName());
if (grpDesc != null) {
assert ccfg.getGroupName().equals(grpDesc.groupName());
validateCacheGroupConfiguration(grpDesc.config(), ccfg);
}
}
}
/**
* @param cfg Existing configuration.
* @param startCfg Cache configuration to start.
* @throws IgniteCheckedException If validation failed.
*/
private void validateCacheGroupConfiguration(CacheConfiguration cfg, CacheConfiguration startCfg)
throws IgniteCheckedException {
GridCacheAttributes attr1 = new GridCacheAttributes(cfg);
GridCacheAttributes attr2 = new GridCacheAttributes(startCfg);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "cacheMode", "Cache mode",
cfg.getCacheMode(), startCfg.getCacheMode(), true);
if (cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT || startCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT)
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "atomicityMode", "Atomicity mode",
attr1.atomicityMode(), attr2.atomicityMode(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinity", "Affinity function",
attr1.cacheAffinityClassName(), attr2.cacheAffinityClassName(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "affinityPartitionsCount",
"Affinity partitions count", attr1.affinityPartitionsCount(), attr2.affinityPartitionsCount(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "nodeFilter", "Node filter",
attr1.nodeFilterClassName(), attr2.nodeFilterClassName(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "dataRegionName", "Data region",
cfg.getDataRegionName(), startCfg.getDataRegionName(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "topologyValidator", "Topology validator",
attr1.topologyValidatorClassName(), attr2.topologyValidatorClassName(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "partitionLossPolicy", "Partition Loss Policy",
cfg.getPartitionLossPolicy(), startCfg.getPartitionLossPolicy(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceMode", "Rebalance mode",
cfg.getRebalanceMode(), startCfg.getRebalanceMode(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceDelay", "Rebalance delay",
cfg.getRebalanceDelay(), startCfg.getRebalanceDelay(), false);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceOrder", "Rebalance order",
cfg.getRebalanceOrder(), startCfg.getRebalanceOrder(), false);
if (cfg.getCacheMode() == PARTITIONED) {
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "backups", "Backups",
cfg.getBackups(), startCfg.getBackups(), true);
}
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "encryptionEnabled", "Encrypted",
cfg.isEncryptionEnabled(), startCfg.isEncryptionEnabled(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg,
"diskPageCompression", "Disk page compression",
cfg.getDiskPageCompression(), startCfg.getDiskPageCompression(), true);
CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg,
"diskPageCompressionLevel", "Disk page compression level",
cfg.getDiskPageCompressionLevel(), startCfg.getDiskPageCompressionLevel(), true);
}
/**
* @return Registered caches.
*/
ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches() {
return registeredCaches;
}
/**
* @return Registered caches by Id
*/
ConcurrentMap<Integer, DynamicCacheDescriptor> registeredCachesById() {
return registeredCachesById;
}
/**
* @return Registered cache templates.
*/
ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates() {
return registeredTemplates;
}
/**
* @return Registered cache groups.
*/
ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
return registeredCacheGrps;
}
/**
* Returns registered cache descriptors ordered by {@code comparator}
*
* @param comparator Comparator (DIRECT, REVERSE or custom) to order cache descriptors.
* @return Ordered by comparator cache descriptors.
*/
private Collection<DynamicCacheDescriptor> orderedCaches(Comparator<DynamicCacheDescriptor> comparator) {
List<DynamicCacheDescriptor> ordered = new ArrayList<>();
ordered.addAll(registeredCaches.values());
Collections.sort(ordered, comparator);
return ordered;
}
/**
*
*/
public void onDisconnected() {
cachesOnDisconnect = new CachesOnDisconnect(
ctx.state().clusterState(),
new HashMap<>(registeredCacheGrps),
new HashMap<>(registeredCaches));
registeredCacheGrps.clear();
registeredCaches.clear();
registeredCachesById.clear();
registeredTemplates.clear();
clientReconnectReqs = null;
}
/**
* @param active {@code True} if reconnected to active cluster.
* @param transition {@code True} if reconnected while state transition in progress.
* @return Information about stopped caches and cache groups.
*/
public ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) {
assert disconnectedState();
Set<String> stoppedCaches = new HashSet<>();
Set<Integer> stoppedCacheGrps = new HashSet<>();
Set<String> survivedCaches = new HashSet<>();
if (!active) {
joinOnTransition = transition;
if (F.isEmpty(locCfgsForActivation)) {
locCfgsForActivation = new HashMap<>();
for (IgniteInternalCache cache : ctx.cache().caches()) {
locCfgsForActivation.put(cache.name(),
new T2<>((CacheConfiguration)null, cache.configuration().getNearConfiguration()));
}
}
for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet())
stoppedCacheGrps.add(e.getValue().groupId());
for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet())
stoppedCaches.add(e.getKey());
}
else {
for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) {
CacheGroupDescriptor locDesc = e.getValue();
CacheGroupDescriptor desc;
boolean stopped = true;
if (locDesc.sharedGroup()) {
desc = cacheGroupByName(locDesc.groupName());
if (desc != null && desc.deploymentId().equals(locDesc.deploymentId()))
stopped = false;
}
else {
desc = nonSharedCacheGroupByCacheName(locDesc.config().getName());
if (desc != null &&
(surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId())))
stopped = false;
}
if (stopped)
stoppedCacheGrps.add(locDesc.groupId());
else
assert locDesc.groupId() == desc.groupId();
}
for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) {
DynamicCacheDescriptor desc = e.getValue();
String cacheName = e.getKey();
boolean stopped;
if (!surviveReconnect(cacheName)) {
DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName);
stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId());
}
else
stopped = false;
if (stopped)
stoppedCaches.add(cacheName);
else
survivedCaches.add(cacheName);
}
if (locJoinCachesCtx != null) {
locJoinCachesCtx.removeSurvivedCaches(survivedCaches);
if (locJoinCachesCtx.isEmpty())
locJoinCachesCtx = null;
}
if (!cachesOnDisconnect.clusterActive())
initStartCachesForLocalJoin(false, true);
}
if (clientReconnectReqs != null) {
for (Map.Entry<UUID, CacheClientReconnectDiscoveryData> e : clientReconnectReqs.entrySet())
processClientReconnectData(e.getValue(), e.getKey());
clientReconnectReqs = null;
}
cachesOnDisconnect = null;
return new ClusterCachesReconnectResult(stoppedCacheGrps, stoppedCaches);
}
/**
* @return {@code True} if client node is currently in disconnected state.
*/
private boolean disconnectedState() {
return cachesOnDisconnect != null;
}
/**
* @param cacheName Cache name.
* @return {@code True} if cache with given name if system cache which should always survive client node disconnect.
*/
private boolean surviveReconnect(String cacheName) {
return CU.isUtilityCache(cacheName);
}
/**
* @param cacheName Cache name.
* @return {@code True} if cache is restarting.
*/
public boolean isRestarting(String cacheName) {
return restartingCaches.containsKey(cacheName);
}
/**
* @param cacheName Cache name which restart were cancelled.
*/
public void removeRestartingCache(String cacheName) {
restartingCaches.remove(cacheName);
}
/**
* Clear up information about restarting caches.
*/
public void removeRestartingCaches() {
restartingCaches.clear();
}
/**
* Holds direct comparator (first system caches) and reverse comparator (first user caches).
* Use DIRECT comparator for ordering cache start operations.
* Use REVERSE comparator for ordering cache stop operations.
*/
static class CacheComparators {
/**
* DIRECT comparator for cache descriptors (first system caches).
*/
static Comparator<DynamicCacheDescriptor> DIRECT = new Comparator<DynamicCacheDescriptor>() {
@Override public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
if (o1.cacheType().userCache() ^ o2.cacheType().userCache())
return o2.cacheType().userCache() ? -1 : 1;
return Integer.compare(o1.cacheId(), o2.cacheId());
}
};
/**
* REVERSE comparator for cache descriptors (first user caches).
*/
static Comparator<DynamicCacheDescriptor> REVERSE = new Comparator<DynamicCacheDescriptor>() {
@Override public int compare(DynamicCacheDescriptor o1, DynamicCacheDescriptor o2) {
return -DIRECT.compare(o1, o2);
}
};
}
/**
*
*/
private static class GridData {
/** */
private final CacheJoinNodeDiscoveryData joinDiscoData;
/** */
private final CacheNodeCommonDiscoveryData gridData;
/** */
private final String conflictErr;
/**
* @param joinDiscoData Discovery data collected for local node join.
* @param gridData Grid data.
* @param conflictErr Cache configuration conflict error.
*/
GridData(CacheJoinNodeDiscoveryData joinDiscoData, CacheNodeCommonDiscoveryData gridData, String conflictErr) {
this.joinDiscoData = joinDiscoData;
this.gridData = gridData;
this.conflictErr = conflictErr;
}
}
/**
*
*/
private static class CachesOnDisconnect {
/** */
final DiscoveryDataClusterState state;
/** */
final Map<Integer, CacheGroupDescriptor> cacheGrps;
/** */
final Map<String, DynamicCacheDescriptor> caches;
/**
* @param state Cluster state.
* @param cacheGrps Cache groups.
* @param caches Caches.
*/
CachesOnDisconnect(DiscoveryDataClusterState state,
Map<Integer, CacheGroupDescriptor> cacheGrps,
Map<String, DynamicCacheDescriptor> caches) {
this.state = state;
this.cacheGrps = cacheGrps;
this.caches = caches;
}
/**
* @return {@code True} if cluster was in active state.
*/
boolean clusterActive() {
return state.active() && !state.transition();
}
}
/**
*
*/
private static class CacheChangeProcessResult {
/** */
private boolean needExchange;
/** */
private final List<DynamicCacheDescriptor> addedDescs = new ArrayList<>();
/** */
private final List<IgniteCheckedException> errs = new ArrayList<>();
}
}