| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence.checkpoint; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.Checkpoint; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE; |
| |
| /** |
| * Checkpoint history. Holds chronological ordered map with {@link CheckpointEntry CheckpointEntries}. |
| * Data is loaded from corresponding checkpoint directory. |
| * This directory holds files for checkpoint start and end. |
| */ |
| public class CheckpointHistory { |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Cache shared context. */ |
| private final GridCacheSharedContext<?, ?> cctx; |
| |
| /** |
| * Maps checkpoint's timestamp (from CP file name) to CP entry. |
| * Using TS provides historical order of CP entries in map ( first is oldest ) |
| */ |
| private final NavigableMap<Long, CheckpointEntry> histMap = new ConcurrentSkipListMap<>(); |
| |
| /** The maximal number of checkpoints hold in memory. */ |
| private final int maxCpHistMemSize; |
| |
| /** If WalHistorySize was setted by user will use old way for removing checkpoints. */ |
| private final boolean isWalHistorySizeParameterEnabled; |
| |
| /** |
| * Constructor. |
| * |
| * @param ctx Context. |
| */ |
| public CheckpointHistory(GridKernalContext ctx) { |
| cctx = ctx.cache().context(); |
| log = ctx.log(getClass()); |
| |
| DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); |
| |
| maxCpHistMemSize = Math.min(dsCfg.getWalHistorySize(), |
| IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, 100)); |
| |
| isWalHistorySizeParameterEnabled = dsCfg.isWalHistorySizeParameterUsed(); |
| } |
| |
| /** |
| * @param checkpoints Checkpoints. |
| */ |
| public void initialize(List<CheckpointEntry> checkpoints) { |
| for (CheckpointEntry e : checkpoints) |
| histMap.put(e.timestamp(), e); |
| } |
| |
| /** |
| * @param cpTs Checkpoint timestamp. |
| * @return Initialized entry. |
| * @throws IgniteCheckedException If failed to initialize entry. |
| */ |
| private CheckpointEntry entry(Long cpTs) throws IgniteCheckedException { |
| CheckpointEntry entry = histMap.get(cpTs); |
| |
| if (entry == null) |
| throw new IgniteCheckedException("Checkpoint entry was removed: " + cpTs); |
| |
| return entry; |
| } |
| |
| /** |
| * @return First checkpoint entry if exists. Otherwise {@code null}. |
| */ |
| public CheckpointEntry firstCheckpoint() { |
| Map.Entry<Long,CheckpointEntry> entry = histMap.firstEntry(); |
| |
| return entry != null ? entry.getValue() : null; |
| } |
| |
| /** |
| * @return Last checkpoint entry if exists. Otherwise {@code null}. |
| */ |
| public CheckpointEntry lastCheckpoint() { |
| Map.Entry<Long,CheckpointEntry> entry = histMap.lastEntry(); |
| |
| return entry != null ? entry.getValue() : null; |
| } |
| |
| /** |
| * @return First checkpoint WAL pointer if exists. Otherwise {@code null}. |
| */ |
| public WALPointer firstCheckpointPointer() { |
| CheckpointEntry entry = firstCheckpoint(); |
| |
| return entry != null ? entry.checkpointMark() : null; |
| } |
| |
| /** |
| * @return Collection of checkpoint timestamps. |
| */ |
| public Collection<Long> checkpoints(boolean descending) { |
| if (descending) |
| return histMap.descendingKeySet(); |
| |
| return histMap.keySet(); |
| } |
| |
| /** |
| * |
| */ |
| public Collection<Long> checkpoints() { |
| return checkpoints(false); |
| } |
| |
| /** |
| * Adds checkpoint entry after the corresponding WAL record has been written to WAL. The checkpoint itself |
| * is not finished yet. |
| * |
| * @param entry Entry to add. |
| */ |
| public void addCheckpoint(CheckpointEntry entry) { |
| histMap.put(entry.timestamp(), entry); |
| } |
| |
| /** |
| * @return {@code true} if there is space for next checkpoint. |
| */ |
| public boolean hasSpace() { |
| return histMap.size() + 1 <= maxCpHistMemSize; |
| } |
| |
| /** |
| * Clears checkpoint history after WAL truncation. |
| * |
| * @return List of checkpoint entries removed from history. |
| */ |
| public List<CheckpointEntry> onWalTruncated(WALPointer ptr) { |
| List<CheckpointEntry> removed = new ArrayList<>(); |
| |
| FileWALPointer highBound = (FileWALPointer)ptr; |
| |
| for (CheckpointEntry cpEntry : histMap.values()) { |
| FileWALPointer cpPnt = (FileWALPointer)cpEntry.checkpointMark(); |
| |
| if (highBound.compareTo(cpPnt) <= 0) |
| break; |
| |
| if (cctx.wal().reserved(cpEntry.checkpointMark())) { |
| U.warn(log, "Could not clear historyMap due to WAL reservation on cp: " + cpEntry + |
| ", history map size is " + histMap.size()); |
| |
| break; |
| } |
| |
| histMap.remove(cpEntry.timestamp()); |
| |
| removed.add(cpEntry); |
| } |
| |
| return removed; |
| } |
| |
| /** |
| * Logs and clears checkpoint history after checkpoint finish. |
| * |
| * @return List of checkpoints removed from history. |
| */ |
| public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp, boolean truncateWal) { |
| chp.walSegsCoveredRange(calculateWalSegmentsCovered()); |
| |
| WALPointer checkpointMarkUntilDel = isWalHistorySizeParameterEnabled //check for compatibility mode. |
| ? checkpointMarkUntilDeleteByMemorySize() |
| : newerPointer(checkpointMarkUntilDeleteByMemorySize(), checkpointMarkUntilDeleteByArchiveSize()); |
| |
| if (checkpointMarkUntilDel == null) |
| return Collections.emptyList(); |
| |
| List<CheckpointEntry> deletedCheckpoints = onWalTruncated(checkpointMarkUntilDel); |
| |
| int deleted = 0; |
| |
| if (truncateWal) |
| deleted += cctx.wal().truncate(null, firstCheckpointPointer()); |
| |
| chp.walFilesDeleted(deleted); |
| |
| return deletedCheckpoints; |
| } |
| |
| /** |
| * @param firstPointer One of pointers to choose the newest. |
| * @param secondPointer One of pointers to choose the newest. |
| * @return The newest pointer from input ones. |
| */ |
| private FileWALPointer newerPointer(WALPointer firstPointer, WALPointer secondPointer) { |
| FileWALPointer first = (FileWALPointer)firstPointer; |
| FileWALPointer second = (FileWALPointer)secondPointer; |
| |
| if (firstPointer == null) |
| return second; |
| |
| if (secondPointer == null) |
| return first; |
| |
| return first.index() > second.index() ? first : second; |
| } |
| |
| /** |
| * Calculate mark until delete by maximum checkpoint history memory size. |
| * |
| * @return Checkpoint mark until which checkpoints can be deleted(not including this pointer). |
| */ |
| private WALPointer checkpointMarkUntilDeleteByMemorySize() { |
| if (histMap.size() <= maxCpHistMemSize) |
| return null; |
| |
| int calculatedCpHistSize = maxCpHistMemSize; |
| |
| for (Map.Entry<Long, CheckpointEntry> entry : histMap.entrySet()) { |
| if (histMap.size() <= calculatedCpHistSize++) |
| return entry.getValue().checkpointMark(); |
| } |
| |
| return lastCheckpoint().checkpointMark(); |
| } |
| |
| /** |
| * Calculate mark until delete by maximum allowed archive size. |
| * |
| * @return Checkpoint mark until which checkpoints can be deleted(not including this pointer). |
| */ |
| @Nullable private WALPointer checkpointMarkUntilDeleteByArchiveSize() { |
| long absFileIdxToDel = cctx.wal().maxArchivedSegmentToDelete(); |
| |
| if (absFileIdxToDel < 0) |
| return null; |
| |
| long fileUntilDel = absFileIdxToDel + 1; |
| |
| long checkpointFileIdx = absFileIdx(lastCheckpoint()); |
| |
| for (CheckpointEntry cpEntry : histMap.values()) { |
| long currFileIdx = absFileIdx(cpEntry); |
| |
| if (checkpointFileIdx <= currFileIdx || fileUntilDel <= currFileIdx) |
| return cpEntry.checkpointMark(); |
| } |
| |
| return lastCheckpoint().checkpointMark(); |
| } |
| |
| /** |
| * Retrieve absolute file index by checkpoint entry. |
| * |
| * @param pointer checkpoint entry for which need to calculate absolute file index. |
| * @return absolute file index for given checkpoint entry. |
| */ |
| private long absFileIdx(CheckpointEntry pointer) { |
| return ((FileWALPointer)pointer.checkpointMark()).index(); |
| } |
| |
| /** |
| * Calculates indexes of WAL segments covered by last checkpoint. |
| * |
| * @return list of indexes or empty list if there are no checkpoints. |
| */ |
| private IgniteBiTuple<Long, Long> calculateWalSegmentsCovered() { |
| IgniteBiTuple<Long, Long> tup = new IgniteBiTuple<>(-1L, -1L); |
| |
| Map.Entry<Long, CheckpointEntry> lastEntry = histMap.lastEntry(); |
| |
| if (lastEntry == null) |
| return tup; |
| |
| Map.Entry<Long, CheckpointEntry> previousEntry = histMap.lowerEntry(lastEntry.getKey()); |
| |
| WALPointer lastWALPointer = lastEntry.getValue().checkpointMark(); |
| |
| long lastIdx = 0; |
| |
| long prevIdx = 0; |
| |
| if (lastWALPointer instanceof FileWALPointer) { |
| lastIdx = ((FileWALPointer)lastWALPointer).index(); |
| |
| if (previousEntry != null) |
| prevIdx = ((FileWALPointer)previousEntry.getValue().checkpointMark()).index(); |
| } |
| |
| tup.set1(prevIdx); |
| tup.set2(lastIdx - 1); |
| |
| return tup; |
| } |
| |
| /** |
| * Tries to search for a WAL pointer for the given partition counter start. |
| * |
| * @param grpId Cache group ID. |
| * @param part Partition ID. |
| * @param partCntrSince Partition counter or {@code null} to search for minimal counter. |
| * @return Checkpoint entry or {@code null} if failed to search. |
| */ |
| @Nullable public WALPointer searchPartitionCounter(int grpId, int part, long partCntrSince) { |
| CheckpointEntry entry = searchCheckpointEntry(grpId, part, partCntrSince); |
| |
| if (entry == null) |
| return null; |
| |
| return entry.checkpointMark(); |
| } |
| |
| /** |
| * Tries to search for a WAL pointer for the given partition counter start. |
| * |
| * @param grpId Cache group ID. |
| * @param part Partition ID. |
| * @param partCntrSince Partition counter or {@code null} to search for minimal counter. |
| * @return Checkpoint entry or {@code null} if failed to search. |
| */ |
| @Nullable public CheckpointEntry searchCheckpointEntry(int grpId, int part, long partCntrSince) { |
| for (Long cpTs : checkpoints(true)) { |
| try { |
| CheckpointEntry entry = entry(cpTs); |
| |
| Long foundCntr = entry.partitionCounter(cctx, grpId, part); |
| |
| if (foundCntr != null && foundCntr <= partCntrSince) |
| return entry; |
| } |
| catch (IgniteCheckedException ignore) { |
| break; |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Finds and reserves earliest valid checkpoint for each of given groups and partitions. |
| * |
| * @param groupsAndPartitions Groups and partitions to find and reserve earliest valid checkpoint. |
| * |
| * @return Map (groupId, Map (partitionId, earliest valid checkpoint to history search)). |
| */ |
| public Map<Integer, Map<Integer, CheckpointEntry>> searchAndReserveCheckpoints( |
| final Map<Integer, Set<Integer>> groupsAndPartitions |
| ) { |
| if (F.isEmpty(groupsAndPartitions)) |
| return Collections.emptyMap(); |
| |
| final Map<Integer, Map<Integer, CheckpointEntry>> res = new HashMap<>(); |
| |
| CheckpointEntry prevReserved = null; |
| |
| // Iterate over all possible checkpoints starting from latest and moving to earliest. |
| for (Long cpTs : checkpoints(true)) { |
| CheckpointEntry chpEntry = null; |
| |
| try { |
| chpEntry = entry(cpTs); |
| |
| boolean reserved = cctx.wal().reserve(chpEntry.checkpointMark()); |
| |
| // If checkpoint WAL history can't be reserved, stop searching. |
| if (!reserved) |
| break; |
| |
| for (Integer grpId : new HashSet<>(groupsAndPartitions.keySet())) |
| if (!isCheckpointApplicableForGroup(grpId, chpEntry)) |
| groupsAndPartitions.remove(grpId); |
| |
| for (Map.Entry<Integer, CheckpointEntry.GroupState> state : chpEntry.groupState(cctx).entrySet()) { |
| int grpId = state.getKey(); |
| CheckpointEntry.GroupState cpGrpState = state.getValue(); |
| |
| Set<Integer> applicablePartitions = groupsAndPartitions.get(grpId); |
| |
| if (F.isEmpty(applicablePartitions)) |
| continue; |
| |
| Set<Integer> inapplicablePartitions = null; |
| |
| for (Integer partId : applicablePartitions) { |
| int pIdx = cpGrpState.indexByPartition(partId); |
| |
| if (pIdx >= 0) |
| res.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, chpEntry); |
| else { |
| if (inapplicablePartitions == null) |
| inapplicablePartitions = new HashSet<>(); |
| |
| // Partition is no more applicable for history search, exclude partition from searching. |
| inapplicablePartitions.add(partId); |
| } |
| } |
| |
| if (!F.isEmpty(inapplicablePartitions)) |
| for (Integer partId : inapplicablePartitions) |
| applicablePartitions.remove(partId); |
| } |
| |
| // Remove groups from search with empty set of applicable partitions. |
| for (Map.Entry<Integer, Set<Integer>> e : new HashSet<>(groupsAndPartitions.entrySet())) |
| if (e.getValue().isEmpty()) |
| groupsAndPartitions.remove(e.getKey()); |
| |
| // All groups are no more applicable, release history and stop searching. |
| if (groupsAndPartitions.isEmpty()) { |
| cctx.wal().release(chpEntry.checkpointMark()); |
| |
| break; |
| } |
| else { |
| // Release previous checkpoint marker. |
| if (prevReserved != null) |
| cctx.wal().release(prevReserved.checkpointMark()); |
| |
| prevReserved = chpEntry; |
| } |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to process checkpoint: " + (chpEntry != null ? chpEntry : "none"), ex); |
| } |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Checkpoint is not applicable when: |
| * 1) WAL was disabled somewhere after given checkpoint. |
| * 2) Checkpoint doesn't contain specified {@code grpId}. |
| * |
| * @param grpId Group ID. |
| * @param cp Checkpoint. |
| */ |
| private boolean isCheckpointApplicableForGroup(int grpId, CheckpointEntry cp) throws IgniteCheckedException { |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager) cctx.database(); |
| |
| if (dbMgr.isCheckpointInapplicableForWalRebalance(cp.timestamp(), grpId)) |
| return false; |
| |
| if (!cp.groupState(cctx).containsKey(grpId)) |
| return false; |
| |
| return true; |
| } |
| } |