| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.checkpoint.filemerging; |
| |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.core.fs.EntropyInjector; |
| import org.apache.flink.core.fs.FSDataOutputStream; |
| import org.apache.flink.core.fs.FileStatus; |
| import org.apache.flink.core.fs.FileSystem; |
| import org.apache.flink.core.fs.OutputStreamAndPath; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; |
| import org.apache.flink.runtime.state.CheckpointedStateScope; |
| import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; |
| import org.apache.flink.runtime.state.StreamStateHandle; |
| import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; |
| import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; |
| import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; |
| import org.apache.flink.util.FlinkRuntimeException; |
| import org.apache.flink.util.Preconditions; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.stream.Stream; |
| |
| import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter; |
| |
| /** Base implementation of {@link FileMergingSnapshotManager}. */ |
| public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class); |
| |
| /** The number of recent checkpoints whose IDs are remembered. */ |
| private static final int NUM_GHOST_CHECKPOINT_IDS = 16; |
| |
| /** The identifier of this manager. */ |
| private final String id; |
| |
| /** The executor for I/O operations in this manager. */ |
| protected final Executor ioExecutor; |
| |
| /** Guard for {@link #initFileSystem}, {@link #restoreStateHandles} and uploadedStates. */ |
| protected final Object lock = new Object(); |
| |
| @GuardedBy("lock") |
| protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>(); |
| |
| /** The map that holds all the known live logical files. */ |
| private final Map<LogicalFileId, LogicalFile> knownLogicalFiles = new ConcurrentHashMap<>(); |
| |
| /** The {@link FileSystem} that this manager works on. */ |
| protected FileSystem fs; |
| |
| // checkpoint directories |
| protected Path checkpointDir; |
| protected Path sharedStateDir; |
| protected Path taskOwnedStateDir; |
| |
| /** The buffer size for writing files to the file system. */ |
| protected int writeBufferSize; |
| |
| /** |
| * The file system should only be initialized once. |
| * |
| * @see FileMergingSnapshotManager#initFileSystem for the reason why a throttle is needed. |
| */ |
| private boolean fileSystemInitiated = false; |
| |
| /** |
| * File-system dependent value. Mark whether the file system this manager running on need sync |
| * for visibility. If true, DO a file sync after writing each segment . |
| */ |
| protected boolean shouldSyncAfterClosingLogicalFile; |
| |
| /** Max size for a physical file. */ |
| protected long maxPhysicalFileSize; |
| |
| /** Type of physical file pool. */ |
| protected PhysicalFilePool.Type filePoolType; |
| |
| protected final float maxSpaceAmplification; |
| |
| protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; |
| |
| private final Object notifyLock = new Object(); |
| |
| @GuardedBy("notifyLock") |
| private final TreeMap<Long, Set<SubtaskKey>> notifiedSubtaskCheckpoint = new TreeMap<>(); |
| |
| @GuardedBy("notifyLock") |
| private final TreeSet<Long> notifiedCheckpoint = new TreeSet<>(); |
| |
| /** |
| * Currently the shared state files are merged within each subtask, files are split by different |
| * directories. |
| */ |
| private final Map<SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<>(); |
| |
| /** |
| * The {@link DirectoryStreamStateHandle} for shared state directories, one for each subtask. |
| */ |
| private final Map<SubtaskKey, DirectoryStreamStateHandle> managedSharedStateDirHandles = |
| new ConcurrentHashMap<>(); |
| |
| /** |
| * The private state files are merged across subtasks, there is only one directory for |
| * merged-files within one TM per job. |
| */ |
| protected Path managedExclusiveStateDir; |
| |
| /** |
| * The {@link DirectoryStreamStateHandle} for private state directory, one for each task |
| * manager. |
| */ |
| protected DirectoryStreamStateHandle managedExclusiveStateDirHandle; |
| |
| /** The current space statistic, updated on file creation/deletion. */ |
| protected SpaceStat spaceStat; |
| |
| public FileMergingSnapshotManagerBase( |
| String id, |
| long maxFileSize, |
| PhysicalFilePool.Type filePoolType, |
| float maxSpaceAmplification, |
| Executor ioExecutor) { |
| this.id = id; |
| this.maxPhysicalFileSize = maxFileSize; |
| this.filePoolType = filePoolType; |
| this.maxSpaceAmplification = |
| maxSpaceAmplification < 1f ? Float.MAX_VALUE : maxSpaceAmplification; |
| this.ioExecutor = ioExecutor; |
| this.spaceStat = new SpaceStat(); |
| } |
| |
| @Override |
| public void initFileSystem( |
| FileSystem fileSystem, |
| Path checkpointBaseDir, |
| Path sharedStateDir, |
| Path taskOwnedStateDir, |
| int writeBufferSize) |
| throws IllegalArgumentException { |
| synchronized (lock) { |
| if (fileSystemInitiated) { |
| Preconditions.checkArgument( |
| checkpointBaseDir.equals(this.checkpointDir), |
| "The checkpoint base dir is not deterministic across subtasks."); |
| Preconditions.checkArgument( |
| sharedStateDir.equals(this.sharedStateDir), |
| "The shared checkpoint dir is not deterministic across subtasks."); |
| Preconditions.checkArgument( |
| taskOwnedStateDir.equals(this.taskOwnedStateDir), |
| "The task-owned checkpoint dir is not deterministic across subtasks."); |
| return; |
| } |
| this.fs = fileSystem; |
| this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir); |
| this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir); |
| this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir); |
| this.fileSystemInitiated = true; |
| this.shouldSyncAfterClosingLogicalFile = shouldSyncAfterClosingLogicalFile(fileSystem); |
| // Initialize the managed exclusive path using id as the child path name. |
| // Currently, we use the task-owned directory to place the merged private state. |
| // According |
| // to the FLIP-306, we later consider move these files to the new introduced |
| // task-manager-owned directory. |
| Path managedExclusivePath = new Path(taskOwnedStateDir, id); |
| createManagedDirectory(managedExclusivePath); |
| this.managedExclusiveStateDir = managedExclusivePath; |
| this.managedExclusiveStateDirHandle = |
| DirectoryStreamStateHandle.forPathWithZeroSize( |
| new File(managedExclusivePath.getPath()).toPath()); |
| this.writeBufferSize = writeBufferSize; |
| } |
| } |
| |
| @Override |
| public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) { |
| String managedDirName = subtaskKey.getManagedDirName(); |
| Path managedPath = new Path(sharedStateDir, managedDirName); |
| if (!managedSharedStateDir.containsKey(subtaskKey)) { |
| createManagedDirectory(managedPath); |
| managedSharedStateDir.put(subtaskKey, managedPath); |
| managedSharedStateDirHandles.put( |
| subtaskKey, |
| DirectoryStreamStateHandle.forPathWithZeroSize( |
| new File(managedPath.getPath()).toPath())); |
| } |
| } |
| |
| @Override |
| public void unregisterSubtask(SubtaskKey subtaskKey) { |
| if (managedSharedStateDir.containsKey(subtaskKey)) { |
| managedSharedStateDir.remove(subtaskKey); |
| managedSharedStateDirHandles.remove(subtaskKey); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // logical & physical file |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Create a logical file on a physical file. |
| * |
| * @param physicalFile the underlying physical file. |
| * @param startOffset the offset in the physical file that the logical file starts from. |
| * @param length the length of the logical file. |
| * @param subtaskKey the id of the subtask that the logical file belongs to. |
| * @return the created logical file. |
| */ |
| protected LogicalFile createLogicalFile( |
| @Nonnull PhysicalFile physicalFile, |
| long startOffset, |
| long length, |
| @Nonnull SubtaskKey subtaskKey) { |
| LogicalFileId fileID = LogicalFileId.generateRandomId(); |
| LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); |
| knownLogicalFiles.put(fileID, file); |
| if (physicalFile.isOwned()) { |
| spaceStat.onLogicalFileCreate(length); |
| spaceStat.onPhysicalFileUpdate(length); |
| } |
| return file; |
| } |
| |
| /** |
| * Create a physical file in right location (managed directory), which is specified by scope of |
| * this checkpoint and current subtask. |
| * |
| * @param subtaskKey the {@link SubtaskKey} of current subtask. |
| * @param scope the scope of the checkpoint. |
| * @return the created physical file. |
| * @throws IOException if anything goes wrong with file system. |
| */ |
| @Nonnull |
| protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, CheckpointedStateScope scope) |
| throws IOException { |
| PhysicalFile result; |
| Exception latestException = null; |
| |
| Path dirPath = getManagedDir(subtaskKey, scope); |
| |
| if (dirPath == null) { |
| throw new IOException( |
| "Could not get " |
| + scope |
| + " path for subtask " |
| + subtaskKey |
| + ", the directory may have not been created."); |
| } |
| |
| for (int attempt = 0; attempt < 10; attempt++) { |
| try { |
| OutputStreamAndPath streamAndPath = |
| EntropyInjector.createEntropyAware( |
| fs, |
| generatePhysicalFilePath(dirPath), |
| FileSystem.WriteMode.NO_OVERWRITE); |
| FSDataOutputStream outputStream = streamAndPath.stream(); |
| Path filePath = streamAndPath.path(); |
| result = new PhysicalFile(outputStream, filePath, this.physicalFileDeleter, scope); |
| updateFileCreationMetrics(filePath); |
| return result; |
| } catch (Exception e) { |
| latestException = e; |
| } |
| } |
| |
| throw new IOException( |
| "Could not open output stream for state file merging.", latestException); |
| } |
| |
| @Override |
| public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream( |
| SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) { |
| |
| return new FileMergingCheckpointStateOutputStream( |
| writeBufferSize, |
| new FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy() { |
| PhysicalFile physicalFile; |
| LogicalFile logicalFile; |
| |
| @Override |
| public Tuple2<FSDataOutputStream, Path> providePhysicalFile() |
| throws IOException { |
| physicalFile = |
| getOrCreatePhysicalFileForCheckpoint( |
| subtaskKey, checkpointId, scope); |
| return new Tuple2<>( |
| physicalFile.getOutputStream(), physicalFile.getFilePath()); |
| } |
| |
| @Override |
| public SegmentFileStateHandle closeStreamAndCreateStateHandle( |
| Path filePath, long startPos, long stateSize) throws IOException { |
| if (physicalFile == null) { |
| return null; |
| } else { |
| // deal with logical file |
| logicalFile = |
| createLogicalFile( |
| physicalFile, startPos, stateSize, subtaskKey); |
| logicalFile.advanceLastCheckpointId(checkpointId); |
| |
| // track the logical file |
| synchronized (lock) { |
| uploadedStates |
| .computeIfAbsent(checkpointId, key -> new HashSet<>()) |
| .add(logicalFile); |
| } |
| |
| // deal with physicalFile file |
| returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); |
| |
| return new SegmentFileStateHandle( |
| physicalFile.getFilePath(), |
| startPos, |
| stateSize, |
| scope, |
| logicalFile.getFileId()); |
| } |
| } |
| |
| @Override |
| public void closeStreamExceptionally() throws IOException { |
| if (physicalFile != null) { |
| if (logicalFile != null) { |
| discardSingleLogicalFile(logicalFile, checkpointId); |
| } else { |
| // The physical file should be closed anyway. This is because the |
| // last segmented write on this file is likely to have failed, and |
| // we want to prevent further reusing of this file. |
| physicalFile.close(); |
| physicalFile.deleteIfNecessary(); |
| } |
| } |
| } |
| }); |
| } |
| |
| private void updateFileCreationMetrics(Path path) { |
| // TODO: FLINK-32091 add io metrics |
| spaceStat.onPhysicalFileCreate(); |
| LOG.debug("Create a new physical file {} for checkpoint file merging.", path); |
| } |
| |
| /** |
| * Generate a file path for a physical file. |
| * |
| * @param dirPath the parent directory path for the physical file. |
| * @return the generated file path for a physical file. |
| */ |
| protected Path generatePhysicalFilePath(Path dirPath) { |
| // this must be called after initFileSystem() is called |
| // so the checkpoint directories must be not null if we reach here |
| final String fileName = UUID.randomUUID().toString(); |
| return new Path(dirPath, fileName); |
| } |
| |
| @VisibleForTesting |
| boolean isResponsibleForFile(Path filePath) { |
| Path parent = filePath.getParent(); |
| return parent.equals(managedExclusiveStateDir) |
| || managedSharedStateDir.containsValue(parent); |
| } |
| |
| /** |
| * Delete a physical file by given file path. Use the io executor to do the deletion. |
| * |
| * @param filePath the given file path to delete. |
| */ |
| protected final void deletePhysicalFile(Path filePath, long size) { |
| ioExecutor.execute( |
| () -> { |
| try { |
| fs.delete(filePath, false); |
| spaceStat.onPhysicalFileDelete(size); |
| LOG.debug("Physical file deleted: {}.", filePath); |
| } catch (IOException e) { |
| LOG.warn("Fail to delete file: {}", filePath); |
| } |
| }); |
| } |
| |
| /** |
| * Create physical pool by filePoolType. |
| * |
| * @return physical file pool. |
| */ |
| protected final PhysicalFilePool createPhysicalPool() { |
| switch (filePoolType) { |
| case NON_BLOCKING: |
| return new NonBlockingPhysicalFilePool( |
| maxPhysicalFileSize, this::createPhysicalFile); |
| case BLOCKING: |
| return new BlockingPhysicalFilePool(maxPhysicalFileSize, this::createPhysicalFile); |
| default: |
| throw new UnsupportedOperationException( |
| "Unsupported type of physical file pool: " + filePoolType); |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // abstract methods |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Get a reused physical file or create one. This will be called in checkpoint output stream |
| * creation logic. |
| * |
| * <p>Basic logic of file reusing: whenever a physical file is needed, this method is called |
| * with necessary information provided for acquiring a file. The file will not be reused until |
| * it is written and returned to the reused pool by calling {@link |
| * #returnPhysicalFileForNextReuse}. |
| * |
| * @param subtaskKey the subtask key for the caller |
| * @param checkpointId the checkpoint id |
| * @param scope checkpoint scope |
| * @return the requested physical file. |
| * @throws IOException thrown if anything goes wrong with file system. |
| */ |
| @Nonnull |
| protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint( |
| SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) |
| throws IOException; |
| |
| /** |
| * Try to return an existing physical file to the manager for next reuse. If this physical file |
| * is no longer needed (for reusing), it will be closed. |
| * |
| * <p>Basic logic of file reusing, see {@link #getOrCreatePhysicalFileForCheckpoint}. |
| * |
| * @param subtaskKey the subtask key for the caller |
| * @param checkpointId in which checkpoint this physical file is requested. |
| * @param physicalFile the returning checkpoint |
| * @throws IOException thrown if anything goes wrong with file system. |
| * @see #getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope) |
| */ |
| protected abstract void returnPhysicalFileForNextReuse( |
| SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException; |
| |
| /** |
| * The callback which will be triggered when all subtasks discarded (aborted or subsumed). |
| * |
| * @param checkpointId the discarded checkpoint id. |
| * @throws IOException if anything goes wrong with file system. |
| */ |
| protected void discardCheckpoint(long checkpointId) throws IOException { |
| controlSpace(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Checkpoint Listener |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId) |
| throws Exception { |
| // does nothing |
| } |
| |
| @Override |
| public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception { |
| synchronized (lock) { |
| Set<LogicalFile> logicalFilesForCurrentCp = uploadedStates.get(checkpointId); |
| if (logicalFilesForCurrentCp == null) { |
| return; |
| } |
| if (discardLogicalFiles(subtaskKey, checkpointId, logicalFilesForCurrentCp)) { |
| uploadedStates.remove(checkpointId); |
| } |
| } |
| notifyReleaseCheckpoint(subtaskKey, checkpointId); |
| } |
| |
| @Override |
| public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) |
| throws Exception { |
| synchronized (lock) { |
| Iterator<Map.Entry<Long, Set<LogicalFile>>> uploadedStatesIterator = |
| uploadedStates.headMap(checkpointId, true).entrySet().iterator(); |
| while (uploadedStatesIterator.hasNext()) { |
| Map.Entry<Long, Set<LogicalFile>> entry = uploadedStatesIterator.next(); |
| if (discardLogicalFiles(subtaskKey, checkpointId, entry.getValue())) { |
| uploadedStatesIterator.remove(); |
| } |
| } |
| } |
| notifyReleaseCheckpoint(subtaskKey, checkpointId); |
| } |
| |
| private void notifyReleaseCheckpoint(SubtaskKey subtaskKey, long checkpointId) |
| throws IOException { |
| synchronized (notifyLock) { |
| if (notifiedCheckpoint.contains(checkpointId)) { |
| // already release, skip |
| return; |
| } |
| Set<SubtaskKey> knownSubtask = |
| notifiedSubtaskCheckpoint.computeIfAbsent(checkpointId, (e) -> new HashSet<>()); |
| knownSubtask.add(subtaskKey); |
| if (knownSubtask.containsAll(managedSharedStateDir.keySet())) { |
| // all known subtask has been notified. |
| tryDiscardCheckpoint(checkpointId); |
| } |
| // control the size of notifiedSubtaskCheckpoint |
| if (notifiedSubtaskCheckpoint.size() > NUM_GHOST_CHECKPOINT_IDS) { |
| notifiedSubtaskCheckpoint.pollFirstEntry(); |
| } |
| } |
| } |
| |
| private void tryDiscardCheckpoint(long checkpointId) throws IOException { |
| synchronized (notifyLock) { |
| if (!notifiedCheckpoint.contains(checkpointId)) { |
| notifiedCheckpoint.add(checkpointId); |
| notifiedSubtaskCheckpoint.remove(checkpointId); |
| discardCheckpoint(checkpointId); |
| if (notifiedCheckpoint.size() > NUM_GHOST_CHECKPOINT_IDS) { |
| notifiedCheckpoint.pollFirst(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void reusePreviousStateHandle( |
| long checkpointId, Collection<? extends StreamStateHandle> stateHandles) { |
| for (StreamStateHandle stateHandle : stateHandles) { |
| if (stateHandle instanceof SegmentFileStateHandle) { |
| LogicalFile file = |
| knownLogicalFiles.get( |
| ((SegmentFileStateHandle) stateHandle).getLogicalFileId()); |
| if (file != null) { |
| file.advanceLastCheckpointId(checkpointId); |
| } |
| } else if (stateHandle instanceof PlaceholderStreamStateHandle |
| && ((PlaceholderStreamStateHandle) stateHandle).isFileMerged()) { |
| // Since the rocksdb state backend will leverage the PlaceholderStreamStateHandle, |
| // the manager should recognize this. |
| LogicalFile file = |
| knownLogicalFiles.get( |
| new LogicalFileId( |
| stateHandle.getStreamStateHandleID().getKeyString())); |
| if (file != null) { |
| file.advanceLastCheckpointId(checkpointId); |
| } |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Space Control |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * The core method that control space if needed. This method will compare the desired space |
| * amplification with current one, and if it exceeds the configured amplification, this method |
| * will mark minimal set of {@link PhysicalFile}s not to be reused anymore. |
| */ |
| private void controlSpace() { |
| if (maxSpaceAmplification != Float.MAX_VALUE |
| && spaceStat.logicalFileSize.get() * maxSpaceAmplification |
| < spaceStat.physicalFileSize.get()) { |
| // may need control space |
| long goalPhysicalSize = |
| Math.round(spaceStat.logicalFileSize.get() * maxSpaceAmplification); |
| final AtomicLong aliveSize = new AtomicLong(0L); |
| // retrieve all the physical files and calculate current alive size |
| Set<PhysicalFile> knownPhysicalFiles = new HashSet<>(); |
| knownLogicalFiles.values().stream() |
| .map(LogicalFile::getPhysicalFile) |
| .forEach( |
| file -> { |
| if (file.isCouldReuse()) { |
| if (knownPhysicalFiles.add(file)) { |
| aliveSize.addAndGet(file.getSize()); |
| } |
| } |
| }); |
| // the alive size still greater than the goal |
| if (aliveSize.get() > goalPhysicalSize) { |
| // sort in DESC order on wasted size |
| SortedSet<PhysicalFile> sortedPhysicalFile = |
| new TreeSet<>((a, b) -> Long.compare(b.wastedSize(), a.wastedSize())); |
| knownPhysicalFiles.stream() |
| .filter(PhysicalFile::closed) |
| .forEach(sortedPhysicalFile::add); |
| // mark the physical file un-alive, until it reaches our goal. |
| for (PhysicalFile file : sortedPhysicalFile) { |
| if (!file.checkReuseOnSpaceAmplification(maxSpaceAmplification)) { |
| if (aliveSize.addAndGet(-file.wastedSize()) <= goalPhysicalSize) { |
| break; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean couldReusePreviousStateHandle(StreamStateHandle stateHandle) { |
| if (stateHandle instanceof SegmentFileStateHandle) { |
| LogicalFile file = |
| knownLogicalFiles.get( |
| ((SegmentFileStateHandle) stateHandle).getLogicalFileId()); |
| if (file != null) { |
| return file.getPhysicalFile().isCouldReuse(); |
| } |
| } else if (stateHandle instanceof PlaceholderStreamStateHandle |
| && ((PlaceholderStreamStateHandle) stateHandle).isFileMerged()) { |
| // Since the rocksdb state backend will leverage the PlaceholderStreamStateHandle, |
| // the manager should recognize this. |
| LogicalFile file = |
| knownLogicalFiles.get( |
| new LogicalFileId(stateHandle.getStreamStateHandleID().getKeyString())); |
| if (file != null) { |
| return file.getPhysicalFile().isCouldReuse(); |
| } |
| } |
| // If a stateHandle is not of the type SegmentFileStateHandle or if its corresponding file |
| // is not recognized by the fileMergingManager, it needs to be re-uploaded. |
| return false; |
| } |
| |
| public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId) |
| throws IOException { |
| logicalFile.discardWithCheckpointId(checkpointId); |
| if (logicalFile.getPhysicalFile().isOwned()) { |
| spaceStat.onLogicalFileDelete(logicalFile.getLength()); |
| } |
| } |
| |
| private boolean discardLogicalFiles( |
| SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles) |
| throws Exception { |
| Iterator<LogicalFile> logicalFileIterator = logicalFiles.iterator(); |
| while (logicalFileIterator.hasNext()) { |
| LogicalFile logicalFile = logicalFileIterator.next(); |
| if (logicalFile.getSubtaskKey().equals(subtaskKey) |
| && logicalFile.getLastUsedCheckpointID() <= checkpointId) { |
| discardSingleLogicalFile(logicalFile, checkpointId); |
| logicalFileIterator.remove(); |
| knownLogicalFiles.remove(logicalFile.getFileId()); |
| } |
| } |
| |
| if (logicalFiles.isEmpty()) { |
| tryDiscardCheckpoint(checkpointId); |
| return true; |
| } |
| return false; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // file system |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope) { |
| if (scope.equals(CheckpointedStateScope.SHARED)) { |
| return managedSharedStateDir.get(subtaskKey); |
| } else { |
| return managedExclusiveStateDir; |
| } |
| } |
| |
| @Override |
| public DirectoryStreamStateHandle getManagedDirStateHandle( |
| SubtaskKey subtaskKey, CheckpointedStateScope scope) { |
| if (scope.equals(CheckpointedStateScope.SHARED)) { |
| return managedSharedStateDirHandles.get(subtaskKey); |
| } else { |
| return managedExclusiveStateDirHandle; |
| } |
| } |
| |
| static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) { |
| // Currently, we do file sync regardless of the file system. |
| // TODO: Determine whether do file sync more wisely. Add an interface to FileSystem if |
| // needed. |
| return true; |
| } |
| |
| // ------------------------------------------------------------------------ |
| // utilities |
| // ------------------------------------------------------------------------ |
| |
| private void createManagedDirectory(Path managedPath) { |
| try { |
| FileStatus fileStatus = null; |
| try { |
| fileStatus = fs.getFileStatus(managedPath); |
| } catch (FileNotFoundException e) { |
| // expected exception when the path not exist, and we ignore it. |
| } |
| if (fileStatus == null) { |
| fs.mkdirs(managedPath); |
| LOG.info("Created a directory {} for checkpoint file-merging.", managedPath); |
| } else if (fileStatus.isDir()) { |
| LOG.info("Reusing previous directory {} for checkpoint file-merging.", managedPath); |
| } else { |
| throw new FlinkRuntimeException( |
| "The managed path " |
| + managedPath |
| + " for file-merging is occupied by another file. Cannot create directory."); |
| } |
| } catch (IOException e) { |
| throw new FlinkRuntimeException( |
| "Cannot create directory " + managedPath + " for file-merging ", e); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException {} |
| |
| // ------------------------------------------------------------------------ |
| // restore |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public void restoreStateHandles( |
| long checkpointId, SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stateHandles) { |
| |
| synchronized (lock) { |
| Set<LogicalFile> restoredLogicalFiles = |
| uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet<>()); |
| |
| Map<Path, PhysicalFile> knownPhysicalFiles = new HashMap<>(); |
| knownLogicalFiles.values().stream() |
| .map(LogicalFile::getPhysicalFile) |
| .forEach(file -> knownPhysicalFiles.putIfAbsent(file.getFilePath(), file)); |
| |
| stateHandles.forEach( |
| fileHandle -> { |
| PhysicalFile physicalFile = |
| knownPhysicalFiles.computeIfAbsent( |
| fileHandle.getFilePath(), |
| path -> { |
| boolean managedByFileMergingManager = |
| fileSystemInitiated |
| && isManagedByFileMergingManager( |
| path, |
| subtaskKey, |
| fileHandle.getScope()); |
| PhysicalFile file = |
| new PhysicalFile( |
| null, |
| path, |
| physicalFileDeleter, |
| fileHandle.getScope(), |
| managedByFileMergingManager); |
| try { |
| file.updateSize(getFileSize(file)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| if (managedByFileMergingManager) { |
| spaceStat.onPhysicalFileCreate(); |
| spaceStat.onPhysicalFileUpdate(file.getSize()); |
| } |
| return file; |
| }); |
| |
| LogicalFileId logicalFileId = fileHandle.getLogicalFileId(); |
| LogicalFile logicalFile = |
| new LogicalFile( |
| logicalFileId, |
| physicalFile, |
| fileHandle.getStartPos(), |
| fileHandle.getStateSize(), |
| subtaskKey); |
| |
| if (physicalFile.isOwned()) { |
| spaceStat.onLogicalFileCreate(logicalFile.getLength()); |
| } |
| knownLogicalFiles.put(logicalFileId, logicalFile); |
| logicalFile.advanceLastCheckpointId(checkpointId); |
| restoredLogicalFiles.add(logicalFile); |
| }); |
| } |
| } |
| |
| private long getFileSize(PhysicalFile file) throws IOException { |
| FileStatus fileStatus = |
| file.getFilePath().getFileSystem().getFileStatus(file.getFilePath()); |
| if (fileStatus == null || fileStatus.isDir()) { |
| throw new FileNotFoundException("File " + file.getFilePath() + " does not exist."); |
| } else { |
| return fileStatus.getLen(); |
| } |
| } |
| |
| /** |
| * Distinguish whether the given filePath is managed by the FileMergingSnapshotManager. If the |
| * filePath is located under managedDir (managedSharedStateDir or managedExclusiveStateDir) as a |
| * subFile, it should be managed by the FileMergingSnapshotManager. |
| */ |
| private boolean isManagedByFileMergingManager( |
| Path filePath, SubtaskKey subtaskKey, CheckpointedStateScope scope) { |
| if (scope == CheckpointedStateScope.SHARED) { |
| Path managedDir = managedSharedStateDir.get(subtaskKey); |
| return filePath.toString().startsWith(managedDir.toString()); |
| } |
| if (scope == CheckpointedStateScope.EXCLUSIVE) { |
| return filePath.toString().startsWith(managedExclusiveStateDir.toString()); |
| } |
| throw new UnsupportedOperationException("Unsupported CheckpointStateScope " + scope); |
| } |
| |
| @VisibleForTesting |
| public LogicalFile getLogicalFile(LogicalFileId fileId) { |
| return knownLogicalFiles.get(fileId); |
| } |
| |
| @VisibleForTesting |
| TreeMap<Long, Set<LogicalFile>> getUploadedStates() { |
| return uploadedStates; |
| } |
| |
| @VisibleForTesting |
| boolean isCheckpointDiscard(long checkpointId) { |
| return notifiedCheckpoint.contains(checkpointId); |
| } |
| } |