blob: 340bb917d41ae73a9b8e5706647da4e53d81321a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.checkpoint.filemerging;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link WithinCheckpointFileMergingSnapshotManager}. */
public class WithinCheckpointFileMergingSnapshotManagerTest
extends FileMergingSnapshotManagerTestBase {
@Override
FileMergingType getFileMergingType() {
return FileMergingType.MERGE_WITHIN_CHECKPOINT;
}
@Test
void testCreateAndReuseFiles() throws IOException {
try (FileMergingSnapshotManagerBase fmsm =
(FileMergingSnapshotManagerBase)
createFileMergingSnapshotManager(checkpointBaseDir)) {
fmsm.registerSubtaskForSharedStates(subtaskKey1);
fmsm.registerSubtaskForSharedStates(subtaskKey2);
// firstly, we try shared state.
PhysicalFile file1 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file1.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
// allocate another
PhysicalFile file2 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file2.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file2).isNotEqualTo(file1);
// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
// allocate for another subtask
PhysicalFile file3 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey2, 0, CheckpointedStateScope.SHARED);
assertThat(file3.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED));
assertThat(file3).isNotEqualTo(file1);
// allocate for another checkpoint
PhysicalFile file4 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.SHARED);
assertThat(file4.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file4).isNotEqualTo(file1);
// allocate for this checkpoint
PhysicalFile file5 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file5.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file5).isEqualTo(file1);
// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file5.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5);
PhysicalFile file6 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file6.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file6).isNotEqualTo(file5);
// Secondly, we try private state
PhysicalFile file7 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
assertThat(file7.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
// allocate another
PhysicalFile file8 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
assertThat(file8.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(file8).isNotEqualTo(file6);
// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7);
// allocate for another checkpoint
PhysicalFile file9 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
assertThat(file9.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(file9).isNotEqualTo(file7);
// allocate for this checkpoint but another subtask
PhysicalFile file10 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE);
assertThat(file10.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE));
assertThat(file10).isEqualTo(file7);
// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file10.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10);
PhysicalFile file11 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file11.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file11).isNotEqualTo(file10);
assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE))
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
}
}
@Test
public void testCheckpointNotification() throws Exception {
try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
FileMergingCheckpointStateOutputStream cp1Stream =
writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 1);
assertFileInManagedDir(fmsm, cp1StateHandle);
// complete checkpoint-2
FileMergingCheckpointStateOutputStream cp2Stream =
writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 2);
assertFileInManagedDir(fmsm, cp2StateHandle);
// subsume checkpoint-1
assertThat(fileExists(cp1StateHandle)).isTrue();
fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
assertThat(fileExists(cp1StateHandle)).isFalse();
// abort checkpoint-3
FileMergingCheckpointStateOutputStream cp3Stream =
writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
assertFileInManagedDir(fmsm, cp3StateHandle);
fmsm.notifyCheckpointAborted(subtaskKey1, 3);
assertThat(fileExists(cp3StateHandle)).isFalse();
}
}
}