blob: c78914136bee10fd1525ab3dd302dd43ff599372 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.management.InstanceNotFoundException;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WarmUpConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.systemview.walker.PagesListViewWalker;
import org.apache.ignite.internal.managers.systemview.walker.PagesTimestampHistogramViewWalker;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.DirectMemoryRegion;
import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.evict.FairFifoPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.NoOpPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.Random2LruPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.evict.RandomLruPageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.warmup.WarmUpStrategy;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.mxbean.DataRegionMetricsMXBean;
import org.apache.ignite.spi.systemview.view.PagesListView;
import org.jetbrains.annotations.Nullable;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_REUSE_MEMORY_ON_DEACTIVATE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.getDouble;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_METRICS_ENABLED;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_RATE_TIME_INTERVAL_MILLIS;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_SUB_INTERVALS;
import static org.apache.ignite.configuration.DataStorageConfiguration.HALF_MAX_WAL_ARCHIVE_SIZE;
import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.METASTORE_DATA_REGION_NAME;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.VOLATILE_DATA_REGION_NAME;
/**
*
*/
public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdapter
implements IgniteChangeGlobalStateSupport, CheckpointLockStateChecker {
/** DataRegionConfiguration name reserved for internal caches. */
public static final String SYSTEM_DATA_REGION_NAME = "sysMemPlc";
/** DataRegionConfiguration names reserved for various internal needs. */
public static Set<String> INTERNAL_DATA_REGION_NAMES = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(SYSTEM_DATA_REGION_NAME, TX_LOG_CACHE_NAME, METASTORE_DATA_REGION_NAME)));
/** System view name for page lists. */
public static final String DATA_REGION_PAGE_LIST_VIEW = "dataRegionPageLists";
/** System view description for page lists. */
public static final String DATA_REGION_PAGE_LIST_VIEW_DESC = "Data region page lists";
/** System view name for pages timestamp histogram. */
public static final String PAGE_TS_HISTOGRAM_VIEW = "pagesTimestampHistogram";
/** System view description for pages timestamp histogram. */
public static final String PAGE_TS_HISTOGRAM_VIEW_DESC = "Data region pages timestamp histogram";
/** Minimum size of memory chunk */
private static final long MIN_PAGE_MEMORY_SIZE = 10L * 1024 * 1024;
/** Maximum initial size on 32-bit JVM */
private static final long MAX_PAGE_MEMORY_INIT_SIZE_32_BIT = 2L * 1024 * 1024 * 1024;
/** {@code True} to reuse memory on deactive. */
protected final boolean reuseMemory = IgniteSystemProperties.getBoolean(IGNITE_REUSE_MEMORY_ON_DEACTIVATE);
/** */
protected final Map<String, DataRegion> dataRegionMap = new ConcurrentHashMap<>();
/** Stores memory providers eligible for reuse. */
private final Map<String, DirectMemoryProvider> memProviderMap = new ConcurrentHashMap<>();
/** */
private static final String MBEAN_GROUP_NAME = "DataRegionMetrics";
/** */
protected volatile boolean dataRegionsInitialized;
/** */
private volatile boolean dataRegionsStarted;
/** */
protected DataRegion dfltDataRegion;
/** */
protected Map<String, CacheFreeList> freeListMap;
/** */
private CacheFreeList dfltFreeList;
/** Page size from memory configuration, may be set only for fake(standalone) IgniteCacheDataBaseSharedManager */
private int pageSize;
/** First eviction was warned flag. */
private volatile boolean firstEvictWarn;
/** Data storege metrics. */
protected final DataStorageMetricsImpl dsMetrics;
/**
* @param ctx Kernal context.
*/
public IgniteCacheDatabaseSharedManager(GridKernalContext ctx) {
if (!CU.isCdcEnabled(ctx.config()) && !CU.isPersistenceEnabled(ctx.config())) {
dsMetrics = null;
return;
}
DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();
if (dsCfg != null) {
dsMetrics = new DataStorageMetricsImpl(
ctx.metric(),
dsCfg.isMetricsEnabled(),
dsCfg.getMetricsRateTimeInterval(),
dsCfg.getMetricsSubIntervalCount()
);
}
else {
dsMetrics = new DataStorageMetricsImpl(
ctx.metric(),
DFLT_METRICS_ENABLED,
DFLT_RATE_TIME_INTERVAL_MILLIS,
DFLT_SUB_INTERVALS
);
}
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
if (cctx.kernalContext().clientNode() && cctx.kernalContext().config().getDataStorageConfiguration() == null)
return;
DataStorageConfiguration memCfg = cctx.kernalContext().config().getDataStorageConfiguration();
assert memCfg != null;
validateConfiguration(memCfg);
pageSize = memCfg.getPageSize();
initDataRegions(memCfg);
cctx.kernalContext().systemView().registerView(
DATA_REGION_PAGE_LIST_VIEW,
DATA_REGION_PAGE_LIST_VIEW_DESC,
new PagesListViewWalker(),
() -> {
Map<String, CacheFreeList> freeLists = freeListMap;
if (freeLists == null)
return Collections.emptyList();
return freeLists.values().stream().flatMap(fl -> IntStream.range(0, fl.bucketsCount()).mapToObj(
bucket -> new PagesListView(fl, bucket))).collect(Collectors.toList());
},
Function.identity()
);
cctx.kernalContext().systemView().registerInnerCollectionView(
PAGE_TS_HISTOGRAM_VIEW,
PAGE_TS_HISTOGRAM_VIEW_DESC,
new PagesTimestampHistogramViewWalker(),
F.viewReadOnly(dataRegions(), DataRegion::metrics),
DataRegionMetricsImpl::pagesTimestampHistogramView,
(pageMemory, view) -> view
);
}
/**
* @param cfg Ignite configuration.
* @param groupName Name of group.
* @param dataRegionName Metrics MBean name.
* @param impl Metrics implementation.
* @param clazz Metrics class type.
*/
protected <T> void registerMetricsMBean(
IgniteConfiguration cfg,
String groupName,
String dataRegionName,
T impl,
Class<T> clazz
) {
if (U.IGNITE_MBEANS_DISABLED)
return;
try {
U.registerMBean(
cfg.getMBeanServer(),
cfg.getIgniteInstanceName(),
groupName,
dataRegionName,
impl,
clazz);
}
catch (Throwable e) {
U.error(log, "Failed to register MBean with name: " + dataRegionName, e);
}
}
/**
* @param cfg Ignite configuration.
* @param groupName Name of group.
* @param name Name of MBean.
*/
protected void unregisterMetricsMBean(
IgniteConfiguration cfg,
String groupName,
String name
) {
if (U.IGNITE_MBEANS_DISABLED)
return;
assert cfg != null;
try {
cfg.getMBeanServer().unregisterMBean(
U.makeMBeanName(
cfg.getIgniteInstanceName(),
groupName,
name
));
}
catch (InstanceNotFoundException ignored) {
// We tried to unregister a non-existing MBean, not a big deal.
}
catch (Throwable e) {
U.error(log, "Failed to unregister MBean for memory metrics: " + name, e);
}
}
/**
* Registers MBeans for all DataRegionMetrics configured in this instance.
*
* @param cfg Ignite configuration.
*/
protected void registerMetricsMBeans(IgniteConfiguration cfg) {
if (U.IGNITE_MBEANS_DISABLED)
return;
assert cfg != null;
for (DataRegion dataRegion : dataRegionMap.values()) {
registerMetricsMBean(
cfg,
MBEAN_GROUP_NAME,
dataRegion.config().getName(),
new DataRegionMetricsMXBeanImpl(dataRegion),
DataRegionMetricsMXBean.class
);
}
}
/**
* @param dbCfg Database config.
* @throws IgniteCheckedException If failed.
*/
protected void initPageMemoryDataStructures(DataStorageConfiguration dbCfg) throws IgniteCheckedException {
freeListMap = U.newHashMap(dataRegionMap.size());
String dfltMemPlcName = dbCfg.getDefaultDataRegionConfiguration().getName();
for (DataRegion memPlc : dataRegionMap.values()) {
DataRegionConfiguration memPlcCfg = memPlc.config();
boolean persistenceEnabled = memPlcCfg.isPersistenceEnabled();
String freeListName = memPlcCfg.getName() + "##FreeList";
CacheFreeList freeList = new CacheFreeList(
0,
freeListName,
memPlc,
persistenceEnabled ? cctx.wal() : null,
0L,
true,
cctx.diagnostic().pageLockTracker(),
cctx.kernalContext(),
null,
PageIdAllocator.FLAG_IDX
);
freeListMap.put(memPlcCfg.getName(), freeList);
}
dfltFreeList = freeListMap.get(dfltMemPlcName);
}
/**
* @return Size of page used for PageMemory regions.
*/
public int pageSize() {
return pageSize;
}
/**
*
*/
private void startDataRegions() {
for (DataRegion region : dataRegionMap.values()) {
if (!cctx.isLazyMemoryAllocation(region))
region.pageMemory().start();
region.evictionTracker().start();
}
}
/**
* @param memCfg Database config.
* @throws IgniteCheckedException If failed to initialize swap path.
*/
protected void initDataRegions(DataStorageConfiguration memCfg) throws IgniteCheckedException {
if (dataRegionsInitialized)
return;
initDataRegions0(memCfg);
dataRegionsInitialized = true;
U.log(log, "Configured data regions initialized successfully [total=" + dataRegionMap.size() + ']');
}
/**
* @param memCfg Database config.
* @throws IgniteCheckedException If failed to initialize swap path.
*/
protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
DataRegionConfiguration[] dataRegionCfgs = memCfg.getDataRegionConfigurations();
boolean persistenceEnabled = CU.isPersistenceEnabled(memCfg);
if (dataRegionCfgs != null) {
for (DataRegionConfiguration dataRegionCfg : dataRegionCfgs)
addDataRegion(memCfg, dataRegionCfg, dataRegionCfg.isPersistenceEnabled());
}
addDataRegion(
memCfg,
memCfg.getDefaultDataRegionConfiguration(),
memCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled()
);
addDataRegion(
memCfg,
createSystemDataRegion(
memCfg.getSystemDataRegionConfiguration().getInitialSize(),
memCfg.getSystemDataRegionConfiguration().getMaxSize(),
persistenceEnabled
),
persistenceEnabled
);
addDataRegion(
memCfg,
createVolatileDataRegion(
memCfg.getSystemDataRegionConfiguration().getInitialSize(),
memCfg.getSystemDataRegionConfiguration().getMaxSize()
),
false
);
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.onInitDataRegions(this);
}
/**
* @param kctx Kernal context.
* @return Database lifecycle listeners.
*/
protected List<DatabaseLifecycleListener> getDatabaseListeners(GridKernalContext kctx) {
return kctx.internalSubscriptionProcessor().getDatabaseListeners();
}
/**
* @param dataStorageCfg Database config.
* @param dataRegionCfg Data region config.
* @throws IgniteCheckedException If failed to initialize swap path.
*/
public DataRegion addDataRegion(
DataStorageConfiguration dataStorageCfg,
DataRegionConfiguration dataRegionCfg,
boolean trackable
) throws IgniteCheckedException {
return addDataRegion(dataStorageCfg, dataRegionCfg, trackable, cctx.pageStore());
}
/**
* @param dataStorageCfg Database config.
* @param dataRegionCfg Data region config.
* @param pmPageMgr Page manager.
* @throws IgniteCheckedException If failed to initialize swap path.
*/
protected DataRegion addDataRegion(
DataStorageConfiguration dataStorageCfg,
DataRegionConfiguration dataRegionCfg,
boolean trackable,
PageReadWriteManager pmPageMgr
) throws IgniteCheckedException {
String dataRegionName = dataRegionCfg.getName();
String dfltMemPlcName = dataStorageCfg.getDefaultDataRegionConfiguration().getName();
if (dfltMemPlcName == null)
dfltMemPlcName = DFLT_DATA_REG_DEFAULT_NAME;
DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(
dataRegionCfg,
cctx.kernalContext(),
dataRegionMetricsProvider(dataRegionCfg));
DataRegion region = initMemory(dataStorageCfg, dataRegionCfg, memMetrics, trackable, pmPageMgr);
dataRegionMap.put(dataRegionName, region);
if (dataRegionName.equals(dfltMemPlcName))
dfltDataRegion = region;
else if (dataRegionName.equals(DFLT_DATA_REG_DEFAULT_NAME))
U.warn(log, "Data Region with name 'default' isn't used as a default. " +
"Please, check Data Region configuration.");
return region;
}
/**
* Closure that can be used to compute fill factor for provided data region.
*
* @param dataRegCfg Data region configuration.
* @return Closure.
*
* @deprecated use {@link #dataRegionMetricsProvider(DataRegionConfiguration)} instead.
*/
@Deprecated
protected IgniteOutClosure<Long> freeSpaceProvider(final DataRegionConfiguration dataRegCfg) {
final String dataRegName = dataRegCfg.getName();
return new IgniteOutClosure<Long>() {
private CacheFreeList freeList;
@Override public Long apply() {
if (freeList == null) {
CacheFreeList freeList0 = freeListMap.get(dataRegName);
if (freeList0 == null)
return 0L;
freeList = freeList0;
}
return freeList.freeSpace();
}
};
}
/**
* Provide that can be used to compute some metrics for provided data region.
*
* @param dataRegCfg Data region configuration.
* @return DataRegionMetricsProvider.
*/
protected DataRegionMetricsProvider dataRegionMetricsProvider(final DataRegionConfiguration dataRegCfg) {
final String dataRegName = dataRegCfg.getName();
return new DataRegionMetricsProvider() {
private CacheFreeList freeList;
private CacheFreeList getFreeList() {
if (freeListMap == null)
return null;
if (freeList == null)
freeList = freeListMap.get(dataRegName);
return freeList;
}
@Override public long partiallyFilledPagesFreeSpace() {
CacheFreeList freeList0 = getFreeList();
return freeList0 == null ? 0L : freeList0.freeSpace();
}
@Override public long emptyDataPages() {
CacheFreeList freeList0 = getFreeList();
return freeList0 == null ? 0L : freeList0.emptyDataPages();
}
};
}
/**
* @param memPlcsCfgs User-defined data region configurations.
*/
private boolean hasCustomDefaultDataRegion(DataRegionConfiguration[] memPlcsCfgs) {
for (DataRegionConfiguration memPlcsCfg : memPlcsCfgs) {
if (DFLT_DATA_REG_DEFAULT_NAME.equals(memPlcsCfg.getName()))
return true;
}
return false;
}
/**
* @param sysCacheInitSize Initial size of PageMemory to be created for system cache.
* @param sysCacheMaxSize Maximum size of PageMemory to be created for system cache.
* @param persistenceEnabled Persistence enabled flag.
*
* @return {@link DataRegionConfiguration configuration} of DataRegion for system cache.
*/
private DataRegionConfiguration createSystemDataRegion(
long sysCacheInitSize,
long sysCacheMaxSize,
boolean persistenceEnabled
) {
DataRegionConfiguration res = new DataRegionConfiguration();
res.setName(SYSTEM_DATA_REGION_NAME);
res.setInitialSize(sysCacheInitSize);
res.setMaxSize(sysCacheMaxSize);
res.setPersistenceEnabled(persistenceEnabled);
res.setLazyMemoryAllocation(false);
return res;
}
/**
* @param volatileCacheInitSize Initial size of PageMemory to be created for volatile cache.
* @param volatileCacheMaxSize Maximum size of PageMemory to be created for volatile cache.
*
* @return {@link DataRegionConfiguration configuration} of DataRegion for volatile cache.
*/
private DataRegionConfiguration createVolatileDataRegion(long volatileCacheInitSize, long volatileCacheMaxSize) {
DataRegionConfiguration res = new DataRegionConfiguration();
res.setName(VOLATILE_DATA_REGION_NAME);
res.setInitialSize(volatileCacheInitSize);
res.setMaxSize(volatileCacheMaxSize);
res.setPersistenceEnabled(false);
res.setLazyMemoryAllocation(true);
return res;
}
/**
* @param memCfg configuration to validate.
* @throws IgniteCheckedException In case of validation violation.
*/
private void validateConfiguration(DataStorageConfiguration memCfg) throws IgniteCheckedException {
checkPageSize(memCfg);
DataRegionConfiguration[] regCfgs = memCfg.getDataRegionConfigurations();
Set<String> regNames = new HashSet<>();
checkSystemDataRegionSizeConfiguration(
memCfg.getSystemDataRegionConfiguration().getInitialSize(),
memCfg.getSystemDataRegionConfiguration().getMaxSize()
);
Map<Class<? extends WarmUpConfiguration>, WarmUpStrategy> warmUpStrategies =
CU.warmUpStrategies(cctx.kernalContext());
if (regCfgs != null) {
for (DataRegionConfiguration regCfg : regCfgs)
checkDataRegionConfiguration(memCfg, regNames, regCfg, warmUpStrategies);
}
checkDataRegionConfiguration(memCfg, regNames, memCfg.getDefaultDataRegionConfiguration(), warmUpStrategies);
checkWalArchiveSizeConfiguration(memCfg, log);
checkExistenceWarmUpConfiguration(
memCfg.getDefaultWarmUpConfiguration(),
warmUpStrategies,
(warmUpCfg) -> "Unknown default warm-up configuration: " + warmUpCfg
);
}
/**
* Check WAL archive sizes configuration for correctness.
*
* @param memCfg Memory configuration.
* @throws IgniteCheckedException If WAL archive sizes is configured incorrectly.
*/
static void checkWalArchiveSizeConfiguration(
DataStorageConfiguration memCfg,
IgniteLogger log
) throws IgniteCheckedException {
long max = memCfg.getMaxWalArchiveSize();
if (!CU.isPersistenceEnabled(memCfg)) {
if (max != DataStorageConfiguration.DFLT_WAL_ARCHIVE_MAX_SIZE) {
LT.info(log, "Maximum WAL archive size has been configured but this node has been launched in " +
"non-persistent mode, so this parameter will be ignored");
}
return;
}
if (memCfg.isWalHistorySizeParameterUsed()) {
LT.warn(log,
"DataRegionConfiguration.walHistorySize property is deprecated and is no longer supported. " +
"It will be ignored and DataRegionConfiguration.maxWalArchiveSize property will be used " +
"instead to control removal of archived WAL files"
);
}
if (max != UNLIMITED_WAL_ARCHIVE) {
int walSegmentSize = memCfg.getWalSegmentSize();
if (max < walSegmentSize) {
throw new IgniteCheckedException(String.format(
"DataRegionConfiguration.maxWalArchiveSize must be no less than " +
"DataRegionConfiguration.walSegmentSize or equal to %d (unlimited size), " +
"current settings:" + U.nl() +
"DataRegionConfiguration.maxWalArchiveSize: %d bytes" + U.nl() +
"DataRegionConfiguration.walSegmentSize: %d bytes",
UNLIMITED_WAL_ARCHIVE, max, walSegmentSize
));
}
long min = memCfg.getMinWalArchiveSize();
double percentage = getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, -1);
if (min != HALF_MAX_WAL_ARCHIVE_SIZE) {
if (min > max) {
throw new IgniteCheckedException(String.format(
"DataRegionConfiguration.minWalArchiveSize must be less than or equal to " +
"DataRegionConfiguration.maxWalArchiveSize or equal to %d " +
"(to be half of maxWalArchiveSize), current settings:" + U.nl() +
"DataRegionConfiguration.minWalArchiveSize: %d bytes" + U.nl() +
"DataRegionConfiguration.maxWalArchiveSize: %d bytes",
HALF_MAX_WAL_ARCHIVE_SIZE, min, max));
}
}
else if (percentage != -1) {
log.warning(String.format(
"%s is deprecated, use DataRegionConfiguration.minWalArchiveSize instead",
IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE
));
if ((long)(max * percentage) > max) {
throw new IgniteCheckedException(String.format(
"%s must be less than or equal to 1.0",
IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE
));
}
}
}
}
/**
* Checking configuration of data region.
*
* @param memCfg Mem config.
* @param regNames Region names.
* @param regCfg Reg config.
* @param warmUpStrategies Available warming-up strategies.
* @throws IgniteCheckedException If config is invalid.
*/
private void checkDataRegionConfiguration(
DataStorageConfiguration memCfg,
Set<String> regNames,
DataRegionConfiguration regCfg,
Map<Class<? extends WarmUpConfiguration>, WarmUpStrategy> warmUpStrategies
) throws IgniteCheckedException {
assert regCfg != null;
checkDataRegionName(regCfg.getName(), regNames);
checkDataRegionSize(regCfg);
checkMetricsProperties(regCfg);
checkRegionEvictionProperties(regCfg, memCfg);
checkRegionMemoryStorageType(regCfg);
checkRegionWarmUpConfiguration(regCfg, warmUpStrategies);
}
/**
* @param memCfg Memory config.
*/
protected void checkPageSize(DataStorageConfiguration memCfg) {
if (memCfg.getPageSize() == 0)
memCfg.setPageSize(DFLT_PAGE_SIZE);
}
/**
* @param regCfg data region config.
*
* @throws IgniteCheckedException if validation of memory metrics properties fails.
*/
private static void checkMetricsProperties(DataRegionConfiguration regCfg) throws IgniteCheckedException {
if (regCfg.getMetricsRateTimeInterval() <= 0)
throw new IgniteCheckedException("Rate time interval must be greater than zero " +
"(use DataRegionConfiguration.rateTimeInterval property to adjust the interval) " +
"[name=" + regCfg.getName() +
", rateTimeInterval=" + regCfg.getMetricsRateTimeInterval() + "]"
);
if (regCfg.getMetricsSubIntervalCount() <= 0)
throw new IgniteCheckedException("Sub intervals must be greater than zero " +
"(use DataRegionConfiguration.subIntervals property to adjust the sub intervals) " +
"[name=" + regCfg.getName() +
", subIntervals=" + regCfg.getMetricsSubIntervalCount() + "]"
);
if (regCfg.getMetricsRateTimeInterval() < 1_000)
throw new IgniteCheckedException("Rate time interval must be longer that 1 second (1_000 milliseconds) " +
"(use DataRegionConfiguration.rateTimeInterval property to adjust the interval) " +
"[name=" + regCfg.getName() +
", rateTimeInterval=" + regCfg.getMetricsRateTimeInterval() + "]");
}
/**
* @param sysCacheInitSize System cache initial size.
* @param sysCacheMaxSize System cache max size.
*
* @throws IgniteCheckedException In case of validation violation.
*/
private static void checkSystemDataRegionSizeConfiguration(
long sysCacheInitSize,
long sysCacheMaxSize
) throws IgniteCheckedException {
if (sysCacheInitSize < MIN_PAGE_MEMORY_SIZE)
throw new IgniteCheckedException("Initial size for system cache must have size more than 10MB (use " +
"DataStorageConfiguration.systemCacheInitialSize property to set correct size in bytes) " +
"[size=" + U.readableSize(sysCacheInitSize, true) + ']'
);
if (U.jvm32Bit() && sysCacheInitSize > MAX_PAGE_MEMORY_INIT_SIZE_32_BIT)
throw new IgniteCheckedException("Initial size for system cache exceeds 2GB on 32-bit JVM (use " +
"DataRegionConfiguration.systemCacheInitialSize property to set correct size in bytes " +
"or use 64-bit JVM) [size=" + U.readableSize(sysCacheInitSize, true) + ']'
);
if (sysCacheMaxSize < sysCacheInitSize)
throw new IgniteCheckedException("MaxSize of system cache must not be smaller than " +
"initialSize [initSize=" + U.readableSize(sysCacheInitSize, true) +
", maxSize=" + U.readableSize(sysCacheMaxSize, true) + "]. " +
"Use DataStorageConfiguration.systemCacheInitialSize/DataStorageConfiguration.systemCacheMaxSize " +
"properties to set correct sizes in bytes."
);
}
/**
* @param regCfg DataRegionConfiguration to validate.
* @throws IgniteCheckedException If config is invalid.
*/
private void checkDataRegionSize(DataRegionConfiguration regCfg) throws IgniteCheckedException {
if (regCfg.getInitialSize() < MIN_PAGE_MEMORY_SIZE || regCfg.getMaxSize() < MIN_PAGE_MEMORY_SIZE)
throw new IgniteCheckedException("DataRegion must have size more than 10MB (use " +
"DataRegionConfiguration.initialSize and .maxSize properties to set correct size in bytes) " +
"[name=" + regCfg.getName() + ", initialSize=" + U.readableSize(regCfg.getInitialSize(), true) +
", maxSize=" + U.readableSize(regCfg.getMaxSize(), true) + "]"
);
if (regCfg.getMaxSize() < regCfg.getInitialSize()) {
if (regCfg.getInitialSize() != Math.min(DataStorageConfiguration.DFLT_DATA_REGION_MAX_SIZE,
DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)) {
throw new IgniteCheckedException("DataRegion maxSize must not be smaller than initialSize" +
"[name=" + regCfg.getName() + ", initialSize=" + U.readableSize(regCfg.getInitialSize(), true) +
", maxSize=" + U.readableSize(regCfg.getMaxSize(), true) + "]");
}
regCfg.setInitialSize(regCfg.getMaxSize());
LT.warn(log, "DataRegion maxSize=" + U.readableSize(regCfg.getMaxSize(), true) +
" is smaller than defaultInitialSize=" +
U.readableSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE, true) +
", setting initialSize to " + U.readableSize(regCfg.getMaxSize(), true));
}
if (U.jvm32Bit() && regCfg.getInitialSize() > MAX_PAGE_MEMORY_INIT_SIZE_32_BIT)
throw new IgniteCheckedException("DataRegion initialSize exceeds 2GB on 32-bit JVM (use " +
"DataRegionConfiguration.initialSize property to set correct size in bytes or use 64-bit JVM) " +
"[name=" + regCfg.getName() +
", size=" + U.readableSize(regCfg.getInitialSize(), true) + "]");
}
/**
* @param regCfg DataRegionConfiguration to validate.
* @throws IgniteCheckedException If config is invalid.
*/
private void checkRegionMemoryStorageType(DataRegionConfiguration regCfg) throws IgniteCheckedException {
if (regCfg.isPersistenceEnabled() && regCfg.getSwapPath() != null)
throw new IgniteCheckedException("DataRegionConfiguration must not have both persistence " +
"storage and swap space enabled at the same time (Use DataRegionConfiguration.setSwapPath(null) " +
"to disable the swap space usage or DataRegionConfiguration.setPersistenceEnabled(false) " +
"to disable the persistence) [name=" + regCfg.getName() + ", swapPath=" + regCfg.getSwapPath() +
", persistenceEnabled=" + regCfg.isPersistenceEnabled() + "]"
);
}
/**
* @param regCfg DataRegionConfiguration to validate.
* @param dbCfg Memory configuration.
* @throws IgniteCheckedException If config is invalid.
*/
protected void checkRegionEvictionProperties(DataRegionConfiguration regCfg, DataStorageConfiguration dbCfg)
throws IgniteCheckedException {
if (regCfg.getPageEvictionMode() == DataPageEvictionMode.DISABLED)
return;
if (regCfg.getEvictionThreshold() < 0.5 || regCfg.getEvictionThreshold() > 0.999) {
throw new IgniteCheckedException("Page eviction threshold must be between 0.5 and 0.999: " +
regCfg.getName());
}
if (regCfg.getEmptyPagesPoolSize() <= 10)
throw new IgniteCheckedException("Evicted pages pool size should be greater than 10: " + regCfg.getName());
long maxPoolSize = regCfg.getMaxSize() / dbCfg.getPageSize() / 10;
if (regCfg.getEmptyPagesPoolSize() >= maxPoolSize) {
throw new IgniteCheckedException("Evicted pages pool size should be lesser than " + maxPoolSize +
": " + regCfg.getName());
}
}
/**
* @param regName DataRegion name to validate.
* @param observedNames Names of MemoryPolicies observed before.
* @throws IgniteCheckedException If config is invalid.
*/
private static void checkDataRegionName(String regName, Collection<String> observedNames)
throws IgniteCheckedException {
if (regName == null || regName.isEmpty())
throw new IgniteCheckedException("User-defined DataRegionConfiguration must have non-null and " +
"non-empty name.");
if (observedNames.contains(regName))
throw new IgniteCheckedException("Two MemoryPolicies have the same name: " + regName);
if (INTERNAL_DATA_REGION_NAMES.contains(regName))
throw new IgniteCheckedException("'" + regName + "' policy name is reserved for internal use.");
observedNames.add(regName);
}
/**
* @param log Logger.
*/
public void dumpStatistics(IgniteLogger log) {
if (freeListMap != null) {
for (CacheFreeList freeList : freeListMap.values())
freeList.dumpStatistics(log);
}
}
/**
* Getting registered data regions.
*
* @return Collection of all configured {@link DataRegion policies}.
*/
public Collection<DataRegion> dataRegions() {
return dataRegionMap.values();
}
/**
* @return DataRegionMetrics for all MemoryPolicies configured in Ignite instance.
*/
public Collection<DataRegionMetrics> memoryMetrics() {
return dataRegionMap.values().stream()
.map(DataRegion::metrics)
.map(DataRegionMetricsSnapshot::new)
.collect(Collectors.toList());
}
/**
* @return DataStorageMetrics if persistence is enabled or {@code null} otherwise.
*/
public DataStorageMetrics persistentStoreMetrics() {
return null;
}
/**
* @return Data storage metrics implementation.
*/
public DataStorageMetricsImpl dataStorageMetricsImpl() {
return dsMetrics;
}
/**
* @param dataRegionName Name of {@link DataRegion} to obtain {@link DataRegionMetrics} for.
* @return {@link DataRegionMetrics} snapshot for specified {@link DataRegion} or {@code null} if
* no {@link DataRegion} is configured for specified name.
*/
public @Nullable DataRegionMetrics memoryMetrics(String dataRegionName) {
DataRegion dataRegion = dataRegionMap.get(dataRegionName);
return dataRegion == null ? null : new DataRegionMetricsSnapshot(dataRegion.metrics());
}
/**
* Getting data region by name.
*
* @param memPlcName Data region name. In case of {@code null}, default data region will be returned.
* @return {@link DataRegion} instance associated with a given {@link DataRegionConfiguration},
* or {@code null} if there are no registered data regions.
* @throws IgniteCheckedException in case of request for unknown DataRegion.
*/
@Nullable public DataRegion dataRegion(@Nullable String memPlcName) throws IgniteCheckedException {
if (memPlcName == null)
return dfltDataRegion;
if (dataRegionMap.isEmpty())
return null;
DataRegion plc;
if ((plc = dataRegionMap.get(memPlcName)) == null)
throw new IgniteCheckedException("Requested DataRegion is not configured: " + memPlcName);
return plc;
}
/**
* @param memPlcName DataRegionConfiguration name.
* @return {@link FreeList} instance associated with a given {@link DataRegionConfiguration}.
*/
public FreeList freeList(String memPlcName) {
if (memPlcName == null)
return dfltFreeList;
return freeListMap != null ? freeListMap.get(memPlcName) : null;
}
/**
* @param memPlcName DataRegionConfiguration name.
* @return {@link ReuseList} instance associated with a given {@link DataRegionConfiguration}.
*/
public ReuseList reuseList(String memPlcName) {
if (memPlcName == null)
return dfltFreeList;
return freeListMap != null ? freeListMap.get(memPlcName) : null;
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
onDeActivate(true);
}
/** {@inheritDoc} */
@Override public boolean checkpointLockIsHeldByThread() {
return true;
}
/**
* No-op for non-persistent storage.
*/
public void checkpointReadLock() {
// No-op.
}
/**
* No-op for non-persistent storage.
*/
public void checkpointReadUnlock() {
// No-op.
}
/**
* @return {@code 0} for non-persistent storage.
*/
public long checkpointReadLockTimeout() {
return 0;
}
/**
* No-op for non-persistent storage.
*/
public void checkpointReadLockTimeout(long val) {
// No-op.
}
/**
* Method will perform cleanup cache page memory and each cache partition store.
*/
public void cleanupRestoredCaches() {
// No-op.
}
/**
* Clean checkpoint directory
* {@link CheckpointMarkersStorage#cpDir}. The operation
* is necessary when local node joined to baseline topology with different consistentId.
*/
public void cleanupCheckpointDirectory() throws IgniteCheckedException {
// No-op.
}
/**
* No-op for non-persistent storage.
*/
public void cleanupTempCheckpointDirectory() throws IgniteCheckedException{
// No-op.
}
/**
*
*/
@Nullable public IgniteInternalFuture wakeupForCheckpoint(String reason) {
return null;
}
/**
* @return Last checkpoint mark WAL pointer.
*/
@Nullable public WALPointer lastCheckpointMarkWalPointer() {
return null;
}
/**
* Schedule a new checkpoint.<p>
* Note: if a checkpoint is already running and the locks have not been taken yet,
* then a new checkpoint will not be forced.
*
* @param reason Reason.
*/
@Nullable public CheckpointProgress forceCheckpoint(String reason) {
return null;
}
/**
* Schedule a new checkpoint.
*
* @param reason Reason.
* @param lsnr Listener which will be called on scheduled checkpoint finish.
*/
@Nullable public <R> CheckpointProgress forceNewCheckpoint(String reason,
IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
return null;
}
/**
* Waits until current state is checkpointed.
*
* @throws IgniteCheckedException If failed.
*/
public void waitForCheckpoint(String reason) throws IgniteCheckedException {
waitForCheckpoint(reason, null);
}
/**
* Waits until current state is checkpointed and execution listeners after finish.
*
* @param reason Reason for checkpoint wakeup if it would be required.
* @param lsnr Listeners which should be called in checkpoint thread after current checkpoint finished.
* @throws IgniteCheckedException If failed.
*/
public <R> void waitForCheckpoint(
String reason,
IgniteInClosure<? super IgniteInternalFuture<R>> lsnr
) throws IgniteCheckedException {
// No-op
}
/**
* @param discoEvt Before exchange for the given discovery event.
*/
public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws IgniteCheckedException {
}
/**
* Perform memory restore before {@link GridDiscoveryManager} start.
*
* @param kctx Current kernal context.
* @param startTimer Holder of start time of stages.
* @throws IgniteCheckedException If fails.
*/
public void startMemoryRestore(GridKernalContext kctx, TimeBag startTimer) throws IgniteCheckedException {
if (!CU.isCdcEnabled(kctx.config()) || kctx.clientNode())
return;
try (WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true)) {
while (iter.hasNext())
iter.next();
WALPointer ptr = iter.lastRead().orElse(null);
if (ptr != null)
ptr = ptr.next();
cctx.wal(true).startAutoReleaseSegments();
cctx.wal(true).resumeLogging(ptr);
}
}
/**
* Called when all partitions have been fully restored and pre-created on node start.
*
* @throws IgniteCheckedException If failed.
*/
public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException {
// No-op.
}
/**
* @param fut Partition exchange future.
*/
public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
// No-op.
}
/**
* Initiate an asynchronous forced index rebuild for caches.
*
* @param contexts Cache contexts.
* @return Cache contexts for which index rebuilding is not initiated by
* this call because they are already in the process of rebuilding.
*/
public Collection<GridCacheContext> forceRebuildIndexes(Collection<GridCacheContext> contexts) {
return Collections.emptyList();
}
/**
* Needed action before any cache will stop
*/
public void prepareCachesStop() {
// No-op.
}
/**
* @param stoppedGrps A collection of tuples (cache group, destroy flag).
*/
public void onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps) {
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext grp = tup.get1();
try {
boolean destroy = tup.get2();
if (destroy && CU.storeCacheConfig(cctx, grp.config()))
cctx.cache().configManager().removeCacheGroupConfigurationData(grp);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
/**
* Reserve update history for exchange.
*
* @return Reserved update counters per cache and partition.
*/
public Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
return Collections.emptyMap();
}
/**
* Release reserved update history.
*/
public void releaseHistoryForExchange() {
// No-op
}
/**
* Reserve update history for preloading.
*
* @param reservationMap Map contains of counters for partitions of groups.
* @return True if successfully reserved.
*/
public boolean reserveHistoryForPreloading(Map<T2<Integer, Integer>, Long> reservationMap) {
return false;
}
/**
* Release reserved update history.
*/
public void releaseHistoryForPreloading() {
// No-op
}
/**
* Returns the latest WAL pointer that reserved for preloading or {@code null}.
*
* @return WAL pointer or {@code null} if nothing reserved.
*/
public WALPointer latestWalPointerReservedForPreloading() {
return null;
}
/**
* Checks that the given {@code region} has enough space for putting a new entry.
*
* This method makes sense then and only then
* the data region is not persisted {@link DataRegionConfiguration#isPersistenceEnabled()}
* and page eviction is disabled {@link DataPageEvictionMode#DISABLED}.
*
* The non-persistent region should reserve a number of pages to support a free list {@link AbstractFreeList}.
* For example, removing a row from underlying store may require allocating a new data page
* in order to move a tracked page from one bucket to another one which does not have a free space for a new stripe.
* See {@link AbstractFreeList#removeDataRowByLink}.
* Therefore, inserting a new entry should be prevented in case of some threshold is exceeded.
*
* @param region Data region to be checked.
* @param dataRowSize Size of data row to be inserted.
* @throws IgniteOutOfMemoryException In case of the given data region does not have enough free space
* for putting a new entry.
*/
public void ensureFreeSpaceForInsert(DataRegion region, int dataRowSize) throws IgniteOutOfMemoryException {
if (region == null)
return;
DataRegionConfiguration regCfg = region.config();
if (regCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED || regCfg.isPersistenceEnabled())
return;
long memorySize = regCfg.getMaxSize();
PageMemory pageMem = region.pageMemory();
CacheFreeList freeList = freeListMap.get(regCfg.getName());
long nonEmptyPages = (pageMem.loadedPages() - freeList.emptyDataPages());
// The maximum number of pages that can be allocated (memorySize / systemPageSize)
// should be greater or equal to pages required for inserting a new entry plus
// the current number of non-empty pages plus the number of pages that may be required in order to move
// all pages to a reuse bucket, that is equal to nonEmptyPages * 8 / pageSize, where 8 is the size of a link.
// Note that not the whole page can be used to storing links,
// see PagesListNodeIO and PagesListMetaIO#getCapacity(), so we pessimistically multiply the result on 1.5,
// in any way, the number of required pages is less than 1 percent.
boolean oomThreshold = (memorySize / pageMem.systemPageSize()) <
((double)dataRowSize / pageMem.pageSize() + nonEmptyPages * (8.0 * 1.5 / pageMem.pageSize() + 1) + 256 /*one page per bucket*/);
if (oomThreshold) {
IgniteOutOfMemoryException oom = new IgniteOutOfMemoryException("Out of memory in data region [" +
"name=" + regCfg.getName() +
", initSize=" + U.readableSize(regCfg.getInitialSize(), false) +
", maxSize=" + U.readableSize(regCfg.getMaxSize(), false) +
", persistenceEnabled=" + regCfg.isPersistenceEnabled() + "] Try the following:" + U.nl() +
" ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)" + U.nl() +
" ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)" + U.nl() +
" ^-- Enable eviction or expiration policies"
);
if (cctx.kernalContext() != null)
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, oom));
throw oom;
}
}
/**
* See {@code GridCacheMapEntry#ensureFreeSpace()}
*
* @param memPlc data region.
*/
public void ensureFreeSpace(DataRegion memPlc) throws IgniteCheckedException {
if (memPlc == null)
return;
while (memPlc.evictionTracker().evictionRequired()) {
warnFirstEvict(memPlc.config());
memPlc.evictionTracker().evictDataPage();
memPlc.metrics().updateEvictionRate();
}
}
/**
* @param memCfg memory configuration with common parameters.
* @param plcCfg data region with PageMemory specific parameters.
* @param memMetrics {@link DataRegionMetrics} object to collect memory usage metrics.
* @param pmPageMgr Page manager.
* @return data region instance.
*
* @throws IgniteCheckedException If failed to initialize swap path.
*/
private DataRegion initMemory(
DataStorageConfiguration memCfg,
DataRegionConfiguration plcCfg,
DataRegionMetricsImpl memMetrics,
boolean trackable,
PageReadWriteManager pmPageMgr
) throws IgniteCheckedException {
if (plcCfg.getMemoryAllocator() == null)
plcCfg.setMemoryAllocator(memCfg.getMemoryAllocator());
PageMemory pageMem = createPageMemory(createOrReuseMemoryProvider(plcCfg), memCfg, plcCfg, memMetrics,
trackable, pmPageMgr);
return new DataRegion(pageMem, plcCfg, memMetrics, createPageEvictionTracker(plcCfg, pageMem));
}
/**
* @param plcCfg Policy config.
* @return DirectMemoryProvider provider.
*/
private DirectMemoryProvider createOrReuseMemoryProvider(DataRegionConfiguration plcCfg)
throws IgniteCheckedException {
if (!supportsMemoryReuse(plcCfg))
return createMemoryProvider(plcCfg);
DirectMemoryProvider memProvider = memProviderMap.get(plcCfg.getName());
if (memProvider == null)
memProviderMap.put(plcCfg.getName(), (memProvider = createMemoryProvider(plcCfg)));
return memProvider;
}
/**
* @param plcCfg Policy config.
*
* @return {@code True} if policy supports memory reuse.
*/
public boolean supportsMemoryReuse(DataRegionConfiguration plcCfg) {
return reuseMemory && plcCfg.getSwapPath() == null;
}
/**
* @param plcCfg Policy config.
* @return DirectMemoryProvider provider.
*/
private DirectMemoryProvider createMemoryProvider(DataRegionConfiguration plcCfg) throws IgniteCheckedException {
File allocPath = buildAllocPath(plcCfg);
return allocPath == null ?
new UnsafeMemoryProvider(log, plcCfg.getMemoryAllocator()) :
new MappedFileMemoryProvider(
log,
allocPath);
}
/**
* @param plc data region Configuration.
* @param pageMem Page memory.
*/
protected PageEvictionTracker createPageEvictionTracker(DataRegionConfiguration plc, PageMemory pageMem) {
if (plc.getPageEvictionMode() == DataPageEvictionMode.DISABLED || plc.isPersistenceEnabled())
return new NoOpPageEvictionTracker();
assert pageMem instanceof PageMemoryNoStoreImpl : pageMem.getClass();
PageMemoryNoStoreImpl pageMem0 = (PageMemoryNoStoreImpl)pageMem;
if (Boolean.getBoolean("override.fair.fifo.page.eviction.tracker"))
return new FairFifoPageEvictionTracker(pageMem0, plc, cctx);
switch (plc.getPageEvictionMode()) {
case RANDOM_LRU:
return new RandomLruPageEvictionTracker(pageMem0, plc, cctx);
case RANDOM_2_LRU:
return new Random2LruPageEvictionTracker(pageMem0, plc, cctx);
default:
return new NoOpPageEvictionTracker();
}
}
/**
* Builds allocation path for memory mapped file to be used with PageMemory.
*
* @param plc DataRegionConfiguration.
*
* @throws IgniteCheckedException If resolving swap directory fails.
*/
@Nullable protected File buildAllocPath(DataRegionConfiguration plc) throws IgniteCheckedException {
String path = plc.getSwapPath();
if (path == null)
return null;
final PdsFolderSettings folderSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
final String folderName = folderSettings.isCompatible() ?
String.valueOf(folderSettings.consistentId()).replaceAll("[:,\\.]", "_") :
folderSettings.folderName();
return buildPath(path, folderName);
}
/**
* Creates PageMemory with given size and memory provider.
*
* @param memProvider Memory provider.
* @param memCfg Memory configuartion.
* @param memPlcCfg data region configuration.
* @param memMetrics DataRegionMetrics to collect memory usage metrics.
* @param pmPageMgr Page manager.
* @return PageMemory instance.
*/
protected PageMemory createPageMemory(
DirectMemoryProvider memProvider,
DataStorageConfiguration memCfg,
DataRegionConfiguration memPlcCfg,
DataRegionMetricsImpl memMetrics,
boolean trackable,
PageReadWriteManager pmPageMgr
) {
memMetrics.persistenceEnabled(false);
PageMemory pageMem = new PageMemoryNoStoreImpl(
cctx,
wrapMetricsMemoryProvider(memProvider, memMetrics),
memCfg.getPageSize(),
memPlcCfg,
memMetrics
);
memMetrics.pageMemory(pageMem);
return pageMem;
}
/**
* @param memoryProvider0 Memory provider.
* @param memMetrics Memory metrics.
* @return Wrapped memory provider.
*/
private DirectMemoryProvider wrapMetricsMemoryProvider(
final DirectMemoryProvider memoryProvider0,
final DataRegionMetricsImpl memMetrics
) {
return new DirectMemoryProvider() {
/** */
private final DirectMemoryProvider memProvider = memoryProvider0;
@Override public void initialize(long[] chunkSizes) {
memProvider.initialize(chunkSizes);
}
@Override public void shutdown(boolean deallocate) {
memProvider.shutdown(deallocate);
}
@Override public DirectMemoryRegion nextRegion() {
DirectMemoryRegion nextMemoryRegion = memProvider.nextRegion();
if (nextMemoryRegion == null)
return null;
memMetrics.updateOffHeapSize(nextMemoryRegion.size());
return nextMemoryRegion;
}
};
}
/**
* @param path Path to the working directory.
* @param consId Consistent ID of the local node.
* @return DB storage path.
*
* @throws IgniteCheckedException If resolving swap directory fails.
*/
protected File buildPath(String path, String consId) throws IgniteCheckedException {
String igniteHomeStr = U.getIgniteHome();
File workDir = igniteHomeStr == null ? new File(path) : U.resolveWorkDirectory(igniteHomeStr, path, false);
return new File(workDir, consId);
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
if (kctx.clientNode() && kctx.config().getDataStorageConfiguration() == null)
return;
initAndStartRegions(kctx.config().getDataStorageConfiguration());
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx))
lsnr.afterInitialise(this);
}
/**
* @param cfg Current data storage configuration.
* @throws IgniteCheckedException If fails.
*/
protected void initAndStartRegions(DataStorageConfiguration cfg) throws IgniteCheckedException {
assert cfg != null;
initDataRegions(cfg);
startDataRegions(cfg);
}
/**
* @param cfg Regions configuration.
* @throws IgniteCheckedException If fails.
*/
private void startDataRegions(DataStorageConfiguration cfg) throws IgniteCheckedException {
if (dataRegionsStarted)
return;
assert cfg != null;
registerMetricsMBeans(cctx.gridConfig());
startDataRegions();
initPageMemoryDataStructures(cfg);
dataRegionsStarted = true;
U.quietAndInfo(log, "Data Regions Started: " + dataRegionMap.size());
((IgniteKernal)cctx.kernalContext().grid()).dataStorageReport();
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
onDeActivate(!reuseMemory);
}
/**
* @param shutdown {@code True} to force memory regions shutdown.
*/
private void onDeActivate(boolean shutdown) {
if (freeListMap != null)
freeListMap.values().forEach(DataStructure::close);
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeStop(this);
for (DataRegion region : dataRegionMap.values()) {
region.pageMemory().stop(shutdown);
region.evictionTracker().stop();
unregisterMetricsMBean(
cctx.gridConfig(),
MBEAN_GROUP_NAME,
region.metrics().getName()
);
region.metrics().remove();
}
dataRegionMap.clear();
if (shutdown && memProviderMap != null)
memProviderMap.clear();
dataRegionsInitialized = false;
dataRegionsStarted = false;
}
/**
* @return Name of DataRegionConfiguration for internal caches.
*/
public String systemDateRegionName() {
return SYSTEM_DATA_REGION_NAME;
}
/**
* Method for fake (standalone) context initialization. Not to be called in production code
* @param pageSize configured page size
*/
protected void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
/**
* @return MetaStorage
*/
public MetaStorage metaStorage() {
return null;
}
/**
* Notifies {@link MetastorageLifecycleListener} that {@link MetaStorage} is ready for read.
* This method is called when all processors and managers have already started and right before discovery manager.
*
* @throws IgniteCheckedException If failed.
*/
public void notifyMetaStorageSubscribersOnReadyForRead() throws IgniteCheckedException {
// No-op.
}
/**
* @param grpId Group ID.
* @return WAL enabled flag.
*/
public boolean walEnabled(int grpId, boolean local) {
return false;
}
/**
* Marks cache group as with disabled WAL.
*
* @param grpId Group id.
* @param enabled flag.
*/
public void walEnabled(int grpId, boolean enabled, boolean local) {
// No-op.
}
/**
* Marks last checkpoint as inapplicable for WAL rebalance for given group {@code grpId}.
*
* @param grpId Group id.
*/
public void lastCheckpointInapplicableForWalRebalance(int grpId) {
// No-op.
}
/**
* Warns on first eviction.
* @param regCfg data region configuration.
*/
private void warnFirstEvict(DataRegionConfiguration regCfg) {
if (firstEvictWarn)
return;
// Do not move warning output to synchronized block (it causes warning in IDE).
synchronized (this) {
if (firstEvictWarn)
return;
firstEvictWarn = true;
}
U.warn(log, "Page-based evictions started." +
" Consider increasing 'maxSize' on Data Region configuration: " + regCfg.getName());
}
/**
* Checking existence of a warm-up configuration.
*
* @param warmUpCfg Warm-up configuration.
* @param warmUpStrategies Available warming-up strategies.
* @param errMsgSupplier Supplier error message.
* @throws IgniteCheckedException If config is invalid.
*/
private void checkExistenceWarmUpConfiguration(
@Nullable WarmUpConfiguration warmUpCfg,
Map<Class<? extends WarmUpConfiguration>, WarmUpStrategy> warmUpStrategies,
IgniteClosure<WarmUpConfiguration, String> errMsgSupplier
) throws IgniteCheckedException {
if (nonNull(warmUpCfg) && !warmUpStrategies.containsKey(warmUpCfg.getClass()))
throw new IgniteCheckedException(errMsgSupplier.apply(warmUpCfg));
}
/**
* Checking data region warm-up configuration.
*
* @param regCfg DataRegionConfiguration to validate.
* @param warmUpStrategies Available warming-up strategies.
* @throws IgniteCheckedException If config is invalid.
*/
private void checkRegionWarmUpConfiguration(
DataRegionConfiguration regCfg,
Map<Class<? extends WarmUpConfiguration>, WarmUpStrategy> warmUpStrategies
) throws IgniteCheckedException {
WarmUpConfiguration warmUpCfg = regCfg.getWarmUpConfiguration();
if (isNull(warmUpCfg))
return;
Supplier<String> errPostfix = () -> "[name=" + regCfg.getName() + ", warmUpConfig=" + warmUpCfg + ']';
if (!regCfg.isPersistenceEnabled()) {
throw new IgniteCheckedException("Warm-up setting is not expected for a non-persistent data region: " +
errPostfix.get());
}
checkExistenceWarmUpConfiguration(
regCfg.getWarmUpConfiguration(),
warmUpStrategies,
(warmUpConfig) -> "Unknown data region warm-up configuration: " + errPostfix.get()
);
}
/**
* Wal truncate callback.
*
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
// No-op.
}
}