blob: d3354d7fa9a0efa4a2e3d933d10390e8670f71f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.CacheException;
import javax.cache.expiry.EternalExpiryPolicy;
import javax.cache.expiry.ExpiryPolicy;
import javax.management.MBeanServer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.WarmUpConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteTransactionsEx;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.systemview.walker.CacheGroupIoViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.CachePagesListViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.PartitionStateViewWalker;
import org.apache.ignite.internal.metric.IoStatisticsType;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteClusterReadOnlyException;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.FinishPreloadingTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionDefferedDeleteQueueCleanupTask;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.mvcc.DeadlockDetectionManager;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.RowStore;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.PagesList;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.cache.warmup.WarmUpStrategy;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager;
import org.apache.ignite.internal.processors.plugin.CachePluginManager;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.SchemaExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask;
import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.processors.security.IgniteSecurity;
import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.IgniteCollectors;
import org.apache.ignite.internal.util.InitializationProtector;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainClosure2;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
import org.apache.ignite.mxbean.IgniteMBeanAware;
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.spi.systemview.view.CacheGroupIoView;
import org.apache.ignite.spi.systemview.view.CachePagesListView;
import org.apache.ignite.spi.systemview.view.PartitionStateView;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
import static org.apache.ignite.configuration.DeploymentMode.SHARED;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
import static org.apache.ignite.internal.IgniteComponentType.JTA;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistentCache;
import static org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils.validateHashIdResolvers;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.security.SecurityUtils.remoteSecurityContext;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
/**
* The main responsibility of this processor is orchestrating of starting, stopping caches and everything related
* to that activity like validation of cache configuration, restoring configuration from persistence, etc.
*/
@SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
public class GridCacheProcessor extends GridProcessorAdapter {
/** */
public static final String CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT =
"Failed to perform %s operation (cluster is in read-only mode) [cacheGrp=%s, cache=%s]";
/** */
private static final String CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT =
"Cannot start/stop cache within lock or transaction [cacheNames=%s, operation=%s]";
/** System view name for page lists. */
public static final String CACHE_GRP_PAGE_LIST_VIEW = "cacheGroupPageLists";
/** System view description for page lists. */
public static final String CACHE_GRP_PAGE_LIST_VIEW_DESC = "Cache group page lists";
/** System view name for partition states. */
public static final String PART_STATES_VIEW = "partitionStates";
/** System view description for partition states. */
public static final String PART_STATES_VIEW_DESC = "Distribution of cache group partitions across cluster nodes";
/** System view name for cache group IO. */
public static final String CACHE_GRP_IO_VIEW = metricName("local", "cache", "groups", "io");
/** System view description for cache group IO. */
public static final String CACHE_GRP_IO_VIEW_DESC = "Local node IO statistics for cache groups";
/** @see IgniteSystemProperties#IGNITE_ALLOW_START_CACHES_IN_PARALLEL */
public static final boolean DFLT_ALLOW_START_CACHES_IN_PARALLEL = true;
/** Enables start caches in parallel. */
private final boolean IGNITE_ALLOW_START_CACHES_IN_PARALLEL =
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_START_CACHES_IN_PARALLEL,
DFLT_ALLOW_START_CACHES_IN_PARALLEL);
/** */
private final boolean keepStaticCacheConfiguration = IgniteSystemProperties.getBoolean(
IgniteSystemProperties.IGNITE_KEEP_STATIC_CACHE_CONFIGURATION);
/** MBean group for cache group metrics */
private static final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups";
/**
* Initial timeout (in milliseconds) for output the progress of restoring partitions status.
* After the first output, the next ones will be output after value/5.
* It is recommended to change this property only in tests.
*/
static long TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS = TimeUnit.MINUTES.toMillis(5);
/** Shared cache context. */
private GridCacheSharedContext<?, ?> sharedCtx;
/** */
private final ConcurrentMap<Integer, CacheGroupContext> cacheGrps = new ConcurrentHashMap<>();
/** */
private final Map<String, GridCacheAdapter<?, ?>> caches;
/** Caches stopped from onKernalStop callback. */
private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>();
/** Map of proxies. */
private final ConcurrentHashMap<String, IgniteCacheProxyImpl<?, ?>> jCacheProxies;
/** Transaction interface implementation. */
private IgniteTransactionsImpl transactions;
/** Pending cache operations. */
private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();
/** Template configuration add futures. */
private ConcurrentMap<String, IgniteInternalFuture> pendingTemplateFuts = new ConcurrentHashMap<>();
/** Enable/disable cache statistics futures. */
private ConcurrentMap<UUID, EnableStatisticsFuture> manageStatisticsFuts = new ConcurrentHashMap<>();
/** */
private ClusterCachesInfo cachesInfo;
/** */
private GridLocalConfigManager locCfgMgr;
/** */
private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>();
/** Must use JDK marsh since it is used by discovery to fire custom events. */
private final Marshaller marsh;
/** Count down latch for caches. */
private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
/** Internal cache names. */
private final Set<String> internalCaches;
/** Protector of initialization of specific value. */
private final InitializationProtector initializationProtector = new InitializationProtector();
/** Cache recovery lifecycle state and actions. */
private final CacheRecoveryLifecycle recovery = new CacheRecoveryLifecycle();
/** Cache configuration splitter. */
private CacheConfigurationSplitter splitter;
/** Cache configuration enricher. */
private CacheConfigurationEnricher enricher;
/** */
public static final String EXPRITY_POLICY_MSG = "expiryPolicy=[type=%s, isEagerTtl=%s]";
/**
* @param ctx Kernal context.
*/
public GridCacheProcessor(GridKernalContext ctx) {
super(ctx);
caches = new ConcurrentHashMap<>();
jCacheProxies = new ConcurrentHashMap<>();
internalCaches = new HashSet<>();
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
splitter = new CacheConfigurationSplitterImpl(ctx, marsh);
enricher = new CacheConfigurationEnricher(ctx, marsh, U.resolveClassLoader(ctx.config()));
}
/**
* @param cfg Configuration to check for possible performance issues.
* @param hasStore {@code True} if store is configured.
*/
private void suggestOptimizations(CacheConfiguration cfg, boolean hasStore) {
GridPerformanceSuggestions perf = ctx.performance();
String msg = "Disable eviction policy (remove from configuration)";
if (cfg.getEvictionPolicyFactory() != null || cfg.getEvictionPolicy() != null)
perf.add(msg, false);
else
perf.add(msg, true);
if (cfg.getCacheMode() == PARTITIONED)
perf.add("Disable near cache (set 'nearConfiguration' to null)", cfg.getNearConfiguration() == null);
// Suppress warning if at least one ATOMIC cache found.
perf.add("Enable ATOMIC mode if not using transactions (set 'atomicityMode' to ATOMIC)",
cfg.getAtomicityMode() == ATOMIC);
// Suppress warning if at least one non-FULL_SYNC mode found.
perf.add("Disable fully synchronous writes (set 'writeSynchronizationMode' to PRIMARY_SYNC or FULL_ASYNC)",
cfg.getWriteSynchronizationMode() != FULL_SYNC);
if (hasStore && cfg.isWriteThrough())
perf.add("Enable write-behind to persistent store (set 'writeBehindEnabled' to true)",
cfg.isWriteBehindEnabled());
}
/**
* Create exchange worker task for custom discovery message.
*
* @param msg Custom discovery message.
* @return Task or {@code null} if message doesn't require any special processing.
*/
public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
if (msg instanceof SchemaAbstractDiscoveryMessage) {
SchemaAbstractDiscoveryMessage msg0 = (SchemaAbstractDiscoveryMessage)msg;
if (msg0.exchange())
return new SchemaExchangeWorkerTask(remoteSecurityContext(ctx), msg0);
}
else if (msg instanceof ClientCacheChangeDummyDiscoveryMessage) {
ClientCacheChangeDummyDiscoveryMessage msg0 = (ClientCacheChangeDummyDiscoveryMessage)msg;
return msg0;
}
else if (msg instanceof CacheStatisticsModeChangeMessage) {
CacheStatisticsModeChangeMessage msg0 = (CacheStatisticsModeChangeMessage)msg;
if (msg0.initial())
return new CacheStatisticsModeChangeTask(remoteSecurityContext(ctx), msg0);
}
return null;
}
/**
* Process custom exchange task.
*
* @param task Task.
*/
void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
if (task instanceof SchemaExchangeWorkerTask) {
SchemaAbstractDiscoveryMessage msg = ((SchemaExchangeWorkerTask)task).message();
if (msg instanceof SchemaProposeDiscoveryMessage) {
SchemaProposeDiscoveryMessage msg0 = (SchemaProposeDiscoveryMessage)msg;
ctx.query().onSchemaPropose(msg0);
}
else
U.warn(log, "Unsupported schema discovery message: " + msg);
}
else if (task instanceof SchemaNodeLeaveExchangeWorkerTask) {
SchemaNodeLeaveExchangeWorkerTask task0 = (SchemaNodeLeaveExchangeWorkerTask)task;
ctx.query().onNodeLeave(task0.node());
}
else if (task instanceof ClientCacheChangeDummyDiscoveryMessage) {
ClientCacheChangeDummyDiscoveryMessage task0 = (ClientCacheChangeDummyDiscoveryMessage)task;
sharedCtx.affinity().processClientCachesRequests(task0);
}
else if (task instanceof ClientCacheUpdateTimeout) {
ClientCacheUpdateTimeout task0 = (ClientCacheUpdateTimeout)task;
sharedCtx.affinity().sendClientCacheChangesMessage(task0);
}
else if (task instanceof CacheStatisticsModeChangeTask) {
CacheStatisticsModeChangeTask task0 = (CacheStatisticsModeChangeTask)task;
processStatisticsModeChange(task0.message());
}
else if (task instanceof TxTimeoutOnPartitionMapExchangeChangeTask) {
TxTimeoutOnPartitionMapExchangeChangeTask task0 = (TxTimeoutOnPartitionMapExchangeChangeTask)task;
sharedCtx.tm().processTxTimeoutOnPartitionMapExchangeChange(task0.message());
}
else if (task instanceof StopCachesOnClientReconnectExchangeTask) {
StopCachesOnClientReconnectExchangeTask task0 = (StopCachesOnClientReconnectExchangeTask)task;
stopCachesOnClientReconnect(task0.stoppedCaches());
task0.onDone();
}
else if (task instanceof WalStateNodeLeaveExchangeTask) {
WalStateNodeLeaveExchangeTask task0 = (WalStateNodeLeaveExchangeTask)task;
sharedCtx.walState().onNodeLeft(task0.node().id());
}
else if (task instanceof FinishPreloadingTask) {
FinishPreloadingTask task0 = (FinishPreloadingTask)task;
CacheGroupContext grp = cacheGroup(task0.groupId());
if (grp != null)
grp.preloader().finishPreloading(task0.topologyVersion(), task0.rebalanceId());
}
else
U.warn(log, "Unsupported custom exchange task: " + task);
}
/**
* @param ctx Context.
* @return DHT managers.
*/
private List<GridCacheManager> dhtManagers(GridCacheContext ctx) {
return F.asList(ctx.store(), ctx.events(), ctx.evicts(), ctx.queries(), ctx.continuousQueries(),
ctx.dr());
}
/**
* @param ctx Context.
* @return Managers present in both, DHT and Near caches.
*/
@SuppressWarnings("IfMayBeConditional")
private Collection<GridCacheManager> dhtExcludes(GridCacheContext ctx) {
if (!isNearEnabled(ctx))
return Collections.emptyList();
else
return F.asList(ctx.queries(), ctx.continuousQueries(), ctx.store());
}
/**
* @param cfg Configuration.
* @param objs Extra components.
* @throws IgniteCheckedException If failed to inject.
*/
private void prepare(CacheConfiguration cfg, Collection<Object> objs) throws IgniteCheckedException {
prepare(cfg, cfg.getAffinity(), false);
prepare(cfg, cfg.getAffinityMapper(), false);
prepare(cfg, cfg.getEvictionFilter(), false);
prepare(cfg, cfg.getInterceptor(), false);
for (Object obj : objs)
prepare(cfg, obj, false);
}
/**
* @param cfg Cache configuration.
* @param rsrc Resource.
* @param near Near flag.
* @throws IgniteCheckedException If failed.
*/
private void prepare(CacheConfiguration cfg, @Nullable Object rsrc, boolean near) throws IgniteCheckedException {
if (rsrc != null) {
ctx.resource().injectGeneric(rsrc);
ctx.resource().injectCacheName(rsrc, cfg.getName());
registerMbean(rsrc, cfg.getName(), near);
}
}
/**
* @param cctx Cache context.
*/
private void cleanup(GridCacheContext cctx) {
CacheConfiguration cfg = cctx.config();
cleanup(cfg, cfg.getAffinity(), false);
cleanup(cfg, cfg.getAffinityMapper(), false);
cleanup(cfg, cfg.getEvictionFilter(), false);
cleanup(cfg, cfg.getInterceptor(), false);
cleanup(cfg, cctx.store().configuredStore(), false);
if (!CU.isUtilityCache(cfg.getName()) && !CU.isSystemCache(cfg.getName())) {
unregisterMbean(cctx.cache().localMxBean(), cfg.getName(), false);
unregisterMbean(cctx.cache().clusterMxBean(), cfg.getName(), false);
}
cctx.cleanup();
}
/**
* @param grp Cache group.
* @param destroy Group destroy flag.
*/
private void cleanup(CacheGroupContext grp, boolean destroy) {
CacheConfiguration cfg = grp.config();
for (Object obj : grp.configuredUserObjects())
cleanup(cfg, obj, false);
if (!grp.systemCache() && !U.IGNITE_MBEANS_DISABLED) {
try {
ctx.config().getMBeanServer().unregisterMBean(U.makeMBeanName(ctx.igniteInstanceName(),
CACHE_GRP_METRICS_MBEAN_GRP, grp.cacheOrGroupName()));
}
catch (Throwable e) {
U.error(log, "Failed to unregister MBean for cache group: " + grp.name(), e);
}
}
grp.metrics().remove(destroy);
grp.removeIOStatistic(destroy);
sharedCtx.evict().cleanupRemovedGroup(grp.groupId());
}
/**
* @param cfg Cache configuration.
* @param rsrc Resource.
* @param near Near flag.
*/
private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near) {
if (rsrc != null) {
unregisterMbean(rsrc, cfg.getName(), near);
try {
ctx.resource().cleanupGeneric(rsrc);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to cleanup resource: " + rsrc, e);
}
}
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked"})
@Override public void start() throws IgniteCheckedException {
ctx.internalSubscriptionProcessor().registerMetastorageListener(recovery);
ctx.internalSubscriptionProcessor().registerDatabaseListener(recovery);
cachesInfo = new ClusterCachesInfo(ctx);
DeploymentMode depMode = ctx.config().getDeploymentMode();
if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
if (depMode != CONTINUOUS && depMode != SHARED)
U.warn(log, "Deployment mode for cache is not CONTINUOUS or SHARED " +
"(it is recommended that you change deployment mode and restart): " + depMode);
}
Collection<CacheStoreSessionListener> sessionListeners =
CU.startStoreSessionListeners(ctx, ctx.config().getCacheStoreSessionListenerFactories());
sharedCtx = createSharedContext(ctx, sessionListeners);
locCfgMgr = new GridLocalConfigManager(this, ctx);
transactions = new IgniteTransactionsImpl(sharedCtx, null, false);
// Start shared managers.
for (GridCacheSharedManager mgr : sharedCtx.managers())
mgr.start(sharedCtx);
if (!ctx.isDaemon() && (!CU.isPersistenceEnabled(ctx.config())) || ctx.config().isClientMode()) {
CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();
if (data != null)
cachesInfo.onStart(data);
}
if (log.isDebugEnabled())
log.debug("Started cache processor.");
ctx.state().cacheProcessorStarted();
ctx.systemView().registerFiltrableView(
CACHE_GRP_PAGE_LIST_VIEW,
CACHE_GRP_PAGE_LIST_VIEW_DESC,
new CachePagesListViewWalker(),
this::pagesListViewSupplier,
Function.identity()
);
ctx.systemView().registerFiltrableView(
PART_STATES_VIEW,
PART_STATES_VIEW_DESC,
new PartitionStateViewWalker(),
this::partStatesViewSupplier,
Function.identity()
);
ctx.systemView().registerView(
CACHE_GRP_IO_VIEW,
CACHE_GRP_IO_VIEW_DESC,
new CacheGroupIoViewWalker(),
() -> F.view(cacheGrps.values(), grp -> !grp.systemCache()),
grpCtx -> {
MetricRegistry mreg = ctx.metric().registry(metricName(IoStatisticsType.CACHE_GROUP.metricGroupName(),
grpCtx.cacheOrGroupName()));
return new CacheGroupIoView(grpCtx, mreg);
}
);
}
/**
* @param cfg Initializes cache configuration with proper defaults.
* @param cacheObjCtx Cache object context.
* @throws IgniteCheckedException If configuration is not valid.
*/
void initialize(CacheConfiguration cfg, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
CU.initializeConfigDefaults(log, cfg, cacheObjCtx);
ctx.coordinators().preProcessCacheConfiguration(cfg);
}
/**
* @param grpId Group ID.
* @return Cache group.
*/
@Nullable public CacheGroupContext cacheGroup(int grpId) {
return cacheGrps.get(grpId);
}
/**
* @return Cache groups.
*/
public Collection<CacheGroupContext> cacheGroups() {
return cacheGrps.values();
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (ctx.isDaemon())
return;
AffinityTopologyVersion joinVer;
try {
boolean checkConsistency = !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK);
if (checkConsistency)
ValidationOnNodeJoinUtils.checkConsistency(ctx, log);
cachesInfo.onKernalStart(checkConsistency);
sharedCtx.walState().onKernalStart();
ctx.query().onCacheKernalStart();
joinVer = sharedCtx.exchange().onKernalStart(active, false);
}
finally {
cacheStartedLatch.countDown();
}
if (!ctx.clientNode())
sharedCtx.time().addTimeoutObject(new PartitionDefferedDeleteQueueCleanupTask(
sharedCtx, Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, DFLT_CACHE_REMOVE_ENTRIES_TTL)));
// Notify shared managers.
for (GridCacheSharedManager mgr : sharedCtx.managers())
mgr.onKernalStart(active);
// Escape if cluster inactive.
if (!active)
return;
awaitRebalance(joinVer).get();
}
/**
* Await for local join finish and caches start.
*
* @throws IgniteCheckedException If failed to wait.
*/
public void awaitStarted() throws IgniteCheckedException {
U.await(cacheStartedLatch);
}
/**
* Await rebalance for caches with SYNC rebalance mode started on local join exchange.
*
* @param joinVer Topology version of local join.
* @return Future indicates that rebalance for SYNC rebalance mode caches has completed.
*/
private GridCompoundFuture<?, ?> awaitRebalance(AffinityTopologyVersion joinVer) {
return internalCaches().stream()
.map(GridCacheAdapter::context)
.filter(GridCacheContext::affinityNode) // Only affinity caches.
.filter(ctx -> ctx.config().getRebalanceMode() == SYNC) // Only caches with SYNC mode.
.filter(ctx -> ctx.startTopologyVersion().equals(joinVer)) // Only caches started on local join.
.filter(ctx -> ctx.config().getCacheMode() == REPLICATED // Caches without manual rebalance.
|| ctx.config().getCacheMode() == PARTITIONED && ctx.config().getRebalanceDelay() >= 0)
.map(ctx -> ctx.preloader().syncFuture())
.collect(IgniteCollectors.toCompoundFuture());
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) throws IgniteCheckedException {
stopCaches(cancel);
List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
GridCacheSharedManager<?, ?> mgr = it.previous();
mgr.stop(cancel);
}
CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners());
sharedCtx.cleanup();
if (log.isDebugEnabled())
log.debug("Stopped cache processor.");
}
/**
* @param cancel Cancel.
*/
public void stopCaches(boolean cancel) {
for (String cacheName : locCfgMgr.stopSequence()) {
GridCacheAdapter<?, ?> cache = stoppedCaches.remove(cacheName);
if (cache != null)
stopCache(cache, cancel, false, false);
}
for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) {
if (cache == stoppedCaches.remove(cache.name()))
stopCache(cache, cancel, false, false);
}
for (CacheGroupContext grp : cacheGrps.values())
stopCacheGroup(grp.groupId(), false);
}
/**
* Blocks all available gateways
*/
public void blockGateways() {
for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values())
proxy.context0().gate().onStopped();
}
/**
* Blocks (stops) cache gateway for caches according to given {@code cacheGroupIds}.
*
* @param cacheGrpIds Cache group ids for which cache gateway should be stopped.
* @return Caches for which cache gateway is blocked (stopped).
*/
public List<GridCacheAdapter> blockGateways(Collection<Integer> cacheGrpIds) {
List<GridCacheAdapter> affectedCaches = internalCaches().stream()
.filter(cache -> cacheGrpIds.contains(cache.context().groupId()))
.collect(toList());
affectedCaches.forEach(cache -> {
// Add proxy if it's not initialized.
addjCacheProxy(cache.context().name(), new IgniteCacheProxyImpl(cache.context(), cache, false));
// Stop proxy.
blockGateway(cache.context().name(), true, false);
});
return affectedCaches;
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
cacheStartedLatch.countDown();
GridCachePartitionExchangeManager<Object, Object> exch = context().exchange();
// Stop exchange manager first so that we call onKernalStop on all caches.
// No new caches should be added after this point.
exch.onKernalStop(cancel);
sharedCtx.mvcc().onStop();
for (CacheGroupContext grp : cacheGrps.values())
grp.onKernalStop();
onKernalStopCaches(cancel);
cancelFutures();
List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
it.hasPrevious(); ) {
GridCacheSharedManager<?, ?> mgr = it.previous();
if (mgr != exch)
mgr.onKernalStop(cancel);
}
}
/**
* @param cancel Cancel.
*/
public void onKernalStopCaches(boolean cancel) {
IgniteCheckedException affErr =
new IgniteCheckedException("Failed to wait for topology update, node is stopping.");
for (CacheGroupContext grp : cacheGrps.values()) {
GridAffinityAssignmentCache aff = grp.affinity();
aff.cancelFutures(affErr);
}
for (String cacheName : locCfgMgr.stopSequence()) {
GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
if (cache != null) {
stoppedCaches.put(cacheName, cache);
onKernalStop(cache, cancel);
}
}
for (Map.Entry<String, GridCacheAdapter<?, ?>> entry : caches.entrySet()) {
GridCacheAdapter<?, ?> cache = entry.getValue();
if (cache == caches.remove(entry.getKey())) {
stoppedCaches.put(entry.getKey(), cache);
onKernalStop(entry.getValue(), cancel);
}
}
}
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
for (IgniteInternalFuture fut : pendingFuts.values())
((GridFutureAdapter)fut).onDone(err);
for (IgniteInternalFuture fut : pendingTemplateFuts.values())
((GridFutureAdapter)fut).onDone(err);
for (EnableStatisticsFuture fut : manageStatisticsFuts.values())
fut.onDone(err);
for (CacheGroupContext grp : cacheGrps.values())
grp.onDisconnected(reconnectFut);
for (GridCacheAdapter cache : caches.values()) {
GridCacheContext cctx = cache.context();
cctx.gate().onDisconnected(reconnectFut);
List<GridCacheManager> mgrs = cache.context().managers();
for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
GridCacheManager mgr = it.previous();
mgr.onDisconnected(reconnectFut);
}
}
sharedCtx.onDisconnected(reconnectFut);
cachesInfo.onDisconnected();
}
/**
* @param cctx Cache context.
* @param stoppedCaches List where stopped cache should be added.
*/
private void stopCacheOnReconnect(GridCacheContext cctx, List<GridCacheAdapter> stoppedCaches) {
cctx.gate().reconnected(true);
sharedCtx.removeCacheContext(cctx);
caches.remove(cctx.name());
completeProxyInitialize(cctx.name());
jCacheProxies.remove(cctx.name());
stoppedCaches.add(cctx.cache());
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
DiscoveryDataClusterState state = ctx.state().clusterState();
boolean active = state.active() && !state.transition();
ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(active, state.transition());
final List<GridCacheAdapter> stoppedCaches = new ArrayList<>();
// Close SQL caches which were not started by client.
for (String cacheToStop: reconnectRes.stoppedCaches()) {
if (!caches.keySet().contains(cacheToStop))
ctx.query().onCacheStop(cacheToStop);
}
for (final GridCacheAdapter cache : caches.values()) {
boolean stopped = reconnectRes.stoppedCacheGroups().contains(cache.context().groupId())
|| reconnectRes.stoppedCaches().contains(cache.name());
if (stopped)
stopCacheOnReconnect(cache.context(), stoppedCaches);
else {
cache.onReconnected();
reconnected.add(cache);
if (cache.context().userCache()) {
DynamicCacheDescriptor desc = cacheDescriptor(cache.name());
assert desc != null : cache.name();
if (!QueryUtils.isEnabled(cache.context().config())
&& QueryUtils.isEnabled(desc.cacheConfiguration())) {
CacheConfiguration newCfg = desc.cacheConfiguration();
cache.context().onSchemaAddQueryEntity(newCfg.getQueryEntities(), newCfg.getSqlSchema(),
newCfg.isSqlEscapeAll(), newCfg.getQueryParallelism());
}
boolean rmvIdx = !cache.context().group().persistenceEnabled();
// Re-create cache structures inside indexing in order to apply recent schema changes.
GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false);
ctx.query().onCacheStop0(cacheInfo, rmvIdx, rmvIdx);
ctx.query().onCacheStart0(cacheInfo, desc.schema(), desc.sql());
}
}
}
final Set<Integer> stoppedGrps = reconnectRes.stoppedCacheGroups();
for (CacheGroupContext grp : cacheGrps.values()) {
if (stoppedGrps.contains(grp.groupId()))
cacheGrps.remove(grp.groupId());
else
grp.onReconnected();
}
sharedCtx.onReconnected(active);
for (GridCacheAdapter cache : reconnected)
cache.context().gate().reconnected(false);
if (!stoppedCaches.isEmpty())
return sharedCtx.exchange().deferStopCachesOnClientReconnect(stoppedCaches);
return null;
}
/**
* @param cache Cache to stop.
* @param cancel Cancel flag.
* @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
* @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
*/
private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel, boolean callDestroy, boolean clearCache) {
stopCache(cache, cancel, callDestroy, clearCache, true);
}
/**
* @param cache Cache to stop.
* @param cancel Cancel flag.
* @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
* @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
* @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
*/
@SuppressWarnings({"unchecked"})
private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel, boolean callDestroy, boolean clearCache, boolean clearDbObjects) {
GridCacheContext ctx = cache.context();
try {
if (!cache.isNear() && ctx.shared().wal() != null) {
try {
ctx.shared().wal().flush(null, false);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to flush write-ahead log on cache stop " +
"[cache=" + ctx.name() + "]", e);
}
}
sharedCtx.removeCacheContext(ctx);
cache.stop();
cache.removeMetrics(callDestroy);
GridCacheContextInfo cacheInfo = new GridCacheContextInfo(ctx, false);
if (clearDbObjects) {
boolean rmvIdx = !cache.context().group().persistenceEnabled() || callDestroy;
boolean clearIdx = !cache.context().group().persistenceEnabled() || clearCache;
ctx.kernalContext().query().onCacheStop(cacheInfo, rmvIdx, clearIdx);
}
else
ctx.kernalContext().query().getIndexing().closeCacheOnClient(ctx.name());
if (isNearEnabled(ctx)) {
GridDhtCacheAdapter dht = ctx.near().dht();
// Check whether dht cache has been started.
if (dht != null) {
dht.stop();
dht.removeMetrics(callDestroy);
GridCacheContext<?, ?> dhtCtx = dht.context();
List<GridCacheManager> dhtMgrs = dhtManagers(dhtCtx);
for (ListIterator<GridCacheManager> it = dhtMgrs.listIterator(dhtMgrs.size()); it.hasPrevious(); ) {
GridCacheManager mgr = it.previous();
mgr.stop(cancel, callDestroy);
}
}
}
List<GridCacheManager> mgrs = ctx.managers();
Collection<GridCacheManager> excludes = dhtExcludes(ctx);
// Reverse order.
for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
GridCacheManager mgr = it.previous();
if (!excludes.contains(mgr))
mgr.stop(cancel, callDestroy);
}
ctx.kernalContext().continuous().onCacheStop(ctx);
ctx.kernalContext().cache().context().snapshot().onCacheStop(ctx, callDestroy);
ctx.kernalContext().coordinators().onCacheStop(ctx);
ctx.group().stopCache(ctx, clearCache);
U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), cache.configuration(), ctx.store().configuredStore()));
if (callDestroy && CU.storeCacheConfig(sharedCtx, ctx.config())) {
try {
locCfgMgr.removeCacheData(new StoredCacheData(ctx.config()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to delete cache configuration data while destroying cache" +
"[cache=" + ctx.name() + "]", e);
}
}
if (log.isInfoEnabled()) {
if (ctx.group().sharedGroup())
log.info("Stopped cache [cacheName=" + cache.name() + ", group=" + ctx.group().name() + ']');
else
log.info("Stopped cache [cacheName=" + cache.name() + ']');
}
}
finally {
cleanup(ctx);
}
}
/**
* @param cache Cache.
* @throws IgniteCheckedException If failed.
*/
private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException {
GridCacheContext<?, ?> ctx = cache.context();
// Start DHT cache as well.
if (isNearEnabled(ctx)) {
GridDhtCacheAdapter dht = ctx.near().dht();
GridCacheContext<?, ?> dhtCtx = dht.context();
for (GridCacheManager mgr : dhtManagers(dhtCtx))
mgr.onKernalStart();
dht.onKernalStart();
if (log.isDebugEnabled())
log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name());
}
for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx))))
mgr.onKernalStart();
cache.onKernalStart();
if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED))
ctx.events().addEvent(EventType.EVT_CACHE_STARTED);
if (log.isDebugEnabled())
log.debug("Executed onKernalStart() callback for cache [name=" + cache.name() + ", mode=" +
cache.configuration().getCacheMode() + ']');
}
/**
* @param cache Cache to stop.
* @param cancel Cancel flag.
*/
@SuppressWarnings("unchecked")
private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {
GridCacheContext ctx = cache.context();
if (isNearEnabled(ctx)) {
GridDhtCacheAdapter dht = ctx.near().dht();
if (dht != null) {
GridCacheContext<?, ?> dhtCtx = dht.context();
for (GridCacheManager mgr : dhtManagers(dhtCtx))
mgr.onKernalStop(cancel);
dht.onKernalStop();
}
}
List<GridCacheManager> mgrs = ctx.managers();
Collection<GridCacheManager> excludes = dhtExcludes(ctx);
// Reverse order.
for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious(); ) {
GridCacheManager mgr = it.previous();
if (!excludes.contains(mgr))
mgr.onKernalStop(cancel);
}
cache.onKernalStop();
if (!ctx.isRecoveryMode() && ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED))
ctx.events().addEvent(EventType.EVT_CACHE_STOPPED);
}
/**
* @param cfg Cache configuration to use to create cache.
* @param grp Cache group.
* @param pluginMgr Cache plugin manager.
* @param desc Cache descriptor.
* @param locStartTopVer Current topology version.
* @param cacheObjCtx Cache object context.
* @param affNode {@code True} if local node affinity node.
* @param updatesAllowed Updates allowed flag.
* @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will
* change state of proxies to restarting
* @return Cache context.
* @throws IgniteCheckedException If failed to create cache.
*/
private GridCacheContext<?, ?> createCacheContext(
CacheConfiguration<?, ?> cfg,
CacheGroupContext grp,
@Nullable CachePluginManager pluginMgr,
DynamicCacheDescriptor desc,
AffinityTopologyVersion locStartTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
boolean updatesAllowed,
boolean disabledAfterStart,
boolean recoveryMode
) throws IgniteCheckedException {
assert cfg != null;
if (cfg.getCacheStoreFactory() instanceof GridCacheLoaderWriterStoreFactory) {
GridCacheLoaderWriterStoreFactory factory = (GridCacheLoaderWriterStoreFactory)cfg.getCacheStoreFactory();
prepare(cfg, factory.loaderFactory(), false);
prepare(cfg, factory.writerFactory(), false);
}
else
prepare(cfg, cfg.getCacheStoreFactory(), false);
CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create() : null;
ValidationOnNodeJoinUtils.validate(ctx.config(), cfg, desc.cacheType(), cfgStore, ctx, log, (x, y) -> {
try {
assertParameter(x, y);
}
catch (IgniteCheckedException ex) {
return ex;
}
return null;
});
if (pluginMgr == null)
pluginMgr = new CachePluginManager(ctx, cfg);
pluginMgr.validate();
if (!recoveryMode && cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT && grp.affinityNode())
sharedCtx.coordinators().ensureStarted();
sharedCtx.jta().registerCache(cfg);
// Skip suggestions for internal caches.
if (desc.cacheType().userCache())
suggestOptimizations(cfg, cfgStore != null);
Collection<Object> toPrepare = new ArrayList<>();
if (cfgStore instanceof GridCacheLoaderWriterStore) {
toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader());
toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer());
}
else
toPrepare.add(cfgStore);
prepare(cfg, toPrepare);
U.startLifecycleAware(lifecycleAwares(grp, cfg, cfgStore));
boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg);
CacheCompressionManager compressMgr = new CacheCompressionManager();
GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
GridCacheEventManager evtMgr = new GridCacheEventManager();
CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled())
? new GridCacheEvictionManager()
: new CacheOffheapEvictionManager();
GridCacheQueryManager qryMgr = new GridCacheDistributedQueryManager();
CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
CacheConflictResolutionManager rslvrMgr = pluginMgr.createComponent(CacheConflictResolutionManager.class);
GridCacheDrManager drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
CacheStoreManager storeMgr = pluginMgr.createComponent(CacheStoreManager.class);
PlatformCacheManager platformMgr = ctx.platform().cacheManager();
if (cfgStore == null)
storeMgr.initialize(cfgStore, sesHolders);
else
initializationProtector.protect(
cfgStore,
() -> storeMgr.initialize(cfgStore, sesHolders)
);
GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
ctx,
sharedCtx,
cfg,
grp,
desc.cacheType(),
locStartTopVer,
desc.deploymentId(),
affNode,
updatesAllowed,
desc.cacheConfiguration().isStatisticsEnabled(),
recoveryMode,
/*
* Managers in starting order!
* ===========================
*/
compressMgr,
evtMgr,
storeMgr,
evictMgr,
qryMgr,
contQryMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
pluginMgr,
affMgr,
platformMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
GridCacheAdapter cache = null;
switch (cfg.getCacheMode()) {
case PARTITIONED:
case REPLICATED: {
if (nearEnabled) {
switch (cfg.getAtomicityMode()) {
case TRANSACTIONAL:
case TRANSACTIONAL_SNAPSHOT: {
cache = new GridNearTransactionalCache(cacheCtx);
break;
}
case ATOMIC: {
cache = new GridNearAtomicCache(cacheCtx);
break;
}
default: {
assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
}
}
}
else {
switch (cfg.getAtomicityMode()) {
case TRANSACTIONAL:
case TRANSACTIONAL_SNAPSHOT: {
cache = cacheCtx.affinityNode() ?
new GridDhtColocatedCache(cacheCtx) :
new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap());
break;
}
case ATOMIC: {
cache = cacheCtx.affinityNode() ?
new GridDhtAtomicCache(cacheCtx) :
new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap());
break;
}
default: {
assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
}
}
}
break;
}
default: {
assert false : "Invalid cache mode: " + cfg.getCacheMode();
}
}
cache.active(!disabledAfterStart);
cacheCtx.cache(cache);
GridCacheContext<?, ?> ret = cacheCtx;
/*
* Create DHT cache.
* ================
*/
if (nearEnabled) {
/*
* Specifically don't create the following managers
* here and reuse the one from Near cache:
* 1. GridCacheVersionManager
* 2. GridCacheIoManager
* 3. GridCacheDeploymentManager
* 4. GridCacheQueryManager (note, that we start it for DHT cache though).
* 5. CacheContinuousQueryManager (note, that we start it for DHT cache though).
* 6. GridCacheDgcManager
* 7. GridCacheTtlManager.
* 8. PlatformCacheManager.
* ===============================================
*/
evictMgr = cfg.isOnheapCacheEnabled() ? new GridCacheEvictionManager() : new CacheOffheapEvictionManager();
evtMgr = new GridCacheEventManager();
pluginMgr = new CachePluginManager(ctx, cfg);
drMgr = pluginMgr.createComponent(GridCacheDrManager.class);
cacheCtx = new GridCacheContext(
ctx,
sharedCtx,
cfg,
grp,
desc.cacheType(),
locStartTopVer,
desc.deploymentId(),
affNode,
true,
desc.cacheConfiguration().isStatisticsEnabled(),
recoveryMode,
/*
* Managers in starting order!
* ===========================
*/
compressMgr,
evtMgr,
storeMgr,
evictMgr,
qryMgr,
contQryMgr,
dataStructuresMgr,
ttlMgr,
drMgr,
rslvrMgr,
pluginMgr,
affMgr,
platformMgr
);
cacheCtx.cacheObjectContext(cacheObjCtx);
GridDhtCacheAdapter dht = null;
switch (cfg.getAtomicityMode()) {
case TRANSACTIONAL:
case TRANSACTIONAL_SNAPSHOT: {
assert cache instanceof GridNearTransactionalCache;
GridNearTransactionalCache near = (GridNearTransactionalCache)cache;
GridDhtCache dhtCache = cacheCtx.affinityNode() ?
new GridDhtCache(cacheCtx) :
new GridDhtCache(cacheCtx, new GridNoStorageCacheMap());
dhtCache.near(near);
near.dht(dhtCache);
dht = dhtCache;
break;
}
case ATOMIC: {
assert cache instanceof GridNearAtomicCache;
GridNearAtomicCache near = (GridNearAtomicCache)cache;
GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ?
new GridDhtAtomicCache(cacheCtx) :
new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap());
dhtCache.near(near);
near.dht(dhtCache);
dht = dhtCache;
break;
}
default: {
assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
}
}
cacheCtx.cache(dht);
}
if (!CU.isUtilityCache(cache.name()) && !CU.isSystemCache(cache.name())) {
registerMbean(cache.localMxBean(), cache.name(), false);
registerMbean(cache.clusterMxBean(), cache.name(), false);
}
return ret;
}
/**
* @param reqs Cache requests to start.
* @param fut Completable future.
*/
public void registrateProxyRestart(Map<String, DynamicCacheChangeRequest> reqs, GridFutureAdapter<?> fut) {
for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
if (reqs.containsKey(proxy.getName()) &&
proxy.isRestarting() &&
!reqs.get(proxy.getName()).disabledAfterStart()
)
proxy.registrateFutureRestart(fut);
}
}
/**
* @param reqs Cache requests to start.
* @param initVer Init exchange version.
* @param doneVer Finish excahnge vertison.
*/
public void completeProxyRestart(
Map<String, DynamicCacheChangeRequest> reqs,
AffinityTopologyVersion initVer,
AffinityTopologyVersion doneVer
) {
if (initVer == null || doneVer == null)
return;
for (GridCacheAdapter<?, ?> cache : caches.values()) {
GridCacheContext<?, ?> cacheCtx = cache.context();
if (reqs.containsKey(cache.name()) ||
(cacheCtx.startTopologyVersion().compareTo(initVer) <= 0 ||
cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0))
completeProxyInitialize(cache.name());
if (
cacheCtx.startTopologyVersion().compareTo(initVer) >= 0 &&
cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0
) {
IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cache.name());
boolean canRestart = Optional.ofNullable(reqs.get(cache.name()))
.map(req -> !req.disabledAfterStart())
.orElse(true);
if (proxy != null && proxy.isRestarting() && canRestart) {
proxy.onRestarted(cacheCtx, cache);
if (cacheCtx.dataStructuresCache())
ctx.dataStructures().restart(cache.name(), proxy.internalProxy());
}
}
}
}
/**
* Gets a collection of currently started caches.
*
* @return Collection of started cache names.
*/
public Collection<String> cacheNames() {
return F.viewReadOnly(cacheDescriptors().values(),
new IgniteClosure<DynamicCacheDescriptor, String>() {
@Override public String apply(DynamicCacheDescriptor desc) {
return desc.cacheConfiguration().getName();
}
});
}
/**
* Gets public cache that can be used for query execution. If cache isn't created on current node it will be
* started.
*
* @param start Start cache.
* @return Cache or {@code null} if there is no suitable cache.
*/
public IgniteCacheProxy<?, ?> getOrStartPublicCache(boolean start) throws IgniteCheckedException {
// Try to find started cache first.
for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
if (!e.getValue().context().userCache())
continue;
CacheConfiguration ccfg = e.getValue().configuration();
String cacheName = ccfg.getName();
return publicJCache(cacheName);
}
if (start) {
for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) {
DynamicCacheDescriptor desc = e.getValue();
if (!desc.cacheType().userCache())
continue;
CacheConfiguration ccfg = desc.cacheConfiguration();
dynamicStartCache(null, ccfg.getName(), null, false, true, true).get();
return publicJCache(ccfg.getName());
}
}
return null;
}
/**
* Gets a collection of currently started public cache names.
*
* @return Collection of currently started public cache names
*/
public Collection<String> publicCacheNames() {
return F.viewReadOnly(cacheDescriptors().values(),
new IgniteClosure<DynamicCacheDescriptor, String>() {
@Override public String apply(DynamicCacheDescriptor desc) {
return desc.cacheConfiguration().getName();
}
},
new IgnitePredicate<DynamicCacheDescriptor>() {
@Override public boolean apply(DynamicCacheDescriptor desc) {
return desc.cacheType().userCache();
}
}
);
}
/**
* Gets a collection of currently started public cache names.
*
* @return Collection of currently started public cache names
*/
public Collection<String> publicAndDsCacheNames() {
return F.viewReadOnly(cacheDescriptors().values(),
new IgniteClosure<DynamicCacheDescriptor, String>() {
@Override public String apply(DynamicCacheDescriptor desc) {
return desc.cacheConfiguration().getName();
}
},
new IgnitePredicate<DynamicCacheDescriptor>() {
@Override public boolean apply(DynamicCacheDescriptor desc) {
return desc.cacheType().userCache() || desc.cacheType() == CacheType.DATA_STRUCTURES;
}
}
);
}
/**
* Gets cache mode.
*
* @param cacheName Cache name to check.
* @return Cache mode.
*/
public CacheMode cacheMode(String cacheName) {
assert cacheName != null;
DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
return desc != null ? desc.cacheConfiguration().getCacheMode() : null;
}
/**
* @return Caches to be started when this node starts.
*/
@Nullable public LocalJoinCachesContext localJoinCachesContext() {
if (ctx.discovery().localNode().order() == 1)
cachesInfo.filterDynamicCacheDescriptors(locCfgMgr.localCachesOnStart());
return cachesInfo.localJoinCachesContext();
}
/**
* @param exchTopVer Local join exchange future version.
* @param locJoinCtx Local join cache context.
* @throws IgniteCheckedException If failed.
*/
public IgniteInternalFuture<?> startCachesOnLocalJoin(
AffinityTopologyVersion exchTopVer,
LocalJoinCachesContext locJoinCtx
) throws IgniteCheckedException {
long time = U.currentTimeMillis();
if (locJoinCtx == null)
return new GridFinishedFuture<>();
IgniteInternalFuture<?> res = sharedCtx.affinity().initCachesOnLocalJoin(
locJoinCtx.cacheGroupDescriptors(), locJoinCtx.cacheDescriptors());
for (DynamicCacheDescriptor d: locJoinCtx.cacheDescriptors().values())
ctx.coordinators().validateCacheConfiguration(d.cacheConfiguration());
List<StartCacheInfo> startCacheInfos = locJoinCtx.caches().stream()
.map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false))
.collect(toList());
locJoinCtx.initCaches()
.forEach(cacheDesc -> {
try {
ctx.query().initQueryStructuresForNotStartedCache(cacheDesc);
}
catch (Exception e) {
log.error("Can't initialize query structures for not started cache [cacheName=" +
cacheDesc.cacheName() + "]", e);
}
});
prepareStartCaches(startCacheInfos);
context().exchange().exchangerUpdateHeartbeat();
if (log.isInfoEnabled())
log.info("Starting caches on local join performed in " + (U.currentTimeMillis() - time) + " ms.");
return res;
}
/**
* @param node Joined node.
* @return {@code True} if there are new caches received from joined node.
*/
public boolean hasCachesReceivedFromJoin(ClusterNode node) {
return cachesInfo.hasCachesReceivedFromJoin(node.id());
}
/**
* Starts statically configured caches received from remote nodes during exchange.
*
* @param nodeId Joining node ID.
* @param exchTopVer Current exchange version.
* @return Started caches descriptors.
* @throws IgniteCheckedException If failed.
*/
public Collection<DynamicCacheDescriptor> startReceivedCaches(UUID nodeId, AffinityTopologyVersion exchTopVer)
throws IgniteCheckedException {
List<DynamicCacheDescriptor> receivedCaches = cachesInfo.cachesReceivedFromJoin(nodeId);
List<StartCacheInfo> startCacheInfos = receivedCaches.stream()
.filter(desc -> isLocalAffinity(desc.groupDescriptor().config()))
.map(desc -> new StartCacheInfo(desc, null, exchTopVer, false))
.collect(toList());
prepareStartCaches(startCacheInfos);
return receivedCaches;
}
/**
* @param cacheConfiguration Checked configuration.
* @return {@code true} if local node is affinity node for cache.
*/
private boolean isLocalAffinity(CacheConfiguration cacheConfiguration) {
return CU.affinityNode(ctx.discovery().localNode(), cacheConfiguration.getNodeFilter());
}
/**
* Start all input caches in parallel.
*
* @param startCacheInfos All caches information for start.
*/
void prepareStartCaches(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException {
prepareStartCaches(startCacheInfos, (data, operation) -> {
operation.apply(data); // PROXY
});
}
/**
* Trying to start all input caches in parallel and skip failed caches.
*
* @param startCacheInfos Caches info for start.
* @return Caches which was failed.
* @throws IgniteCheckedException if failed.
*/
Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible(
Collection<StartCacheInfo> startCacheInfos
) throws IgniteCheckedException {
HashMap<StartCacheInfo, IgniteCheckedException> failedCaches = new HashMap<>();
prepareStartCaches(startCacheInfos, (data, operation) -> {
try {
operation.apply(data);
}
catch (IgniteInterruptedCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
log.warning("Cache can not be started : cache=" + data.getStartedConfiguration().getName());
failedCaches.put(data, e);
}
});
return failedCaches;
}
/**
* Start all input caches in parallel.
*
* @param startCacheInfos All caches information for start.
* @param cacheStartFailHandler Fail handler for one cache start.
*/
private void prepareStartCaches(
Collection<StartCacheInfo> startCacheInfos,
StartCacheFailHandler<StartCacheInfo, Void> cacheStartFailHandler
) throws IgniteCheckedException {
if (!IGNITE_ALLOW_START_CACHES_IN_PARALLEL || startCacheInfos.size() <= 1) {
for (StartCacheInfo startCacheInfo : startCacheInfos) {
cacheStartFailHandler.handle(
startCacheInfo,
cacheInfo -> {
prepareCacheStart(
cacheInfo.getCacheDescriptor(),
cacheInfo.getReqNearCfg(),
cacheInfo.getExchangeTopVer(),
cacheInfo.isDisabledAfterStart(),
cacheInfo.isClientCache()
);
return null;
}
);
context().exchange().exchangerUpdateHeartbeat();
}
}
else {
Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>();
// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
doInParallel(
parallelismLvl,
sharedCtx.kernalContext().pools().getSystemExecutorService(),
startCacheInfos,
startCacheInfo -> {
cacheStartFailHandler.handle(
startCacheInfo,
cacheInfo -> {
GridCacheContext cacheCtx = prepareCacheContext(
cacheInfo.getCacheDescriptor(),
cacheInfo.getReqNearCfg(),
cacheInfo.getExchangeTopVer(),
cacheInfo.isDisabledAfterStart()
);
cacheContexts.put(cacheInfo, cacheCtx);
context().exchange().exchangerUpdateHeartbeat();
return null;
}
);
return null;
}
);
/*
* This hack required because we can't start sql schema in parallel by folowing reasons:
* * checking index to duplicate(and other checking) require one order on every nodes.
* * onCacheStart and createSchema contains a lot of mutex.
*
* TODO IGNITE-9729
*/
Set<StartCacheInfo> successfullyPreparedCaches = cacheContexts.keySet();
List<StartCacheInfo> cacheInfosInOriginalOrder = startCacheInfos.stream()
.filter(successfullyPreparedCaches::contains)
.collect(toList());
for (StartCacheInfo startCacheInfo : cacheInfosInOriginalOrder) {
cacheStartFailHandler.handle(
startCacheInfo,
cacheInfo -> {
GridCacheContext cctx = cacheContexts.get(cacheInfo);
if (!cctx.isRecoveryMode()) {
ctx.query().onCacheStart(
new GridCacheContextInfo(cctx, cacheInfo.isClientCache()),
cacheInfo.getCacheDescriptor().schema() != null
? cacheInfo.getCacheDescriptor().schema()
: new QuerySchema(),
cacheInfo.getCacheDescriptor().sql()
);
}
context().exchange().exchangerUpdateHeartbeat();
return null;
}
);
}
doInParallel(
parallelismLvl,
sharedCtx.kernalContext().pools().getSystemExecutorService(),
cacheContexts.entrySet(),
cacheCtxEntry -> {
cacheStartFailHandler.handle(
cacheCtxEntry.getKey(),
cacheInfo -> {
GridCacheContext<?, ?> cacheContext = cacheCtxEntry.getValue();
if (cacheContext.isRecoveryMode())
finishRecovery(cacheInfo.getExchangeTopVer(), cacheContext);
else
onCacheStarted(cacheCtxEntry.getValue());
context().exchange().exchangerUpdateHeartbeat();
return null;
}
);
return null;
}
);
}
}
/**
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param exchTopVer Current exchange version.
* @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will
* change state of proxies to restarting
* @throws IgniteCheckedException If failed.
*/
public void prepareCacheStart(
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
AffinityTopologyVersion exchTopVer,
boolean disabledAfterStart,
boolean clientCache
) throws IgniteCheckedException {
GridCacheContext cacheCtx = prepareCacheContext(desc, reqNearCfg, exchTopVer, disabledAfterStart);
if (cacheCtx.isRecoveryMode())
finishRecovery(exchTopVer, cacheCtx);
else {
ctx.query().onCacheStart(
new GridCacheContextInfo(cacheCtx, clientCache),
desc.schema() != null ? desc.schema() : new QuerySchema(),
desc.sql()
);
onCacheStarted(cacheCtx);
}
}
/**
* Preparing cache context to start.
*
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param exchTopVer Current exchange version.
* @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change
* state of proxies to restarting
* @return Created {@link GridCacheContext}.
* @throws IgniteCheckedException if failed.
*/
private GridCacheContext prepareCacheContext(
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
AffinityTopologyVersion exchTopVer,
boolean disabledAfterStart
) throws IgniteCheckedException {
desc = enricher().enrich(desc, isLocalAffinity(desc.cacheConfiguration()));
CacheConfiguration startCfg = desc.cacheConfiguration();
if (caches.containsKey(startCfg.getName())) {
GridCacheAdapter<?, ?> existingCache = caches.get(startCfg.getName());
GridCacheContext<?, ?> cctx = existingCache.context();
assert cctx.isRecoveryMode();
QuerySchema localSchema = recovery.querySchemas.get(desc.cacheId());
QuerySchemaPatch localSchemaPatch = localSchema.makePatch(desc.schema().entities());
// Cache schema is changed after restart, workaround is stop existing cache and start new.
if (!localSchemaPatch.isEmpty() || localSchemaPatch.hasConflicts())
stopCacheSafely(cctx);
else
return existingCache.context();
}
assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
CacheConfiguration ccfg = new CacheConfiguration(startCfg);
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
boolean affNode = checkForAffinityNode(desc, reqNearCfg, ccfg);
ctx.cache().context().database().checkpointReadLock();
try {
CacheGroupContext grp = getOrCreateCacheGroupContext(
desc, exchTopVer, cacheObjCtx, affNode, startCfg.getGroupName(), false);
GridCacheContext cacheCtx = createCacheContext(ccfg,
grp,
null,
desc,
exchTopVer,
cacheObjCtx,
affNode,
true,
disabledAfterStart,
false
);
initCacheContext(cacheCtx, ccfg);
return cacheCtx;
}
finally {
ctx.cache().context().database().checkpointReadUnlock();
}
}
/**
* Stops cache under checkpoint lock.
*
* @param cctx Cache context.
*/
private void stopCacheSafely(GridCacheContext<?, ?> cctx) {
stopCacheSafely(cctx, true);
}
/**
* Stops cache under checkpoint lock.
*
* @param cctx Cache context.
* @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
* The parameters was added due to make differ between cache.close() on client node and distributed destroy cache
* (e.g. call cache.destroy()).
*
* Before add the parameter {@code clearDbObjects}:
* when on client node is joined we initialize H2 objects for all caches in cluster,
* but on client cache.close() we destroy part of created objects,
* making impossible running SQL queries on that cache;
* clientCache.close() should restore status quo of state right after client join instead).
*/
private void stopCacheSafely(GridCacheContext<?, ?> cctx, boolean clearDbObjects) {
sharedCtx.database().checkpointReadLock();
try {
prepareCacheStop(cctx.name(), false, false, clearDbObjects);
if (!cctx.group().hasCaches())
stopCacheGroup(cctx.group().groupId(), false);
}
finally {
sharedCtx.database().checkpointReadUnlock();
}
}
/**
* Finishes recovery for given cache context.
*
* @param cacheStartVer Cache join to topology version.
* @param cacheContext Cache context.
* @throws IgniteCheckedException If failed.
*/
private void finishRecovery(
AffinityTopologyVersion cacheStartVer,
GridCacheContext<?, ?> cacheContext
) throws IgniteCheckedException {
CacheGroupContext groupContext = cacheContext.group();
// Take cluster-wide cache descriptor and try to update local cache and cache group parameters.
DynamicCacheDescriptor updatedDescriptor = cacheDescriptor(cacheContext.cacheId());
groupContext.finishRecovery(
cacheStartVer,
updatedDescriptor.receivedFrom(),
isLocalAffinity(updatedDescriptor.cacheConfiguration())
);
cacheContext.finishRecovery(cacheStartVer, updatedDescriptor);
if (cacheContext.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT && groupContext.affinityNode())
sharedCtx.coordinators().ensureStarted();
onKernalStart(cacheContext.cache());
if (log.isInfoEnabled())
log.info("Finished recovery for cache [cache=" + cacheContext.name()
+ ", grp=" + groupContext.cacheOrGroupName() + ", startVer=" + cacheStartVer + "]");
}
/**
* Stops all caches and groups, that was recovered, but not activated on node join. Such caches can remain only if
* it was filtered by node filter on current node. It's impossible to check whether current node is affinity node
* for given cache before join to topology.
*/
public void shutdownNotFinishedRecoveryCaches() {
for (GridCacheAdapter cacheAdapter : caches.values()) {
GridCacheContext cacheContext = cacheAdapter.context();
if (cacheContext.isRecoveryMode()) {
assert !isLocalAffinity(cacheContext.config())
: "Cache " + cacheAdapter.context() + " is still in recovery mode after start, but not activated.";
stopCacheSafely(cacheContext);
}
}
}
/**
* Check for affinity node and customize near configuration if needed.
*
* @param desc Cache descriptor.
* @param reqNearCfg Near configuration if specified for client cache start request.
* @param ccfg Cache configuration to use.
* @return {@code true} if it is affinity node for cache.
*/
private boolean checkForAffinityNode(
DynamicCacheDescriptor desc,
@Nullable NearCacheConfiguration reqNearCfg,
CacheConfiguration ccfg
) {
if (isLocalAffinity(desc.cacheConfiguration()))
return true;
ccfg.setNearConfiguration(reqNearCfg);
return false;
}
/**
* Prepare page store for start cache.
*
* @param desc Cache descriptor.
* @param affNode {@code true} if it is affinity node for cache.
* @throws IgniteCheckedException if failed.
*/
public void preparePageStore(DynamicCacheDescriptor desc, boolean affNode) throws IgniteCheckedException {
if (sharedCtx.pageStore() != null && affNode)
initializationProtector.protect(
desc.groupDescriptor().groupId(),
() -> sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData(splitter).config())
);
}
/**
* Prepare cache group to start cache.
*
* @param desc Cache descriptor.
* @param exchTopVer Current exchange version.
* @param cacheObjCtx Cache object context.
* @param affNode {@code true} if it is affinity node for cache.
* @param grpName Group name.
* @return Prepared cache group context.
* @throws IgniteCheckedException if failed.
*/
private CacheGroupContext getOrCreateCacheGroupContext(
DynamicCacheDescriptor desc,
AffinityTopologyVersion exchTopVer,
CacheObjectContext cacheObjCtx,
boolean affNode,
String grpName,
boolean recoveryMode
) throws IgniteCheckedException {
if (grpName != null) {
return initializationProtector.protect(
desc.groupId(),
() -> findCacheGroup(grpName),
() -> startCacheGroup(
desc.groupDescriptor(),
desc.cacheType(),
affNode,
cacheObjCtx,
exchTopVer,
recoveryMode
)
);
}
return startCacheGroup(desc.groupDescriptor(),
desc.cacheType(),
affNode,
cacheObjCtx,
exchTopVer,
recoveryMode
);
}
/**
* Initialize created cache context.
*
* @param cacheCtx Cache context to initializtion.
* @param cfg Cache configuration.
* @throws IgniteCheckedException if failed.
*/
private void initCacheContext(
GridCacheContext<?, ?> cacheCtx,
CacheConfiguration cfg
) throws IgniteCheckedException {
GridCacheAdapter cache = cacheCtx.cache();
sharedCtx.addCacheContext(cacheCtx);
caches.put(cacheCtx.name(), cache);
// Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
if (cfg.isStoreKeepBinary() && cfg.isStoreKeepBinary() != CacheConfiguration.DFLT_STORE_KEEP_BINARY
&& !(ctx.config().getMarshaller() instanceof BinaryMarshaller))
U.warn(log, "CacheConfiguration.isStoreKeepBinary() configuration property will be ignored because " +
"BinaryMarshaller is not used");
// Start managers.
for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
mgr.start(cacheCtx);
cacheCtx.initConflictResolver();
if (GridCacheUtils.isNearEnabled(cfg)) {
GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
// Start DHT managers.
for (GridCacheManager mgr : dhtManagers(dhtCtx))
mgr.start(dhtCtx);
dhtCtx.initConflictResolver();
// Start DHT cache.
dhtCtx.cache().start();
if (log.isDebugEnabled())
log.debug("Started DHT cache: " + dhtCtx.cache().name());
}
ctx.continuous().onCacheStart(cacheCtx);
cacheCtx.cache().start();
}
/**
* Handle of cache context which was fully prepared.
*
* @param cacheCtx Fully prepared context.
* @throws IgniteCheckedException if failed.
*/
private void onCacheStarted(GridCacheContext cacheCtx) throws IgniteCheckedException {
GridCacheAdapter cache = cacheCtx.cache();
CacheConfiguration cfg = cacheCtx.config();
CacheGroupContext grp = cacheGrps.get(cacheCtx.groupId());
cacheCtx.onStarted();
String dataRegion = cfg.getDataRegionName();
if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
if (log.isInfoEnabled()) {
String expPlcInfo = buildExpirePolicyInfo(cacheCtx);
log.info("Started cache [name=" + cfg.getName() +
", id=" + cacheCtx.cacheId() +
(cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
", dataRegionName=" + dataRegion +
", mode=" + cfg.getCacheMode() +
", atomicity=" + cfg.getAtomicityMode() +
", backups=" + cfg.getBackups() +
", mvcc=" + cacheCtx.mvccEnabled() +
(expPlcInfo != null ? ", " + expPlcInfo : "") + ']');
}
grp.onCacheStarted(cacheCtx);
onKernalStart(cache);
if (ctx.performanceStatistics().enabled() && U.isLocalNodeCoordinator(ctx.discovery()))
ctx.performanceStatistics().cacheStart(cacheCtx.cacheId(), cfg.getName());
}
/**
* @param desc Cache descriptor.
* @throws IgniteCheckedException If failed.
*/
private GridCacheContext<?, ?> startCacheInRecoveryMode(
DynamicCacheDescriptor desc
) throws IgniteCheckedException {
// Only affinity nodes are able to start cache in recovery mode.
desc = enricher().enrich(desc, true);
CacheConfiguration cfg = desc.cacheConfiguration();
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
preparePageStore(desc, true);
CacheGroupContext grpCtx;
GridCacheContext cacheCtx;
ctx.cache().context().database().checkpointReadLock();
try {
grpCtx = getOrCreateCacheGroupContext(
desc,
AffinityTopologyVersion.NONE,
cacheObjCtx,
true,
cfg.getGroupName(),
true
);
cacheCtx = createCacheContext(cfg,
grpCtx,
null,
desc,
AffinityTopologyVersion.NONE,
cacheObjCtx,
true,
true,
false,
true
);
initCacheContext(cacheCtx, cfg);
}
finally {
ctx.cache().context().database().checkpointReadUnlock();
}
cacheCtx.onStarted();
String dataRegion = cfg.getDataRegionName();
if (dataRegion == null && ctx.config().getDataStorageConfiguration() != null)
dataRegion = ctx.config().getDataStorageConfiguration().getDefaultDataRegionConfiguration().getName();
grpCtx.onCacheStarted(cacheCtx);
ctx.query().onCacheStart(new GridCacheContextInfo(cacheCtx, false),
desc.schema() != null ? desc.schema() : new QuerySchema(), desc.sql());
if (log.isInfoEnabled()) {
String expPlcInfo = buildExpirePolicyInfo(cacheCtx);
log.info("Started cache in recovery mode [name=" + cfg.getName() +
", id=" + cacheCtx.cacheId() +
(cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
", dataRegionName=" + dataRegion +
", mode=" + cfg.getCacheMode() +
", atomicity=" + cfg.getAtomicityMode() +
", backups=" + cfg.getBackups() +
", mvcc=" + cacheCtx.mvccEnabled() +
(expPlcInfo != null ? ", " + expPlcInfo : "") + ']');
}
return cacheCtx;
}
/**
* Build formatted string with expire policy info.
*
* @param cacheCtx - cache context.
* @return formatted expire policy info.
*/
private String buildExpirePolicyInfo(@NotNull GridCacheContext cacheCtx) {
ExpiryPolicy expPlc = cacheCtx.expiry();
if (expPlc == null || expPlc instanceof EternalExpiryPolicy)
return null;
return String.format(EXPRITY_POLICY_MSG, expPlc.getClass().getName(), cacheCtx.ttl().eagerTtlEnabled());
}
/**
* @param grpName Group name.
* @return Found group or null.
*/
private CacheGroupContext findCacheGroup(String grpName) {
return cacheGrps.values().stream()
.filter(grp -> grp.sharedGroup() && grpName.equals(grp.name()))
.findAny()
.orElse(null);
}
/**
* Restarts proxies of caches if they was marked as restarting. Requires external synchronization - shouldn't be
* called concurrently with another caches restart.
*/
public void restartProxies() {
for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
if (proxy == null)
continue;
GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName()));
if (cacheCtx == null)
continue;
if (proxy.isRestarting()) {
caches.get(proxy.getName()).active(true);
proxy.onRestarted(cacheCtx, cacheCtx.cache());
if (cacheCtx.dataStructuresCache())
ctx.dataStructures().restart(proxy.getName(), proxy.internalProxy());
}
}
}
/**
* Complete stopping of caches if they were marked as restarting but it failed.
* @return Cache names of proxies which were restarted.
*/
public List<String> resetRestartingProxies() {
List<String> res = new ArrayList<>();
for (Map.Entry<String, IgniteCacheProxyImpl<?, ?>> e : jCacheProxies.entrySet()) {
IgniteCacheProxyImpl<?, ?> proxy = e.getValue();
if (proxy == null)
continue;
if (proxy.isRestarting()) {
String cacheName = e.getKey();
res.add(cacheName);
jCacheProxies.remove(cacheName);
proxy.onRestarted(null, null);
if (DataStructuresProcessor.isDataStructureCache(cacheName))
ctx.dataStructures().restart(cacheName, null);
}
}
cachesInfo.removeRestartingCaches();
return res;
}
/**
* @param desc Group descriptor.
* @param cacheType Cache type.
* @param affNode Affinity node flag.
* @param cacheObjCtx Cache object context.
* @param exchTopVer Current topology version.
* @return Started cache group.
* @throws IgniteCheckedException If failed.
*/
private CacheGroupContext startCacheGroup(
CacheGroupDescriptor desc,
CacheType cacheType,
boolean affNode,
CacheObjectContext cacheObjCtx,
AffinityTopologyVersion exchTopVer,
boolean recoveryMode
) throws IgniteCheckedException {
desc = enricher().enrich(desc, affNode);
CacheConfiguration cfg = new CacheConfiguration(desc.config());
String memPlcName = cfg.getDataRegionName();
DataRegion dataRegion = affNode ? sharedCtx.database().dataRegion(memPlcName) : null;
boolean needToStart = (dataRegion != null)
&& (cacheType != CacheType.USER
|| (sharedCtx.isLazyMemoryAllocation(dataRegion)
&& !cacheObjCtx.kernalContext().clientNode()));
if (needToStart)
dataRegion.pageMemory().start();
FreeList freeList = sharedCtx.database().freeList(memPlcName);
ReuseList reuseList = sharedCtx.database().reuseList(memPlcName);
boolean persistenceEnabled = recoveryMode || sharedCtx.localNode().isClient() ? desc.persistenceEnabled() :
dataRegion != null && dataRegion.config().isPersistenceEnabled();
CacheGroupContext grp = new CacheGroupContext(sharedCtx,
desc.groupId(),
desc.receivedFrom(),
cacheType,
cfg,
affNode,
dataRegion,
cacheObjCtx,
freeList,
reuseList,
exchTopVer,
persistenceEnabled,
desc.walEnabled(),
recoveryMode
);
for (Object obj : grp.configuredUserObjects())
prepare(cfg, obj, false);
U.startLifecycleAware(grp.configuredUserObjects());
grp.start();
CacheGroupContext old = cacheGrps.put(desc.groupId(), grp);
if (!grp.systemCache() && !U.IGNITE_MBEANS_DISABLED) {
try {
U.registerMBean(ctx.config().getMBeanServer(), ctx.igniteInstanceName(), CACHE_GRP_METRICS_MBEAN_GRP,
grp.cacheOrGroupName(), new CacheGroupMetricsMXBeanImpl(grp), CacheGroupMetricsMXBean.class);
}
catch (Throwable e) {
U.error(log, "Failed to register MBean for cache group: " + grp.name(), e);
}
}
assert old == null : old.name();
return grp;
}
/**
* @param cacheName Cache name.
* @param stop {@code True} for stop cache, {@code false} for close cache.
* @param restart Restart flag.
*/
public void blockGateway(String cacheName, boolean stop, boolean restart) {
// Break the proxy before exchange future is done.
IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, false);
if (restart) {
GridCacheAdapter<?, ?> cache = caches.get(cacheName);
if (cache != null)
cache.active(false);
}
if (stop) {
if (restart) {
GridCacheAdapter<?, ?> cache;
if (proxy == null && (cache = caches.get(cacheName)) != null) {
proxy = new IgniteCacheProxyImpl(cache.context(), cache, false);
IgniteCacheProxyImpl<?, ?> oldProxy = jCacheProxies.putIfAbsent(cacheName, proxy);
if (oldProxy != null)
proxy = oldProxy;
}
if (proxy != null)
proxy.suspend();
}
if (proxy != null)
proxy.context0().gate().stopped();
}
else if (proxy != null)
proxy.closeProxy();
}
/**
* @param req Request.
*/
private void stopGateway(DynamicCacheChangeRequest req) {
assert req.stop() : req;
IgniteCacheProxyImpl<?, ?> proxy;
// Break the proxy before exchange future is done.
if (req.restart()) {
if (DataStructuresProcessor.isDataStructureCache(req.cacheName()))
ctx.dataStructures().suspend(req.cacheName());
GridCacheAdapter<?, ?> cache = caches.get(req.cacheName());
if (cache != null)
cache.active(false);
proxy = jCacheProxies.get(req.cacheName());
if (proxy != null)
proxy.suspend();
}
else {
completeProxyInitialize(req.cacheName());
proxy = jCacheProxies.remove(req.cacheName());
}
if (proxy != null)
proxy.context0().gate().onStopped();
}
/**
* @param cacheName Cache name.
* @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
* @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
*/
public void prepareCacheStop(String cacheName, boolean callDestroy, boolean clearCache) {
prepareCacheStop(cacheName, callDestroy, clearCache, true);
}
/**
* @param cacheName Cache name.
* @param callDestroy Cache destroy flag. This flag is passed to the {@link GridCacheManager#stop} method.
* @param clearCache Cache data clear flag. Setting to {@code true} will remove all cache data.
* @param clearDbObjects If {@code false} DB objects don't removed (used for cache.close() on client node).
*/
public void prepareCacheStop(String cacheName, boolean callDestroy, boolean clearCache, boolean clearDbObjects) {
assert sharedCtx.database().checkpointLockIsHeldByThread();
GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
if (cache != null) {
GridCacheContext<?, ?> ctx = cache.context();
sharedCtx.removeCacheContext(ctx);
onKernalStop(cache, true);
stopCache(cache, true, callDestroy, clearCache, clearDbObjects);
}
else
// Try to unregister query structures for not started caches.
ctx.query().onCacheStop(cacheName);
}
/**
* @param startTopVer Cache start version.
* @param err Cache start error if any.
*/
void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) {
for (GridCacheAdapter<?, ?> cache : caches.values()) {
GridCacheContext<?, ?> cacheCtx = cache.context();
if (cacheCtx.startTopologyVersion().equals(startTopVer)) {
if (!jCacheProxies.containsKey(cacheCtx.name())) {
IgniteCacheProxyImpl<?, ?> newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false);
if (!cache.active())
newProxy.suspend();
addjCacheProxy(cacheCtx.name(), newProxy);
}
if (cacheCtx.preloader() != null)
cacheCtx.preloader().onInitialExchangeComplete(err);
}
}
}
/**
* @param cachesToClose Caches to close.
* @param retClientCaches {@code True} if return IDs of closed client caches.
* @return Closed client caches' IDs.
*/
Set<Integer> closeCaches(Set<String> cachesToClose, boolean retClientCaches) {
Set<Integer> ids = null;
for (String cacheName : cachesToClose) {
completeProxyInitialize(cacheName);
blockGateway(cacheName, false, false);
GridCacheContext ctx = sharedCtx.cacheContext(CU.cacheId(cacheName));
if (ctx == null)
continue;
if (retClientCaches && !ctx.affinityNode()) {
if (ids == null)
ids = U.newHashSet(cachesToClose.size());
ids.add(ctx.cacheId());
}
closeCache(ctx);
}
return ids;
}
/**
* @param cctx Cache context.
*/
private void closeCache(GridCacheContext cctx) {
if (cctx.affinityNode()) {
GridCacheAdapter<?, ?> cache = caches.get(cctx.name());
assert cache != null : cctx.name();
jCacheProxies.put(cctx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false));
completeProxyInitialize(cctx.name());
}
else {
cctx.gate().onStopped();
// Do not close client cache while requests processing is in progress.
sharedCtx.io().writeLock();
try {
if (!cctx.affinityNode() && cctx.transactional())
sharedCtx.tm().rollbackTransactionsForCache(cctx.cacheId());
completeProxyInitialize(cctx.name());
jCacheProxies.remove(cctx.name());
closeCacheOnNotAffinityNode(cctx);
}
finally {
sharedCtx.io().writeUnlock();
}
}
}
/**
* @param cctx Cache context to close.
*/
private void closeCacheOnNotAffinityNode(GridCacheContext cctx) {
if (ctx.query().moduleEnabled())
stopCacheSafely(cctx, false);
else
stopCacheSafely(cctx);
}
/**
* Called during the rollback of the exchange partitions procedure in order to stop the given cache even if it's not
* fully initialized (e.g. failed on cache init stage).
*
* @param topVer Topology version related to the given {@code exchActions}.
* @param exchActions Stop requests.
*/
void forceCloseCaches(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
assert exchActions != null && !exchActions.cacheStopRequests().isEmpty();
processCacheStopRequestOnExchangeDone(topVer, exchActions);
}
/**
* @param topVer Topology version related to the given {@code exchActions}.
* @param exchActions Change requests.
*/
private void processCacheStopRequestOnExchangeDone(AffinityTopologyVersion topVer, ExchangeActions exchActions) {
// Reserve at least 2 threads for system operations.
int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
List<IgniteBiTuple<CacheGroupContext, Boolean>> grpsToStop = exchActions.cacheGroupsToStop().stream()
.filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
.map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
.collect(toList());
// Wait until all evictions are finished.
grpsToStop.forEach(t -> sharedCtx.evict().onCacheGroupStopped(t.get1()));
if (!exchActions.cacheStopRequests().isEmpty())
removeOffheapListenerAfterCheckpoint(grpsToStop);
Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
.collect(groupingBy(action -> action.descriptor().groupId()));
Set<Integer> grpIdToDestroy = grpsToStop.stream()
.filter(IgniteBiTuple::get2).map(t2 -> t2.get1().groupId()).collect(toSet());
try {
doInParallel(
parallelismLvl,
sharedCtx.kernalContext().pools().getSystemExecutorService(),
cachesToStop.entrySet(),
cachesToStopByGrp -> {
Integer groupId = cachesToStopByGrp.getKey();
CacheGroupContext gctx = cacheGrps.get(groupId);
if (gctx != null)
gctx.preloader().pause();
try {
if (gctx != null) {
final String msg = "Failed to wait for topology update, cache group is stopping.";
// If snapshot operation in progress we must throw CacheStoppedException
// for correct cache proxy restart. For more details see
// IgniteCacheProxy.cacheException()
gctx.affinity().cancelFutures(new CacheStoppedException(msg));
}
for (ExchangeActions.CacheActionData action : cachesToStopByGrp.getValue()) {
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
stopGateway(action.request());
String cacheName = action.request().cacheName();
// TTL manager has to be unregistered before the checkpointReadLock is acquired.
GridCacheAdapter<?, ?> cache = caches.get(cacheName);
if (cache != null)
cache.context().ttl().unregister();
sharedCtx.database().checkpointReadLock();
try {
boolean callDestroy = action.request().destroy();
// If all caches in group will be destroyed it is not necessary to clear a single cache
// because group will be stopped anyway.
// Stop will still be called with destroy {@code true}, but cache will not be cleared.
boolean clearCache = callDestroy && !grpIdToDestroy.contains(groupId);
prepareCacheStop(cacheName, callDestroy, clearCache);
if (callDestroy || grpIdToDestroy.contains(groupId))
ctx.query().completeRebuildIndexes(cacheName);
}
finally {
sharedCtx.database().checkpointReadUnlock();
}
}
}
finally {
if (gctx != null)
gctx.preloader().resume();
}
return null;
}
);
}
catch (IgniteCheckedException e) {
String msg = "Failed to stop caches";
log.error(msg, e);
throw new IgniteException(msg, e);
}
finally {
cachesInfo.cleanupRemovedCaches(topVer);
}
for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpsToStop)
stopCacheGroup(grp.get1().groupId(), grp.get2());
if (!sharedCtx.kernalContext().clientNode())
sharedCtx.database().onCacheGroupsStopped(grpsToStop);
cachesInfo.cleanupRemovedCacheGroups(topVer);
if (exchActions.deactivate())
sharedCtx.deactivate();
}
/**
* @param rmtNode Remote node to check.
* @return Data storage configuration
*/
private DataStorageConfiguration extractDataStorage(ClusterNode rmtNode) {
return GridCacheUtils.extractDataStorage(
rmtNode,
ctx.marshallerContext().jdkMarshaller(),
U.resolveClassLoader(ctx.config())
);
}
/**
* @param dataStorageCfg User-defined data regions.
*/
private Map<String, DataRegionConfiguration> dataRegionCfgs(DataStorageConfiguration dataStorageCfg) {
if (dataStorageCfg != null) {
return Optional.ofNullable(dataStorageCfg.getDataRegionConfigurations())
.map(Stream::of)
.orElseGet(Stream::empty)
.collect(Collectors.toMap(DataRegionConfiguration::getName, e -> e));
}
return Collections.emptyMap();
}
/**
* Force checkpoint and remove offheap checkpoint listener after it was finished.
*
* @param grpToStop Cache group to stop.
*/
private void removeOffheapListenerAfterCheckpoint(List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop) {
try {
sharedCtx.database().waitForCheckpoint(
"caches stop", (fut) -> removeOffheapCheckpointListener(grpToStop)
);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
}
}
/**
* @param grpToStop Group for which listener shuold be removed.
*/
private void removeOffheapCheckpointListener(List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop) {
sharedCtx.database().checkpointReadLock();
try {
// Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption.
grpToStop.forEach(grp -> {
CacheGroupContext gctx = grp.getKey();
if (gctx != null && gctx.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager)sharedCtx.database();
mngr.removeCheckpointListener((CheckpointListener)gctx.offheap());
}
});
}
finally {
sharedCtx.database().checkpointReadUnlock();
}
}
/**
* Callback invoked when first exchange future for dynamic cache is completed.
*
* @param cacheStartVer Started caches version to create proxy for.
* @param exchActions Change requests.
* @param err Error.
*/
public void onExchangeDone(
AffinityTopologyVersion cacheStartVer,
@Nullable ExchangeActions exchActions,
@Nullable Throwable err
) {
initCacheProxies(cacheStartVer, err);
if (exchActions == null)
return;
if (exchActions.systemCachesStarting() && exchActions.stateChangeRequest() == null)
ctx.dataStructures().restoreStructuresState(ctx);
if (err == null)
processCacheStopRequestOnExchangeDone(cacheStartVer, exchActions);
}
/**
* @param grpId Group ID.
* @param destroy Group destroy flag.
*/
private void stopCacheGroup(int grpId, boolean destroy) {
CacheGroupContext grp = cacheGrps.remove(grpId);
if (grp != null)
stopCacheGroup(grp, destroy);
}
/**
* @param grp Cache group.
* @param destroy Group destroy flag.
*/
private void stopCacheGroup(CacheGroupContext grp, boolean destroy) {
grp.stopGroup();
U.stopLifecycleAware(log, grp.configuredUserObjects());
cleanup(grp, destroy);
}
/**
* @param cacheName Cache name.
* @param deploymentId Future deployment ID.
*/
void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
GridCacheProcessor.TemplateConfigurationFuture fut =
(GridCacheProcessor.TemplateConfigurationFuture)pendingTemplateFuts.get(cacheName);
if (fut != null && fut.deploymentId().equals(deploymentId))
fut.onDone();
}
/**
* @param req Request to complete future for.
* @param success Future result.
* @param err Error if any.
*/
public void completeCacheStartFuture(DynamicCacheChangeRequest req, boolean success, @Nullable Throwable err) {
if (ctx.localNodeId().equals(req.initiatingNodeId())) {
DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());
if (fut != null)
fut.onDone(success, err);
}
}
/**
* @param reqId Request ID.
* @param err Error if any.
*/
void completeClientCacheChangeFuture(UUID reqId, @Nullable Exception err) {
DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId);
if (fut != null)
fut.onDone(false, err);
}
/**
* Creates shared context.
*
* @param kernalCtx Kernal context.
* @param storeSesLsnrs Store session listeners.
* @return Shared context.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private GridCacheSharedContext createSharedContext(
GridKernalContext kernalCtx,
Collection<CacheStoreSessionListener> storeSesLsnrs
) throws IgniteCheckedException {
IgniteTxManager tm = new IgniteTxManager();
GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
GridCacheVersionManager verMgr = new GridCacheVersionManager();
GridCacheDeploymentManager depMgr = new GridCacheDeploymentManager();
GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager();
IgniteCacheDatabaseSharedManager dbMgr;
IgnitePageStoreManager pageStoreMgr = null;
IgniteWriteAheadLogManager walMgr = null;
if (CU.isPersistenceEnabled(ctx.config()) && !ctx.clientNode()) {
dbMgr = new GridCacheDatabaseSharedManager(ctx);
pageStoreMgr = ctx.plugins().createComponent(IgnitePageStoreManager.class);
if (pageStoreMgr == null)
pageStoreMgr = new FilePageStoreManager(ctx);
}
else {
if (CU.isPersistenceEnabled(ctx.config()) && ctx.clientNode()) {
U.warn(log, "Persistent Store is not supported on client nodes (Persistent Store's" +
" configuration will be ignored).");
}
dbMgr = new IgniteCacheDatabaseSharedManager(ctx);
}
if ((CU.isPersistenceEnabled(ctx.config()) || CU.isCdcEnabled(ctx.config())) && !ctx.clientNode()) {
walMgr = ctx.plugins().createComponent(IgniteWriteAheadLogManager.class);
if (walMgr == null)
walMgr = new FileWriteAheadLogManager(ctx);
}
WalStateManager walStateMgr = new WalStateManager(ctx);
IgniteSnapshotManager snapshotMgr = new IgniteSnapshotManager(ctx);
IgniteCacheSnapshotManager snpMgr = ctx.plugins().createComponent(IgniteCacheSnapshotManager.class);
if (snpMgr == null)
snpMgr = new IgniteCacheSnapshotManager();
GridCacheIoManager ioMgr = new GridCacheIoManager();
CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
GridCacheSharedTtlCleanupManager ttl = new GridCacheSharedTtlCleanupManager();
PartitionsEvictManager evict = new PartitionsEvictManager();
CacheJtaManagerAdapter jta = JTA.createOptional();
MvccCachingManager mvccCachingMgr = new MvccCachingManager();
DeadlockDetectionManager deadlockDetectionMgr = new DeadlockDetectionManager();
CacheDiagnosticManager diagnosticMgr = new CacheDiagnosticManager();
return new GridCacheSharedContext(
kernalCtx,
tm,
verMgr,
mvccMgr,
pageStoreMgr,
walMgr,
walStateMgr,
dbMgr,
snapshotMgr,
snpMgr,
depMgr,
exchMgr,
topMgr,
ioMgr,
ttl,
evict,
jta,
storeSesLsnrs,
mvccCachingMgr,
deadlockDetectionMgr,
diagnosticMgr
);
}
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
return CACHE_PROC;
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
cachesInfo.collectJoiningNodeData(dataBag);
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
cachesInfo.collectGridNodeData(dataBag, backwardCompatibleSplitter());
}
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
cachesInfo.onJoiningNodeDataReceived(data);
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
cachesInfo.onGridDataReceived(data);
sharedCtx.walState().onCachesInfoCollected();
}
/** {@inheritDoc} */
@Override public @Nullable IgniteNodeValidationResult validateNode(
ClusterNode node, JoiningNodeDiscoveryData discoData
) {
if (!cachesInfo.isMergeConfigSupports(node))
return null;
String validationRes = cachesInfo.validateJoiningNodeData(discoData, node.isClient());
if (validationRes != null)
return new IgniteNodeValidationResult(node.id(), validationRes, validationRes);
return ValidationOnNodeJoinUtils.validateNode(node, discoData, marsh, ctx, this::cacheDescriptor);
}
/**
* @param msg Message.
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
cachesInfo.onStateChangeFinish(msg);
}
/**
* @param msg Message.
* @param topVer Current topology version.
* @param curState Current cluster state.
* @return Exchange actions.
* @throws IgniteCheckedException If configuration validation failed.
*/
public ExchangeActions onStateChangeRequest(
ChangeGlobalStateMessage msg,
AffinityTopologyVersion topVer,
DiscoveryDataClusterState curState
) throws IgniteCheckedException {
return cachesInfo.onStateChangeRequest(msg, topVer, curState);
}
/**
* Cache statistics flag change message received.
*
* @param msg Message.
*/
public void onCacheStatisticsModeChange(CacheStatisticsModeChangeMessage msg) {
assert msg != null;
if (msg.initial()) {
EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());
if (fut != null && !cacheNames().containsAll(msg.caches())) {
fut.onDone(new IgniteCheckedException("One or more cache descriptors not found [caches="
+ caches + ']'));
return;
}
for (String cacheName : msg.caches()) {
DynamicCacheDescriptor desc = cachesInfo.registeredCaches().get(cacheName);
if (desc != null) {
if (desc.cacheConfiguration().isStatisticsEnabled() != msg.enabled()) {
desc.cacheConfiguration().setStatisticsEnabled(msg.enabled());
try {
ctx.cache().saveCacheConfiguration(desc);
}
catch (IgniteCheckedException e) {
log.error("Error while saving cache configuration to disk, cfg = "
+ desc.cacheConfiguration(), e);
}
}
}
else
log.warning("Failed to change cache descriptor configuration, cache not found [cacheName="
+ cacheName + ']');
}
}
else {
EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());
if (fut != null)
fut.onDone();
}
}
/**
* Cache statistics clear message received.
*
* @param msg Message.
*/
private void onCacheStatisticsClear(CacheStatisticsClearMessage msg) {
assert msg != null;
if (msg.initial()) {
EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());
if (fut != null && !cacheNames().containsAll(msg.caches())) {
fut.onDone(new IgniteCheckedException("One or more cache descriptors not found [caches="
+ caches + ']'));
return;
}
for (String cacheName : msg.caches()) {
IgniteInternalCache<?, ?> cache = ctx.cache().cache(cacheName);
if (cache != null)
cache.localMxBean().clear();
else
log.warning("Failed to clear cache statistics, cache not found [cacheName="
+ cacheName + ']');
}
}
else {
EnableStatisticsFuture fut = manageStatisticsFuts.get(msg.requestId());
if (fut != null)
fut.onDone();
}
}
/**
* Cache statistics flag change task processed by exchange worker.
*
* @param msg Message.
*/
public void processStatisticsModeChange(CacheStatisticsModeChangeMessage msg) {
assert msg != null;
for (String cacheName : msg.caches()) {
IgniteInternalCache<Object, Object> cache = cache(cacheName);
if (cache != null)
cache.context().statisticsEnabled(msg.enabled());
else
log.warning("Failed to change cache configuration, cache not found [cacheName=" + cacheName + ']');
}
}
/**
* @param stoppedCaches Stopped caches.
*/
private void stopCachesOnClientReconnect(Collection<GridCacheAdapter> stoppedCaches) {
assert ctx.discovery().localNode().isClient();
for (GridCacheAdapter cache : stoppedCaches) {
CacheGroupContext grp = cache.context().group();
onKernalStop(cache, true);
stopCache(cache, true, false, false);
sharedCtx.affinity().stopCacheOnReconnect(cache.context());
if (!grp.hasCaches()) {
stopCacheGroup(grp, false);
sharedCtx.affinity().stopCacheGroupOnReconnect(grp);
}
}
}
/**
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
try {
CacheConfiguration cfg = getOrCreateConfigFromTemplate(cacheName);
return dynamicStartCache(cfg, cacheName, null, true, true, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/**
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean checkThreadTx) {
return getOrCreateFromTemplate(cacheName, cacheName, null, checkThreadTx);
}
/**
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
* @param templateName Cache template name.
* @param cfgOverride Cache config properties to override.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, String templateName,
CacheConfigurationOverride cfgOverride, boolean checkThreadTx) {
assert cacheName != null;
try {
if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started.
return new GridFinishedFuture<>();
CacheConfiguration ccfg = F.isEmpty(templateName)
? getOrCreateConfigFromTemplate(cacheName)
: getOrCreateConfigFromTemplate(templateName);
ccfg.setName(cacheName);
if (cfgOverride != null)
cfgOverride.apply(ccfg);
return dynamicStartCache(ccfg, cacheName, null, false, true, checkThreadTx);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
/**
* @param cacheName Cache name.
* @return Cache configuration, or {@code null} if no template with matching name found.
* @throws IgniteCheckedException If failed.
*/
public CacheConfiguration getConfigFromTemplate(String cacheName) throws IgniteCheckedException {
DynamicCacheDescriptor cfgTemplate = null;
DynamicCacheDescriptor dfltCacheCfg = null;
List<DynamicCacheDescriptor> wildcardNameCfgs = null;
for (DynamicCacheDescriptor desc : cachesInfo.registeredTemplates().values()) {
assert desc.template();
CacheConfiguration cfg = desc.cacheConfiguration();
assert cfg != null;
if (F.eq(cacheName, cfg.getName())) {
cfgTemplate = desc;
break;
}
if (cfg.getName() != null) {
if (GridCacheUtils.isCacheTemplateName(cfg.getName())) {
if (cfg.getName().length() > 1) {
if (wildcardNameCfgs == null)
wildcardNameCfgs = new ArrayList<>();
wildcardNameCfgs.add(desc);
}
else
dfltCacheCfg = desc; // Template with name '*'.
}
}
else if (dfltCacheCfg == null)
dfltCacheCfg = desc;
}
if (cfgTemplate == null && cacheName != null && wildcardNameCfgs != null) {
wildcardNameCfgs.sort((a, b) ->
Integer.compare(b.cacheConfiguration().getName().length(), a.cacheConfiguration().getName().length()));
for (DynamicCacheDescriptor desc : wildcardNameCfgs) {
String wildcardCacheName = desc.cacheConfiguration().getName();
if (cacheName.startsWith(wildcardCacheName.substring(0, wildcardCacheName.length() - 1))) {
cfgTemplate = desc;
break;
}
}
}
if (cfgTemplate == null)
cfgTemplate = dfltCacheCfg;
if (cfgTemplate == null)
return null;
// It's safe to enrich cache configuration here because we requested this cache from current node.
CacheConfiguration enrichedTemplate = enricher().enrichFully(
cfgTemplate.cacheConfiguration(), cfgTemplate.cacheConfigurationEnrichment());
enrichedTemplate = cloneCheckSerializable(enrichedTemplate);
CacheConfiguration cfg = new CacheConfiguration(enrichedTemplate);
cfg.setName(cacheName);
return cfg;
}
/**
* @param cacheName Cache name.
* @return Cache configuration.
* @throws IgniteCheckedException If failed.
*/
private CacheConfiguration getOrCreateConfigFromTemplate(String cacheName) throws IgniteCheckedException {
CacheConfiguration cfg = getConfigFromTemplate(cacheName);
return cfg != null ? cfg : new CacheConfiguration(cacheName);
}
/**
* Dynamically starts cache.
*
* @param ccfg Cache configuration.
* @param cacheName Cache name.
* @param nearCfg Near cache configuration.
* @param failIfExists Fail if exists flag.
* @param failIfNotStarted If {@code true} fails if cache is not started.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<Boolean> dynamicStartCache(
@Nullable CacheConfiguration ccfg,
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
boolean failIfExists,
boolean failIfNotStarted,
boolean checkThreadTx
) {
return dynamicStartCache(ccfg,
cacheName,
nearCfg,
CacheType.USER,
false,
failIfExists,
failIfNotStarted,
checkThreadTx);
}
/**
* Dynamically starts cache as a result of SQL {@code CREATE TABLE} command.
*
* @param ccfg Cache configuration.
*/
public IgniteInternalFuture<Boolean> dynamicStartSqlCache(
CacheConfiguration ccfg
) {
A.notNull(ccfg, "ccfg");
return dynamicStartCache(ccfg,
ccfg.getName(),
ccfg.getNearConfiguration(),
CacheType.USER,
true,
false,
true,
true);
}
/**
* Dynamically starts cache.
*
* @param ccfg Cache configuration.
* @param cacheName Cache name.
* @param nearCfg Near cache configuration.
* @param cacheType Cache type.
* @param sql If the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
* @param failIfExists Fail if exists flag.
* @param failIfNotStarted If {@code true} fails if cache is not started.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
public IgniteInternalFuture<Boolean> dynamicStartCache(
@Nullable CacheConfiguration ccfg,
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
CacheType cacheType,
boolean sql,
boolean failIfExists,
boolean failIfNotStarted,
boolean checkThreadTx
) {
assert cacheName != null;
if (checkThreadTx) {
sharedCtx.tm().checkEmptyTransactions(
() -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheName, "dynamicStartCache"));
}
GridPlainClosure2<Collection<byte[]>, byte[], IgniteInternalFuture<Boolean>> startCacheClsr =
(grpKeys, masterKeyDigest) -> {
assert ccfg == null || !ccfg.isEncryptionEnabled() || !grpKeys.isEmpty();
DynamicCacheChangeRequest req = prepareCacheChangeRequest(
ccfg,
cacheName,
nearCfg,
cacheType,
sql,
failIfExists,
failIfNotStarted,
null,
false,
null,
ccfg != null && ccfg.isEncryptionEnabled() ? grpKeys.iterator().next() : null,
null,
ccfg != null && ccfg.isEncryptionEnabled() ? masterKeyDigest : null);
if (req != null) {
if (req.clientStartOnly())
return startClientCacheChange(F.asMap(req.cacheName(), req), null);
return F.first(initiateCacheChanges(F.asList(req)));
}
else
return new GridFinishedFuture<>();
};
try {
if (ccfg != null && ccfg.isEncryptionEnabled()) {
ctx.encryption().checkEncryptedCacheSupported();
return generateEncryptionKeysAndStartCacheAfter(1, startCacheClsr);
}
return startCacheClsr.apply(Collections.EMPTY_SET, null);
}
catch (Exception e) {
return new GridFinishedFuture<>(e);
}
}
/**
* Checks that cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
*
* @param opName Operation name.
* @param cfgs Stored cache configurations.
* @throws CacheException If cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
*/
private void checkReadOnlyState(String opName, Collection<StoredCacheData> cfgs) {
IgniteOutClosure<String> cacheNameClo = null;
IgniteOutClosure<String> cacheGrpNameClo = null;
if (!F.isEmpty(cfgs)) {
if (cfgs.size() == 1) {
CacheConfiguration cfg = cfgs.iterator().next().config();
cacheNameClo = cfg::getName;
cacheGrpNameClo = cfg::getGroupName;
}
else {
cacheNameClo = () -> cfgs.stream()
.map(StoredCacheData::config)
.map(CacheConfiguration::getName)
.collect(toList()).toString();
cacheGrpNameClo = () -> cfgs.stream()
.map(StoredCacheData::config)
.map(CacheConfiguration::getGroupName)
.collect(toList()).toString();
}
}
checkReadOnlyState(opName, cacheGrpNameClo, cacheNameClo);
}
/**
* Checks that cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
*
* @param opName Operation name.
* @param cfgs Cache configurations.
* @throws CacheException If cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
*/
public void checkReadOnlyState(String opName, CacheConfiguration... cfgs) {
IgniteOutClosure<String> cacheNameClo = null;
IgniteOutClosure<String> cacheGrpNameClo = null;
if (!F.isEmpty(cfgs)) {
if (cfgs.length == 1) {
cacheNameClo = () -> cfgs[0] == null ? null : cfgs[0].getName();
cacheGrpNameClo = () -> cfgs[0] == null ? null : cfgs[0].getGroupName();
}
else {
cacheNameClo = () -> Stream.of(cfgs)
.map(CacheConfiguration::getName)
.collect(toList()).toString();
cacheGrpNameClo = () -> Stream.of(cfgs)
.map(CacheConfiguration::getGroupName)
.collect(toList()).toString();
}
}
checkReadOnlyState(opName, cacheGrpNameClo, cacheNameClo);
}
/**
* Checks that cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
*
* @param opName Operation name.
* @param cacheGrpNameClo Closure for getting cache group name, if it needed (optional).
* @param cacheNameClo Closure for getting cache name, if it needed (optional).
* @throws CacheException If cluster in a {@link ClusterState#ACTIVE_READ_ONLY} state.
*/
private void checkReadOnlyState(
String opName,
@Nullable IgniteOutClosure<String> cacheGrpNameClo,
@Nullable IgniteOutClosure<String> cacheNameClo
) {
if (sharedCtx.readOnlyMode()) {
String cacheName = cacheNameClo == null ? null : cacheNameClo.apply();
String cacheGrpName = cacheGrpNameClo == null ? null : cacheGrpNameClo.apply();
String errorMsg = format(CLUSTER_READ_ONLY_MODE_ERROR_MSG_FORMAT, opName, cacheGrpName, cacheName);
throw new CacheException(new IgniteClusterReadOnlyException(errorMsg));
}
}
/**
* Send {@code GenerateEncryptionKeyRequest} and execute {@code after} closure if succeed.
*
* @param keyCnt Count of keys to generate.
* @param after Closure to execute after encryption keys would be generated.
*/
private IgniteInternalFuture<Boolean> generateEncryptionKeysAndStartCacheAfter(int keyCnt,
GridPlainClosure2<Collection<byte[]>, byte[], IgniteInternalFuture<Boolean>> after) {
IgniteInternalFuture<T2<Collection<byte[]>, byte[]>> genEncKeyFut = ctx.encryption().generateKeys(keyCnt);
GridFutureAdapter<Boolean> res = new GridFutureAdapter<>();
genEncKeyFut.listen(new IgniteInClosure<IgniteInternalFuture<T2<Collection<byte[]>, byte[]>>>() {
@Override public void apply(IgniteInternalFuture<T2<Collection<byte[]>, byte[]>> fut) {
try {
Collection<byte[]> grpKeys = fut.result().get1();
byte[] masterKeyDigest = fut.result().get2();
if (F.size(grpKeys, F.alwaysTrue()) != keyCnt)
res.onDone(false, fut.error());
IgniteInternalFuture<Boolean> dynStartCacheFut = after.apply(grpKeys, masterKeyDigest);
dynStartCacheFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
try {
res.onDone(fut.get(), fut.error());
}
catch (IgniteCheckedException e) {
res.onDone(false, e);
}
}
});
}
catch (Exception e) {
res.onDone(false, e);
}
}
});
return res;
}
/**
* @param startReqs Start requests.
* @param cachesToClose Cache tp close.
* @return Future for cache change operation.
*/
private IgniteInternalFuture<Boolean> startClientCacheChange(
@Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) {
assert startReqs != null ^ cachesToClose != null;
DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID());
IgniteInternalFuture old = pendingFuts.put(fut.id, fut);
assert old == null : old;
ctx.discovery().clientCacheStartEvent(fut.id, startReqs, cachesToClose);
IgniteCheckedException err = checkNodeState();
if (err != null)
fut.onDone(err);
return fut;
}
/**
* Dynamically starts multiple caches.
*
* @param ccfgList Collection of cache configuration.
* @param failIfExists Fail if exists flag.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
* @return Future that will be completed when all caches are deployed.
*/
public IgniteInternalFuture<Boolean> dynamicStartCaches(
Collection<CacheConfiguration> ccfgList,
boolean failIfExists,
boolean checkThreadTx,
boolean disabledAfterStart
) {
return dynamicStartCachesByStoredConf(
ccfgList.stream().map(StoredCacheData::new).collect(toList()),
failIfExists,
checkThreadTx,
disabledAfterStart,
null);
}
/**
* Dynamically starts multiple caches.
*
* @param storedCacheDataList Collection of stored cache data.
* @param failIfExists Fail if exists flag.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
* @param restartId Restart requester id (it'll allow to start this cache only him).
* @return Future that will be completed when all caches are deployed.
*/
public IgniteInternalFuture<Boolean> dynamicStartCachesByStoredConf(
Collection<StoredCacheData> storedCacheDataList,
boolean failIfExists,
boolean checkThreadTx,
boolean disabledAfterStart,
IgniteUuid restartId
) {
if (checkThreadTx) {
sharedCtx.tm().checkEmptyTransactions(() -> {
List<String> cacheNames = storedCacheDataList.stream()
.map(StoredCacheData::config)
.map(CacheConfiguration::getName)
.collect(toList());
return format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "dynamicStartCachesByStoredConf");
});
}
GridPlainClosure2<Collection<byte[]>, byte[], IgniteInternalFuture<Boolean>> startCacheClsr = (grpKeys, masterKeyDigest) -> {
List<DynamicCacheChangeRequest> srvReqs = null;
Map<String, DynamicCacheChangeRequest> clientReqs = null;
Iterator<byte[]> grpKeysIter = grpKeys.iterator();
for (StoredCacheData ccfg : storedCacheDataList) {
assert ccfg.groupKeyEncrypted() == null || ccfg.config().isEncryptionEnabled();
// Reuse encription key if passed for this group. Take next generated otherwise.
GroupKeyEncrypted encrKey = ccfg.config().isEncryptionEnabled() ? (ccfg.groupKeyEncrypted() != null ?
ccfg.groupKeyEncrypted() : new GroupKeyEncrypted(0, grpKeysIter.next())) : null;
DynamicCacheChangeRequest req = prepareCacheChangeRequest(
ccfg.config(),
ccfg.config().getName(),
null,
resolveCacheType(ccfg.config()),
ccfg.sql(),
failIfExists,
true,
restartId,
disabledAfterStart,
ccfg.queryEntities(),
encrKey != null ? encrKey.key() : null,
encrKey != null ? encrKey.id() : null,
encrKey != null ? masterKeyDigest : null
);
if (req != null) {
if (req.clientStartOnly()) {
if (clientReqs == null)
clientReqs = U.newLinkedHashMap(storedCacheDataList.size());
clientReqs.put(req.cacheName(), req);
}
else {
if (srvReqs == null)
srvReqs = new ArrayList<>(storedCacheDataList.size());
srvReqs.add(req);
}
}
}
if (srvReqs == null && clientReqs == null)
return new GridFinishedFuture<>();
if (clientReqs != null && srvReqs == null)
return startClientCacheChange(clientReqs, null);
GridCompoundFuture<?, Boolean> compoundFut = new GridCompoundFuture<>();
for (DynamicCacheStartFuture fut : initiateCacheChanges(srvReqs))
compoundFut.add((IgniteInternalFuture)fut);
if (clientReqs != null) {
IgniteInternalFuture<Boolean> clientStartFut = startClientCacheChange(clientReqs, null);
compoundFut.add((IgniteInternalFuture)clientStartFut);
}
compoundFut.markInitialized();
return compoundFut;
};
int encGrpCnt = 0;
for (StoredCacheData ccfg : storedCacheDataList) {
if (ccfg.config().isEncryptionEnabled())
encGrpCnt++;
}
return generateEncryptionKeysAndStartCacheAfter(encGrpCnt, startCacheClsr);
}
/** Resolve cache type for input cacheType */
private @NotNull CacheType resolveCacheType(CacheConfiguration ccfg) {
if (CU.isUtilityCache(ccfg.getName()))
return CacheType.UTILITY;
else if (internalCaches.contains(ccfg.getName()))
return CacheType.INTERNAL;
else if (DataStructuresProcessor.isDataStructureCache(ccfg.getName()))
return CacheType.DATA_STRUCTURES;
else
return CacheType.USER;
}
/**
* @param cacheName Cache name to destroy.
* @param sql If the cache needs to be destroyed only if it was created as the result of SQL {@code CREATE TABLE}
* command.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @param restart Restart flag.
* @param restartId Restart requester id (it'll allow to start this cache only him).
* @return Future that will be completed when cache is destroyed.
*/
public IgniteInternalFuture<Boolean> dynamicDestroyCache(
String cacheName,
boolean sql,
boolean checkThreadTx,
boolean restart,
IgniteUuid restartId
) {
assert cacheName != null;
if (checkThreadTx) {
sharedCtx.tm().checkEmptyTransactions(
() -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheName, "dynamicDestroyCache"));
}
checkReadOnlyState("dynamic destroy cache", null, cacheName::toString);
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, sql, true);
req.stop(true);
req.destroy(true);
req.restart(restart);
req.restartId(restartId);
return F.first(initiateCacheChanges(F.asList(req)));
}
/**
* @param cacheNames Collection of cache names to destroy.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is destroyed.
*/
public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx) {
return dynamicDestroyCaches(cacheNames, checkThreadTx, true);
}
/**
* @param cacheNames Collection of cache names to destroy.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @param destroy Cache data destroy flag. Setting to <code>true</code> will cause removing all cache data
* @return Future that will be completed when cache is destroyed.
*/
public IgniteInternalFuture<?> dynamicDestroyCaches(
Collection<String> cacheNames,
boolean checkThreadTx,
boolean destroy
) {
if (checkThreadTx) {
sharedCtx.tm().checkEmptyTransactions(
() -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "dynamicDestroyCaches"));
}
if (!F.isEmpty(cacheNames))
checkReadOnlyState("dynamic destroy caches", null, cacheNames::toString);
return dynamicChangeCaches(
cacheNames.stream().map(cacheName -> createStopRequest(cacheName, false, null, destroy))
.collect(toList())
);
}
/**
* Prepares cache stop request.
*
* @param cacheName Cache names to destroy.
* @param restart Restart flag.
* @param restartId Restart requester id (it'll allow to start this cache only him).
* @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store.
* @return Future that will be completed when cache is destroyed.
*/
public @NotNull DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false, true);
req.stop(true);
req.destroy(destroy);
req.restart(restart);
req.restartId(restartId);
return req;
}
/**
* Starts cache stop request as cache change batch.
*
* @param reqs cache stop requests.
* @return compound future.
*/
@NotNull public IgniteInternalFuture<?> dynamicChangeCaches(List<DynamicCacheChangeRequest> reqs) {
return initiateCacheChanges(reqs).stream().collect(IgniteCollectors.toCompoundFuture());
}
/**
* @param cacheName Cache name to close.
* @return Future that will be completed when cache is closed.
*/
IgniteInternalFuture<?> dynamicCloseCache(String cacheName) {
assert cacheName != null;
IgniteCacheProxy<?, ?> proxy = jcacheProxy(cacheName, false);
if (proxy == null || proxy.isProxyClosed())
return new GridFinishedFuture<>(); // No-op.
sharedCtx.tm().checkEmptyTransactions(
() -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheName, "dynamicCloseCache"));
return startClientCacheChange(null, Collections.singleton(cacheName));
}
/**
* Resets cache state after the cache has been moved to recovery state.
*
* @param cacheNames Cache names.
* @return Future that will be completed when state is changed for all caches.
*/
public IgniteInternalFuture<?> resetCacheState(Collection<String> cacheNames) throws ClusterTopologyCheckedException {
sharedCtx.tm().checkEmptyTransactions(
() -> format(CHECK_EMPTY_TRANSACTIONS_ERROR_MSG_FORMAT, cacheNames, "resetCacheState"));
Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
for (String cacheName : cacheNames) {
final IgniteInternalCache<Object, Object> cache0 = internalCache(cacheName);
if (cache0 == null)
continue;
// Check if all lost partitions has at least one affinity owner.
final Collection<Integer> lostParts = cache0.lostPartitions();
if (lostParts.isEmpty())
continue;
for (Integer part : lostParts) {
final Collection<ClusterNode> owners = cache0.affinity().mapPartitionToPrimaryAndBackups(part);
if (owners.isEmpty())
throw new ClusterTopologyCheckedException("Cannot reset lost partitions because no baseline nodes " +
"are online [cache=" + cacheName + ", partition=" + part + ']');
}
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.resetLostPartitions(ctx, cacheName);
reqs.add(req);
}
return initiateCacheChanges(reqs).stream().collect(IgniteCollectors.toCompoundFuture());
}
/**
* @param cacheName Cache name.
* @return Cache type.
*/
public CacheType cacheType(String cacheName) {
if (CU.isUtilityCache(cacheName))
return CacheType.UTILITY;
else if (internalCaches.contains(cacheName))
return CacheType.INTERNAL;
else if (DataStructuresProcessor.isDataStructureCache(cacheName))
return CacheType.DATA_STRUCTURES;
else
return CacheType.USER;
}
/**
* @return {@code True} if cache group {@code cacheGrpId} is encrypted. {@code False} otherwise.
*/
public boolean isEncrypted(int cacheGrpId) {
return cacheGrpId != MetaStorage.METASTORAGE_CACHE_ID && cacheGroup(cacheGrpId).config().isEncryptionEnabled();
}
/**
* Save cache configuration to persistent store if necessary.
*
* @param desc Cache descriptor.
*/
public void saveCacheConfiguration(DynamicCacheDescriptor desc) throws IgniteCheckedException {
assert desc != null;
locCfgMgr.saveCacheConfiguration(desc.toStoredData(splitter), true);
}
/**
* Remove all persistent files for all registered caches.
*/
public void cleanupCachesDirectories() throws IgniteCheckedException {
if (sharedCtx.pageStore() == null || sharedCtx.kernalContext().clientNode())
return;
for (DynamicCacheDescriptor desc : cacheDescriptors().values()) {
if (isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration()))
sharedCtx.pageStore().cleanupPersistentSpace(desc.cacheConfiguration());
}
}
/**
* @param reqs Requests.
* @return Collection of futures.
*/
private Collection<DynamicCacheStartFuture> initiateCacheChanges(
Collection<DynamicCacheChangeRequest> reqs
) {
Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
for (DynamicCacheChangeRequest req : reqs) {
authorizeCacheChange(ctx.security(), req);
DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId());
try {
if (req.stop()) {
DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
if (desc == null)
// No-op.
fut.onDone(false);
}
if (req.start() && req.startCacheConfiguration() != null) {
CacheConfiguration ccfg = req.startCacheConfiguration();
try {
cachesInfo.validateStartCacheConfiguration(ccfg);
}
catch (IgniteCheckedException e) {
fut.onDone(e);
}
}
if (fut.isDone())
continue;
DynamicCacheStartFuture old = (DynamicCacheStartFuture)pendingFuts.putIfAbsent(
req.requestId(), fut);
assert old == null;
if (fut.isDone())
continue;
sndReqs.add(req);
}
catch (Exception e) {
fut.onDone(e);
}
finally {
res.add(fut);
}
}
Exception err = null;
if (!sndReqs.isEmpty()) {
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
err = checkNodeState();
}
catch (IgniteCheckedException e) {
err = e;
}
}
if (err != null) {
for (DynamicCacheStartFuture fut : res)
fut.onDone(err);
}
return res;
}
/**
* Authorizes cache change request.
*
* @param security Security.
* @param req Cache change request.
*/
static void authorizeCacheChange(IgniteSecurity security, DynamicCacheChangeRequest req) {
if (req.cacheType() == null || req.cacheType() == CacheType.USER) {
if (req.start())
authorizeCacheCreate(security, req.startCacheConfiguration());
else if (req.stop())
authorizeCacheDestroy(security, req.cacheName());
}
}
/**
* Authorizes cache destroy.
*
* @param security Security.
* @param cacheName Cache name.
*/
static void authorizeCacheDestroy(IgniteSecurity security, String cacheName) {
security.authorize(cacheName, SecurityPermission.CACHE_DESTROY);
}
/**
* Authorizes cache create.
*
* @param security Security.
* @param cacheCfg Cache configuration.
*/
static void authorizeCacheCreate(IgniteSecurity security, @Nullable CacheConfiguration cacheCfg) {
if (cacheCfg == null)
return;
security.authorize(cacheCfg.getName(), SecurityPermission.CACHE_CREATE);
if (cacheCfg.isOnheapCacheEnabled() &&
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_ONHEAP_CACHE))
throw new SecurityException("Authorization failed for enabling on-heap cache.");
}
/**
* @return Non null exception if node is stopping or disconnected.
*/
private @Nullable IgniteCheckedException checkNodeState() {
if (ctx.isStopping()) {
return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
"node is stopping.");
}
else if (ctx.clientDisconnected()) {
return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
}
return null;
}
/**
* @param type Event type.
* @param customMsg Custom message instance.
* @param node Event node.
* @param topVer Topology version.
* @param state Cluster state.
*/
public void onDiscoveryEvent(
int type,
@Nullable DiscoveryCustomMessage customMsg,
ClusterNode node,
AffinityTopologyVersion topVer,
DiscoveryDataClusterState state
) {
cachesInfo.onDiscoveryEvent(type, node, topVer);
sharedCtx.affinity().onDiscoveryEvent(type, customMsg, node, topVer, state);
}
/**
* Callback invoked from discovery thread when discovery custom message is received.
*
* @param msg Customer message.
* @param topVer Current topology version.
* @param node Node sent message.
* @return {@code True} if minor topology version should be increased.
*/
public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer, ClusterNode node) {
if (msg instanceof SchemaAbstractDiscoveryMessage) {
ctx.query().onDiscovery((SchemaAbstractDiscoveryMessage)msg);
return false;
}
if (msg instanceof CacheAffinityChangeMessage)
return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg));
if (msg instanceof SnapshotDiscoveryMessage &&
((SnapshotDiscoveryMessage)msg).needExchange())
return true;
if (msg instanceof WalStateAbstractMessage) {
WalStateAbstractMessage msg0 = (WalStateAbstractMessage)msg;
if (msg0 instanceof WalStateProposeMessage)
sharedCtx.walState().onProposeDiscovery((WalStateProposeMessage)msg);
else if (msg0 instanceof WalStateFinishMessage)
sharedCtx.walState().onFinishDiscovery((WalStateFinishMessage)msg);
return msg0.needExchange();
}
if (msg instanceof DynamicCacheChangeBatch) {
boolean changeRequested = cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
ctx.query().onCacheChangeRequested((DynamicCacheChangeBatch)msg);
return changeRequested;
}
if (msg instanceof DynamicCacheChangeFailureMessage)
cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer);
if (msg instanceof ClientCacheChangeDiscoveryMessage)
cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);
if (msg instanceof CacheStatisticsModeChangeMessage)
onCacheStatisticsModeChange((CacheStatisticsModeChangeMessage)msg);
if (msg instanceof CacheStatisticsClearMessage)
onCacheStatisticsClear((CacheStatisticsClearMessage)msg);
if (msg instanceof TxTimeoutOnPartitionMapExchangeChangeMessage)
sharedCtx.tm().onTxTimeoutOnPartitionMapExchangeChange((TxTimeoutOnPartitionMapExchangeChangeMessage)msg);
return false;
}
/** {@inheritDoc} */
@Override public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node) {
IgniteNodeValidationResult res = validateHashIdResolvers(node, ctx, cacheDescriptors());
if (res == null)
res = validateRestartingCaches(node);
if (res == null)
res = validateRestoringCaches(node);
return res;
}
/**
* @param cacheName Cache to check.
* @return Cache is under restarting.
*/
public boolean isCacheRestarting(String cacheName) {
return cachesInfo.isRestarting(cacheName);
}
/**
* @param node Joining node to validate.
* @return Node validation result if there was an issue with the joining node, {@code null} otherwise.
*/
private IgniteNodeValidationResult validateRestartingCaches(ClusterNode node) {
if (cachesInfo.hasRestartingCaches()) {
String msg = "Joining node during caches restart is not allowed [joiningNodeId=" + node.id() +
", restartingCaches=" + new HashSet<>(cachesInfo.restartingCaches()) + ']';
return new IgniteNodeValidationResult(node.id(), msg);
}
return null;
}
/**
* @param node Joining node to validate.
* @return Node validation result if there was an issue with the joining node, {@code null} otherwise.
*/
private IgniteNodeValidationResult validateRestoringCaches(ClusterNode node) {
if (ctx.cache().context().snapshotMgr().isRestoring()) {
String msg = "Joining node during caches restore is not allowed [joiningNodeId=" + node.id() + ']';
return new IgniteNodeValidationResult(node.id(), msg);
}
return null;
}
/**
* @return Keep static cache configuration flag. If {@code true}, static cache configuration will override
* configuration persisted on disk.
*/
public boolean keepStaticCacheConfiguration() {
return keepStaticCacheConfiguration;
}
/**
* @param name Cache name.
* @param <K> type of keys.
* @param <V> type of values.
* @return Cache instance for given name.
*/
public <K, V> @Nullable IgniteInternalCache<K, V> cache(String name) {
assert name != null;
if (log.isDebugEnabled())
log.debug("Getting cache for name: " + name);
IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);
return jcache == null ? null : jcache.internalProxy();
}
/**
* Await proxy initialization.
*
* @param jcache Cache proxy.
*/
private void awaitInitializeProxy(IgniteCacheProxyImpl<?, ?> jcache) {
if (jcache != null) {
CountDownLatch initLatch = jcache.getInitLatch();
try {
while (initLatch.getCount() > 0) {
initLatch.await(2000, TimeUnit.MILLISECONDS);
if (log.isInfoEnabled())
log.info("Failed to wait proxy initialization, cache=" + jcache.getName() +
", localNodeId=" + ctx.localNodeId());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Ignore intteruption.
}
}
}
/**
* @param name Cache name.
*/
public void completeProxyInitialize(String name) {
IgniteCacheProxyImpl<?, ?> jcache = jCacheProxies.get(name);
if (jcache != null) {
CountDownLatch proxyInitLatch = jcache.getInitLatch();
if (proxyInitLatch.getCount() > 0) {
if (log.isInfoEnabled())
log.info("Finish proxy initialization, cacheName=" + name +
", localNodeId=" + ctx.localNodeId());
proxyInitLatch.countDown();
}
}
else {
if (log.isInfoEnabled())
log.info("Can not finish proxy initialization because proxy does not exist, cacheName=" + name +
", localNodeId=" + ctx.localNodeId());
}
}
/**
* @param name Cache name.
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
public <K, V> IgniteInternalCache<K, V> getOrStartCache(String name) throws IgniteCheckedException {
return getOrStartCache(name, null);
}
/**
* @param name Cache name.
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
public <K, V> IgniteInternalCache<K, V> getOrStartCache(
String name,
CacheConfiguration ccfg
) throws IgniteCheckedException {
assert name != null;
if (log.isDebugEnabled())
log.debug("Getting cache for name: " + name);
IgniteCacheProxy<?, ?> cache = jcacheProxy(name, true);
if (cache == null) {
dynamicStartCache(ccfg, name, null, false, ccfg == null, true).get();
cache = jcacheProxy(name, true);
}
return cache == null ? null : (IgniteInternalCache<K, V>)cache.internalProxy();
}
/**
* @return All configured cache instances.
*/
public Collection<IgniteInternalCache<?, ?>> caches() {
return F.viewReadOnly(jCacheProxies.values(),
(IgniteClosure<IgniteCacheProxy<?, ?>, IgniteInternalCache<?, ?>>)IgniteCacheProxy::internalProxy);
}
/**
* @return All configured cache instances.
*/
public Collection<IgniteCacheProxy<?, ?>> jcaches() {
return F.viewReadOnly(jCacheProxies.values(),
(IgniteClosure<IgniteCacheProxyImpl<?, ?>, IgniteCacheProxy<?, ?>>)IgniteCacheProxyImpl::gatewayWrapper);
}
/**
* Gets utility cache.
*
* @return Utility cache.
*/
public <K, V> IgniteInternalCache<K, V> utilityCache() {
return internalCacheEx(CU.UTILITY_CACHE_NAME);
}
/**
* @param name Cache name.
* @return Cache.
*/
private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) {
if (ctx.discovery().localNode().isClient()) {
IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);
if (proxy == null) {
GridCacheAdapter<?, ?> cacheAdapter = caches.get(name);
if (cacheAdapter != null) {
proxy = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false);
IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, (IgniteCacheProxyImpl<?, ?>)proxy);
if (prev != null)
proxy = (IgniteCacheProxy<K, V>)prev;
completeProxyInitialize(proxy.getName());
}
}
assert proxy != null : name;
return proxy.internalProxy();
}
return internalCache(name);
}
/**
* @param name Cache name.
* @param <K> type of keys.
* @param <V> type of values.
* @return Cache instance for given name.
* @throws IllegalArgumentException If cache not exists.
*/
public <K, V> IgniteInternalCache<K, V> publicCache(String name) {
assert name != null;
if (log.isDebugEnabled())
log.debug("Getting public cache for name: " + name);
DynamicCacheDescriptor desc = cacheDescriptor(name);
if (desc == null)
throw new IllegalArgumentException("Cache is not started: " + name);
if (!desc.cacheType().userCache())
throw new IllegalStateException("Failed to get cache because it is a system cache: " + name);
IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);
if (jcache == null)
throw new IllegalArgumentException("Cache is not started: " + name);
return jcache.internalProxy();
}
/**
* @param cacheName Cache name.
* @param <K> type of keys.
* @param <V> type of values.
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
public <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName) throws IgniteCheckedException {
return publicJCache(cacheName, true, true);
}
/**
* @param cacheName Cache name.
* @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
* otherwise returns {@code null} in this case.
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"ConstantConditions"})
public @Nullable <K, V> IgniteCacheProxy<K, V> publicJCache(String cacheName,
boolean failIfNotStarted,
boolean checkThreadTx) throws IgniteCheckedException {
assert cacheName != null;
if (log.isDebugEnabled())
log.debug("Getting public cache for name: " + cacheName);
DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
if (desc != null && !desc.cacheType().userCache())
throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, true);
// Try to start cache, there is no guarantee that cache will be instantiated.
if (proxy == null) {
dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
proxy = jcacheProxy(cacheName, true);
}
return proxy != null ? (IgniteCacheProxy<K, V>)proxy.gatewayWrapper() : null;
}
/**
* Get configuration for the given cache.
*
* @param name Cache name.
* @return Cache configuration.
*/
public CacheConfiguration cacheConfiguration(String name) {
assert name != null;
DynamicCacheDescriptor desc = cacheDescriptor(name);
if (desc == null) {
if (cachesInfo.isRestarting(name)) {
IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(name);
assert proxy != null : name;
proxy.internalProxy(); //should throw exception
// we have procceed, try again
return cacheConfiguration(name);
}
throw new IllegalStateException("Cache doesn't exist: " + name);
}
else
return desc.cacheConfiguration();
}
/**
* Get registered cache descriptor.
*
* @param name Name.
* @return Descriptor.
*/
public DynamicCacheDescriptor cacheDescriptor(String name) {
return cachesInfo.registeredCaches().get(name);
}
/**
* @return Cache descriptors.
*/
public Map<String, DynamicCacheDescriptor> cacheDescriptors() {
return cachesInfo.registeredCaches();
}
/**
* @return Collection of persistent cache descriptors.
*/
public Collection<DynamicCacheDescriptor> persistentCaches() {
return cachesInfo.registeredCaches().values()
.stream()
.filter(desc -> isPersistentCache(desc.cacheConfiguration(), ctx.config().getDataStorageConfiguration()))
.collect(toList());
}
/**
* @return Collection of persistent cache group descriptors.
*/
public Collection<CacheGroupDescriptor> persistentGroups() {
return cachesInfo.registeredCacheGroups().values()
.stream()
.filter(CacheGroupDescriptor::persistenceEnabled)
.collect(toList());
}
/**
* @return Cache group descriptors.
*/
public Map<Integer, CacheGroupDescriptor> cacheGroupDescriptors() {
return cachesInfo.registeredCacheGroups();
}
/**
* Tries to find cache group descriptor either in registered cache groups
* or in marked for deletion collection if cache group is considered to be stopped.
*
* @param grpId Group id.
*/
public CacheGroupDescriptor cacheGroupDescriptor(int grpId) {
CacheGroupDescriptor desc = cacheGroupDescriptors().get(grpId);
// Try to find descriptor if it was marked for deletion.
if (desc == null)
return cachesInfo.markedForDeletionCacheGroupDesc(grpId);
return desc;
}
/**
* @param cacheId Cache ID.
* @return Cache descriptor.
*/
public @Nullable DynamicCacheDescriptor cacheDescriptor(int cacheId) {
return cachesInfo.registeredCachesById().get(cacheId);
}
/**
* @param cacheCfg Cache configuration template.
* @throws IgniteCheckedException If failed.
*/
public void addCacheConfiguration(CacheConfiguration cacheCfg) throws IgniteCheckedException {
assert cacheCfg.getName() != null;
String name = cacheCfg.getName();
DynamicCacheDescriptor desc = cachesInfo.registeredTemplates().get(name);
if (desc != null)
return;
DynamicCacheChangeRequest req = DynamicCacheChangeRequest.addTemplateRequest(ctx, cacheCfg,
backwardCompatibleSplitter().split(cacheCfg));
TemplateConfigurationFuture fut = new TemplateConfigurationFuture(req.cacheName(), req.deploymentId());
TemplateConfigurationFuture old =
(TemplateConfigurationFuture)pendingTemplateFuts.putIfAbsent(cacheCfg.getName(), fut);
if (old != null)
fut = old;
Exception err = null;
try {
ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
if (ctx.isStopping()) {
err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
"node is stopping.");
}
else if (ctx.clientDisconnected()) {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
}
}
catch (IgniteCheckedException e) {
err = e;
}
if (err != null)
fut.onDone(err);
fut.get();
}
/**
* @param name Cache name.
* @return Cache instance for given name.
*/
@SuppressWarnings("unchecked")
public <K, V> IgniteCacheProxy<K, V> jcache(String name) {
assert name != null;
IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true);
if (cache == null) {
GridCacheAdapter<?, ?> cacheAdapter = caches.get(name);
if (cacheAdapter != null) {
cache = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false);
IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, (IgniteCacheProxyImpl<?, ?>)cache);
if (prev != null)
cache = (IgniteCacheProxy<K, V>)prev;
completeProxyInitialize(cache.getName());
}
}
if (cache == null)
throw new IllegalArgumentException("Cache is not configured: " + name);
return cache;
}
/**
* @param name Cache name.
* @param awaitInit Await proxy initialization.
* @return Cache proxy.
*/
public @Nullable IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) {
IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(name);
if (awaitInit)
awaitInitializeProxy(cache);
return cache;
}
/**
* @param name Cache name.
* @param proxy Cache proxy.
* @return Previous cache proxy.
*/
public @Nullable IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) {
return jCacheProxies.putIfAbsent(name, proxy);
}
/**
* @return All configured public cache instances.
*/
public Collection<IgniteCacheProxy<?, ?>> publicCaches() {
Collection<IgniteCacheProxy<?, ?>> res = new ArrayList<>(jCacheProxies.size());
for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) {
if (proxy.context().userCache())
res.add(proxy.gatewayWrapper());
}
return res;
}
/**
* @param name Cache name.
* @param <K> type of keys.
* @param <V> type of values.
* @return Cache instance for given name.
*/
public <K, V> GridCacheAdapter<K, V> internalCache(String name) {
assert name != null;
if (log.isDebugEnabled())
log.debug("Getting internal cache adapter: " + name);
return (GridCacheAdapter<K, V>)caches.get(name);
}
/**
* Cancel all user operations.
*/
private void cancelFutures() {
sharedCtx.mvcc().onStop();
Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
for (IgniteInternalFuture fut : pendingFuts.values())
((GridFutureAdapter)fut).onDone(err);
for (IgniteInternalFuture fut : pendingTemplateFuts.values())
((GridFutureAdapter)fut).onDone(err);
for (EnableStatisticsFuture fut : manageStatisticsFuts.values())
fut.onDone(err);
}
/**
* @return All internal cache instances.
*/
public Collection<GridCacheAdapter<?, ?>> internalCaches() {
return caches.values();
}
/**
* @param name Cache name.
* @return {@code True} if specified cache is system, {@code false} otherwise.
*/
public boolean systemCache(String name) {
assert name != null;
DynamicCacheDescriptor desc = cacheDescriptor(name);
return desc != null && !desc.cacheType().userCache();
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>> ");
for (GridCacheAdapter c : caches.values()) {
X.println(">>> Cache memory stats [igniteInstanceName=" + ctx.igniteInstanceName() +
", cache=" + c.name() + ']');
c.context().printMemoryStats();
}
}
/**
* Callback invoked by deployment manager for whenever a class loader gets undeployed.
*
* @param ldr Class loader.
*/
public void onUndeployed(ClassLoader ldr) {
if (!ctx.isStopping()) {
for (GridCacheAdapter<?, ?> cache : caches.values()) {
// Do not notify system caches and caches for which deployment is disabled.
if (cache.context().userCache() && cache.context().deploymentEnabled())
cache.onUndeploy(ldr);
}
}
}
/**
* @return Shared context.
*/
public <K, V> GridCacheSharedContext<K, V> context() {
return (GridCacheSharedContext<K, V>)sharedCtx;
}
/** @return Local config manager. */
public GridLocalConfigManager configManager() {
return locCfgMgr;
}
/**
* @return Transactions interface implementation.
*/
public IgniteTransactionsEx transactions() {
return transactions;
}
/**
* Registers MBean for cache components.
*
* @param obj Cache component.
* @param cacheName Cache name.
* @param near Near flag.
* @throws IgniteCheckedException If registration failed.
*/
private void registerMbean(Object obj, @Nullable String cacheName, boolean near)
throws IgniteCheckedException {
if (U.IGNITE_MBEANS_DISABLED)
return;
assert obj != null;
MBeanServer srvr = ctx.config().getMBeanServer();
assert srvr != null;
cacheName = U.maskName(cacheName);
cacheName = near ? cacheName + "-near" : cacheName;
final Object mbeanImpl = (obj instanceof IgniteMBeanAware) ? ((IgniteMBeanAware)obj).getMBean() : obj;
for (Class<?> itf : mbeanImpl.getClass().getInterfaces()) {
if (itf.getName().endsWith("MBean") || itf.getName().endsWith("MXBean")) {
try {
U.registerMBean(srvr, ctx.igniteInstanceName(), cacheName, obj.getClass().getName(), mbeanImpl,
(Class<Object>)itf);
}
catch (Throwable e) {
throw new IgniteCheckedException("Failed to register MBean for component: " + obj, e);
}
break;
}
}
}
/**
* Unregisters MBean for cache components.
*
* @param o Cache component.
* @param cacheName Cache name.
* @param near Near flag.
*/
private void unregisterMbean(Object o, @Nullable String cacheName, boolean near) {
if (U.IGNITE_MBEANS_DISABLED)
return;
assert o != null;
MBeanServer srvr = ctx.config().getMBeanServer();
assert srvr != null;
cacheName = U.maskName(cacheName);
cacheName = near ? cacheName + "-near" : cacheName;
boolean needToUnregister = o instanceof IgniteMBeanAware;
if (!needToUnregister) {
for (Class<?> itf : o.getClass().getInterfaces()) {
if (itf.getName().endsWith("MBean") || itf.getName().endsWith("MXBean")) {
needToUnregister = true;
break;
}
}
}
if (needToUnregister) {
try {
srvr.unregisterMBean(U.makeMBeanName(ctx.igniteInstanceName(), cacheName, o.getClass().getName()));
}
catch (Throwable e) {
U.error(log, "Failed to unregister MBean for component: " + o, e);
}
}
}
/**
* @param grp Cache group.
* @param ccfg Cache configuration.
* @param objs Extra components.
* @return Components provided in cache configuration which can implement {@link LifecycleAware} interface.
*/
private Iterable<Object> lifecycleAwares(CacheGroupContext grp, CacheConfiguration ccfg, Object... objs) {
Collection<Object> ret = new ArrayList<>(7 + objs.length);
if (grp.affinityFunction() != ccfg.getAffinity())
ret.add(ccfg.getAffinity());
ret.add(ccfg.getAffinityMapper());
ret.add(ccfg.getEvictionFilter());
ret.add(ccfg.getEvictionPolicyFactory());
ret.add(ccfg.getEvictionPolicy());
ret.add(ccfg.getInterceptor());
NearCacheConfiguration nearCfg = ccfg.getNearConfiguration();
if (nearCfg != null) {
ret.add(nearCfg.getNearEvictionPolicyFactory());
ret.add(nearCfg.getNearEvictionPolicy());
}
Collections.addAll(ret, objs);
return ret;
}
/**
* @param val Object to check.
* @return Configuration copy.
* @throws IgniteCheckedException If validation failed.
*/
CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
if (val == null)
return null;
return withBinaryContext(new IgniteOutClosureX<CacheConfiguration>() {
@Override public CacheConfiguration applyx() throws IgniteCheckedException {
if (val.getCacheStoreFactory() != null) {
try {
ClassLoader ldr = ctx.config().getClassLoader();
if (ldr == null)
ldr = val.getCacheStoreFactory().getClass().getClassLoader();
U.unmarshal(marsh, U.marshal(marsh, val.getCacheStoreFactory()),
U.resolveClassLoader(ldr, ctx.config()));
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to validate cache configuration. " +
"Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e);
}
}
try {
return U.unmarshal(marsh, U.marshal(marsh, val), U.resolveClassLoader(ctx.config()));
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to validate cache configuration " +
"(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e);
}
}
});
}
/**
* @param c Closure.
* @return Closure result.
* @throws IgniteCheckedException If failed.
*/
private <T> T withBinaryContext(IgniteOutClosureX<T> c) throws IgniteCheckedException {
IgniteCacheObjectProcessor objProc = ctx.cacheObjects();
BinaryContext oldCtx = null;
if (objProc instanceof CacheObjectBinaryProcessorImpl) {
GridBinaryMarshaller binMarsh = ((CacheObjectBinaryProcessorImpl)objProc).marshaller();
oldCtx = binMarsh == null ? null : binMarsh.pushContext();
}
try {
return c.applyx();
}
finally {
if (objProc instanceof CacheObjectBinaryProcessorImpl)
GridBinaryMarshaller.popContext(oldCtx);
}
}
/**
* Prepares DynamicCacheChangeRequest for cache creation.
*
* @param ccfg Cache configuration
* @param cacheName Cache name
* @param nearCfg Near cache configuration
* @param cacheType Cache type
* @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
* @param failIfExists Fail if exists flag.
* @param failIfNotStarted If {@code true} fails if cache is not started.
* @param restartId Restart requester id (it'll allow to start this cache only him).
* @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
* @param qryEntities Query entities.
* @param encKey Encryption key.
* @param encKeyId Id of the encryption key.
* @param masterKeyDigest Master key digest.
* @return Request or {@code null} if cache already exists.
* @throws IgniteCheckedException if some of pre-checks failed
* @throws CacheExistsException if cache exists and failIfExists flag is {@code true}
*/
private DynamicCacheChangeRequest prepareCacheChangeRequest(
@Nullable CacheConfiguration ccfg,
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
CacheType cacheType,
boolean sql,
boolean failIfExists,
boolean failIfNotStarted,
IgniteUuid restartId,
boolean disabledAfterStart,
@Nullable Collection<QueryEntity> qryEntities,
@Nullable byte[] encKey,
@Nullable Integer encKeyId,
@Nullable byte[] masterKeyDigest
) throws IgniteCheckedException {
DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
req.sql(sql);
req.failIfExists(failIfExists);
req.disabledAfterStart(disabledAfterStart);
req.masterKeyDigest(masterKeyDigest);
req.encryptionKey(encKey);
req.encryptionKeyId(encKeyId);
req.restartId(restartId);
if (ccfg != null) {
cloneCheckSerializable(ccfg);
if (desc != null) {
if (failIfExists) {
throw new CacheExistsException("Failed to start cache " +
"(a cache with the same name is already started): " + cacheName);
}
else {
CacheConfiguration descCfg = desc.cacheConfiguration();
// Check if we were asked to start a near cache.
if (nearCfg != null) {
if (isLocalAffinity(descCfg)) {
// If we are on a data node and near cache was enabled, return success, else - fail.
if (descCfg.getNearConfiguration() != null)
return null;
else
throw new IgniteCheckedException("Failed to start near " +
"cache (local node is an affinity node for cache): " + cacheName);
}
else
// If local node has near cache, return success.
req.clientStartOnly(true);
}
else if (!isLocalAffinity(descCfg))
req.clientStartOnly(true);
req.deploymentId(desc.deploymentId());
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(desc);
req.startCacheConfiguration(splitCfg.get1());
req.cacheConfigurationEnrichment(splitCfg.get2());
req.schema(desc.schema());
}
}
else {
CacheConfiguration cfg = new CacheConfiguration(ccfg);
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
// Cache configuration must be initialized before splitting.
initialize(cfg, cacheObjCtx);
req.deploymentId(IgniteUuid.randomUuid());
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(cfg);
req.startCacheConfiguration(splitCfg.get1());
req.cacheConfigurationEnrichment(splitCfg.get2());
cfg = splitCfg.get1();
if (restartId != null)
req.schema(new QuerySchema(qryEntities == null ? cfg.getQueryEntities() : qryEntities));
else
req.schema(new QuerySchema(qryEntities != null ? QueryUtils.normalizeQueryEntities(ctx, qryEntities, cfg)
: cfg.getQueryEntities()));
}
}
else {
req.clientStartOnly(true);
if (desc != null)
ccfg = desc.cacheConfiguration();
if (ccfg == null) {
if (failIfNotStarted) {
throw new CacheExistsException("Failed to start client cache " +
"(a cache with the given name is not started): " + cacheName);
}
else
return null;
}
req.deploymentId(desc.deploymentId());
T2<CacheConfiguration, CacheConfigurationEnrichment> splitCfg = backwardCompatibleSplitter().split(ccfg);
req.startCacheConfiguration(splitCfg.get1());
req.cacheConfigurationEnrichment(splitCfg.get2());
req.schema(desc.schema());
}
if (nearCfg != null)
req.nearCacheConfiguration(nearCfg);
req.cacheType(cacheType);
return req;
}
/**
* Enable/disable statistics globally for the caches
*
* @param cacheNames Collection of cache names.
* @param enabled Statistics enabled flag.
*/
public void enableStatistics(Collection<String> cacheNames, boolean enabled) throws IgniteCheckedException {
Collection<IgniteInternalCache> caches = manageStatisticsCaches(cacheNames);
Collection<String> globalCaches = new HashSet<>(U.capacity(caches.size()));
for (IgniteInternalCache cache : caches) {
cache.context().statisticsEnabled(enabled);
globalCaches.add(cache.name());
}
if (globalCaches.isEmpty())
return;
CacheStatisticsModeChangeMessage msg = new CacheStatisticsModeChangeMessage(UUID.randomUUID(), globalCaches, enabled);
EnableStatisticsFuture fut = new EnableStatisticsFuture(msg.requestId());
manageStatisticsFuts.put(msg.requestId(), fut);
ctx.grid().context().discovery().sendCustomEvent(msg);
fut.get();
}
/**
* Clear statistics globally for the caches
*
* @param cacheNames Collection of cache names.
*/
public void clearStatistics(Collection<String> cacheNames) throws IgniteCheckedException {
Collection<IgniteInternalCache> caches = manageStatisticsCaches(cacheNames);
Collection<String> globalCaches = new HashSet<>(U.capacity(caches.size()));
for (IgniteInternalCache cache : caches)
globalCaches.add(cache.name());
if (globalCaches.isEmpty())
return;
CacheStatisticsClearMessage msg = new CacheStatisticsClearMessage(UUID.randomUUID(), globalCaches);
EnableStatisticsFuture fut = new EnableStatisticsFuture(msg.requestId());
manageStatisticsFuts.put(msg.requestId(), fut);
ctx.grid().context().discovery().sendCustomEvent(msg);
fut.get();
}
/**
*
*/
private Collection<IgniteInternalCache> manageStatisticsCaches(Collection<String> caches)
throws IgniteCheckedException {
assert caches != null;
Collection<IgniteInternalCache> res = new ArrayList<>(caches.size());
if (!cacheNames().containsAll(caches))
throw new IgniteCheckedException("One or more cache descriptors not found [caches=" + caches + ']');
for (String cacheName : caches) {
IgniteInternalCache cache = cache(cacheName);
if (cache == null)
throw new IgniteCheckedException("Cache not found [cacheName=" + cacheName + ']');
res.add(cache);
}
return res;
}
/**
* @param obj Object to clone.
* @return Object copy.
* @throws IgniteCheckedException If failed.
*/
public <T> T clone(final T obj) throws IgniteCheckedException {
return withBinaryContext(new IgniteOutClosureX<T>() {
@Override public T applyx() throws IgniteCheckedException {
return U.unmarshal(marsh, U.marshal(marsh, obj), U.resolveClassLoader(ctx.config()));
}
});
}
/**
* @param oldFormat Old format.
*/
private CacheConfigurationSplitter splitter(boolean oldFormat) {
// Requesting splitter with old format support is rare operation.
// It's acceptable to allocate it every time by request.
return oldFormat ? new CacheConfigurationSplitterOldFormat(enricher) : splitter;
}
/**
* @return By default it returns splitter without old format configuration support.
*/
public CacheConfigurationSplitter splitter() {
return splitter(false);
}
/**
* If not all nodes in cluster support splitted cache configurations it returns old format splitter.
* In other case it returns default splitter.
*
* @return Cache configuration splitter with or without old format support depending on cluster state.
*/
private CacheConfigurationSplitter backwardCompatibleSplitter() {
IgniteDiscoverySpi spi = (IgniteDiscoverySpi)ctx.discovery().getInjectedDiscoverySpi();
boolean oldFormat = !spi.allNodesSupport(IgniteFeatures.SPLITTED_CACHE_CONFIGURATIONS);
return splitter(oldFormat);
}
/**
* @return Cache configuration enricher.
*/
public CacheConfigurationEnricher enricher() {
return enricher;
}
/**
* Pages list view supplier.
*
* @param filter Filter.
*/
private Iterable<CachePagesListView> pagesListViewSupplier(Map<String, Object> filter) {
Integer cacheGrpId = (Integer)filter.get(CachePagesListViewWalker.CACHE_GROUP_ID_FILTER);
Integer partId = (Integer)filter.get(CachePagesListViewWalker.PARTITION_ID_FILTER);
Integer bucketNum = (Integer)filter.get(CachePagesListViewWalker.BUCKET_NUMBER_FILTER);
Iterable<IgniteCacheOffheapManager.CacheDataStore> dataStores = F.flat(F.iterator(
filteredMap(cacheGrps, cacheGrpId).values(), grp -> grp.offheap().cacheDataStores(), true));
return F.flat(F.iterator(dataStores, dataStore -> {
RowStore rowStore = dataStore.rowStore();
if (rowStore == null || !(dataStore instanceof GridCacheOffheapManager.GridCacheDataStore))
return Collections.emptySet();
PagesList pagesList = (PagesList)rowStore.freeList();
if (bucketNum != null) {
return bucketNum >= 0 && bucketNum < pagesList.bucketsCount() ?
Collections.singleton(new CachePagesListView(pagesList, bucketNum, dataStore.partId())) :
Collections.emptyList();
}
return IntStream.range(0, pagesList.bucketsCount())
.mapToObj(bucket -> new CachePagesListView(pagesList, bucket, dataStore.partId()))
.collect(toList());
}, true, cacheDataStore -> partId == null || cacheDataStore.partId() == partId));
}
/**
* Partition states view supplier.
*
* @param filter Filter.
*/
private Iterable<PartitionStateView> partStatesViewSupplier(Map<String, Object> filter) {
Integer cacheGrpId = (Integer)filter.get(PartitionStateViewWalker.CACHE_GROUP_ID_FILTER);
UUID nodeId = (UUID)filter.get(PartitionStateViewWalker.NODE_ID_FILTER);
Integer partId = (Integer)filter.get(PartitionStateViewWalker.PARTITION_ID_FILTER);
return () -> F.concat(F.concat(F.iterator(filteredMap(cacheGrps, cacheGrpId).values(),
grp -> F.iterator(filteredMap(grp.topology().partitionMap(false), nodeId).entrySet(),
nodeToParts -> F.iterator(filteredMap(nodeToParts.getValue().map(),
partId == null || partId < 0 ? null : partId).entrySet(),
partToStates -> new PartitionStateView(
grp.groupId(),
nodeToParts.getKey(),
partToStates.getKey(),
partToStates.getValue(),
isPrimary(grp, nodeToParts.getKey(), partToStates.getKey())),
true),
true),
true)));
}
/**
* Filter map by key.
*
* @param map Map.
* @param key Filtering key.
*/
private static <K, V> Map<K, V> filteredMap(Map<K, V> map, K key) {
if (key == null)
return map;
V val = map.get(key);
return val != null ? F.asMap(key, val) : Collections.emptyMap();
}
/**
* @param grp Cache group.
* @param nodeId Node id.
* @param part Partition.
*/
private static boolean isPrimary(CacheGroupContext grp, UUID nodeId, int part) {
List<ClusterNode> nodes = grp.affinity().lastReadyAffinity().get(part);
if (F.isEmpty(nodes))
return false;
ClusterNode primaryNode = nodes.get(0);
return primaryNode != null && nodeId.equals(primaryNode.id());
}
/**
* Recovery lifecycle for caches.
*/
private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener {
/**
* Set of QuerySchema's saved on recovery. It's needed if cache query schema has changed after node joined to
* topology.
*/
private final Map<Integer, QuerySchema> querySchemas = new ConcurrentHashMap<>();
/** Flag for stopping warm-up. */
private final AtomicBoolean stopWarmUp = new AtomicBoolean();
/** Currently running warm-up strategy. */
private volatile WarmUpStrategy curWarmUpStrat;
/** {@inheritDoc} */
@Override public void onBaselineChange() {
for (GridCacheAdapter<?, ?> cache : caches.values())
if (cache != null)
cache.context().ttl().unregister();
onKernalStopCaches(true);
stopCaches(true);
sharedCtx.coordinators().stopTxLog();
sharedCtx.database().cleanupRestoredCaches();
}
/** {@inheritDoc} */
@Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException {
CacheJoinNodeDiscoveryData data = locCfgMgr.restoreCacheConfigurations();
cachesInfo.onStart(data);
}
/** {@inheritDoc} */
@Override public void beforeBinaryMemoryRestore(
IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
for (DynamicCacheDescriptor cacheDescriptor : persistentCaches())
preparePageStore(cacheDescriptor, true);
}
/** {@inheritDoc} */
@Override public void afterBinaryMemoryRestore(
IgniteCacheDatabaseSharedManager mgr,
GridCacheDatabaseSharedManager.RestoreBinaryState restoreState) throws IgniteCheckedException {
Object consistentId = ctx.pdsFolderResolver().resolveFolders().consistentId();
DetachedClusterNode clusterNode = new DetachedClusterNode(consistentId, ctx.nodeAttributes());
for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) {
boolean affinityNode = CU.affinityNode(clusterNode, cacheDescriptor.cacheConfiguration().getNodeFilter());
if (!affinityNode)
continue;
startCacheInRecoveryMode(cacheDescriptor);
querySchemas.put(cacheDescriptor.cacheId(), cacheDescriptor.schema().copy());
}
}
/** {@inheritDoc} */
@Override public void afterLogicalUpdatesApplied(
IgniteCacheDatabaseSharedManager mgr,
GridCacheDatabaseSharedManager.RestoreLogicalState restoreState
) throws IgniteCheckedException {
Collection<CacheGroupContext> cacheGrps = cacheGroups();
restorePartitionStates(cacheGrps, restoreState.partitionRecoveryStates());
// Start warm-up only after restoring memory storage, but before starting GridDiscoveryManager.
if (!cacheGrps.isEmpty())
startWarmUp();
}
/**
* Restoring the state of partitions for cache groups.
*
* @param forGroups Cache groups.
* @param partStates Partition states.
* @throws IgniteCheckedException If failed.
*/
private void restorePartitionStates(
Collection<CacheGroupContext> forGroups,
Map<GroupPartitionId, Integer> partStates
) throws IgniteCheckedException {
long startRestorePart = U.currentTimeMillis();
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
AtomicReference<IgniteCheckedException> restoreStateError = new AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
final int totalPart = forGroups.stream().mapToInt(grpCtx -> grpCtx.affinity().partitions()).sum();
CountDownLatch completionLatch = new CountDownLatch(totalPart);
Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new ConcurrentHashMap<>();
final int topPartRefLimit = 5;
for (CacheGroupContext grpCtx : forGroups) {
for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
final int partId = i;
sysPool.execute(() -> {
GroupPartitionId grpPartId = new GroupPartitionId(grpCtx.groupId(), partId);
try {
long time = grpCtx.offheap().restoreStateOfPartition(partId, partStates.get(grpPartId));
if (log.isInfoEnabled()) {
T3<Long, Long, GroupPartitionId> curPart = new T3<>(time, U.currentTimeMillis(), grpPartId);
RestorePartitionStateThreadContext threadCtx = threadCtxs.computeIfAbsent(
Thread.currentThread(),
t -> new RestorePartitionStateThreadContext()
);
Comparator<T3<Long, Long, GroupPartitionId>> cmp = processedPartitionComparator();
threadCtx.topPartRef.updateAndGet(prev -> {
if (prev == null ||
cmp.compare(prev.last(), curPart) < 0) {
SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(cmp);
top.add(curPart);
if (prev != null)
top.addAll(prev);
trimToSize(top, topPartRefLimit);
return top;
}
else
return prev;
});
threadCtx.incrementProcessedCnt();
}
}
catch (IgniteCheckedException | RuntimeException | Error e) {
U.error(log, "Failed to restore partition state for " +
"groupName=" + grpCtx.name() + " groupId=" + grpCtx.groupId(), e);
IgniteCheckedException ex = e instanceof IgniteCheckedException
? ((IgniteCheckedException)e)
: new IgniteCheckedException(e);
if (!restoreStateError.compareAndSet(null, ex))
restoreStateError.get().addSuppressed(ex);
}
finally {
completionLatch.countDown();
}
});
}
}
boolean printTop = false;
try {
// Await completion restore state tasks in all stripes.
if (!log.isInfoEnabled())
completionLatch.await();
else {
long timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS;
while (!completionLatch.await(timeout, TimeUnit.MILLISECONDS)) {
if (log.isInfoEnabled()) {
SortedSet<T3<Long, Long, GroupPartitionId>> top =
collectTopProcessedParts(threadCtxs.values(), topPartRefLimit);
long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();
log.info("Restore partitions state progress [grpCnt=" +
(forGroups.size() - completionLatch.getCount()) + '/' + forGroups.size() +
", partitionCnt=" + totalProcessed + '/' + totalPart + (top.isEmpty() ? "" :
", topProcessedPartitions=" + toStringTopProcessingPartitions(top, forGroups)) + ']');
}
timeout = TIMEOUT_OUTPUT_RESTORE_PARTITION_STATE_PROGRESS / 5;
printTop = true;
}
}
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
for (CacheGroupContext grpCtx : forGroups)
grpCtx.offheap().confirmPartitionStatesRestored();
// Checking error after all task applied.
if (restoreStateError.get() != null)
throw restoreStateError.get();
if (log.isInfoEnabled()) {
SortedSet<T3<Long, Long, GroupPartitionId>> t =
printTop ? collectTopProcessedParts(threadCtxs.values(), topPartRefLimit) : null;
long totalProcessed = threadCtxs.values().stream().mapToLong(c -> c.processedCnt).sum();
log.info("Finished restoring partition state for local groups [" +
"groupsProcessed=" + forGroups.size() +
", partitionsProcessed=" + totalProcessed +
", time=" + U.humanReadableDuration(U.currentTimeMillis() - startRestorePart) +
(t == null ? "" : ", topProcessedPartitions=" + toStringTopProcessingPartitions(t, forGroups)) +
"]");
}
}
/**
* Collects top processed partitions from thread local contexts of restore partition state process.
*
* @param threadCtxs Thread local contexts.
* @param topPartRefLimit Limit of top partitions collection size.
*/
private SortedSet<T3<Long, Long, GroupPartitionId>> collectTopProcessedParts(
Collection<RestorePartitionStateThreadContext> threadCtxs,
int topPartRefLimit
) {
SortedSet<T3<Long, Long, GroupPartitionId>> top = new TreeSet<>(processedPartitionComparator());
for (RestorePartitionStateThreadContext threadCtx : threadCtxs) {
SortedSet<T3<Long, Long, GroupPartitionId>> threadTop = threadCtx.topPartRef.get();
if (threadTop != null)
top.addAll(threadTop);
}
trimToSize(top, topPartRefLimit);
return top;
}
/**
* Start warming up sequentially for each persist data region.
*
* @throws IgniteCheckedException If failed.
*/
private void startWarmUp() throws IgniteCheckedException {
boolean start = false;
try {
// Collecting custom and default data regions.
DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();
List<DataRegionConfiguration> regCfgs =
new ArrayList<>(asList(dsCfg.getDefaultDataRegionConfiguration()));
if (nonNull(dsCfg.getDataRegionConfigurations()))
regCfgs.addAll(asList(dsCfg.getDataRegionConfigurations()));
// Warm-up start.
Map<Class<? extends WarmUpConfiguration>, WarmUpStrategy> warmUpStrats = CU.warmUpStrategies(ctx);
WarmUpConfiguration dfltWarmUpCfg = dsCfg.getDefaultWarmUpConfiguration();
for (DataRegionConfiguration regCfg : regCfgs) {
if (stopWarmUp.get())
return;
if (!regCfg.isPersistenceEnabled())
continue;
WarmUpConfiguration warmUpCfg = nonNull(regCfg.getWarmUpConfiguration()) ?
regCfg.getWarmUpConfiguration() : dfltWarmUpCfg;
if (isNull(warmUpCfg))
continue;
WarmUpStrategy warmUpStrat = (curWarmUpStrat = warmUpStrats.get(warmUpCfg.getClass()));
DataRegion region = sharedCtx.database().dataRegion(regCfg.getName());
if (!stopWarmUp.get()) {
if (!start && (start = true) && log.isInfoEnabled())
log.info("Warm-up start.");
if (log.isInfoEnabled()) {
log.info("Start warm-up for data region [name=" + regCfg.getName()
+ ", warmUpStrategy=" + warmUpStrat + ", warmUpConfig=" + warmUpCfg + ", isDefault="
+ (warmUpCfg == dfltWarmUpCfg) + ']');
}
warmUpStrat.warmUp(warmUpCfg, region);
if (log.isInfoEnabled())
log.info("Finish of warm-up data region: " + region.config().getName());
}
}
}
finally {
if (stopWarmUp.get() && log.isInfoEnabled())
log.info("Warm-up stop.");
else if (start && log.isInfoEnabled())
log.info("Warm-up finish.");
stopWarmUp.set(true);
curWarmUpStrat = null;
}
}
}
/**
* Stop warming up and current running strategy.
*
* @return {@code true} if stopped by this call.
* @throws IgniteCheckedException If there is an error when stopping warm-up.
*/
public boolean stopWarmUp() throws IgniteCheckedException {
if (recovery.stopWarmUp.compareAndSet(false, true)) {
WarmUpStrategy strat = recovery.curWarmUpStrat;
if (log.isInfoEnabled())
log.info("Stopping warm-up strategy: " + strat);
if (nonNull(strat))
strat.stop();
return true;
}
return false;
}
/**
* Handle of fail during cache start.
*
* @param <T> Type of started data.
*/
private interface StartCacheFailHandler<T, R> {
/**
* Handle of fail.
*
* @param data Start data.
* @param startCacheOperation Operation for start cache.
* @throws IgniteCheckedException if failed.
*/
void handle(T data, IgniteThrowableFunction<T, R> startCacheOperation) throws IgniteCheckedException;
}
/**
*
*/
private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> {
/** */
private UUID id;
/**
* @param id Future ID.
*/
private DynamicCacheStartFuture(UUID id) {
this.id = id;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
// Make sure to remove future before completion.
pendingFuts.remove(id, this);
context().exchange().exchangerUpdateHeartbeat();
return super.onDone(res, err);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheStartFuture.class, this);
}
}
/**
*
*/
private class TemplateConfigurationFuture extends GridFutureAdapter<Object> {
/** Start ID. */
@GridToStringInclude
private IgniteUuid deploymentId;
/** Cache name. */
private String cacheName;
/**
* @param cacheName Cache name.
* @param deploymentId Deployment ID.
*/
private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) {
this.deploymentId = deploymentId;
this.cacheName = cacheName;
}
/**
* @return Start ID.
*/
public IgniteUuid deploymentId() {
return deploymentId;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
// Make sure to remove future before completion.
pendingTemplateFuts.remove(cacheName, this);
return super.onDone(res, err);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TemplateConfigurationFuture.class, this);
}
}
/**
* Enable statistics future.
*/
private class EnableStatisticsFuture extends GridFutureAdapter<Void> {
/** */
private UUID id;
/**
* @param id Future ID.
*/
private EnableStatisticsFuture(UUID id) {
this.id = id;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
// Make sure to remove future before completion.
manageStatisticsFuts.remove(id, this);
return super.onDone(res, err);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(EnableStatisticsFuture.class, this);
}
}
/**
* Creation of a string representation of the top (descending) partitions, the processing of which took the most time.
*
* @param top Top (ascending) processed partitions.
* @param groups Cache group contexts.
* @return String representation.
*/
static String toStringTopProcessingPartitions(
SortedSet<T3<Long, Long, GroupPartitionId>> top,
Collection<CacheGroupContext> groups
) {
if (top.isEmpty())
return "[]";
StringJoiner sj0 = new StringJoiner(", ", "[", "]");
TreeMap<Long, List<GroupPartitionId>> top0 = top.stream().collect(groupingBy(
T3::get1,
TreeMap::new,
mapping(T3::get3, toList())
));
for (Map.Entry<Long, List<GroupPartitionId>> e0 : top0.descendingMap().entrySet()) {
Map<Integer, List<GroupPartitionId>> byCacheGrpId =
e0.getValue().stream().collect(groupingBy(GroupPartitionId::getGroupId));
StringJoiner sj1 = new StringJoiner(", ", "[", "]");
for (Map.Entry<Integer, List<GroupPartitionId>> e1 : byCacheGrpId.entrySet()) {
@Nullable CacheGroupContext grp =
groups.stream().filter(g -> g.groupId() == e1.getKey()).findAny().orElse(null);
String parts = e1.getValue().stream().map(GroupPartitionId::getPartitionId).sorted()
.map(p -> grp == null ? p.toString() : p + ":" + grp.topology().localPartition(p).fullSize())
.collect(Collectors.joining(", ", "[", "]"));
sj1.add("[grp=" + (grp == null ? e1.getKey() : grp.cacheOrGroupName()) + ", part=" + parts + ']');
}
sj0.add("[time=" + U.humanReadableDuration(e0.getKey()) + ' ' + sj1.toString() + ']');
}
return sj0.toString();
}
/**
* Trimming the set to the required size.
* Removing items will be in ascending order.
*
* @param set Set.
* @param size Size.
*/
static <E> void trimToSize(SortedSet<E> set, int size) {
while (set.size() > size)
set.remove(set.first());
}
/**
* Comparator of processed partitions.
* T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
* Sort order: duration -> timestamp (reversed order) -> partition of group.
*
* @return Comparator.
*/
static Comparator<T3<Long, Long, GroupPartitionId>> processedPartitionComparator() {
Comparator<T3<Long, Long, GroupPartitionId>> comp = Comparator.comparing(T3::get1);
return comp.thenComparing(T3::get2, Comparator.reverseOrder()).thenComparing(T3::get3);
}
/**
* Thread local context of restore partition state progress.
*/
private static class RestorePartitionStateThreadContext {
/** Field updater. */
static final AtomicLongFieldUpdater<RestorePartitionStateThreadContext> PROCESSED_CNT_UPD =
AtomicLongFieldUpdater.newUpdater(RestorePartitionStateThreadContext.class, "processedCnt");
/** Top partitions by processing time. */
final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> topPartRef =
new AtomicReference<>();
/** Processed partitions count. It is always updated from the same thread. */
volatile long processedCnt = 0;
/**
* Increment {@code processedCnt} field.
*/
void incrementProcessedCnt() {
PROCESSED_CNT_UPD.incrementAndGet(this);
}
}
}