blob: e64a53f4779742fdd90154c211efeb84cb8ce564 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.Checkpoint;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
/**
* Checkpoint history. Holds chronological ordered map with {@link CheckpointEntry CheckpointEntries}.
* Data is loaded from corresponding checkpoint directory.
* This directory holds files for checkpoint start and end.
*/
public class CheckpointHistory {
/** Logger. */
private final IgniteLogger log;
/** Cache shared context. */
private final GridCacheSharedContext<?, ?> cctx;
/**
* Maps checkpoint's timestamp (from CP file name) to CP entry.
* Using TS provides historical order of CP entries in map ( first is oldest )
*/
private final NavigableMap<Long, CheckpointEntry> histMap = new ConcurrentSkipListMap<>();
/** The maximal number of checkpoints hold in memory. */
private final int maxCpHistMemSize;
/** If WalHistorySize was setted by user will use old way for removing checkpoints. */
private final boolean isWalHistorySizeParameterEnabled;
/**
* Constructor.
*
* @param ctx Context.
*/
public CheckpointHistory(GridKernalContext ctx) {
cctx = ctx.cache().context();
log = ctx.log(getClass());
DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration();
maxCpHistMemSize = Math.min(dsCfg.getWalHistorySize(),
IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, 100));
isWalHistorySizeParameterEnabled = dsCfg.isWalHistorySizeParameterUsed();
}
/**
* @param checkpoints Checkpoints.
*/
public void initialize(List<CheckpointEntry> checkpoints) {
for (CheckpointEntry e : checkpoints)
histMap.put(e.timestamp(), e);
}
/**
* @param cpTs Checkpoint timestamp.
* @return Initialized entry.
* @throws IgniteCheckedException If failed to initialize entry.
*/
private CheckpointEntry entry(Long cpTs) throws IgniteCheckedException {
CheckpointEntry entry = histMap.get(cpTs);
if (entry == null)
throw new IgniteCheckedException("Checkpoint entry was removed: " + cpTs);
return entry;
}
/**
* @return First checkpoint entry if exists. Otherwise {@code null}.
*/
public CheckpointEntry firstCheckpoint() {
Map.Entry<Long,CheckpointEntry> entry = histMap.firstEntry();
return entry != null ? entry.getValue() : null;
}
/**
* @return Last checkpoint entry if exists. Otherwise {@code null}.
*/
public CheckpointEntry lastCheckpoint() {
Map.Entry<Long,CheckpointEntry> entry = histMap.lastEntry();
return entry != null ? entry.getValue() : null;
}
/**
* @return First checkpoint WAL pointer if exists. Otherwise {@code null}.
*/
public WALPointer firstCheckpointPointer() {
CheckpointEntry entry = firstCheckpoint();
return entry != null ? entry.checkpointMark() : null;
}
/**
* @return Collection of checkpoint timestamps.
*/
public Collection<Long> checkpoints(boolean descending) {
if (descending)
return histMap.descendingKeySet();
return histMap.keySet();
}
/**
*
*/
public Collection<Long> checkpoints() {
return checkpoints(false);
}
/**
* Adds checkpoint entry after the corresponding WAL record has been written to WAL. The checkpoint itself
* is not finished yet.
*
* @param entry Entry to add.
*/
public void addCheckpoint(CheckpointEntry entry) {
histMap.put(entry.timestamp(), entry);
}
/**
* @return {@code true} if there is space for next checkpoint.
*/
public boolean hasSpace() {
return histMap.size() + 1 <= maxCpHistMemSize;
}
/**
* Clears checkpoint history after WAL truncation.
*
* @return List of checkpoint entries removed from history.
*/
public List<CheckpointEntry> onWalTruncated(WALPointer ptr) {
List<CheckpointEntry> removed = new ArrayList<>();
FileWALPointer highBound = (FileWALPointer)ptr;
for (CheckpointEntry cpEntry : histMap.values()) {
FileWALPointer cpPnt = (FileWALPointer)cpEntry.checkpointMark();
if (highBound.compareTo(cpPnt) <= 0)
break;
if (cctx.wal().reserved(cpEntry.checkpointMark())) {
U.warn(log, "Could not clear historyMap due to WAL reservation on cp: " + cpEntry +
", history map size is " + histMap.size());
break;
}
histMap.remove(cpEntry.timestamp());
removed.add(cpEntry);
}
return removed;
}
/**
* Logs and clears checkpoint history after checkpoint finish.
*
* @return List of checkpoints removed from history.
*/
public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp, boolean truncateWal) {
chp.walSegsCoveredRange(calculateWalSegmentsCovered());
WALPointer checkpointMarkUntilDel = isWalHistorySizeParameterEnabled //check for compatibility mode.
? checkpointMarkUntilDeleteByMemorySize()
: newerPointer(checkpointMarkUntilDeleteByMemorySize(), checkpointMarkUntilDeleteByArchiveSize());
if (checkpointMarkUntilDel == null)
return Collections.emptyList();
List<CheckpointEntry> deletedCheckpoints = onWalTruncated(checkpointMarkUntilDel);
int deleted = 0;
if (truncateWal)
deleted += cctx.wal().truncate(null, firstCheckpointPointer());
chp.walFilesDeleted(deleted);
return deletedCheckpoints;
}
/**
* @param firstPointer One of pointers to choose the newest.
* @param secondPointer One of pointers to choose the newest.
* @return The newest pointer from input ones.
*/
private FileWALPointer newerPointer(WALPointer firstPointer, WALPointer secondPointer) {
FileWALPointer first = (FileWALPointer)firstPointer;
FileWALPointer second = (FileWALPointer)secondPointer;
if (firstPointer == null)
return second;
if (secondPointer == null)
return first;
return first.index() > second.index() ? first : second;
}
/**
* Calculate mark until delete by maximum checkpoint history memory size.
*
* @return Checkpoint mark until which checkpoints can be deleted(not including this pointer).
*/
private WALPointer checkpointMarkUntilDeleteByMemorySize() {
if (histMap.size() <= maxCpHistMemSize)
return null;
int calculatedCpHistSize = maxCpHistMemSize;
for (Map.Entry<Long, CheckpointEntry> entry : histMap.entrySet()) {
if (histMap.size() <= calculatedCpHistSize++)
return entry.getValue().checkpointMark();
}
return lastCheckpoint().checkpointMark();
}
/**
* Calculate mark until delete by maximum allowed archive size.
*
* @return Checkpoint mark until which checkpoints can be deleted(not including this pointer).
*/
@Nullable private WALPointer checkpointMarkUntilDeleteByArchiveSize() {
long absFileIdxToDel = cctx.wal().maxArchivedSegmentToDelete();
if (absFileIdxToDel < 0)
return null;
long fileUntilDel = absFileIdxToDel + 1;
long checkpointFileIdx = absFileIdx(lastCheckpoint());
for (CheckpointEntry cpEntry : histMap.values()) {
long currFileIdx = absFileIdx(cpEntry);
if (checkpointFileIdx <= currFileIdx || fileUntilDel <= currFileIdx)
return cpEntry.checkpointMark();
}
return lastCheckpoint().checkpointMark();
}
/**
* Retrieve absolute file index by checkpoint entry.
*
* @param pointer checkpoint entry for which need to calculate absolute file index.
* @return absolute file index for given checkpoint entry.
*/
private long absFileIdx(CheckpointEntry pointer) {
return ((FileWALPointer)pointer.checkpointMark()).index();
}
/**
* Calculates indexes of WAL segments covered by last checkpoint.
*
* @return list of indexes or empty list if there are no checkpoints.
*/
private IgniteBiTuple<Long, Long> calculateWalSegmentsCovered() {
IgniteBiTuple<Long, Long> tup = new IgniteBiTuple<>(-1L, -1L);
Map.Entry<Long, CheckpointEntry> lastEntry = histMap.lastEntry();
if (lastEntry == null)
return tup;
Map.Entry<Long, CheckpointEntry> previousEntry = histMap.lowerEntry(lastEntry.getKey());
WALPointer lastWALPointer = lastEntry.getValue().checkpointMark();
long lastIdx = 0;
long prevIdx = 0;
if (lastWALPointer instanceof FileWALPointer) {
lastIdx = ((FileWALPointer)lastWALPointer).index();
if (previousEntry != null)
prevIdx = ((FileWALPointer)previousEntry.getValue().checkpointMark()).index();
}
tup.set1(prevIdx);
tup.set2(lastIdx - 1);
return tup;
}
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
* @param grpId Cache group ID.
* @param part Partition ID.
* @param partCntrSince Partition counter or {@code null} to search for minimal counter.
* @return Checkpoint entry or {@code null} if failed to search.
*/
@Nullable public WALPointer searchPartitionCounter(int grpId, int part, long partCntrSince) {
CheckpointEntry entry = searchCheckpointEntry(grpId, part, partCntrSince);
if (entry == null)
return null;
return entry.checkpointMark();
}
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
* @param grpId Cache group ID.
* @param part Partition ID.
* @param partCntrSince Partition counter or {@code null} to search for minimal counter.
* @return Checkpoint entry or {@code null} if failed to search.
*/
@Nullable public CheckpointEntry searchCheckpointEntry(int grpId, int part, long partCntrSince) {
for (Long cpTs : checkpoints(true)) {
try {
CheckpointEntry entry = entry(cpTs);
Long foundCntr = entry.partitionCounter(cctx, grpId, part);
if (foundCntr != null && foundCntr <= partCntrSince)
return entry;
}
catch (IgniteCheckedException ignore) {
break;
}
}
return null;
}
/**
* Finds and reserves earliest valid checkpoint for each of given groups and partitions.
*
* @param groupsAndPartitions Groups and partitions to find and reserve earliest valid checkpoint.
*
* @return Map (groupId, Map (partitionId, earliest valid checkpoint to history search)).
*/
public Map<Integer, Map<Integer, CheckpointEntry>> searchAndReserveCheckpoints(
final Map<Integer, Set<Integer>> groupsAndPartitions
) {
if (F.isEmpty(groupsAndPartitions))
return Collections.emptyMap();
final Map<Integer, Map<Integer, CheckpointEntry>> res = new HashMap<>();
CheckpointEntry prevReserved = null;
// Iterate over all possible checkpoints starting from latest and moving to earliest.
for (Long cpTs : checkpoints(true)) {
CheckpointEntry chpEntry = null;
try {
chpEntry = entry(cpTs);
boolean reserved = cctx.wal().reserve(chpEntry.checkpointMark());
// If checkpoint WAL history can't be reserved, stop searching.
if (!reserved)
break;
for (Integer grpId : new HashSet<>(groupsAndPartitions.keySet()))
if (!isCheckpointApplicableForGroup(grpId, chpEntry))
groupsAndPartitions.remove(grpId);
for (Map.Entry<Integer, CheckpointEntry.GroupState> state : chpEntry.groupState(cctx).entrySet()) {
int grpId = state.getKey();
CheckpointEntry.GroupState cpGrpState = state.getValue();
Set<Integer> applicablePartitions = groupsAndPartitions.get(grpId);
if (F.isEmpty(applicablePartitions))
continue;
Set<Integer> inapplicablePartitions = null;
for (Integer partId : applicablePartitions) {
int pIdx = cpGrpState.indexByPartition(partId);
if (pIdx >= 0)
res.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, chpEntry);
else {
if (inapplicablePartitions == null)
inapplicablePartitions = new HashSet<>();
// Partition is no more applicable for history search, exclude partition from searching.
inapplicablePartitions.add(partId);
}
}
if (!F.isEmpty(inapplicablePartitions))
for (Integer partId : inapplicablePartitions)
applicablePartitions.remove(partId);
}
// Remove groups from search with empty set of applicable partitions.
for (Map.Entry<Integer, Set<Integer>> e : new HashSet<>(groupsAndPartitions.entrySet()))
if (e.getValue().isEmpty())
groupsAndPartitions.remove(e.getKey());
// All groups are no more applicable, release history and stop searching.
if (groupsAndPartitions.isEmpty()) {
cctx.wal().release(chpEntry.checkpointMark());
break;
}
else {
// Release previous checkpoint marker.
if (prevReserved != null)
cctx.wal().release(prevReserved.checkpointMark());
prevReserved = chpEntry;
}
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to process checkpoint: " + (chpEntry != null ? chpEntry : "none"), ex);
}
}
return res;
}
/**
* Checkpoint is not applicable when:
* 1) WAL was disabled somewhere after given checkpoint.
* 2) Checkpoint doesn't contain specified {@code grpId}.
*
* @param grpId Group ID.
* @param cp Checkpoint.
*/
private boolean isCheckpointApplicableForGroup(int grpId, CheckpointEntry cp) throws IgniteCheckedException {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager) cctx.database();
if (dbMgr.isCheckpointInapplicableForWalRebalance(cp.timestamp(), grpId))
return false;
if (!cp.groupState(cctx).containsKey(grpId))
return false;
return true;
}
}