blob: f0edf962154c5b6d1f960a8685fa33ed5997fcbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StateDirectory.PROCESS_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.toTaskDirString;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StateDirectoryTest {
private final MockTime time = new MockTime();
private File stateDir;
private final String applicationId = "applicationId";
private StateDirectory directory;
private File appDir;
private void initializeStateDirectory(final boolean createStateDirectory, final boolean hasNamedTopology) throws IOException {
stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5));
if (!createStateDirectory) {
cleanup();
}
directory = new StateDirectory(
new StreamsConfig(new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
}
}),
time, createStateDirectory, hasNamedTopology);
appDir = new File(stateDir, applicationId);
}
@Before
public void before() throws IOException {
initializeStateDirectory(true, false);
}
@After
public void cleanup() throws IOException {
Utils.delete(stateDir);
}
@Test
public void shouldCreateBaseDirectory() {
assertTrue(stateDir.exists());
assertTrue(stateDir.isDirectory());
assertTrue(appDir.exists());
assertTrue(appDir.isDirectory());
}
@Test
public void shouldHaveSecurePermissions() {
assertPermissions(stateDir);
assertPermissions(appDir);
}
private void assertPermissions(final File file) {
final Path path = file.toPath();
if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
final Set<PosixFilePermission> expectedPermissions = EnumSet.of(
PosixFilePermission.OWNER_EXECUTE,
PosixFilePermission.GROUP_READ,
PosixFilePermission.OWNER_WRITE,
PosixFilePermission.GROUP_EXECUTE,
PosixFilePermission.OWNER_READ);
try {
final Set<PosixFilePermission> filePermissions = Files.getPosixFilePermissions(path);
assertThat(expectedPermissions, equalTo(filePermissions));
} catch (final IOException e) {
fail("Should create correct files and set correct permissions");
}
} else {
assertThat(file.canRead(), is(true));
assertThat(file.canWrite(), is(true));
assertThat(file.canExecute(), is(true));
}
}
@Test
public void shouldParseUnnamedTaskId() {
final TaskId task = new TaskId(1, 0);
assertThat(TaskId.parse(task.toString()), equalTo(task));
}
@Test
public void shouldParseNamedTaskId() {
final TaskId task = new TaskId(1, 0, "namedTopology");
assertThat(TaskId.parse(task.toString()), equalTo(task));
}
@Test
public void shouldCreateTaskStateDirectory() {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
assertTrue(taskDirectory.exists());
assertTrue(taskDirectory.isDirectory());
}
@Test
public void shouldBeTrueIfAlreadyHoldsLock() {
final TaskId taskId = new TaskId(0, 0);
directory.getOrCreateDirectoryForTask(taskId);
directory.lock(taskId);
try {
assertTrue(directory.lock(taskId));
} finally {
directory.unlock(taskId);
}
}
@Test
public void shouldBeAbleToUnlockEvenWithoutLocking() {
final TaskId taskId = new TaskId(0, 0);
directory.unlock(taskId);
}
@Test
public void shouldReportDirectoryEmpty() throws IOException {
final TaskId taskId = new TaskId(0, 0);
// when task dir first created, it should be empty
assertTrue(directory.directoryForTaskIsEmpty(taskId));
// after locking, it should still be empty
directory.lock(taskId);
assertTrue(directory.directoryForTaskIsEmpty(taskId));
// after writing checkpoint, it should still be empty
final OffsetCheckpoint checkpointFile = new OffsetCheckpoint(new File(directory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME));
assertTrue(directory.directoryForTaskIsEmpty(taskId));
checkpointFile.write(Collections.singletonMap(new TopicPartition("topic", 0), 0L));
assertTrue(directory.directoryForTaskIsEmpty(taskId));
// if some store dir is created, it should not be empty
final File dbDir = new File(new File(directory.getOrCreateDirectoryForTask(taskId), "db"), "store1");
Files.createDirectories(dbDir.getParentFile().toPath());
Files.createDirectories(dbDir.getAbsoluteFile().toPath());
assertFalse(directory.directoryForTaskIsEmpty(taskId));
// after wiping out the state dir, the dir should show as empty again
Utils.delete(dbDir.getParentFile());
assertTrue(directory.directoryForTaskIsEmpty(taskId));
directory.unlock(taskId);
assertTrue(directory.directoryForTaskIsEmpty(taskId));
}
@Test
public void shouldThrowProcessorStateException() throws IOException {
final TaskId taskId = new TaskId(0, 0);
Utils.delete(stateDir);
assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId));
}
@Test
public void shouldThrowProcessorStateExceptionIfStateDirOccupied() throws IOException {
final TaskId taskId = new TaskId(0, 0);
// Replace application's stateDir to regular file
Utils.delete(appDir);
Files.createFile(appDir.toPath());
assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId));
}
@Test
public void shouldThrowProcessorStateExceptionIfTestDirOccupied() throws IOException {
final TaskId taskId = new TaskId(0, 0);
// Replace taskDir to a regular file
final File taskDir = new File(appDir, toTaskDirString(taskId));
Utils.delete(taskDir);
Files.createFile(taskDir.toPath());
// Error: ProcessorStateException should be thrown.
assertThrows(ProcessorStateException.class, () -> directory.getOrCreateDirectoryForTask(taskId));
}
@Test
public void shouldNotThrowIfStateDirectoryHasBeenDeleted() throws IOException {
final TaskId taskId = new TaskId(0, 0);
Utils.delete(stateDir);
assertThrows(IllegalStateException.class, () -> directory.lock(taskId));
}
@Test
public void shouldLockMultipleTaskDirectories() {
final TaskId taskId = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(1, 0);
assertThat(directory.lock(taskId), is(true));
assertThat(directory.lock(taskId2), is(true));
directory.unlock(taskId);
directory.unlock(taskId2);
}
@Test
public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() {
final TaskId task0 = new TaskId(0, 0);
final TaskId task1 = new TaskId(1, 0);
final TaskId task2 = new TaskId(2, 0);
try {
assertTrue(new File(directory.getOrCreateDirectoryForTask(task0), "store").mkdir());
assertTrue(new File(directory.getOrCreateDirectoryForTask(task1), "store").mkdir());
assertTrue(new File(directory.getOrCreateDirectoryForTask(task2), "store").mkdir());
directory.lock(task0);
directory.lock(task1);
final TaskDirectory dir0 = new TaskDirectory(new File(appDir, toTaskDirString(task0)), null);
final TaskDirectory dir1 = new TaskDirectory(new File(appDir, toTaskDirString(task1)), null);
final TaskDirectory dir2 = new TaskDirectory(new File(appDir, toTaskDirString(task2)), null);
List<TaskDirectory> files = directory.listAllTaskDirectories();
assertEquals(mkSet(dir0, dir1, dir2), new HashSet<>(files));
files = directory.listNonEmptyTaskDirectories();
assertEquals(mkSet(dir0, dir1, dir2), new HashSet<>(files));
time.sleep(5000);
directory.cleanRemovedTasks(0);
files = directory.listAllTaskDirectories();
assertEquals(mkSet(dir0, dir1), new HashSet<>(files));
files = directory.listNonEmptyTaskDirectories();
assertEquals(mkSet(dir0, dir1), new HashSet<>(files));
} finally {
directory.unlock(task0);
directory.unlock(task1);
}
}
@Test
public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
final File dir = directory.getOrCreateDirectoryForTask(new TaskId(2, 0));
assertTrue(new File(dir, "store").mkdir());
final int cleanupDelayMs = 60000;
directory.cleanRemovedTasks(cleanupDelayMs);
assertTrue(dir.exists());
assertEquals(1, directory.listAllTaskDirectories().size());
assertEquals(1, directory.listNonEmptyTaskDirectories().size());
time.sleep(cleanupDelayMs + 1000);
directory.cleanRemovedTasks(cleanupDelayMs);
assertFalse(dir.exists());
assertEquals(0, directory.listAllTaskDirectories().size());
assertEquals(0, directory.listNonEmptyTaskDirectories().size());
}
@Test
public void shouldCleanupObsoleteTaskDirectoriesAndDeleteTheDirectoryItself() {
final File dir = directory.getOrCreateDirectoryForTask(new TaskId(2, 0));
assertTrue(new File(dir, "store").mkdir());
assertEquals(1, directory.listAllTaskDirectories().size());
assertEquals(1, directory.listNonEmptyTaskDirectories().size());
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
time.sleep(5000);
directory.cleanRemovedTasks(0);
assertFalse(dir.exists());
assertEquals(0, directory.listAllTaskDirectories().size());
assertEquals(0, directory.listNonEmptyTaskDirectories().size());
assertThat(
appender.getMessages(),
hasItem(containsString("Deleting obsolete state directory"))
);
}
}
@Test
public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");
directory.cleanRemovedTasks(0);
assertTrue(otherDir.exists());
}
@Test
public void shouldReturnEmptyArrayForNonPersistentApp() throws IOException {
initializeStateDirectory(false, false);
assertTrue(directory.listAllTaskDirectories().isEmpty());
}
@Test
public void shouldReturnEmptyArrayIfStateDirDoesntExist() throws IOException {
cleanup();
assertFalse(stateDir.exists());
assertTrue(directory.listAllTaskDirectories().isEmpty());
}
@Test
public void shouldReturnEmptyArrayIfListFilesReturnsNull() throws IOException {
stateDir = new File(TestUtils.IO_TMP_DIR, "kafka-" + TestUtils.randomString(5));
directory = new StateDirectory(
new StreamsConfig(new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
}
}),
time,
true,
false);
appDir = new File(stateDir, applicationId);
// make sure the File#listFiles returns null and StateDirectory#listAllTaskDirectories is able to handle null
Utils.delete(appDir);
Files.createFile(appDir.toPath());
assertTrue(Files.exists(appDir.toPath()));
assertNull(appDir.listFiles());
assertEquals(0, directory.listAllTaskDirectories().size());
}
@Test
public void shouldOnlyListNonEmptyTaskDirectories() throws IOException {
TestUtils.tempDirectory(stateDir.toPath(), "foo");
final TaskDirectory taskDir1 = new TaskDirectory(directory.getOrCreateDirectoryForTask(new TaskId(0, 0)), null);
final TaskDirectory taskDir2 = new TaskDirectory(directory.getOrCreateDirectoryForTask(new TaskId(0, 1)), null);
final File storeDir = new File(taskDir1.file(), "store");
assertTrue(storeDir.mkdir());
assertThat(mkSet(taskDir1, taskDir2), equalTo(new HashSet<>(directory.listAllTaskDirectories())));
assertThat(singletonList(taskDir1), equalTo(directory.listNonEmptyTaskDirectories()));
Utils.delete(taskDir1.file());
assertThat(singleton(taskDir2), equalTo(new HashSet<>(directory.listAllTaskDirectories())));
assertThat(emptyList(), equalTo(directory.listNonEmptyTaskDirectories()));
}
@Test
public void shouldCreateDirectoriesIfParentDoesntExist() {
final File tempDir = TestUtils.tempDirectory();
final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
final StateDirectory stateDirectory = new StateDirectory(
new StreamsConfig(new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
}
}),
time,
true,
false);
final File taskDir = stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 0));
assertTrue(stateDir.exists());
assertTrue(taskDir.exists());
}
@Test
public void shouldNotLockStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final Thread thread = new Thread(() -> directory.lock(taskId));
thread.start();
thread.join(30000);
assertFalse(directory.lock(taskId));
}
@Test
public void shouldNotUnLockStateDirLockedByAnotherThread() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final CountDownLatch lockLatch = new CountDownLatch(1);
final CountDownLatch unlockLatch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>();
final Thread thread = new Thread(() -> {
try {
directory.lock(taskId);
lockLatch.countDown();
unlockLatch.await();
directory.unlock(taskId);
} catch (final Exception e) {
exceptionOnThread.set(e);
}
});
thread.start();
lockLatch.await(5, TimeUnit.SECONDS);
assertNull("should not have had an exception on other thread", exceptionOnThread.get());
directory.unlock(taskId);
assertFalse(directory.lock(taskId));
unlockLatch.countDown();
thread.join(30000);
assertNull("should not have had an exception on other thread", exceptionOnThread.get());
assertTrue(directory.lock(taskId));
}
@Test
public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
final TaskId id = new TaskId(1, 0);
directory.getOrCreateDirectoryForTask(id);
directory.globalStateDir();
final File dir0 = new File(appDir, id.toString());
final File globalDir = new File(appDir, "global");
assertEquals(mkSet(dir0, globalDir), Arrays.stream(
Objects.requireNonNull(appDir.listFiles())).collect(Collectors.toSet()));
directory.clean();
// if appDir is empty, it is deleted in StateDirectory#clean process.
assertFalse(appDir.exists());
}
@Test
public void shouldNotCreateBaseDirectory() throws IOException {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
initializeStateDirectory(false, false);
assertThat(stateDir.exists(), is(false));
assertThat(appDir.exists(), is(false));
assertThat(appender.getMessages(),
not(hasItem(containsString("Error changing permissions for the state or base directory"))));
}
}
@Test
public void shouldNotCreateTaskStateDirectory() throws IOException {
initializeStateDirectory(false, false);
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
assertFalse(taskDirectory.exists());
}
@Test
public void shouldNotCreateGlobalStateDirectory() throws IOException {
initializeStateDirectory(false, false);
final File globalStateDir = directory.globalStateDir();
assertFalse(globalStateDir.exists());
}
@Test
public void shouldLockTaskStateDirectoryWhenDirectoryCreationDisabled() throws IOException {
initializeStateDirectory(false, false);
final TaskId taskId = new TaskId(0, 0);
assertTrue(directory.lock(taskId));
}
@Test
public void shouldNotFailWhenCreatingTaskDirectoryInParallel() throws Exception {
final TaskId taskId = new TaskId(0, 0);
final AtomicBoolean passed = new AtomicBoolean(true);
final CreateTaskDirRunner runner = new CreateTaskDirRunner(directory, taskId, passed);
final Thread t1 = new Thread(runner);
final Thread t2 = new Thread(runner);
t1.start();
t2.start();
t1.join(Duration.ofMillis(500L).toMillis());
t2.join(Duration.ofMillis(500L).toMillis());
assertNotNull(runner.taskDirectory);
assertTrue(passed.get());
assertTrue(runner.taskDirectory.exists());
assertTrue(runner.taskDirectory.isDirectory());
}
@Test
public void shouldDeleteAppDirWhenCleanUpIfEmpty() {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
final File testFile = new File(taskDirectory, "testFile");
assertThat(testFile.mkdir(), is(true));
assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
// call StateDirectory#clean
directory.clean();
// if appDir is empty, it is deleted in StateDirectory#clean process.
assertFalse(appDir.exists());
}
@Test
public void shouldNotDeleteAppDirWhenCleanUpIfNotEmpty() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
final File testFile = new File(taskDirectory, "testFile");
assertThat(testFile.mkdir(), is(true));
assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
// Create a dummy file in appDir; for this, appDir will not be empty after cleanup.
final File dummyFile = new File(appDir, "dummy");
Files.createFile(dummyFile.toPath());
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
// call StateDirectory#clean
directory.clean();
assertThat(
appender.getMessages(),
hasItem(endsWith(String.format("Failed to delete state store directory of %s for it is not empty", appDir.getAbsolutePath())))
);
}
}
@Test
public void shouldLogManualUserCallMessage() {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
final File testFile = new File(taskDirectory, "testFile");
assertThat(testFile.mkdir(), is(true));
assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
directory.clean();
assertThat(
appender.getMessages(),
hasItem(endsWith("as user calling cleanup."))
);
}
}
@Test
public void shouldLogStateDirCleanerMessage() {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
final File testFile = new File(taskDirectory, "testFile");
assertThat(testFile.mkdir(), is(true));
assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
final long cleanupDelayMs = 0;
time.sleep(5000);
directory.cleanRemovedTasks(cleanupDelayMs);
assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup delay is " + cleanupDelayMs + "ms).")));
}
}
@Test
public void shouldLogTempDirMessage() {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
new StateDirectory(
new StreamsConfig(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "")
)
),
new MockTime(),
true,
false
);
assertThat(
appender.getMessages(),
hasItem("Using an OS temp directory in the state.dir property can cause failures with writing the" +
" checkpoint file due to the fact that this directory can be cleared by the OS." +
" Resolved state.dir: [" + System.getProperty("java.io.tmpdir") + "/kafka-streams]")
);
}
}
/* Named Topology Tests */
@Test
public void shouldCreateTaskDirectoriesUnderNamedTopologyDirs() throws IOException {
initializeStateDirectory(true, true);
directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology1"));
directory.getOrCreateDirectoryForTask(new TaskId(0, 1, "topology1"));
directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology2"));
assertThat(new File(appDir, "__topology1__").exists(), is(true));
assertThat(new File(appDir, "__topology1__").isDirectory(), is(true));
assertThat(new File(appDir, "__topology2__").exists(), is(true));
assertThat(new File(appDir, "__topology2__").isDirectory(), is(true));
assertThat(new File(new File(appDir, "__topology1__"), "0_0").exists(), is(true));
assertThat(new File(new File(appDir, "__topology1__"), "0_0").isDirectory(), is(true));
assertThat(new File(new File(appDir, "__topology1__"), "0_1").exists(), is(true));
assertThat(new File(new File(appDir, "__topology1__"), "0_1").isDirectory(), is(true));
assertThat(new File(new File(appDir, "__topology2__"), "0_0").exists(), is(true));
assertThat(new File(new File(appDir, "__topology2__"), "0_0").isDirectory(), is(true));
}
@Test
public void shouldOnlyListNonEmptyTaskDirectoriesInNamedTopologies() throws IOException {
initializeStateDirectory(true, true);
TestUtils.tempDirectory(appDir.toPath(), "foo");
final TaskDirectory taskDir1 = new TaskDirectory(directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology1")), "topology1");
final TaskDirectory taskDir2 = new TaskDirectory(directory.getOrCreateDirectoryForTask(new TaskId(0, 1, "topology1")), "topology1");
final TaskDirectory taskDir3 = new TaskDirectory(directory.getOrCreateDirectoryForTask(new TaskId(0, 0, "topology2")), "topology2");
final File storeDir = new File(taskDir1.file(), "store");
assertTrue(storeDir.mkdir());
assertThat(new HashSet<>(directory.listAllTaskDirectories()), equalTo(mkSet(taskDir1, taskDir2, taskDir3)));
assertThat(directory.listNonEmptyTaskDirectories(), equalTo(singletonList(taskDir1)));
Utils.delete(taskDir1.file());
assertThat(new HashSet<>(directory.listAllTaskDirectories()), equalTo(mkSet(taskDir2, taskDir3)));
assertThat(directory.listNonEmptyTaskDirectories(), equalTo(emptyList()));
}
@Test
public void shouldRemoveNonEmptyNamedTopologyDirsWhenCallingClean() throws Exception {
initializeStateDirectory(true, true);
final File taskDir = directory.getOrCreateDirectoryForTask(new TaskId(2, 0, "topology1"));
final File namedTopologyDir = new File(appDir, "__topology1__");
assertThat(taskDir.exists(), is(true));
assertThat(namedTopologyDir.exists(), is(true));
directory.clean();
assertThat(taskDir.exists(), is(false));
assertThat(namedTopologyDir.exists(), is(false));
}
@Test
public void shouldRemoveEmptyNamedTopologyDirsWhenCallingClean() throws IOException {
initializeStateDirectory(true, true);
final File namedTopologyDir = new File(appDir, "__topology1__");
assertThat(namedTopologyDir.mkdir(), is(true));
assertThat(namedTopologyDir.exists(), is(true));
directory.clean();
assertThat(namedTopologyDir.exists(), is(false));
}
@Test
public void shouldRemoveNonEmptyNamedTopologyDirsWhenCallingClearLocalStateForNamedTopology() throws Exception {
initializeStateDirectory(true, true);
final String topologyName = "topology1";
final File taskDir = directory.getOrCreateDirectoryForTask(new TaskId(2, 0, topologyName));
final File namedTopologyDir = new File(appDir, "__" + topologyName + "__");
assertThat(taskDir.exists(), is(true));
assertThat(namedTopologyDir.exists(), is(true));
directory.clearLocalStateForNamedTopology(topologyName);
assertThat(taskDir.exists(), is(false));
assertThat(namedTopologyDir.exists(), is(false));
}
@Test
public void shouldRemoveEmptyNamedTopologyDirsWhenCallingClearLocalStateForNamedTopology() throws IOException {
initializeStateDirectory(true, true);
final String topologyName = "topology1";
final File namedTopologyDir = new File(appDir, "__" + topologyName + "__");
assertThat(namedTopologyDir.mkdir(), is(true));
assertThat(namedTopologyDir.exists(), is(true));
directory.clearLocalStateForNamedTopology(topologyName);
assertThat(namedTopologyDir.exists(), is(false));
}
@Test
public void shouldNotRemoveDirsThatDoNotMatchNamedTopologyDirsWhenCallingClean() throws IOException {
initializeStateDirectory(true, true);
final File someDir = new File(appDir, "_not-a-valid-named-topology_dir_name_");
assertThat(someDir.mkdir(), is(true));
assertThat(someDir.exists(), is(true));
directory.clean();
assertThat(someDir.exists(), is(true));
}
@Test
public void shouldCleanupObsoleteTaskDirectoriesInNamedTopologiesAndDeleteTheParentDirectories() throws IOException {
initializeStateDirectory(true, true);
final File taskDir = directory.getOrCreateDirectoryForTask(new TaskId(2, 0, "topology1"));
final File namedTopologyDir = new File(appDir, "__topology1__");
assertThat(namedTopologyDir.exists(), is(true));
assertThat(taskDir.exists(), is(true));
assertTrue(new File(taskDir, "store").mkdir());
assertThat(directory.listAllTaskDirectories().size(), is(1));
assertThat(directory.listNonEmptyTaskDirectories().size(), is(1));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
time.sleep(5000);
directory.cleanRemovedTasks(0);
assertThat(taskDir.exists(), is(false));
assertThat(namedTopologyDir.exists(), is(false));
assertThat(directory.listAllTaskDirectories().size(), is(0));
assertThat(directory.listNonEmptyTaskDirectories().size(), is(0));
assertThat(
appender.getMessages(),
hasItem(containsString("Deleting obsolete state directory"))
);
}
}
@Test
public void shouldPersistProcessIdAcrossRestart() {
final UUID processId = directory.initializeProcessId();
directory.close();
assertThat(directory.initializeProcessId(), equalTo(processId));
}
@Test
public void shouldGetFreshProcessIdIfProcessFileDeleted() {
final UUID processId = directory.initializeProcessId();
directory.close();
final File processFile = new File(appDir, PROCESS_FILE_NAME);
assertThat(processFile.exists(), is(true));
assertThat(processFile.delete(), is(true));
assertThat(directory.initializeProcessId(), not(processId));
}
@Test
public void shouldGetFreshProcessIdIfJsonUnreadable() throws Exception {
final File processFile = new File(appDir, PROCESS_FILE_NAME);
Files.createFile(processFile.toPath());
final UUID processId = UUID.randomUUID();
final FileOutputStream fileOutputStream = new FileOutputStream(processFile);
try (final BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
writer.write(processId.toString());
writer.flush();
fileOutputStream.getFD().sync();
}
assertThat(directory.initializeProcessId(), not(processId));
}
@Test
public void shouldReadFutureProcessFileFormat() throws Exception {
final File processFile = new File(appDir, PROCESS_FILE_NAME);
final ObjectMapper mapper = new ObjectMapper();
final UUID processId = UUID.randomUUID();
mapper.writeValue(processFile, new FutureStateDirectoryProcessFile(processId, "some random junk"));
assertThat(directory.initializeProcessId(), equalTo(processId));
}
private static class FutureStateDirectoryProcessFile {
@JsonProperty
private final UUID processId;
@JsonProperty
private final String newField;
// required by jackson -- do not remove, your IDE may be warning that this is unused but it's lying to you
public FutureStateDirectoryProcessFile() {
this.processId = null;
this.newField = null;
}
FutureStateDirectoryProcessFile(final UUID processId, final String newField) {
this.processId = processId;
this.newField = newField;
}
}
private static class CreateTaskDirRunner implements Runnable {
private final StateDirectory directory;
private final TaskId taskId;
private final AtomicBoolean passed;
private File taskDirectory;
private CreateTaskDirRunner(final StateDirectory directory,
final TaskId taskId,
final AtomicBoolean passed) {
this.directory = directory;
this.taskId = taskId;
this.passed = passed;
}
@Override
public void run() {
try {
taskDirectory = directory.getOrCreateDirectoryForTask(taskId);
} catch (final ProcessorStateException error) {
passed.set(false);
}
}
}
}