| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.core.fs; |
| |
| import org.apache.flink.core.io.SimpleVersionedSerializer; |
| import org.apache.flink.util.StringUtils; |
| import org.apache.flink.util.TestLogger; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Random; |
| |
| import static org.junit.Assert.fail; |
| |
| /** |
| * A base test-suite for the {@link RecoverableWriter}. |
| * This should be subclassed to test each filesystem specific writer. |
| */ |
| public abstract class AbstractRecoverableWriterTest extends TestLogger { |
| |
| private static final Random RND = new Random(); |
| |
| private static final String testData1 = "THIS IS A TEST 1."; |
| private static final String testData2 = "THIS IS A TEST 2."; |
| private static final String testData3 = "THIS IS A TEST 3."; |
| |
| private Path basePathForTest; |
| |
| private static FileSystem fileSystem; |
| |
| public abstract Path getBasePath() throws Exception; |
| |
| public abstract FileSystem initializeFileSystem() throws Exception; |
| |
| public Path getBasePathForTest() { |
| return basePathForTest; |
| } |
| |
| private FileSystem getFileSystem() throws Exception { |
| if (fileSystem == null) { |
| fileSystem = initializeFileSystem(); |
| } |
| return fileSystem; |
| } |
| |
| private RecoverableWriter getNewFileSystemWriter() throws Exception { |
| return getFileSystem().createRecoverableWriter(); |
| } |
| |
| @Before |
| public void prepare() throws Exception { |
| basePathForTest = new Path(getBasePath(), randomName()); |
| getFileSystem().mkdirs(basePathForTest); |
| } |
| |
| @After |
| public void cleanup() throws Exception { |
| getFileSystem().delete(basePathForTest, true); |
| } |
| |
| @Test |
| public void testCloseWithNoData() throws Exception { |
| final RecoverableWriter writer = getNewFileSystemWriter(); |
| |
| final Path testDir = getBasePathForTest(); |
| |
| final Path path = new Path(testDir, "part-0"); |
| |
| final RecoverableFsDataOutputStream stream = writer.open(path); |
| for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) { |
| Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress.")); |
| Assert.assertTrue(fileContents.getValue().isEmpty()); |
| } |
| |
| stream.closeForCommit().commit(); |
| |
| for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) { |
| Assert.assertEquals("part-0", fileContents.getKey().getName()); |
| Assert.assertTrue(fileContents.getValue().isEmpty()); |
| } |
| } |
| |
| @Test |
| public void testCommitAfterNormalClose() throws Exception { |
| final RecoverableWriter writer = getNewFileSystemWriter(); |
| |
| final Path testDir = getBasePathForTest(); |
| |
| final Path path = new Path(testDir, "part-0"); |
| |
| try (final RecoverableFsDataOutputStream stream = writer.open(path)) { |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| stream.closeForCommit().commit(); |
| |
| for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) { |
| Assert.assertEquals("part-0", fileContents.getKey().getName()); |
| Assert.assertEquals(testData1, fileContents.getValue()); |
| } |
| } |
| } |
| |
| @Test |
| public void testCommitAfterPersist() throws Exception { |
| final RecoverableWriter writer = getNewFileSystemWriter(); |
| |
| final Path testDir = getBasePathForTest(); |
| |
| final Path path = new Path(testDir, "part-0"); |
| |
| try (final RecoverableFsDataOutputStream stream = writer.open(path)) { |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| stream.persist(); |
| |
| stream.write(testData2.getBytes(StandardCharsets.UTF_8)); |
| stream.closeForCommit().commit(); |
| |
| for (Map.Entry<Path, String> fileContents : getFileContentByPath(testDir).entrySet()) { |
| Assert.assertEquals("part-0", fileContents.getKey().getName()); |
| Assert.assertEquals(testData1 + testData2, fileContents.getValue()); |
| } |
| } |
| } |
| |
| // TESTS FOR RECOVERY |
| |
| private static final String INIT_EMPTY_PERSIST = "EMPTY"; |
| private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE"; |
| private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE"; |
| private static final String FINAL_WITH_EXTRA_STATE = "FINAL"; |
| |
| @Test |
| public void testRecoverWithEmptyState() throws Exception { |
| testResumeAfterMultiplePersist( |
| INIT_EMPTY_PERSIST, |
| "", |
| testData3); |
| } |
| |
| @Test |
| public void testRecoverWithState() throws Exception { |
| testResumeAfterMultiplePersist( |
| INTERM_WITH_STATE_PERSIST, |
| testData1, |
| testData1 + testData3); |
| } |
| |
| @Test |
| public void testRecoverFromIntermWithoutAdditionalState() throws Exception { |
| testResumeAfterMultiplePersist( |
| INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, |
| testData1, |
| testData1 + testData3); |
| } |
| |
| @Test |
| public void testRecoverAfterMultiplePersistsState() throws Exception { |
| testResumeAfterMultiplePersist( |
| FINAL_WITH_EXTRA_STATE, |
| testData1 + testData2, |
| testData1 + testData2 + testData3); |
| } |
| |
| private void testResumeAfterMultiplePersist( |
| final String persistName, |
| final String expectedPostRecoveryContents, |
| final String expectedFinalContents) throws Exception { |
| |
| final Path testDir = getBasePathForTest(); |
| final Path path = new Path(testDir, "part-0"); |
| |
| final RecoverableWriter initWriter = getNewFileSystemWriter(); |
| |
| final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4); |
| try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) { |
| recoverables.put(INIT_EMPTY_PERSIST, stream.persist()); |
| |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| |
| recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist()); |
| recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist()); |
| |
| // and write some more data |
| stream.write(testData2.getBytes(StandardCharsets.UTF_8)); |
| |
| recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist()); |
| } |
| |
| final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer(); |
| final byte[] serializedRecoverable = serializer.serialize(recoverables.get(persistName)); |
| |
| // get a new serializer from a new writer to make sure that no pre-initialized state leaks in. |
| final RecoverableWriter newWriter = getNewFileSystemWriter(); |
| final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> deserializer = newWriter.getResumeRecoverableSerializer(); |
| final RecoverableWriter.ResumeRecoverable recoveredRecoverable = |
| deserializer.deserialize(serializer.getVersion(), serializedRecoverable); |
| |
| try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) { |
| |
| // we expect the data to be truncated |
| Map<Path, String> files = getFileContentByPath(testDir); |
| Assert.assertEquals(1L, files.size()); |
| |
| for (Map.Entry<Path, String> fileContents : files.entrySet()) { |
| Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress.")); |
| Assert.assertEquals(expectedPostRecoveryContents, fileContents.getValue()); |
| } |
| |
| recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8)); |
| recoveredStream.closeForCommit().commit(); |
| |
| files = getFileContentByPath(testDir); |
| Assert.assertEquals(1L, files.size()); |
| |
| for (Map.Entry<Path, String> fileContents : files.entrySet()) { |
| Assert.assertEquals("part-0", fileContents.getKey().getName()); |
| Assert.assertEquals(expectedFinalContents, fileContents.getValue()); |
| } |
| } |
| } |
| |
| @Test |
| public void testCommitAfterRecovery() throws Exception { |
| final Path testDir = getBasePathForTest(); |
| final Path path = new Path(testDir, "part-0"); |
| |
| final RecoverableWriter initWriter = getNewFileSystemWriter(); |
| |
| final RecoverableWriter.CommitRecoverable recoverable; |
| try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) { |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| |
| stream.persist(); |
| stream.persist(); |
| |
| // and write some more data |
| stream.write(testData2.getBytes(StandardCharsets.UTF_8)); |
| |
| recoverable = stream.closeForCommit().getRecoverable(); |
| } |
| |
| final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable); |
| |
| // get a new serializer from a new writer to make sure that no pre-initialized state leaks in. |
| final RecoverableWriter newWriter = getNewFileSystemWriter(); |
| final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> deserializer = newWriter.getCommitRecoverableSerializer(); |
| final RecoverableWriter.CommitRecoverable recoveredRecoverable = deserializer.deserialize(deserializer.getVersion(), serializedRecoverable); |
| |
| final RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable); |
| committer.commitAfterRecovery(); |
| |
| Map<Path, String> files = getFileContentByPath(testDir); |
| Assert.assertEquals(1L, files.size()); |
| |
| for (Map.Entry<Path, String> fileContents : files.entrySet()) { |
| Assert.assertEquals("part-0", fileContents.getKey().getName()); |
| Assert.assertEquals(testData1 + testData2, fileContents.getValue()); |
| } |
| } |
| |
| // TESTS FOR EXCEPTIONS |
| |
| @Test(expected = IOException.class) |
| public void testExceptionWritingAfterCloseForCommit() throws Exception { |
| final Path testDir = getBasePathForTest(); |
| |
| final RecoverableWriter writer = getNewFileSystemWriter(); |
| final Path path = new Path(testDir, "part-0"); |
| |
| try (final RecoverableFsDataOutputStream stream = writer.open(path)) { |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| |
| stream.closeForCommit().getRecoverable(); |
| stream.write(testData2.getBytes(StandardCharsets.UTF_8)); |
| fail(); |
| } |
| } |
| |
| @Test(expected = IOException.class) |
| public void testResumeAfterCommit() throws Exception { |
| final Path testDir = getBasePathForTest(); |
| |
| final RecoverableWriter writer = getNewFileSystemWriter(); |
| final Path path = new Path(testDir, "part-0"); |
| |
| RecoverableWriter.ResumeRecoverable recoverable; |
| try (final RecoverableFsDataOutputStream stream = writer.open(path)) { |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| |
| recoverable = stream.persist(); |
| stream.write(testData2.getBytes(StandardCharsets.UTF_8)); |
| |
| stream.closeForCommit().commit(); |
| } |
| |
| // this should throw an exception as the file is already committed |
| writer.recover(recoverable); |
| fail(); |
| } |
| |
| @Test |
| public void testResumeWithWrongOffset() throws Exception { |
| // this is a rather unrealistic scenario, but it is to trigger |
| // truncation of the file and try to resume with missing data. |
| |
| final Path testDir = getBasePathForTest(); |
| |
| final RecoverableWriter writer = getNewFileSystemWriter(); |
| final Path path = new Path(testDir, "part-0"); |
| |
| final RecoverableWriter.ResumeRecoverable recoverable1; |
| final RecoverableWriter.ResumeRecoverable recoverable2; |
| try (final RecoverableFsDataOutputStream stream = writer.open(path)) { |
| stream.write(testData1.getBytes(StandardCharsets.UTF_8)); |
| |
| recoverable1 = stream.persist(); |
| stream.write(testData2.getBytes(StandardCharsets.UTF_8)); |
| |
| recoverable2 = stream.persist(); |
| stream.write(testData3.getBytes(StandardCharsets.UTF_8)); |
| } |
| |
| try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) { |
| // this should work fine |
| } catch (Exception e) { |
| fail(); |
| } |
| |
| // this should throw an exception |
| try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable2)) { |
| fail(); |
| } catch (IOException e) { |
| // we expect this |
| return; |
| } |
| fail(); |
| } |
| |
| private Map<Path, String> getFileContentByPath(Path directory) throws Exception { |
| Map<Path, String> contents = new HashMap<>(); |
| |
| final FileStatus[] filesInBucket = getFileSystem().listStatus(directory); |
| for (FileStatus file : filesInBucket) { |
| final long fileLength = file.getLen(); |
| byte[] serContents = new byte[(int) fileLength]; |
| |
| FSDataInputStream stream = getFileSystem().open(file.getPath()); |
| stream.read(serContents); |
| |
| contents.put(file.getPath(), new String(serContents, StandardCharsets.UTF_8)); |
| } |
| return contents; |
| } |
| |
| private static String randomName() { |
| return StringUtils.getRandomString(RND, 16, 16, 'a', 'z'); |
| } |
| } |