blob: da71924d378b4fe23e864c763021800df9e14c60 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.systemview.walker.MetastorageViewWalker;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.DirectMemoryRegion;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.IndexRenameRootPageRecord;
import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecordV2;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistoryResult;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointStatus;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.ReservationReason;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationPageReadWriteManager;
import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationWorkflowCallback;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManager;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridCountDownCallback;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.spi.systemview.view.MetastorageView;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyList;
import static java.util.Objects.nonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RECOVERY_SEMAPHORE_PERMITS;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_LOCK_HOLD_COUNT;
import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.fromStore;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
import static org.apache.ignite.internal.util.IgniteUtils.GB;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
/**
*
*/
@SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"})
public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager {
/** */
@SystemProperty(value = "Sets the flag controlling if the I/O sync needs to be skipped on a checkpoint")
public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
/** Skip checkpoint on node stop flag. */
@SystemProperty(value = "Sets the flag controlling of a checkpoint needs to be skipped during a node termination")
public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP";
/** Log read lock holders. */
@SystemProperty(value = "Enables log checkpoint read lock holders")
public static final String IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS = "IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS";
/** MemoryPolicyConfiguration name reserved for meta store. */
public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc";
/** Name of the system view for a system {@link MetaStorage}. */
public static final String METASTORE_VIEW = "metastorage";
/** Description of the system view for a {@link MetaStorage}. */
public static final String METASTORE_VIEW_DESC = "Local metastorage data";
/** */
public static final String DEFRAGMENTATION_PART_REGION_NAME = "defragPartitionsDataRegion";
/** */
public static final String DEFRAGMENTATION_MAPPING_REGION_NAME = "defragMappingDataRegion";
/**
* Threshold to calculate limit for pages list on-heap caches.
* <p>
* Note: When a checkpoint is triggered, we need some amount of page memory to store pages list on-heap cache.
* If a checkpoint is triggered by "too many dirty pages" reason and pages list cache is rather big, we can get
* {@code IgniteOutOfMemoryException}. To prevent this, we can limit the total amount of cached page list buckets,
* assuming that checkpoint will be triggered if no more then 3/4 of pages will be marked as dirty (there will be
* at least 1/4 of clean pages) and each cached page list bucket can be stored to up to 2 pages (this value is not
* static, but depends on PagesCache.MAX_SIZE, so if PagesCache.MAX_SIZE > PagesListNodeIO#getCapacity it can take
* more than 2 pages). Also some amount of page memory needed to store page list metadata.
*/
private static final double PAGE_LIST_CACHE_LIMIT_THRESHOLD = 0.1;
/**
* @see IgniteSystemProperties#IGNITE_PDS_WAL_REBALANCE_THRESHOLD
* @see #HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY
*/
public static final int DFLT_PDS_WAL_REBALANCE_THRESHOLD = 500;
/** @see IgniteSystemProperties#IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE */
public static final int DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE = 60;
/**
* Threshold value to use history or full rebalance for local partition.
* Master value contained in {@link #historicalRebalanceThreshold}.
*/
private final int walRebalanceThresholdLegacy =
getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
/** WAL rebalance threshold distributed configuration key */
public static final String HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY = "historical.rebalance.threshold";
/** Prefer historical rebalance flag. */
private final boolean preferWalRebalance = getBoolean(IGNITE_PREFER_WAL_REBALANCE);
/** Value of property for throttling policy override. */
private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
/** Defragmentation regions size percentage of configured ones. */
private final int defragmentationRegionSizePercentageOfConfiguredSize =
getInteger(IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE, DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE);
/** */
private static final String MBEAN_NAME = "DataStorageMetrics";
/** */
private static final String MBEAN_GROUP = "Persistent Store";
/** WAL marker prefix for meta store. */
private static final String WAL_KEY_PREFIX = "grp-wal-";
/** Prefix for meta store records which means that WAL was disabled globally for some group. */
private static final String WAL_GLOBAL_KEY_PREFIX = WAL_KEY_PREFIX + "disabled-";
/** Prefix for meta store records which means that WAL was disabled locally for some group. */
private static final String WAL_LOCAL_KEY_PREFIX = WAL_KEY_PREFIX + "local-disabled-";
/** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
/** Default checkpoint deviation from the configured frequency in percentage. */
private static final int DEFAULT_CHECKPOINT_DEVIATION = 40;
/** */
private FilePageStoreManager storeMgr;
/** */
CheckpointManager checkpointManager;
/** Database configuration. */
private final DataStorageConfiguration persistenceCfg;
/**
* The position of last seen WAL pointer. Used for resumming logging from this pointer.
*
* If binary memory recovery pefrormed on node start, the checkpoint END pointer will store
* not the last WAL pointer and can't be used for resumming logging.
*/
private volatile WALPointer walTail;
/**
* Lock holder for compatible folders mode. Null if lock holder was created at start node. <br>
* In this case lock is held on PDS resover manager and it is not required to manage locking here
*/
@Nullable private NodeFileLockHolder fileLockHolder;
/** Lock wait time. */
private final long lockWaitTime;
/**
* This is the earliest WAL pointer that was reserved during exchange and would release after exchange completed.
* Guarded by {@code this}.
*/
@Nullable private WALPointer reservedForExchange;
/** This is the earliest WAL pointer that was reserved during preloading. */
private final AtomicReference<WALPointer> reservedForPreloading = new AtomicReference<>();
/** Snapshot manager. */
private IgniteCacheSnapshotManager snapshotMgr;
/** */
private final DataStorageMetricsImpl persStoreMetrics;
/**
* MetaStorage instance. Value {@code null} means storage not initialized yet.
* Guarded by {@link GridCacheDatabaseSharedManager#checkpointReadLock()}
*/
@Nullable
private MetaStorage metaStorage;
/** Temporary metastorage to migration of index partition. {@see IGNITE-8735}. */
private MetaStorage.TmpStorage tmpMetaStorage;
/** */
private List<MetastorageLifecycleListener> metastorageLifecycleLsnrs;
/** Initially disabled cache groups. */
private final Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>();
/** Initially local wal disabled groups. */
private final Collection<Integer> initiallyLocWalDisabledGrps = new HashSet<>();
/** Flag allows to log additional information about partitions during recovery phases. */
private final boolean recoveryVerboseLogging =
getBoolean(IgniteSystemProperties.IGNITE_RECOVERY_VERBOSE_LOGGING, false);
/** Page list cache limits per data region. */
private final Map<String, AtomicLong> pageListCacheLimits = new ConcurrentHashMap<>();
/** */
private CachePartitionDefragmentationManager defrgMgr;
/** Data regions which should be checkpointed. */
protected final Set<DataRegion> checkpointedDataRegions = new GridConcurrentHashSet<>();
/** Checkpoint frequency deviation. */
private SimpleDistributedProperty<Integer> cpFreqDeviation;
/** WAL rebalance threshold. */
private final SimpleDistributedProperty<Integer> historicalRebalanceThreshold =
new SimpleDistributedProperty<>(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, Integer::parseInt);
/**
* @param ctx Kernal context.
*/
public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
IgniteConfiguration cfg = ctx.config();
persistenceCfg = cfg.getDataStorageConfiguration();
assert persistenceCfg != null;
lockWaitTime = persistenceCfg.getLockWaitTime();
persStoreMetrics = new DataStorageMetricsImpl(
ctx.metric(),
persistenceCfg.isMetricsEnabled(),
persistenceCfg.getMetricsRateTimeInterval(),
persistenceCfg.getMetricsSubIntervalCount()
);
}
/**
* @return File store manager.
*/
public FilePageStoreManager getFileStoreManager() {
return storeMgr;
}
/** Registers system view. */
private void registerSystemView() {
cctx.kernalContext().systemView().registerView(METASTORE_VIEW, METASTORE_VIEW_DESC,
new MetastorageViewWalker(), () -> {
try {
List<MetastorageView> data = new ArrayList<>();
metaStorage.iterate("", (key, valBytes) -> {
try {
Serializable val = metaStorage.marshaller().unmarshal((byte[])valBytes, U.gridClassLoader());
data.add(new MetastorageView(key, IgniteUtils.toStringSafe(val)));
}
catch (IgniteCheckedException ignored) {
data.add(new MetastorageView(key, "[Raw data. " + (((byte[])valBytes).length + " bytes]")));
}
}, false);
return data;
}
catch (IgniteCheckedException e) {
log.warning("Metastore iteration error", e);
return emptyList();
}
}, identity());
}
/** */
private void notifyMetastorageReadyForRead() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
lsnr.onReadyForRead(metaStorage);
}
/** */
private void notifyMetastorageReadyForReadWrite() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
lsnr.onReadyForReadWrite(metaStorage);
}
/**
*
*/
public Checkpointer getCheckpointer() {
return checkpointManager.getCheckpointer();
}
/**
* Returns true if historical rebalance is preferred,
* false means using heuristic for determine rebalance type.
*
* @return Flag of preferred historical rebalance.
*/
public boolean preferWalRebalance() {
return preferWalRebalance;
}
/**
* For test use only.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
IgniteInternalFuture<Void> fut = checkpointManager.enableCheckpoints(enable);
wakeupForCheckpoint("enableCheckpoints()");
return fut;
}
/** {@inheritDoc} */
@Override protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
super.initDataRegions0(memCfg);
addDataRegion(memCfg, createMetastoreDataRegionConfig(memCfg), false);
List<DataRegionMetrics> regionMetrics = dataRegionMap.values().stream()
.map(DataRegion::metrics)
.collect(Collectors.toList());
persStoreMetrics.regionMetrics(regionMetrics);
}
/**
* Create metastorage data region configuration with enabled persistence by default.
*
* @param storageCfg Data storage configuration.
* @return Data region configuration.
*/
private DataRegionConfiguration createMetastoreDataRegionConfig(DataStorageConfiguration storageCfg) {
DataRegionConfiguration cfg = new DataRegionConfiguration();
cfg.setName(METASTORE_DATA_REGION_NAME);
cfg.setInitialSize(storageCfg.getSystemRegionInitialSize());
cfg.setMaxSize(storageCfg.getSystemRegionMaxSize());
cfg.setPersistenceEnabled(true);
cfg.setLazyMemoryAllocation(false);
return cfg;
}
/** */
private DataRegionConfiguration createDefragmentationDataRegionConfig(long regionSize) {
DataRegionConfiguration cfg = new DataRegionConfiguration();
cfg.setName(DEFRAGMENTATION_PART_REGION_NAME);
cfg.setInitialSize(regionSize);
cfg.setMaxSize(regionSize);
cfg.setPersistenceEnabled(true);
cfg.setLazyMemoryAllocation(false);
return cfg;
}
/** */
private DataRegionConfiguration createDefragmentationMappingRegionConfig(long regionSize) {
DataRegionConfiguration cfg = new DataRegionConfiguration();
cfg.setName(DEFRAGMENTATION_MAPPING_REGION_NAME);
cfg.setInitialSize(regionSize);
cfg.setMaxSize(regionSize);
cfg.setPersistenceEnabled(true);
cfg.setLazyMemoryAllocation(false);
return cfg;
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
snapshotMgr = cctx.snapshot();
IgnitePageStoreManager store = cctx.pageStore();
assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
storeMgr = (FilePageStoreManager)store;
final GridKernalContext kernalCtx = cctx.kernalContext();
assert !kernalCtx.clientNode();
initWalRebalanceThreshold();
if (!kernalCtx.clientNode()) {
kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
cpFreqDeviation = new SimpleDistributedProperty<>("checkpoint.deviation", Integer::parseInt);
kernalCtx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
cpFreqDeviation.addListener((name, oldVal, newVal) ->
U.log(log, "Checkpoint frequency deviation changed [oldVal=" + oldVal + ", newVal=" + newVal + "]"));
dispatcher.registerProperty(cpFreqDeviation);
});
checkpointManager = new CheckpointManager(
kernalCtx::log,
cctx.igniteInstanceName(),
"db-checkpoint-thread",
cctx.wal(),
kernalCtx.workersRegistry(),
persistenceCfg,
storeMgr,
this::isCheckpointInapplicableForWalRebalance,
this::checkpointedDataRegions,
this::cacheGroupContexts,
this::getPageMemoryForCacheGroup,
resolveThrottlingPolicy(),
snapshotMgr,
persistentStoreMetricsImpl(),
kernalCtx.longJvmPauseDetector(),
kernalCtx.failure(),
kernalCtx.cache(),
() -> cpFreqDeviation.getOrDefault(DEFAULT_CHECKPOINT_DEVIATION)
);
final NodeFileLockHolder preLocked = kernalCtx.pdsFolderResolver()
.resolveFolders()
.getLockedFileLockHolder();
acquireFileLock(preLocked);
cleanupTempCheckpointDirectory();
persStoreMetrics.wal(cctx.wal());
}
}
/** {@inheritDoc} */
@Override protected void initDataRegions(DataStorageConfiguration memCfg) throws IgniteCheckedException {
if (isDefragmentationScheduled() && !dataRegionsInitialized) {
//Region size configuration will be changed for defragmentation needs.
memCfg = configureDataRegionForDefragmentation(memCfg);
}
super.initDataRegions(memCfg);
}
/**
* Configure data regions:
* <p> Size of configured cache data regions will be decreased in order of freeing space for</p>
* <p>defragmentation needs. * New defragmentation regions will be created which size would be based on freed space
* from previous step.</p>
*
* @param memCfg Data storage configuration with data region configurations.
* @return New data storage configuration which contains data regions with changed size.
* @throws IgniteCheckedException If fail.
*/
private DataStorageConfiguration configureDataRegionForDefragmentation(
DataStorageConfiguration memCfg
) throws IgniteCheckedException {
List<DataRegionConfiguration> regionConfs = new ArrayList<>();
DataStorageConfiguration dataConf = memCfg; //not do the changes in-place it's better to make the copy of memCfg.
regionConfs.add(dataConf.getDefaultDataRegionConfiguration());
if (dataConf.getDataRegionConfigurations() != null)
regionConfs.addAll(Arrays.asList(dataConf.getDataRegionConfigurations()));
long totalDefrRegionSize = 0;
long totalRegionsSize = 0;
for (DataRegionConfiguration regionCfg : regionConfs) {
totalDefrRegionSize = Math.max(
totalDefrRegionSize,
(long)(regionCfg.getMaxSize() * 0.01 * defragmentationRegionSizePercentageOfConfiguredSize)
);
totalRegionsSize += regionCfg.getMaxSize();
}
double shrinkPercentage = 1d * (totalRegionsSize - totalDefrRegionSize) / totalRegionsSize;
for (DataRegionConfiguration region : regionConfs) {
long newSize = (long)(region.getMaxSize() * shrinkPercentage);
long newInitSize = Math.min(region.getInitialSize(), newSize);
log.info("Region size was reassigned by defragmentation reason: " +
"region = '" + region.getName() + "', " +
"oldInitialSize = '" + region.getInitialSize() + "', " +
"newInitialSize = '" + newInitSize + "', " +
"oldMaxSize = '" + region.getMaxSize() + "', " +
"newMaxSize = '" + newSize
);
region.setMaxSize(newSize);
region.setInitialSize(newInitSize);
region.setCheckpointPageBufferSize(0);
}
long mappingRegionSize = Math.min(GB, (long)(totalDefrRegionSize * 0.1));
checkpointedDataRegions.remove(
addDataRegion(
memCfg,
createDefragmentationDataRegionConfig(totalDefrRegionSize - mappingRegionSize),
true,
new DefragmentationPageReadWriteManager(cctx.kernalContext(), "defrgPartitionsStore")
)
);
checkpointedDataRegions.remove(
addDataRegion(
memCfg,
createDefragmentationMappingRegionConfig(mappingRegionSize),
true,
new DefragmentationPageReadWriteManager(cctx.kernalContext(), "defrgLinkMappingStore")
)
);
return dataConf;
}
/**
* @return {@code true} if maintenance mode is on and defragmentation task exists.
*/
private boolean isDefragmentationScheduled() {
return cctx.kernalContext().maintenanceRegistry().activeMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME) != null;
}
/** */
public Collection<DataRegion> checkpointedDataRegions() {
return checkpointedDataRegions;
}
/** */
private Collection<CacheGroupContext> cacheGroupContexts() {
return cctx.cache().cacheGroups();
}
/**
* Cleanup checkpoint directory from all temporary files.
*/
@Override public void cleanupTempCheckpointDirectory() throws IgniteCheckedException {
checkpointManager.cleanupTempCheckpointDirectory();
}
/** {@inheritDoc} */
@Override public void cleanupRestoredCaches() {
if (dataRegionMap.isEmpty())
return;
boolean hasMvccCache = false;
for (CacheGroupDescriptor grpDesc : cctx.cache().cacheGroupDescriptors().values()) {
hasMvccCache |= grpDesc.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
String regionName = grpDesc.config().getDataRegionName();
DataRegion region = regionName != null ? dataRegionMap.get(regionName) : dfltDataRegion;
if (region == null)
continue;
if (log.isInfoEnabled())
log.info("Page memory " + region.config().getName() + " for " + grpDesc + " has invalidated.");
int partitions = grpDesc.config().getAffinity().partitions();
if (region.pageMemory() instanceof PageMemoryEx) {
PageMemoryEx memEx = (PageMemoryEx)region.pageMemory();
for (int partId = 0; partId < partitions; partId++)
memEx.invalidate(grpDesc.groupId(), partId);
memEx.invalidate(grpDesc.groupId(), PageIdAllocator.INDEX_PARTITION);
}
if (grpDesc.config().isEncryptionEnabled())
cctx.kernalContext().encryption().onCacheGroupStop(grpDesc.groupId());
}
if (!hasMvccCache && dataRegionMap.containsKey(TxLog.TX_LOG_CACHE_NAME)) {
PageMemory memory = dataRegionMap.get(TxLog.TX_LOG_CACHE_NAME).pageMemory();
if (memory instanceof PageMemoryEx)
((PageMemoryEx)memory).invalidate(TxLog.TX_LOG_CACHE_ID, PageIdAllocator.INDEX_PARTITION);
}
final boolean hasMvccCache0 = hasMvccCache;
storeMgr.cleanupPageStoreIfMatch(
new Predicate<Integer>() {
@Override public boolean test(Integer grpId) {
return MetaStorage.METASTORAGE_CACHE_ID != grpId &&
(TxLog.TX_LOG_CACHE_ID != grpId || !hasMvccCache0);
}
},
true);
}
/** {@inheritDoc} */
@Override public void cleanupCheckpointDirectory() throws IgniteCheckedException {
checkpointManager.cleanupCheckpointDirectory();
}
/**
* @param preLocked Pre-locked file lock holder.
*/
private void acquireFileLock(NodeFileLockHolder preLocked) throws IgniteCheckedException {
if (cctx.kernalContext().clientNode())
return;
fileLockHolder = preLocked == null ?
new NodeFileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log) : preLocked;
if (!fileLockHolder.isLocked()) {
if (log.isDebugEnabled())
log.debug("Try to capture file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.tryLock(lockWaitTime);
}
}
/**
*
*/
private void releaseFileLock() {
if (cctx.kernalContext().clientNode() || fileLockHolder == null)
return;
if (log.isDebugEnabled())
log.debug("Release file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.close();
}
/** */
private void prepareCacheDefragmentation(List<String> cacheNames) throws IgniteCheckedException {
GridKernalContext kernalCtx = cctx.kernalContext();
DataStorageConfiguration dsCfg = kernalCtx.config().getDataStorageConfiguration();
assert CU.isPersistenceEnabled(dsCfg);
List<DataRegion> regions = Arrays.asList(
dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME),
dataRegion(DEFRAGMENTATION_PART_REGION_NAME)
);
LightweightCheckpointManager lightCheckpointMgr = new LightweightCheckpointManager(
kernalCtx::log,
cctx.igniteInstanceName(),
"db-checkpoint-thread-defrag",
kernalCtx.workersRegistry(),
persistenceCfg,
() -> regions,
this::getPageMemoryForCacheGroup,
resolveThrottlingPolicy(),
snapshotMgr,
persistentStoreMetricsImpl(),
kernalCtx.longJvmPauseDetector(),
kernalCtx.failure(),
kernalCtx.cache()
);
lightCheckpointMgr.start();
defrgMgr = new CachePartitionDefragmentationManager(
cacheNames,
cctx,
this,
(FilePageStoreManager)cctx.pageStore(),
checkpointManager,
lightCheckpointMgr,
persistenceCfg.getPageSize(),
persistenceCfg.getDefragmentationThreadPoolSize()
);
}
/** */
public CachePartitionDefragmentationManager defragmentationManager() {
return defrgMgr;
}
/** {@inheritDoc} */
@Override public DataRegion addDataRegion(DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg,
boolean trackable, PageReadWriteManager pmPageMgr) throws IgniteCheckedException {
DataRegion region = super.addDataRegion(dataStorageCfg, dataRegionCfg, trackable, pmPageMgr);
checkpointedDataRegions.add(region);
return region;
}
/** */
private void readMetastore() throws IgniteCheckedException {
try {
CheckpointStatus status = readCheckpointStatus();
checkpointReadLock();
try {
dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().start();
performBinaryMemoryRestore(status, onlyMetastorageGroup(), physicalRecords(), false);
metaStorage = createMetastorage(true);
applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), true);
fillWalDisabledGroups();
checkpointManager.initializeStorage();
registerSystemView();
notifyMetastorageReadyForRead();
cctx.kernalContext().maintenanceRegistry()
.registerWorkflowCallbackIfTaskExists(
DEFRAGMENTATION_MNTC_TASK_NAME,
task -> {
prepareCacheDefragmentation(fromStore(task).cacheNames());
return new DefragmentationWorkflowCallback(
cctx.kernalContext()::log,
defrgMgr,
cctx.kernalContext().failure()
);
}
);
}
finally {
if (metaStorage != null)
metaStorage.close();
metaStorage = null;
dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().stop(false);
cctx.pageStore().cleanupPageStoreIfMatch(new Predicate<Integer>() {
@Override public boolean test(Integer grpId) {
return MetaStorage.METASTORAGE_CACHE_ID == grpId;
}
}, false);
checkpointReadUnlock();
}
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Activate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
snapshotMgr = cctx.snapshot();
checkpointManager.init();
super.onActivate(ctx);
if (!cctx.kernalContext().clientNode())
finishRecovery();
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
onKernalStop0(false);
super.onDeActivate(kctx);
/* Must be here, because after deactivate we can invoke activate and file lock must be already configured */
checkpointManager.unblockCheckpointLock();
}
/** {@inheritDoc} */
@Override protected void registerMetricsMBeans(IgniteConfiguration cfg) {
super.registerMetricsMBeans(cfg);
registerMetricsMBean(
cctx.kernalContext().config(),
MBEAN_GROUP,
MBEAN_NAME,
persStoreMetrics,
DataStorageMetricsMXBean.class
);
}
/** {@inheritDoc} */
@Deprecated
@Override protected IgniteOutClosure<Long> freeSpaceProvider(final DataRegionConfiguration dataRegCfg) {
if (!dataRegCfg.isPersistenceEnabled())
return super.freeSpaceProvider(dataRegCfg);
final String dataRegName = dataRegCfg.getName();
return new IgniteOutClosure<Long>() {
@Override public Long apply() {
long freeSpace = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
freeSpace += ((GridCacheOffheapManager)grpCtx.offheap()).freeSpace();
}
return freeSpace;
}
};
}
/** {@inheritDoc} */
@Override protected DataRegionMetricsProvider dataRegionMetricsProvider(final DataRegionConfiguration dataRegCfg) {
if (!dataRegCfg.isPersistenceEnabled())
return super.dataRegionMetricsProvider(dataRegCfg);
final String dataRegName = dataRegCfg.getName();
return new DataRegionMetricsProvider() {
@Override public long partiallyFilledPagesFreeSpace() {
long freeSpace = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
freeSpace += ((GridCacheOffheapManager)grpCtx.offheap()).freeSpace();
}
return freeSpace;
}
@Override public long emptyDataPages() {
long emptyDataPages = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
emptyDataPages += ((GridCacheOffheapManager)grpCtx.offheap()).emptyDataPages();
}
return emptyDataPages;
}
};
}
/**
* Restores last valid WAL pointer and resumes logging from that pointer.
* Re-creates metastorage if needed.
*
* @throws IgniteCheckedException If failed.
*/
private void finishRecovery() throws IgniteCheckedException {
assert !cctx.kernalContext().clientNode();
long time = System.currentTimeMillis();
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
try {
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeResumeWalLogging(this);
// Try to resume logging since last finished checkpoint if possible.
if (walTail == null) {
CheckpointStatus status = readCheckpointStatus();
walTail = CheckpointStatus.NULL_PTR.equals(status.endPtr) ? null : status.endPtr;
}
resumeWalLogging();
walTail = null;
// Recreate metastorage to refresh page memory state after deactivation.
if (metaStorage == null)
metaStorage = createMetastorage(false);
notifyMetastorageReadyForReadWrite();
U.log(log, "Finish recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw e;
}
finally {
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
}
/**
* @param readOnly Metastorage read-only mode.
* @return Instance of Metastorage.
* @throws IgniteCheckedException If failed to create metastorage.
*/
private MetaStorage createMetastorage(boolean readOnly) throws IgniteCheckedException {
cctx.pageStore().initializeForMetastorage();
MetaStorage storage = new MetaStorage(cctx, dataRegion(METASTORE_DATA_REGION_NAME), readOnly);
storage.init(this);
return storage;
}
/**
* @param cacheGroupsPredicate Cache groups to restore.
* @param recordTypePredicate Filter records by type.
* @return Last seen WAL pointer during binary memory recovery.
* @throws IgniteCheckedException If failed.
*/
private RestoreBinaryState restoreBinaryMemory(
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate
) throws IgniteCheckedException {
long time = System.currentTimeMillis();
try {
log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeBinaryMemoryRestore(this);
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
RestoreBinaryState binaryState = performBinaryMemoryRestore(
status,
cacheGroupsPredicate,
recordTypePredicate,
true
);
WALPointer restored = binaryState.lastReadRecordPointer();
if (restored.equals(CheckpointStatus.NULL_PTR))
restored = null; // This record is first
else
restored = restored.next();
if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " +
"[tailWalPtr=" + restored + ", endPtr=" + status.endPtr + ']');
}
else if (restored != null)
U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
// Wal logging is now available.
cctx.wal().resumeLogging(restored);
// Log MemoryRecoveryRecord to make sure that old physical records are not replayed during
// next physical recovery.
checkpointManager.memoryRecoveryRecordPtr(cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())));
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.afterBinaryMemoryRestore(this, binaryState);
if (log.isInfoEnabled())
log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
return binaryState;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw e;
}
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
if (defrgMgr != null)
defrgMgr.cancel();
checkpointManager.stop(cancel);
super.onKernalStop0(cancel);
unregisterMetricsMBean(
cctx.gridConfig(),
MBEAN_GROUP,
MBEAN_NAME
);
if (metaStorage != null)
metaStorage.close();
metaStorage = null;
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
super.stop0(cancel);
releaseFileLock();
}
/** */
private long[] calculateFragmentSizes(int concLvl, long cacheSize, long chpBufSize) {
if (concLvl < 2)
concLvl = Runtime.getRuntime().availableProcessors();
long fragmentSize = cacheSize / concLvl;
if (fragmentSize < 1024 * 1024)
fragmentSize = 1024 * 1024;
long[] sizes = new long[concLvl + 1];
for (int i = 0; i < concLvl; i++)
sizes[i] = fragmentSize;
sizes[concLvl] = chpBufSize;
return sizes;
}
/** {@inheritDoc} */
@Override protected PageMemory createPageMemory(
DirectMemoryProvider memProvider,
DataStorageConfiguration memCfg,
DataRegionConfiguration plcCfg,
DataRegionMetricsImpl memMetrics,
final boolean trackable,
PageReadWriteManager pmPageMgr
) {
if (!plcCfg.isPersistenceEnabled())
return super.createPageMemory(memProvider, memCfg, plcCfg, memMetrics, trackable, pmPageMgr);
memMetrics.persistenceEnabled(true);
long cacheSize = plcCfg.getMaxSize();
// Checkpoint buffer size can not be greater than cache size, it does not make sense.
long chpBufSize = checkpointBufferSize(plcCfg);
if (chpBufSize > cacheSize) {
U.quietAndInfo(log,
"Configured checkpoint page buffer size is too big, setting to the max region size [size="
+ U.readableSize(cacheSize, false) + ", memPlc=" + plcCfg.getName() + ']');
chpBufSize = cacheSize;
}
GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
if (trackable)
changeTracker = new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(
Long page,
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
if (trackable)
snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
}
};
else
changeTracker = null;
PageMemoryImpl pageMem = new PageMemoryImpl(
wrapMetricsPersistentMemoryProvider(memProvider, memMetrics),
calculateFragmentSizes(
memCfg.getConcurrencyLevel(),
cacheSize,
chpBufSize
),
cctx,
pmPageMgr,
memCfg.getPageSize(),
(fullId, pageBuf, tag) -> {
memMetrics.onPageWritten();
// We can write only page from disk into snapshot.
snapshotMgr.beforePageWrite(fullId);
// Write page to disk.
pmPageMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, tag, true);
getCheckpointer().currentProgress().updateEvictedPages(1);
},
changeTracker,
this,
memMetrics,
resolveThrottlingPolicy(),
() -> getCheckpointer().currentProgress()
);
memMetrics.pageMemory(pageMem);
return pageMem;
}
/**
* @param memoryProvider0 Memory provider.
* @param memMetrics Memory metrics.
* @return Wrapped memory provider.
*/
private DirectMemoryProvider wrapMetricsPersistentMemoryProvider(
final DirectMemoryProvider memoryProvider0,
final DataRegionMetricsImpl memMetrics
) {
return new DirectMemoryProvider() {
private final AtomicInteger checkPointBufferIdxCnt = new AtomicInteger();
private final DirectMemoryProvider memProvider = memoryProvider0;
@Override public void initialize(long[] chunkSizes) {
memProvider.initialize(chunkSizes);
checkPointBufferIdxCnt.set(chunkSizes.length);
}
@Override public void shutdown(boolean deallocate) {
memProvider.shutdown(deallocate);
}
@Override public DirectMemoryRegion nextRegion() {
DirectMemoryRegion nextMemoryRegion = memProvider.nextRegion();
if (nextMemoryRegion == null)
return null;
int idx = checkPointBufferIdxCnt.decrementAndGet();
long chunkSize = nextMemoryRegion.size();
// Checkpoint chunk last in the long[] chunkSizes.
if (idx != 0)
memMetrics.updateOffHeapSize(chunkSize);
else
memMetrics.updateCheckpointBufferSize(chunkSize);
return nextMemoryRegion;
}
};
}
/**
* Resolves throttling policy according to the settings.
*/
@NotNull private PageMemoryImpl.ThrottlingPolicy resolveThrottlingPolicy() {
PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled()
? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
: PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
if (throttlingPolicyOverride != null) {
try {
plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase());
}
catch (IllegalArgumentException e) {
log.error("Incorrect value of IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property. " +
"The default throttling policy will be used [plc=" + throttlingPolicyOverride +
", defaultPlc=" + plc + ']');
}
}
return plc;
}
/** {@inheritDoc} */
@Override protected void checkRegionEvictionProperties(DataRegionConfiguration regCfg, DataStorageConfiguration dbCfg)
throws IgniteCheckedException {
if (!regCfg.isPersistenceEnabled())
super.checkRegionEvictionProperties(regCfg, dbCfg);
else if (regCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
U.warn(log, "Page eviction mode will have no effect because the oldest pages are evicted automatically " +
"if Ignite persistence is enabled: " + regCfg.getName());
}
}
/** {@inheritDoc} */
@Override protected void checkPageSize(DataStorageConfiguration memCfg) {
if (memCfg.getPageSize() == 0) {
try {
assert cctx.pageStore() instanceof FilePageStoreManager :
"Invalid page store manager was created: " + cctx.pageStore();
Path anyIdxPartFile = IgniteUtils.searchFileRecursively(
((FilePageStoreManager)cctx.pageStore()).workDir().toPath(), FilePageStoreManager.INDEX_FILE_NAME);
if (anyIdxPartFile != null) {
memCfg.setPageSize(resolvePageSizeFromPartitionFile(anyIdxPartFile));
return;
}
}
catch (IgniteCheckedException | IOException | IllegalArgumentException e) {
U.quietAndWarn(log, "Attempt to resolve pageSize from store files failed: " + e.getMessage());
U.quietAndWarn(log, "Default page size will be used: " + DataStorageConfiguration.DFLT_PAGE_SIZE + " bytes");
}
memCfg.setPageSize(DataStorageConfiguration.DFLT_PAGE_SIZE);
}
}
/**
* @param partFile Partition file.
*/
private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException {
FileIOFactory ioFactory = persistenceCfg.getFileIOFactory();
try (FileIO fileIO = ioFactory.create(partFile.toFile())) {
int minimalHdr = FilePageStore.HEADER_SIZE;
if (fileIO.size() < minimalHdr)
throw new IgniteCheckedException("Partition file is too small: " + partFile);
ByteBuffer hdr = ByteBuffer.allocate(minimalHdr).order(ByteOrder.nativeOrder());
fileIO.readFully(hdr);
hdr.rewind();
hdr.getLong(); // Read signature.
hdr.getInt(); // Read version.
hdr.get(); // Read type.
int pageSize = hdr.getInt();
if (pageSize == 2048) {
U.quietAndWarn(log, "You are currently using persistent store with 2K pages (DataStorageConfiguration#" +
"pageSize). If you use SSD disk, consider migrating to 4K pages for better IO performance.");
}
return pageSize;
}
}
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
// Try to restore partition states.
if (fut.localJoinExchange() || fut.activateCluster()
|| (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
cctx.cache().cacheGroups(),
cacheGroup -> {
if (cacheGroup.isLocal())
return null;
cctx.database().checkpointReadLock();
try {
cacheGroup.offheap().restorePartitionStates(Collections.emptyMap());
if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
cacheGroup.topology().afterStateRestored(fut.initialVersion());
fut.timeBag().finishLocalStage("Restore partition states " +
"[grp=" + cacheGroup.cacheOrGroupName() + "]");
}
finally {
cctx.database().checkpointReadUnlock();
}
return null;
}
);
fut.timeBag().finishGlobalStage("Restore partition states");
}
if (cctx.kernalContext().query().moduleEnabled())
cctx.kernalContext().query().beforeExchange(fut);
}
/** {@inheritDoc} */
@Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture exchangeFut) {
if (defrgMgr != null)
return;
Collection<GridCacheContext> rejected = rebuildIndexes(
cctx.cacheContexts(),
(cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()) &&
cctx.kernalContext().query().rebuildIndexOnExchange(cacheCtx.cacheId(), exchangeFut),
false
);
if (!rejected.isEmpty()) {
cctx.kernalContext().query().removeIndexRebuildFuturesOnExchange(
exchangeFut,
rejected.stream().map(GridCacheContext::cacheId).collect(toSet())
);
}
}
/** {@inheritDoc} */
@Override public Collection<GridCacheContext> forceRebuildIndexes(Collection<GridCacheContext> contexts) {
Set<Integer> cacheIds = contexts.stream().map(GridCacheContext::cacheId).collect(toSet());
Set<Integer> rejected = cctx.kernalContext().query().prepareRebuildIndexes(cacheIds);
if (log.isDebugEnabled()) {
log.debug("Preparing features of rebuilding indexes for caches on force rebuild [requested=" + cacheIds +
", rejected=" + rejected + ']');
}
rebuildIndexes(contexts, (cacheCtx) -> !rejected.contains(cacheCtx.cacheId()), true);
return rejected.isEmpty() ? emptyList() :
contexts.stream().filter(ctx -> rejected.contains(ctx.cacheId())).collect(toList());
}
/**
* Rebuilding indexes for caches.
*
* @param contexts Collection of cache contexts for which indexes should be rebuilt.
* @param rebuildCond Condition that should be met for indexes to be rebuilt for specific cache.
* @param force Force rebuild indexes.
* @return Cache contexts that did not pass by {@code rebuildCond}.
*/
private Collection<GridCacheContext> rebuildIndexes(
Collection<GridCacheContext> contexts,
Predicate<GridCacheContext> rebuildCond,
boolean force
) {
GridQueryProcessor qryProc = cctx.kernalContext().query();
if (!qryProc.moduleEnabled())
return emptyList();
GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
contexts.size(),
() -> {
if (log.isInfoEnabled())
log.info("Indexes rebuilding completed for all caches.");
},
1 //need at least 1 index rebuilded to print message about rebuilding completion
);
Collection<GridCacheContext> rejected = null;
for (GridCacheContext cacheCtx : contexts) {
if (rebuildCond.test(cacheCtx)) {
IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(
cacheCtx,
force || !qryProc.rebuildIndexesCompleted(cacheCtx)
);
if (rebuildFut != null)
rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
else
rebuildIndexesCompleteCntr.countDown(false);
}
else {
if (rejected == null)
rejected = new ArrayList<>();
rejected.add(cacheCtx);
}
}
return rejected == null ? emptyList() : rejected;
}
/**
* Return short information about cache.
*
* @param cacheCtx Cache context.
* @return Short cache info.
*/
private String cacheInfo(GridCacheContext cacheCtx) {
assert nonNull(cacheCtx);
return "name=" + cacheCtx.name() + ", grpName=" + cacheCtx.group().name();
}
/** {@inheritDoc} */
@Override public void onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps) {
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
List<Integer> stoppedGrpIds = stoppedGrps.stream()
.filter(IgniteBiTuple::get2)
.map(t -> t.get1().groupId())
.collect(toList());
cctx.snapshotMgr().onCacheGroupsStopped(stoppedGrpIds);
initiallyLocWalDisabledGrps.removeAll(stoppedGrpIds);
initiallyGlobalWalDisabledGrps.removeAll(stoppedGrpIds);
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext gctx = tup.get1();
boolean destroy = tup.get2();
int grpId = gctx.groupId();
DataRegion dataRegion = gctx.dataRegion();
if (dataRegion != null)
dataRegion.metrics().removeCacheGrpPageMetrics(grpId);
if (!gctx.persistenceEnabled())
continue;
snapshotMgr.onCacheGroupStop(gctx, destroy);
PageMemoryEx pageMem = (PageMemoryEx) dataRegion.pageMemory();
Collection<Integer> grpIds = destroyed.computeIfAbsent(pageMem, k -> new HashSet<>());
grpIds.add(grpId);
if (gctx.config().isEncryptionEnabled())
cctx.kernalContext().encryption().onCacheGroupStop(grpId);
pageMem.onCacheGroupDestroyed(grpId);
if (destroy)
cctx.kernalContext().encryption().onCacheGroupDestroyed(grpId);
}
Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
final Collection<Integer> grpIds = entry.getValue();
clearFuts.add(entry.getKey().clearAsync((grpId, pageIdg) -> grpIds.contains(grpId), false));
}
for (IgniteInternalFuture<Void> clearFut : clearFuts) {
try {
clearFut.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to clear page memory", e);
}
}
if (cctx.pageStore() != null) {
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext grp = tup.get1();
try {
cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
}
/**
* Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory
* state.
*
* @throws IgniteException If failed.
*/
@Override public void checkpointReadLock() {
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
}
/** {@inheritDoc} */
@Override public boolean checkpointLockIsHeldByThread() {
return checkpointManager.checkpointTimeoutLock().checkpointLockIsHeldByThread();
}
/**
* Releases the checkpoint read lock.
*/
@Override public void checkpointReadUnlock() {
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
/** {@inheritDoc} */
@Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
assert reservedForExchange == null : reservedForExchange;
Map</*grpId*/Integer, Set</*partId*/Integer>> applicableGroupsAndPartitions = partitionsApplicableForWalRebalance();
Map</*grpId*/Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> earliestValidCheckpoints;
checkpointReadLock();
WALPointer reservedCheckpointMark;
try {
CheckpointHistoryResult checkpointHistoryResult =
checkpointHistory().searchAndReserveCheckpoints(applicableGroupsAndPartitions);
earliestValidCheckpoints = checkpointHistoryResult.earliestValidCheckpoints();
reservedForExchange = reservedCheckpointMark = checkpointHistoryResult.reservedCheckpointMark();
}
finally {
checkpointReadUnlock();
}
Map</*grpId*/Integer, Map</*partId*/Integer, /*updCntr*/Long>> grpPartsWithCnts = new HashMap<>();
for (Map.Entry<Integer, T2</*reason*/ReservationReason, Map</*partId*/Integer, CheckpointEntry>>> e :
earliestValidCheckpoints.entrySet()) {
int grpId = e.getKey();
if (e.getValue().get2() == null)
continue;
for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().get2().entrySet()) {
CheckpointEntry cpEntry = e0.getValue();
int partId = e0.getKey();
if (reservedCheckpointMark != null && !cctx.wal().reserved(reservedCheckpointMark)) {
log.warning("Reservation failed because the segment was released: " + reservedCheckpointMark);
reservedForExchange = null;
grpPartsWithCnts.clear();
return grpPartsWithCnts;
}
try {
Long updCntr = cpEntry.partitionCounter(cctx.wal(), grpId, partId);
if (updCntr != null)
grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
}
catch (IgniteCheckedException ex) {
log.warning("Reservation failed because counters are not available [grpId=" + grpId
+ ", part=" + partId
+ ", cp=(" + cpEntry.checkpointId() + ", " + U.format(cpEntry.timestamp()) + ")]", ex);
}
}
}
if (log.isInfoEnabled() && !F.isEmpty(earliestValidCheckpoints))
printReservationToLog(earliestValidCheckpoints);
return grpPartsWithCnts;
}
/**
* Prints detail information about caches which were not reserved
* and reservation depth for the caches which have WAL history enough.
*
* @param earliestValidCheckpoints Map contains information about caches' reservation.
*/
private void printReservationToLog(
Map<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> earliestValidCheckpoints) {
try {
Map<ReservationReason, List<Integer>> notReservedCachesToPrint = new HashMap<>();
Map<ReservationReason, List<T2<Integer, CheckpointEntry>>> reservedCachesToPrint = new HashMap<>();
for (Map.Entry<Integer, T2<ReservationReason, Map<Integer, CheckpointEntry>>> entry : earliestValidCheckpoints.entrySet()) {
if (entry.getValue().get2() == null) {
notReservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>())
.add(entry.getKey());
}
else {
reservedCachesToPrint.computeIfAbsent(entry.getValue().get1(), reason -> new ArrayList<>())
.add(new T2(entry.getKey(), entry.getValue().get2().values().stream().min(
Comparator.comparingLong(CheckpointEntry::timestamp)).get()));
}
}
if (!F.isEmpty(notReservedCachesToPrint)) {
log.info("Cache groups were not reserved [" +
notReservedCachesToPrint.entrySet().stream()
.map(entry -> '[' +
entry.getValue().stream().map(grpId -> "[grpId=" + grpId +
", grpName=" + cctx.cache().cacheGroup(grpId).cacheOrGroupName() + ']')
.collect(Collectors.joining(", ")) +
", reason=" + entry.getKey() + ']')
.collect(Collectors.joining(", ")) + ']');
}
if (!F.isEmpty(reservedCachesToPrint)) {
log.info("Cache groups with earliest reserved checkpoint and a reason why a previous checkpoint was inapplicable: [" +
reservedCachesToPrint.entrySet().stream()
.map(entry -> '[' +
entry.getValue().stream().map(grpCp -> "[grpId=" + grpCp.get1() +
", grpName=" + cctx.cache().cacheGroup(grpCp.get1()).cacheOrGroupName() +
", cp=(" + grpCp.get2().checkpointId() + ", " + U.format(grpCp.get2().timestamp()) + ")]")
.collect(Collectors.joining(", ")) +
", reason=" + entry.getKey() + ']')
.collect(Collectors.joining(", ")) + ']');
}
}
catch (Exception e) {
log.error("An error happened during printing partitions that were reserved for potential historical rebalance.", e);
}
}
/**
* @return Map of group id -> Set of partitions which can be used as suppliers for WAL rebalance.
*/
private Map<Integer, Set<Integer>> partitionsApplicableForWalRebalance() {
Map<Integer, Set<Integer>> res = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
if (locPart.state() == OWNING && (preferWalRebalance() ||
locPart.fullSize() > historicalRebalanceThreshold.getOrDefault(walRebalanceThresholdLegacy)))
res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
}
}
return res;
}
/** {@inheritDoc} */
@Override public synchronized void releaseHistoryForExchange() {
if (reservedForExchange != null) {
cctx.wal().release(reservedForExchange);
reservedForExchange = null;
}
}
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(Map<T2<Integer, Integer>, Long> reservationMap) {
Map<GroupPartitionId, CheckpointEntry> entries = checkpointHistory().searchCheckpointEntry(reservationMap);
if (F.isEmpty(entries))
return false;
WALPointer oldestWALPointerToReserve = null;
for (CheckpointEntry cpE : entries.values()) {
WALPointer ptr = cpE.checkpointMark();
if (ptr == null)
return false;
if (oldestWALPointerToReserve == null || ptr.compareTo(oldestWALPointerToReserve) < 0)
oldestWALPointerToReserve = ptr;
}
if (cctx.wal().reserve(oldestWALPointerToReserve)) {
reservedForPreloading.set(oldestWALPointerToReserve);
return true;
}
else
return false;
}
/** {@inheritDoc} */
@Override public void releaseHistoryForPreloading() {
WALPointer prev = reservedForPreloading.getAndSet(null);
if (prev != null)
cctx.wal().release(prev);
}
/** {@inheritDoc} */
@Override public WALPointer latestWalPointerReservedForPreloading() {
return reservedForPreloading.get();
}
/**
*
*/
@Nullable @Override public IgniteInternalFuture wakeupForCheckpoint(String reason) {
CheckpointProgress progress = checkpointManager.forceCheckpoint(reason, null);
if (progress != null)
return progress.futureFor(LOCK_RELEASED);
return null;
}
/** {@inheritDoc} */
@Override public <R> void waitForCheckpoint(String reason, IgniteInClosure<? super IgniteInternalFuture<R>> lsnr)
throws IgniteCheckedException {
CheckpointProgress progress = checkpointManager.forceCheckpoint(reason, lsnr);
if (progress == null)
return;
progress.futureFor(FINISHED).get();
}
/** {@inheritDoc} */
@Override public CheckpointProgress forceCheckpoint(String reason) {
return checkpointManager.forceCheckpoint(reason, null);
}
/** {@inheritDoc} */
@Override public WALPointer lastCheckpointMarkWalPointer() {
CheckpointEntry lastCheckpointEntry = checkpointHistory() == null ? null : checkpointHistory().lastCheckpoint();
return lastCheckpointEntry == null ? null : lastCheckpointEntry.checkpointMark();
}
/**
* @return Checkpoint directory.
*/
public File checkpointDirectory() {
return checkpointManager.checkpointDirectory();
}
/**
* @param lsnr Listener.
* @param dataRegion Data region for which listener is corresponded to.
*/
public void addCheckpointListener(CheckpointListener lsnr, DataRegion dataRegion) {
checkpointManager.addCheckpointListener(lsnr, dataRegion);
}
/**
* @param lsnr Listener.
*/
public void addCheckpointListener(CheckpointListener lsnr) {
checkpointManager.addCheckpointListener(lsnr, null);
}
/**
* @param lsnr Listener.
*/
public void removeCheckpointListener(CheckpointListener lsnr) {
checkpointManager.removeCheckpointListener(lsnr);
}
/**
* @return Read checkpoint status.
* @throws IgniteCheckedException If failed to read checkpoint status page.
*/
@SuppressWarnings("TooBroadScope")
private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
return checkpointManager.readCheckpointStatus();
}
/** {@inheritDoc} */
@Override public void startMemoryRestore(GridKernalContext kctx, TimeBag startTimer) throws IgniteCheckedException {
if (kctx.clientNode())
return;
MaintenanceRegistry mntcRegistry = kctx.maintenanceRegistry();
MaintenanceTask mntcTask = mntcRegistry.activeMaintenanceTask(CORRUPTED_DATA_FILES_MNTC_TASK_NAME);
if (mntcTask != null) {
log.warning("Maintenance task found, stop restoring memory");
File workDir = ((FilePageStoreManager) cctx.pageStore()).workDir();
mntcRegistry.registerWorkflowCallback(CORRUPTED_DATA_FILES_MNTC_TASK_NAME,
new CorruptedPdsMaintenanceCallback(workDir,
Arrays.asList(mntcTask.parameters().split(Pattern.quote(File.separator))))
);
return;
}
checkpointReadLock();
RestoreLogicalState logicalState;
try {
// Preform early regions startup before restoring state.
initAndStartRegions(kctx.config().getDataStorageConfiguration());
startTimer.finishGlobalStage("Init and start regions");
// Restore binary memory for all not WAL disabled cache groups.
restoreBinaryMemory(
groupsWithEnabledWal(),
physicalRecords()
);
if (recoveryVerboseLogging && log.isInfoEnabled()) {
log.info("Partition states information after BINARY RECOVERY phase:");
dumpPartitionsInfo(cctx, log);
}
startTimer.finishGlobalStage("Restore binary memory");
CheckpointStatus status = readCheckpointStatus();
logicalState = applyLogicalUpdates(
status,
groupsWithEnabledWal(),
logicalRecords(),
false
);
cctx.tm().clearUncommitedStates();
if (recoveryVerboseLogging && log.isInfoEnabled()) {
log.info("Partition states information after LOGICAL RECOVERY phase:");
dumpPartitionsInfo(cctx, log);
}
startTimer.finishGlobalStage("Restore logical state");
}
catch (IgniteCheckedException e) {
releaseFileLock();
throw e;
}
finally {
checkpointReadUnlock();
}
walTail = tailPointer(logicalState);
cctx.wal().onDeActivate(kctx);
}
/** */
public void resumeWalLogging() throws IgniteCheckedException {
cctx.wal().resumeLogging(walTail);
}
/** */
public void preserveWalTailPointer() throws IgniteCheckedException {
walTail = cctx.wal().flush(null, true);
}
/**
* @param grpId Cache group id.
* @param partId Partition ID.
* @return Page store.
* @throws IgniteCheckedException If failed.
*/
public PageStore getPageStore(int grpId, int partId) throws IgniteCheckedException {
return storeMgr.getStore(grpId, partId);
}
/**
* @param gctx Group context.
* @param f Consumer.
* @return Accumulated result for all page stores.
*/
public long forGroupPageStores(CacheGroupContext gctx, ToLongFunction<PageStore> f) {
int groupId = gctx.groupId();
long res = 0;
try {
Collection<PageStore> stores = storeMgr.getStores(groupId);
if (stores != null) {
for (PageStore store : stores)
res += f.applyAsLong(store);
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
return res;
}
/**
* Calculates tail pointer for WAL at the end of logical recovery.
*
* @param logicalState State after logical recovery.
* @return Tail pointer.
* @throws IgniteCheckedException If failed.
*/
private WALPointer tailPointer(RestoreLogicalState logicalState) throws IgniteCheckedException {
// Should flush all data in buffers before read last WAL pointer.
// Iterator read records only from files.
WALPointer lastFlushPtr = cctx.wal().flush(null, true);
// We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging
// can't write header without that condition.
WALPointer lastReadPtr = logicalState.lastReadRecordPointer();
if (lastFlushPtr != null && lastReadPtr == null)
return lastFlushPtr;
if (lastFlushPtr == null && lastReadPtr != null)
return lastReadPtr;
if (lastFlushPtr != null && lastReadPtr != null)
return lastReadPtr.compareTo(lastFlushPtr) >= 0 ? lastReadPtr : lastFlushPtr;
return null;
}
/**
* Called when all partitions have been fully restored and pre-created on node start.
*
* Starts checkpointing process and initiates first checkpoint.
*
* @throws IgniteCheckedException If first checkpoint has failed.
*/
@Override public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException {
checkpointManager.start();
CheckpointProgress chp = checkpointManager.forceCheckpoint("node started", null);
if (chp != null)
chp.futureFor(LOCK_RELEASED).get();
}
/**
* @param status Checkpoint status.
* @param cacheGroupsPredicate Cache groups to restore.
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
private RestoreBinaryState performBinaryMemoryRestore(
CheckpointStatus status,
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate,
boolean finalizeState
) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
WALPointer recPtr = status.endPtr;
boolean apply = status.needRestoreMemory();
try {
WALRecord startRec = !CheckpointStatus.NULL_PTR.equals(status.startPtr) || apply ? cctx.wal().read(status.startPtr) : null;
if (apply) {
if (finalizeState)
U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
"finish checkpoint on node start.");
cctx.cache().cacheGroupDescriptors().forEach((grpId, desc) -> {
if (!cacheGroupsPredicate.apply(grpId))
return;
try {
DataRegion region = cctx.database().dataRegion(desc.config().getDataRegionName());
if (region == null || !cctx.isLazyMemoryAllocation(region))
return;
region.pageMemory().start();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
cctx.pageStore().beginRecover();
if (!(startRec instanceof CheckpointRecord))
throw new StorageException("Checkpoint marker doesn't point to checkpoint record " +
"[ptr=" + status.startPtr + ", rec=" + startRec + "]");
WALPointer cpMark = ((CheckpointRecord)startRec).checkpointMark();
if (cpMark != null) {
if (log.isInfoEnabled())
log.info("Restoring checkpoint after logical recovery, will start physical recovery from " +
"back pointer: " + cpMark);
recPtr = cpMark;
}
}
else
cctx.wal().notchLastCheckpointPtr(status.startPtr);
}
catch (NoSuchElementException e) {
throw new StorageException("Failed to read checkpoint record from WAL, persistence consistency " +
"cannot be guaranteed. Make sure configuration points to correct WAL folders and WAL folder is " +
"properly mounted [ptr=" + status.startPtr + ", walPath=" + persistenceCfg.getWalPath() +
", walArchive=" + persistenceCfg.getWalArchivePath() + "]");
}
AtomicReference<Throwable> applyError = new AtomicReference<>();
StripedExecutor exec = cctx.kernalContext().getStripedExecutorService();
Semaphore semaphore = new Semaphore(semaphorePertmits(exec));
long start = U.currentTimeMillis();
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
WALIterator it = cctx.wal().replay(recPtr, recordTypePredicate);
RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, it, lastArchivedSegment, cacheGroupsPredicate);
AtomicLong applied = new AtomicLong();
try {
while (restoreBinaryState.hasNext()) {
if (applyError.get() != null)
break;
WALRecord rec = restoreBinaryState.next();
if (rec == null)
break;
switch (rec.type()) {
case PAGE_RECORD:
if (restoreBinaryState.needApplyBinaryUpdate()) {
PageSnapshot pageSnapshot = (PageSnapshot)rec;
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
int groupId = pageSnapshot.fullPageId().groupId();
int partId = partId(pageSnapshot.fullPageId().pageId());
if (skipRemovedIndexUpdates(groupId, partId))
break;
stripedApplyPage((pageMem) -> {
try {
applyPageSnapshot(pageMem, pageSnapshot);
applied.incrementAndGet();
}
catch (Throwable t) {
U.error(log, "Failed to apply page snapshot. rec=[" + pageSnapshot + ']');
applyError.compareAndSet(
null,
(t instanceof IgniteCheckedException) ?
(IgniteCheckedException)t :
new IgniteCheckedException("Failed to apply page snapshot", t));
}
}, groupId, partId, exec, semaphore
);
}
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
{
int groupId = metaStateRecord.groupId();
int partId = metaStateRecord.partitionId();
stripedApplyPage((pageMem) -> {
GridDhtPartitionState state = fromOrdinal(metaStateRecord.state());
if (state == null || state == GridDhtPartitionState.EVICTED)
schedulePartitionDestroy(groupId, partId);
else {
try {
cancelOrWaitPartitionDestroy(groupId, partId);
}
catch (Throwable t) {
U.error(log, "Failed to cancel or wait partition destroy. rec=[" + metaStateRecord + ']');
applyError.compareAndSet(
null,
(t instanceof IgniteCheckedException) ?
(IgniteCheckedException)t :
new IgniteCheckedException("Failed to cancel or wait partition destroy", t));
}
}
}, groupId, partId, exec, semaphore);
}
break;
case PARTITION_DESTROY:
PartitionDestroyRecord destroyRecord = (PartitionDestroyRecord)rec;
{
int groupId = destroyRecord.groupId();
int partId = destroyRecord.partitionId();
stripedApplyPage((pageMem) -> {
pageMem.invalidate(groupId, partId);
schedulePartitionDestroy(groupId, partId);
}, groupId, partId, exec, semaphore);
}
break;
default:
if (restoreBinaryState.needApplyBinaryUpdate() && rec instanceof PageDeltaRecord) {
PageDeltaRecord pageDelta = (PageDeltaRecord)rec;
int groupId = pageDelta.groupId();
int partId = partId(pageDelta.pageId());
if (skipRemovedIndexUpdates(groupId, partId))
break;
stripedApplyPage((pageMem) -> {
try {
applyPageDelta(pageMem, pageDelta, true);
applied.incrementAndGet();
}
catch (Throwable t) {
U.error(log, "Failed to apply page delta. rec=[" + pageDelta + ']');
applyError.compareAndSet(
null,
(t instanceof IgniteCheckedException) ?
(IgniteCheckedException)t :
new IgniteCheckedException("Failed to apply page delta", t));
}
}, groupId, partId, exec, semaphore);
}
}
}
}
finally {
it.close();
awaitApplyComplete(exec, applyError);
}
if (!finalizeState)
return null;
WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
if (status.needRestoreMemory()) {
if (restoreBinaryState.needApplyBinaryUpdate())
throw new StorageException("Failed to restore memory state (checkpoint marker is present " +
"on disk, but checkpoint record is missed in WAL) " +
"[cpStatus=" + status + ", lastRead=" + lastReadPtr + "]");
if (log.isInfoEnabled())
log.info("Finished applying memory changes [changesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr, exec);
}
return restoreBinaryState;
}
/**
* Calculate the maximum number of concurrent tasks for apply through the striped executor.
*
* @param exec Striped executor.
* @return Number of permits.
*/
private int semaphorePertmits(StripedExecutor exec) {
// 4 task per-stripe by default.
int permits = exec.stripesCount() * 4;
long maxMemory = Runtime.getRuntime().maxMemory();
// Heuristic calculation part of heap size as a maximum number of concurrent tasks.
int permits0 = (int)((maxMemory * 0.2) / (4096 * 2));
// May be for small heap. Get a low number of permits.
if (permits0 < permits)
permits = permits0;
// Property for override any calculation.
return getInteger(IGNITE_RECOVERY_SEMAPHORE_PERMITS, permits);
}
/**
* @param exec Striped executor.
* @param applyError Check error reference.
*/
private void awaitApplyComplete(
StripedExecutor exec,
AtomicReference<Throwable> applyError
) throws IgniteCheckedException {
try {
// Await completion apply tasks in all stripes.
exec.awaitComplete();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
// Checking error after all task applied.
Throwable error = applyError.get();
if (error != null)
throw error instanceof IgniteCheckedException
? (IgniteCheckedException)error : new IgniteCheckedException(error);
}
/**
* @param consumer Runnable task.
* @param grpId Group Id.
* @param partId Partition Id.
* @param exec Striped executor.
*/
public void stripedApplyPage(
Consumer<PageMemoryEx> consumer,
int grpId,
int partId,
StripedExecutor exec,
Semaphore semaphore
) throws IgniteCheckedException {
assert consumer != null;
assert exec != null;
assert semaphore != null;
PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
if (pageMem == null)
return;
stripedApply(() -> consumer.accept(pageMem), grpId, partId, exec, semaphore);
}
/**
* @param run Runnable task.
* @param grpId Group Id.
* @param partId Partition Id.
* @param exec Striped executor.
*/
public void stripedApply(
Runnable run,
int grpId,
int partId,
StripedExecutor exec,
Semaphore semaphore
) {
assert run != null;
assert exec != null;
assert semaphore != null;
int stripes = exec.stripesCount();
int stripe = U.stripeIdx(stripes, grpId, partId);
assert stripe >= 0 && stripe <= stripes : "idx=" + stripe + ", stripes=" + stripes;
try {
semaphore.acquire();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
exec.execute(stripe, () -> {
// WA for avoid assert check in PageMemory, that current thread hold chpLock.
CHECKPOINT_LOCK_HOLD_COUNT.set(1);
try {
run.run();
}
finally {
CHECKPOINT_LOCK_HOLD_COUNT.set(0);
semaphore.release();
}
});
}
/**
* @param pageMem Page memory.
* @param pageSnapshotRecord Page snapshot record.
* @throws IgniteCheckedException If failed.
*/
public void applyPageSnapshot(PageMemoryEx pageMem, PageSnapshot pageSnapshotRecord) throws IgniteCheckedException {
int grpId = pageSnapshotRecord.fullPageId().groupId();
long pageId = pageSnapshotRecord.fullPageId().pageId();
long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page, true);
try {
PageUtils.putBytes(pageAddr, 0, pageSnapshotRecord.pageData());
if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) {
int realPageSize = pageMem.realPageSize(pageSnapshotRecord.groupId());
assert pageSnapshotRecord.pageDataSize() <= realPageSize : pageSnapshotRecord.pageDataSize();
cctx.kernalContext().compress().decompressPage(pageMem.pageBuffer(pageAddr), realPageSize);
}
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
}
/**
* @param pageMem Page memory.
* @param pageDeltaRecord Page delta record.
* @param restore Get page for restore.
* @throws IgniteCheckedException If failed.
*/
private void applyPageDelta(PageMemoryEx pageMem, PageDeltaRecord pageDeltaRecord, boolean restore) throws IgniteCheckedException {
int grpId = pageDeltaRecord.groupId();
long pageId = pageDeltaRecord.pageId();
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, restore);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page, restore);
try {
pageDeltaRecord.applyDelta(pageMem, pageAddr);
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, restore);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
}
/**
* @param grpId Group id.
* @param partId Partition id.
*/
private boolean skipRemovedIndexUpdates(int grpId, int partId) {
return (partId == PageIdAllocator.INDEX_PARTITION) && !storeMgr.hasIndexStore(grpId);
}
/**
* Obtains PageMemory reference from cache descriptor instead of cache context.
*
* @param grpId Cache group id.
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
// TODO IGNITE-12722: Get rid of GridCacheDatabaseSharedManager#getPageMemoryForCacheGroup functionality.
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
return (PageMemoryEx)dataRegion(METASTORE_DATA_REGION_NAME).pageMemory();
// TODO IGNITE-7792 add generic mapping.
if (grpId == TxLog.TX_LOG_CACHE_ID)
return (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
// TODO IGNITE-5075: cache descriptor can be removed.
GridCacheSharedContext sharedCtx = context();
CacheGroupDescriptor desc = sharedCtx.cache().cacheGroupDescriptors().get(grpId);
if (desc == null)
return null;
String memPlcName = desc.config().getDataRegionName();
return (PageMemoryEx)sharedCtx.database().dataRegion(memPlcName).pageMemory();
}
/**
* Apply update from some iterator and with specific filters.
*
* @param it WalIterator.
* @param recPredicate Wal record filter.
* @param entryPredicate Entry filter.
*/
public void applyUpdatesOnRecovery(
@Nullable WALIterator it,
IgniteBiPredicate<WALPointer, WALRecord> recPredicate,
IgnitePredicate<DataEntry> entryPredicate
) throws IgniteCheckedException {
if (it == null)
return;
cctx.walState().runWithOutWAL(() -> {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> next = it.next();
WALRecord rec = next.get2();
if (!recPredicate.apply(next.get1(), rec))
break;
switch (rec.type()) {
case MVCC_DATA_RECORD:
case DATA_RECORD:
case DATA_RECORD_V2:
checkpointReadLock();
try {
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
if (entryPredicate.apply(dataEntry)) {
checkpointReadLock();
try {
int cacheId = dataEntry.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null)
applyUpdate(cacheCtx, dataEntry);
else if (log != null)
log.warning("Cache is not started. Updates cannot be applied " +
"[cacheId=" + cacheId + ']');
}
finally {
checkpointReadUnlock();
}
}
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
checkpointReadUnlock();
}
break;
case MVCC_TX_RECORD:
checkpointReadLock();
try {
MvccTxRecord txRecord = (MvccTxRecord)rec;
byte txState = convertToTxState(txRecord.state());
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, true);
}
finally {
checkpointReadUnlock();
}
break;
default:
// Skip other records.
}
}
});
}
/**
* @param status Last registered checkpoint status.
* @param restoreMeta Metastore restore phase if {@code true}.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
private RestoreLogicalState applyLogicalUpdates(
CheckpointStatus status,
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate,
boolean restoreMeta
) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost " + (restoreMeta ? "metastore" : "cache") + " updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
if (!restoreMeta)
cctx.kernalContext().query().skipFieldLookup(true);
long start = U.currentTimeMillis();
AtomicReference<Throwable> applyError = new AtomicReference<>();
AtomicLong applied = new AtomicLong();
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
StripedExecutor exec = cctx.kernalContext().getStripedExecutorService();
Semaphore semaphore = new Semaphore(semaphorePertmits(exec));
Map<GroupPartitionId, Integer> partitionRecoveryStates = new HashMap<>();
WALIterator it = cctx.wal().replay(status.startPtr, recordTypePredicate);
RestoreLogicalState restoreLogicalState =
new RestoreLogicalState(status, it, lastArchivedSegment, cacheGroupsPredicate, partitionRecoveryStates);
final IgniteTxManager txManager = cctx.tm();
try {
while (restoreLogicalState.hasNext()) {
WALRecord rec = restoreLogicalState.next();
if (rec == null)
break;
switch (rec.type()) {
case TX_RECORD:
if (restoreMeta) { // Also restore tx states.
TxRecord txRec = (TxRecord)rec;
txManager.collectTxStates(txRec);
}
break;
case CHECKPOINT_RECORD: // Calculate initial partition states
CheckpointRecord cpRec = (CheckpointRecord)rec;
for (Map.Entry<Integer, CacheState> entry : cpRec.cacheGroupStates().entrySet()) {
CacheState cacheState = entry.getValue();
for (int i = 0; i < cacheState.size(); i++) {
int partId = cacheState.partitionByIndex(i);
byte state = cacheState.stateByIndex(i);
// Ignore undefined state.
if (state != -1) {
partitionRecoveryStates.put(new GroupPartitionId(entry.getKey(), partId),
(int)state);
}
}
}
break;
case ROLLBACK_TX_RECORD:
RollbackRecord rbRec = (RollbackRecord)rec;
CacheGroupContext ctx = cctx.cache().cacheGroup(rbRec.groupId());
if (ctx != null && !ctx.isLocal()) {
ctx.topology().forceCreatePartition(rbRec.partitionId());
ctx.offheap().onPartitionInitialCounterUpdated(rbRec.partitionId(), rbRec.start(),
rbRec.range());
}
break;
case MVCC_DATA_RECORD:
case DATA_RECORD:
case DATA_RECORD_V2:
case ENCRYPTED_DATA_RECORD:
case ENCRYPTED_DATA_RECORD_V2:
case ENCRYPTED_DATA_RECORD_V3:
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
if (!restoreMeta && txManager.uncommitedTx(dataEntry))
continue;
int cacheId = dataEntry.cacheId();
DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheId);
// Can empty in case recovery node on blt changed.
if (cacheDesc == null)
continue;
stripedApply(() -> {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (skipRemovedIndexUpdates(cacheCtx.groupId(), PageIdAllocator.INDEX_PARTITION))
cctx.kernalContext().query().markAsRebuildNeeded(cacheCtx, true);
try {
applyUpdate(cacheCtx, dataEntry);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to apply data entry, dataEntry=" + dataEntry +
", ptr=" + dataRec.position());
applyError.compareAndSet(null, e);
}
applied.incrementAndGet();
}, cacheDesc.groupId(), dataEntry.partitionId(), exec, semaphore);
}
break;
case MVCC_TX_RECORD:
MvccTxRecord txRecord = (MvccTxRecord)rec;
byte txState = convertToTxState(txRecord.state());
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, true);
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
GroupPartitionId groupPartitionId = new GroupPartitionId(
metaStateRecord.groupId(), metaStateRecord.partitionId()
);
restoreLogicalState.partitionRecoveryStates.put(groupPartitionId, (int)metaStateRecord.state());
break;
case METASTORE_DATA_RECORD:
MetastoreDataRecord metastoreDataRecord = (MetastoreDataRecord)rec;
metaStorage.applyUpdate(metastoreDataRecord.key(), metastoreDataRecord.value());
break;
case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
PageDeltaRecord pageDelta = (PageDeltaRecord)rec;
stripedApplyPage((pageMem) -> {
try {
applyPageDelta(pageMem, pageDelta, false);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to apply page delta, " + pageDelta);
applyError.compareAndSet(null, e);
}
}, pageDelta.groupId(), partId(pageDelta.pageId()), exec, semaphore);
break;
case MASTER_KEY_CHANGE_RECORD_V2:
cctx.kernalContext().encryption().applyKeys((MasterKeyChangeRecordV2)rec);
break;
case REENCRYPTION_START_RECORD:
cctx.kernalContext().encryption().applyReencryptionStartRecord((ReencryptionStartRecord)rec);
break;
case INDEX_ROOT_PAGE_RENAME_RECORD:
IndexRenameRootPageRecord record = (IndexRenameRootPageRecord)rec;
int cacheId = record.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null) {
IgniteCacheOffheapManager offheap = cacheCtx.offheap();
for (int i = 0; i < record.segments(); i++)
offheap.renameRootPageForIndex(cacheId, record.oldTreeName(), record.newTreeName(), i);
}
break;
default:
// Skip other records.
}
}
}
finally {
it.close();
if (!restoreMeta)
cctx.kernalContext().query().skipFieldLookup(false);
}
awaitApplyComplete(exec, applyError);
if (log.isInfoEnabled())
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.afterLogicalUpdatesApplied(this, restoreLogicalState);
return restoreLogicalState;
}
/**
* Convert {@link TransactionState} to Mvcc {@link TxState}.
*
* @param state TransactionState.
* @return TxState.
*/
private byte convertToTxState(TransactionState state) {
switch (state) {
case PREPARED:
return TxState.PREPARED;
case COMMITTED:
return TxState.COMMITTED;
case ROLLED_BACK:
return TxState.ABORTED;
default:
throw new IllegalStateException("Unsupported TxState.");
}
}
/**
* Wal truncate callback.
*
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointManager.removeCheckpointsUntil(highBound);
}
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
* @throws IgniteCheckedException If failed to restore.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
int partId = dataEntry.partitionId();
if (partId == -1)
partId = cacheCtx.affinity().partition(dataEntry.key());
GridDhtLocalPartition locPart = cacheCtx.isLocal() ? null : cacheCtx.topology().forceCreatePartition(partId);
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
if (dataEntry instanceof MvccDataEntry) {
cacheCtx.offheap().mvccApplyUpdate(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
dataEntry.expireTime(),
locPart,
((MvccDataEntry)dataEntry).mvccVer());
}
else {
cacheCtx.offheap().update(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
dataEntry.expireTime(),
locPart,
null);
}
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter() - 1, 1);
break;
case DELETE:
if (dataEntry instanceof MvccDataEntry) {
cacheCtx.offheap().mvccApplyUpdate(
cacheCtx,
dataEntry.key(),
null,
dataEntry.writeVersion(),
0L,
locPart,
((MvccDataEntry)dataEntry).mvccVer());
}
else
cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter() - 1, 1);
break;
case READ:
// do nothing
break;
default:
throw new IgniteCheckedException("Invalid operation for WAL entry update: " + dataEntry.op());
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void finalizeCheckpointOnRecovery(
long cpTs,
UUID cpId,
WALPointer walPtr,
StripedExecutor exec
) throws IgniteCheckedException {
assert checkpointManager != null : "Checkpoint is null";
checkpointManager.finalizeCheckpointOnRecovery(cpTs, cpId, walPtr, exec);
cctx.pageStore().finishRecover();
}
/**
* Replace thread local with buffers. Thread local should provide direct buffer with one page in length.
*
* @param threadBuf new thread-local with buffers for the checkpoint threads.
*/
public void setThreadBuf(final ThreadLocal<ByteBuffer> threadBuf) {
assert checkpointManager != null : "Checkpointer is null";
checkpointManager.threadBuf(threadBuf);
}
/**
* @return Checkpoint history.
*/
@Nullable public CheckpointHistory checkpointHistory() {
if (checkpointManager == null)
return null;
return checkpointManager.checkpointHistory();
}
/**
* Adds given partition to checkpointer destroy queue.
*
* @param grpId Group ID.
* @param partId Partition ID.
*/
public void schedulePartitionDestroy(int grpId, int partId) {
checkpointManager.schedulePartitionDestroy(cctx.cache().cacheGroup(grpId), grpId, partId);
}
/**
* Cancels or wait for partition destroy.
*
* @param grpId Group ID.
* @param partId Partition ID.
* @throws IgniteCheckedException If failed.
* @return {@code True} if the request to destroy the partition was canceled.
*/
public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
return checkpointManager.cancelOrWaitPartitionDestroy(grpId, partId);
}
/**
* Timeout for checkpoint read lock acquisition.
*
* @return Timeout for checkpoint read lock acquisition in milliseconds.
*/
@Override public long checkpointReadLockTimeout() {
return checkpointManager.checkpointTimeoutLock().checkpointReadLockTimeout();
}
/**
* Sets timeout for checkpoint read lock acquisition.
*
* @param val New timeout in milliseconds, non-positive value denotes infinite timeout.
*/
@Override public void checkpointReadLockTimeout(long val) {
checkpointManager.checkpointTimeoutLock().checkpointReadLockTimeout(val);
}
/**
* @return Holder for page list cache limit for given data region.
*/
public AtomicLong pageListCacheLimitHolder(DataRegion dataRegion) {
if (dataRegion.config().isPersistenceEnabled()) {
return pageListCacheLimits.computeIfAbsent(dataRegion.config().getName(), name -> new AtomicLong(
(long)(((PageMemoryEx)dataRegion.pageMemory()).totalPages() * PAGE_LIST_CACHE_LIMIT_THRESHOLD)));
}
return null;
}
/**
* Node file lock holder.
*/
public static class NodeFileLockHolder extends FileLockHolder {
/** Kernal context to generate Id of locked node in file. */
@NotNull private final GridKernalContext ctx;
/**
* @param rootDir Root directory for lock file.
* @param ctx Kernal context.
* @param log Log.
*/
public NodeFileLockHolder(String rootDir, @NotNull GridKernalContext ctx, IgniteLogger log) {
super(rootDir, log);
this.ctx = ctx;
}
/** {@inheritDoc} */
@Override public String lockInfo() {
SB sb = new SB();
//write node id
sb.a("[").a(ctx.localNodeId().toString()).a("]");
//write ip addresses
final GridDiscoveryManager discovery = ctx.discovery();
if (discovery != null) { //discovery may be not up and running
final ClusterNode node = discovery.localNode();
if (node != null)
sb.a(node.addresses());
}
//write ports
sb.a("[");
Iterator<GridPortRecord> it = ctx.ports().records().iterator();
while (it.hasNext()) {
GridPortRecord rec = it.next();
sb.a(rec.protocol()).a(":").a(rec.port());
if (it.hasNext())
sb.a(", ");
}
sb.a("]");
return sb.toString();
}
/** {@inheritDoc} */
@Override protected String warningMessage(String lockInfo) {
return "Failed to acquire file lock. Will try again in 1s " +
"[nodeId=" + ctx.localNodeId() + ", holder=" + lockInfo +
", path=" + lockPath() + ']';
}
}
/** {@inheritDoc} */
@Override public DataStorageMetrics persistentStoreMetrics() {
return new DataStorageMetricsSnapshot(persStoreMetrics);
}
/**
*
*/
public DataStorageMetricsImpl persistentStoreMetricsImpl() {
return persStoreMetrics;
}
/** {@inheritDoc} */
@Override public MetaStorage metaStorage() {
return metaStorage;
}
/**
* @return Temporary metastorage to migration of index partition.
*/
public MetaStorage.TmpStorage temporaryMetaStorage() {
return tmpMetaStorage;
}
/**
* @param tmpMetaStorage Temporary metastorage to migration of index partition.
*/
public void temporaryMetaStorage(MetaStorage.TmpStorage tmpMetaStorage) {
this.tmpMetaStorage = tmpMetaStorage;
}
/** {@inheritDoc} */
@Override public void notifyMetaStorageSubscribersOnReadyForRead() throws IgniteCheckedException {
metastorageLifecycleLsnrs = cctx.kernalContext().internalSubscriptionProcessor().getMetastorageSubscribers();
readMetastore();
}
/** {@inheritDoc} */
@Override public boolean walEnabled(int grpId, boolean local) {
if (local)
return !initiallyLocWalDisabledGrps.contains(grpId);
else
return !initiallyGlobalWalDisabledGrps.contains(grpId);
}
/** {@inheritDoc} */
@Override public void walEnabled(int grpId, boolean enabled, boolean local) {
String key = walGroupIdToKey(grpId, local);
checkpointReadLock();
try {
if (enabled)
metaStorage.remove(key);
else {
metaStorage.write(key, true);
lastCheckpointInapplicableForWalRebalance(grpId);
}
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to write cache group WAL state [grpId=" + grpId +
", enabled=" + enabled + ']', e);
}
finally {
checkpointReadUnlock();
}
}
/**
* Checks that checkpoint with timestamp {@code cpTs} is inapplicable as start point for WAL rebalance for given group {@code grpId}.
*
* @param cpTs Checkpoint timestamp.
* @param grpId Group ID.
* @return {@code true} if checkpoint {@code cpTs} is inapplicable as start point for WAL rebalance for {@code grpId}.
* @throws IgniteCheckedException If failed to check.
*/
public boolean isCheckpointInapplicableForWalRebalance(Long cpTs, int grpId) throws IgniteCheckedException {
return metaStorage.read(checkpointInapplicableCpAndGroupIdToKey(cpTs, grpId)) != null;
}
/**
* Set last checkpoint as inapplicable for WAL rebalance for given group {@code grpId}.
*
* @param grpId Group ID.
*/
@Override public void lastCheckpointInapplicableForWalRebalance(int grpId) {
checkpointReadLock();
try {
CheckpointEntry lastCp = checkpointHistory().lastCheckpointMarkingAsInapplicable(grpId);
long lastCpTs = lastCp != null ? lastCp.timestamp() : 0;
if (lastCpTs != 0)
metaStorage.write(checkpointInapplicableCpAndGroupIdToKey(lastCpTs, grpId), true);
}
catch (IgniteCheckedException e) {
log.error("Failed to mark last checkpoint as inapplicable for WAL rebalance for group: " + grpId, e);
}
finally {
checkpointReadUnlock();
}
}
/**
*
*/
private void fillWalDisabledGroups() {
assert metaStorage != null;
try {
metaStorage.iterate(WAL_KEY_PREFIX, (key, val) -> {
T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key);
if (t2 != null) {
if (t2.get2())
initiallyLocWalDisabledGrps.add(t2.get1());
else
initiallyGlobalWalDisabledGrps.add(t2.get1());
}
}, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to read cache groups WAL state.", e);
}
}
/**
* Convert cache group ID to WAL state key.
*
* @param grpId Group ID.
* @return Key.
*/
private static String walGroupIdToKey(int grpId, boolean local) {
if (local)
return WAL_LOCAL_KEY_PREFIX + grpId;
else
return WAL_GLOBAL_KEY_PREFIX + grpId;
}
/**
* Convert checkpoint timestamp and cache group ID to key for {@link #CHECKPOINT_INAPPLICABLE_FOR_REBALANCE} metastorage records.
*
* @param cpTs Checkpoint timestamp.
* @param grpId Group ID.
* @return Key.
*/
private static String checkpointInapplicableCpAndGroupIdToKey(long cpTs, int grpId) {
return CHECKPOINT_INAPPLICABLE_FOR_REBALANCE + cpTs + "-" + grpId;
}
/**
* Convert WAL state key to cache group ID.
*
* @param key Key.
* @return Group ID.
*/
private static T2<Integer, Boolean> walKeyToGroupIdAndLocalFlag(String key) {
if (key.startsWith(WAL_LOCAL_KEY_PREFIX))
return new T2<>(Integer.parseInt(key.substring(WAL_LOCAL_KEY_PREFIX.length())), true);
else if (key.startsWith(WAL_GLOBAL_KEY_PREFIX))
return new T2<>(Integer.parseInt(key.substring(WAL_GLOBAL_KEY_PREFIX.length())), false);
else
return null;
}
/**
* Method dumps partitions info see {@link #dumpPartitionsInfo(CacheGroupContext, IgniteLogger)}
* for all persistent cache groups.
*
* @param cctx Shared context.
* @param log Logger.
* @throws IgniteCheckedException If failed.
*/
private static void dumpPartitionsInfo(GridCacheSharedContext cctx,
IgniteLogger log) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.persistenceEnabled())
continue;
dumpPartitionsInfo(grp, log);
}
}
/**
* Retrieves from page memory meta information about given {@code grp} group partitions
* and dumps this information to log INFO level.
*
* @param grp Cache group.
* @param log Logger.
* @throws IgniteCheckedException If failed.
*/
private static void dumpPartitionsInfo(CacheGroupContext grp, IgniteLogger log) throws IgniteCheckedException {
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
IgnitePageStoreManager pageStore = grp.shared().pageStore();
assert pageStore != null : "Persistent cache should have initialize page store manager.";
for (int p = 0; p < grp.affinity().partitions(); p++) {
GridDhtLocalPartition part = grp.topology().localPartition(p);
if (part != null) {
if (log.isInfoEnabled())
log.info("Partition [grp=" + grp.cacheOrGroupName()
+ ", id=" + p
+ ", state=" + part.state()
+ ", counter=" + part.dataStore().partUpdateCounter()
+ ", size=" + part.fullSize() + "]");
continue;
}
if (!pageStore.exists(grp.groupId(), p))
continue;
pageStore.ensure(grp.groupId(), p);
if (pageStore.pages(grp.groupId(), p) <= 1) {
if (log.isInfoEnabled())
log.info("Partition [grp=" + grp.cacheOrGroupName() + ", id=" + p + ", state=N/A (only file header) ]");
continue;
}
long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
try {
long pageAddr = pageMem.readLock(grp.groupId(), partMetaId, partMetaPage);
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr));
String state = partState != null ? partState.toString() : "N/A";
long updateCntr = io.getUpdateCounter(pageAddr);
long size = io.getSize(pageAddr);
if (log.isInfoEnabled())
log.info("Partition [grp=" + grp.cacheOrGroupName()
+ ", id=" + p
+ ", state=" + state
+ ", counter=" + updateCntr
+ ", size=" + size + "]");
}
finally {
pageMem.readUnlock(grp.groupId(), partMetaId, partMetaPage);
}
}
finally {
pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
}
}
}
/**
* Recovery lifecycle for read-write metastorage.
*/
private class MetastorageRecoveryLifecycle implements DatabaseLifecycleListener {
/** {@inheritDoc} */
@Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
cctx.pageStore().initializeForMetastorage();
}
/** {@inheritDoc} */
@Override public void afterBinaryMemoryRestore(
IgniteCacheDatabaseSharedManager mgr,
RestoreBinaryState restoreState
) throws IgniteCheckedException {
assert metaStorage == null;
metaStorage = createMetastorage(false);
}
}
/**
* @return Cache group predicate that passes only Metastorage cache group id.
*/
private IgnitePredicate<Integer> onlyMetastorageGroup() {
return groupId -> MetaStorage.METASTORAGE_CACHE_ID == groupId;
}
/**
* @return Cache group predicate that passes only cache groups with enabled WAL.
*/
private IgnitePredicate<Integer> groupsWithEnabledWal() {
return groupId -> !initiallyGlobalWalDisabledGrps.contains(groupId)
&& !initiallyLocWalDisabledGrps.contains(groupId);
}
/**
* @return WAL records predicate that passes only Metastorage and encryption data records.
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> onlyMetastorageAndEncryptionRecords() {
return (type, ptr) -> type == METASTORE_DATA_RECORD || type == TX_RECORD ||
type == MASTER_KEY_CHANGE_RECORD || type == MASTER_KEY_CHANGE_RECORD_V2;
}
/**
* @return WAL records predicate that passes only physical and mixed WAL records.
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> physicalRecords() {
return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.PHYSICAL
|| type.purpose() == WALRecord.RecordPurpose.MIXED;
}
/**
* @return WAL records predicate that passes only logical and mixed WAL records +
* CP record (used for restoring initial partition states).
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> logicalRecords() {
return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.LOGICAL
|| type.purpose() == WALRecord.RecordPurpose.MIXED || type == CHECKPOINT_RECORD;
}
/**
* Abstract class to create restore context.
*/
private abstract class RestoreStateContext {
/** Last archived segment. */
protected final long lastArchivedSegment;
/** Checkpoint status. */
protected final CheckpointStatus status;
/** WAL iterator. */
private final WALIterator iterator;
/** Only {@link WalRecordCacheGroupAware} records satisfied this predicate will be applied. */
private final IgnitePredicate<Integer> cacheGroupPredicate;
/**
* @param status Checkpoint status.
* @param iterator WAL iterator.
* @param lastArchivedSegment Last archived segment index.
* @param cacheGroupPredicate Cache groups predicate.
*/
protected RestoreStateContext(
CheckpointStatus status,
WALIterator iterator,
long lastArchivedSegment,
IgnitePredicate<Integer> cacheGroupPredicate
) {
this.status = status;
this.iterator = iterator;
this.lastArchivedSegment = lastArchivedSegment;
this.cacheGroupPredicate = cacheGroupPredicate;
}
/**
* Advance iterator to the next record.
*
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
@Nullable public WALRecord next() throws IgniteCheckedException {
try {
for (; ; ) {
if (!iterator.hasNextX())
return null;
IgniteBiTuple<WALPointer, WALRecord> tup = iterator.nextX();
if (tup == null)
return null;
WALRecord rec = tup.get2();
WALPointer ptr = tup.get1();
rec.position(ptr);
// Filter out records by group id.
if (rec instanceof WalRecordCacheGroupAware) {
WalRecordCacheGroupAware grpAwareRecord = (WalRecordCacheGroupAware)rec;
if (!cacheGroupPredicate.apply(grpAwareRecord.groupId()))
continue;
}
// Filter out data entries by group id.
if (rec instanceof DataRecord)
rec = filterEntriesByGroupId((DataRecord)rec);
return rec;
}
}
catch (IgniteCheckedException e) {
IgniteCheckedException ex = throwsError(e);
if (ex != null)
throw ex;
else
return null;
}
}
/**
* Filter outs data entries from given data record that not satisfy {@link #cacheGroupPredicate}.
*
* @param record Original data record.
* @return Data record with filtered data entries.
*/
private DataRecord filterEntriesByGroupId(DataRecord record) {
List<DataEntry> filteredEntries = record.writeEntries().stream()
.filter(entry -> {
int cacheId = entry.cacheId();
return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.apply(cctx.cacheContext(cacheId).groupId());
})
.collect(toList());
return record.setWriteEntries(filteredEntries);
}
/**
*
* @return Last read WAL record pointer.
*/
public WALPointer lastReadRecordPointer() {
assert status.startPtr != null;
return iterator.lastRead()
.orElseGet(() -> status.startPtr);
}
/**
*
* @return Flag indicates need throws CRC exception or not.
*/
public boolean throwsCRCError() {
return lastReadRecordPointer().index() <= lastArchivedSegment;
}
/**
* Checks for more WALRecord entries.
*
* @return {@code True} if contains more WALRecord entries.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
public boolean hasNext() throws IgniteCheckedException {
try {
return iterator.hasNextX();
}
catch (IgniteCheckedException e) {
IgniteCheckedException ex = throwsError(e);
if (ex != null)
throw ex;
else
return false;
}
}
/**
* Checks the need to throw an exception.
*
* @param e Thrown exception.
* @return Exception to be thrown.
*/
@Nullable private IgniteCheckedException throwsError(IgniteCheckedException e) {
boolean throwsCRCError = throwsCRCError();
if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
return throwsCRCError ? e : null;
else {
log.error("There is an error during restore state [throwsCRCError=" + throwsCRCError + ']', e);
return e;
}
}
}
/**
* Restore memory context. Tracks the safety of binary recovery.
*/
public class RestoreBinaryState extends RestoreStateContext {
/** The flag indicates need to apply the binary update or no needed. */
private boolean needApplyBinaryUpdates;
/**
* @param status Checkpoint status.
* @param iterator WAL iterator.
* @param lastArchivedSegment Last archived segment index.
* @param cacheGroupsPredicate Cache groups predicate.
*/
public RestoreBinaryState(
CheckpointStatus status,
WALIterator iterator,
long lastArchivedSegment,
IgnitePredicate<Integer> cacheGroupsPredicate
) {
super(status, iterator, lastArchivedSegment, cacheGroupsPredicate);
this.needApplyBinaryUpdates = status.needRestoreMemory();
}
/**
* Advance iterator to the next record.
*
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
@Override @Nullable public WALRecord next() throws IgniteCheckedException {
WALRecord rec = super.next();
if (rec == null)
return null;
if (rec.type() == CHECKPOINT_RECORD) {
CheckpointRecord cpRec = (CheckpointRecord)rec;
// We roll memory up until we find a checkpoint start record registered in the status.
if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() +
", pos=" + rec.position() + ']');
needApplyBinaryUpdates = false;
}
else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() +
", expCpId=" + status.cpStartId + ", pos=" + rec.position() + ']');
}
return rec;
}
/**
*
* @return Flag indicates need apply binary record or not.
*/
public boolean needApplyBinaryUpdate() {
return needApplyBinaryUpdates;
}
/**
*
* @return Flag indicates need throws CRC exception or not.
*/
@Override public boolean throwsCRCError() {
log.info("Throws CRC error check [needApplyBinaryUpdates=" + needApplyBinaryUpdates +
", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" + lastReadRecordPointer() + ']');
if (needApplyBinaryUpdates)
return true;
return super.throwsCRCError();
}
}
/**
* Restore logical state context. Tracks the safety of logical recovery.
*/
public class RestoreLogicalState extends RestoreStateContext {
/** States of partitions recovered during applying logical updates. */
private final Map<GroupPartitionId, Integer> partitionRecoveryStates;
/**
* @param lastArchivedSegment Last archived segment index.
* @param partitionRecoveryStates Initial partition recovery states.
*/
public RestoreLogicalState(CheckpointStatus status, WALIterator iterator, long lastArchivedSegment,
IgnitePredicate<Integer> cacheGroupsPredicate, Map<GroupPartitionId, Integer> partitionRecoveryStates) {
super(status, iterator, lastArchivedSegment, cacheGroupsPredicate);
this.partitionRecoveryStates = partitionRecoveryStates;
}
/**
* @return Map of restored partition states for cache groups.
*/
public Map<GroupPartitionId, Integer> partitionRecoveryStates() {
return Collections.unmodifiableMap(partitionRecoveryStates);
}
}
/**
* Registers {@link #historicalRebalanceThreshold} property in distributed metastore.
*/
private void initWalRebalanceThreshold() {
cctx.kernalContext().internalSubscriptionProcessor().registerDistributedConfigurationListener(
new DistributedConfigurationLifecycleListener() {
/** {@inheritDoc} */
@Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
String logMsgFmt = "Historical rebalance WAL threshold changed [property=%s, oldVal=%s, newVal=%s]";
historicalRebalanceThreshold.addListener(makeUpdateListener(logMsgFmt, log));
dispatcher.registerProperties(historicalRebalanceThreshold);
}
/** {@inheritDoc} */
@Override public void onReadyToWrite() {
setDefaultValue(historicalRebalanceThreshold, walRebalanceThresholdLegacy, log);
}
}
);
}
}