blob: 630b918dbb71114eba7513abecfbce74b1377fe5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
/**
* Abstraction responsible for managing checkpoint markers storage.
*/
public class CheckpointMarkersStorage {
/** Checkpoint file name pattern. */
public static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
/** Logger. */
protected IgniteLogger log;
/** Checkpoint history. */
private CheckpointHistory cpHistory;
/** File I/O factory for writing checkpoint markers. */
private final FileIOFactory ioFactory;
/** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end */
public final File cpDir;
/** Temporary write buffer. */
private final ByteBuffer tmpWriteBuf;
/**
* @param logger Ignite logger.
* @param history Checkpoint history.
* @param factory IO factory.
* @param absoluteWorkDir Directory path to checkpoint markers folder.
* @throws IgniteCheckedException if fail.
*/
CheckpointMarkersStorage(
Function<Class<?>, IgniteLogger> logger,
CheckpointHistory history,
FileIOFactory factory,
String absoluteWorkDir
) throws IgniteCheckedException {
this.log = logger.apply(getClass());
cpHistory = history;
ioFactory = factory;
cpDir = Paths.get(absoluteWorkDir, "cp").toFile();
if (!U.mkdirs(cpDir))
throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
//File index + offset + length.
tmpWriteBuf = ByteBuffer.allocateDirect(Long.BYTES + Integer.BYTES + Integer.BYTES);
tmpWriteBuf.order(ByteOrder.nativeOrder());
}
/**
* Cleanup checkpoint directory from all temporary files.
*/
public void cleanupTempCheckpointDirectory() throws IgniteCheckedException {
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath(), TMP_FILE_MATCHER::matches)) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup checkpoint directory from temporary files: " + cpDir, e);
}
}
/**
* Cleanup checkpoint directory from all temporary files.
*/
public void cleanupCheckpointDirectory() throws IgniteCheckedException {
if (cpHistory != null)
cpHistory.clear();
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath())) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup checkpoint directory: " + cpDir, e);
}
}
/**
* Filling internal structures with data from disk.
*/
public void initialize() throws IgniteCheckedException {
cpHistory.initialize(retrieveHistory());
}
/**
* Wal truncate callback.
*
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
public void removeCheckpointsUntil(@Nullable WALPointer highBound) throws IgniteCheckedException {
List<CheckpointEntry> rmvFromHist = history().onWalTruncated(highBound);
for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}
/**
* Logs and clears checkpoint history after checkpoint finish.
*
* @param chp Finished checkpoint.
* @throws IgniteCheckedException If failed.
*/
public void onCheckpointFinished(Checkpoint chp) throws IgniteCheckedException {
List<CheckpointEntry> rmvFromHist = history().onCheckpointFinished(chp);
for (CheckpointEntry cp : rmvFromHist)
removeCheckpointFiles(cp);
}
/**
* @return Read checkpoint status.
* @throws IgniteCheckedException If failed to read checkpoint status page.
*/
@SuppressWarnings("TooBroadScope")
public CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
long lastStartTs = 0;
long lastEndTs = 0;
UUID startId = CheckpointStatus.NULL_UUID;
UUID endId = CheckpointStatus.NULL_UUID;
File startFile = null;
File endFile = null;
WALPointer startPtr = CheckpointStatus.NULL_PTR;
WALPointer endPtr = CheckpointStatus.NULL_PTR;
File dir = cpDir;
if (!dir.exists()) {
log.warning("Read checkpoint status: checkpoint directory is not found.");
return new CheckpointStatus(0, startId, startPtr, endId, endPtr);
}
File[] files = dir.listFiles();
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
long ts = Long.parseLong(matcher.group(1));
UUID id = UUID.fromString(matcher.group(2));
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START && ts > lastStartTs) {
lastStartTs = ts;
startId = id;
startFile = file;
}
else if (type == CheckpointEntryType.END && ts > lastEndTs) {
lastEndTs = ts;
endId = id;
endFile = file;
}
}
}
ByteBuffer buf = ByteBuffer.allocate(WALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
if (startFile != null)
startPtr = readPointer(startFile, buf);
if (endFile != null)
endPtr = readPointer(endFile, buf);
if (log.isInfoEnabled())
log.info("Read checkpoint status [startMarker=" + startFile + ", endMarker=" + endFile + ']');
return new CheckpointStatus(lastStartTs, startId, startPtr, endId, endPtr);
}
/**
* Retreives checkpoint history form specified {@code dir}.
*
* @return List of checkpoints.
*/
private List<CheckpointEntry> retrieveHistory() throws IgniteCheckedException {
if (!cpDir.exists())
return Collections.emptyList();
try (DirectoryStream<Path> cpFiles = Files.newDirectoryStream(
cpDir.toPath(),
path -> CP_FILE_NAME_PATTERN.matcher(path.toFile().getName()).matches())
) {
List<CheckpointEntry> checkpoints = new ArrayList<>();
ByteBuffer buf = ByteBuffer.allocate(WALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
for (Path cpFile : cpFiles) {
CheckpointEntry cp = parseFromFile(buf, cpFile.toFile());
if (cp != null)
checkpoints.add(cp);
}
return checkpoints;
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to load checkpoint history.", e);
}
}
/**
* Parses checkpoint entry from given file.
*
* @param buf Temporary byte buffer.
* @param file Checkpoint file.
*/
@Nullable private CheckpointEntry parseFromFile(ByteBuffer buf, File file) throws IgniteCheckedException {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (!matcher.matches())
return null;
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type != CheckpointEntryType.START)
return null;
long cpTs = Long.parseLong(matcher.group(1));
UUID cpId = UUID.fromString(matcher.group(2));
WALPointer ptr = readPointer(file, buf);
return createCheckPointEntry(cpTs, ptr, cpId, null, CheckpointEntryType.START);
}
/**
* Loads WAL pointer from CP file
*
* @param cpMarkerFile Checkpoint mark file.
* @return WAL pointer.
* @throws IgniteCheckedException If failed to read mignite-put-get-exampleark file.
*/
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
try (FileIO io = ioFactory.create(cpMarkerFile, READ)) {
io.readFully(buf);
buf.flip();
return new WALPointer(buf.getLong(), buf.getInt(), buf.getInt());
}
catch (Exception e) {
throw new IgniteCheckedException(
"Failed to read checkpoint pointer from marker file: " + cpMarkerFile.getAbsolutePath(), e);
}
}
/**
* @param cpTs Checkpoint timestamp.
* @param ptr Wal pointer of checkpoint.
* @param cpId Checkpoint ID.
* @param rec Checkpoint record.
* @param type Checkpoint type.
* @return Checkpoint entry.
*/
private CheckpointEntry createCheckPointEntry(
long cpTs,
WALPointer ptr,
UUID cpId,
@Nullable CheckpointRecord rec,
CheckpointEntryType type
) {
assert cpTs > 0;
assert ptr != null;
assert cpId != null;
assert type != null;
Map<Integer, CacheState> cacheGrpStates = null;
if (rec != null)
cacheGrpStates = rec.cacheGroupStates();
return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
}
/**
* Removes checkpoint start/end files belongs to given {@code cpEntry}.
*
* @param cpEntry Checkpoint entry.
* @throws IgniteCheckedException If failed to delete.
*/
private void removeCheckpointFiles(CheckpointEntry cpEntry) throws IgniteCheckedException {
Path startFile = new File(cpDir.getAbsolutePath(), checkpointFileName(cpEntry, CheckpointEntryType.START)).toPath();
Path endFile = new File(cpDir.getAbsolutePath(), checkpointFileName(cpEntry, CheckpointEntryType.END)).toPath();
try {
if (Files.exists(startFile))
Files.delete(startFile);
if (Files.exists(endFile))
Files.delete(endFile);
}
catch (IOException e) {
throw new StorageException("Failed to delete stale checkpoint files: " + cpEntry, e);
}
}
/**
* @param entryBuf Buffer which would be written to disk.
* @param cp Prepared checkpoint entry.
* @param type Type of checkpoint marker.
* @param skipSync {@code true} if file sync should be skip after write.
* @throws StorageException if fail.
*/
private void writeCheckpointEntry(
ByteBuffer entryBuf,
CheckpointEntry cp,
CheckpointEntryType type,
boolean skipSync
) throws StorageException {
String fileName = checkpointFileName(cp, type);
String tmpFileName = fileName + FilePageStoreManager.TMP_SUFFIX;
try {
try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : tmpFileName).toFile(),
StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
io.writeFully(entryBuf);
entryBuf.clear();
if (!skipSync)
io.force(true);
}
if (!skipSync)
Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
}
catch (IOException e) {
throw new StorageException("Failed to write checkpoint entry [ptr=" + cp.checkpointMark()
+ ", cpTs=" + cp.timestamp()
+ ", cpId=" + cp.checkpointId()
+ ", type=" + type + "]", e);
}
}
/**
* Writes checkpoint entry buffer {@code entryBuf} to specified checkpoint file with 2-phase protocol.
*
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint id.
* @param ptr WAL pointer containing record.
* @param rec Checkpoint WAL record.
* @param type Checkpoint type.
* @return Checkpoint entry which represents current checkpoint by given parameters.
* @throws StorageException If failed to write checkpoint entry.
*/
public CheckpointEntry writeCheckpointEntry(
long cpTs,
UUID cpId,
WALPointer ptr,
@Nullable CheckpointRecord rec,
CheckpointEntryType type,
boolean skipSync
) throws StorageException {
CheckpointEntry entry = prepareCheckpointEntry(
tmpWriteBuf,
cpTs,
cpId,
ptr,
rec,
type
);
if (type == CheckpointEntryType.START)
cpHistory.addCheckpoint(entry, rec.cacheGroupStates());
writeCheckpointEntry(tmpWriteBuf, entry, type, skipSync);
return entry;
}
/**
* Prepares checkpoint entry containing WAL pointer to checkpoint record. Writes into given {@code ptrBuf} WAL
* pointer content.
*
* @param entryBuf Buffer to fill
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint id.
* @param ptr WAL pointer containing record.
* @param rec Checkpoint WAL record.
* @param type Checkpoint type.
* @return Checkpoint entry.
*/
private CheckpointEntry prepareCheckpointEntry(
ByteBuffer entryBuf,
long cpTs,
UUID cpId,
WALPointer ptr,
@Nullable CheckpointRecord rec,
CheckpointEntryType type
) {
assert ptr != null;
entryBuf.rewind();
entryBuf.putLong(ptr.index());
entryBuf.putInt(ptr.fileOffset());
entryBuf.putInt(ptr.length());
entryBuf.flip();
return createCheckPointEntry(cpTs, ptr, cpId, rec, type);
}
/**
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint ID.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
private static String checkpointFileName(long cpTs, UUID cpId, CheckpointEntryType type) {
return cpTs + "-" + cpId + "-" + type + ".bin";
}
/**
* @param cp Checkpoint entry.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
public static String checkpointFileName(CheckpointEntry cp, CheckpointEntryType type) {
return checkpointFileName(cp.timestamp(), cp.checkpointId(), type);
}
/**
* @return Cached checkpoint history.
*/
public CheckpointHistory history() {
return cpHistory;
}
}