NIFI-13213 Added Validation for Swap File Names
This closes 8812
Signed-off-by: Joseph Witt <joewitt@apache.org>
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index b01bdc9..2b56ced 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -65,6 +65,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -79,6 +80,7 @@
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
+ private static final Pattern UUID_PATTERN = Pattern.compile("([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})");
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
@@ -133,11 +135,11 @@
return null;
}
- final String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString();
- final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
- final String swapFileName = swapFileBaseName + ".swap";
+ final String swapFileName = getSwapFileName(flowFileQueue.getIdentifier(), partitionName);
+ final Path storageDirectoryPath = storageDirectory.toPath();
+ final Path swapFilePath = storageDirectoryPath.resolve(swapFileName).toAbsolutePath();
- final File swapFile = new File(storageDirectory, swapFileName);
+ final File swapFile = swapFilePath.toFile();
final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
final String swapLocation = swapFile.getAbsolutePath();
@@ -482,4 +484,18 @@
logger.debug("Changed Partition for Swap File by renaming from {} to {}", swapLocation, newPartitionName);
return newFile.getAbsolutePath();
}
+
+ private String getSwapFileName(final String flowFileQueueIdentifier, final String partitionName) {
+ final UUID identifier;
+ final Matcher identifierMatcher = UUID_PATTERN.matcher(flowFileQueueIdentifier);
+ if (identifierMatcher.find()) {
+ identifier = UUID.fromString(identifierMatcher.group(1));
+ } else {
+ throw new IllegalArgumentException("FlowFile Queue Identifier [%s] not valid".formatted(flowFileQueueIdentifier));
+ }
+
+ final String swapFilePrefix = System.currentTimeMillis() + "-" + identifier + "-" + UUID.randomUUID();
+ final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
+ return swapFileBaseName + ".swap";
+ }
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index a00b2c0..cc91b40 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -27,7 +27,6 @@
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
@@ -41,24 +40,44 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFileSystemSwapManager {
@Test
+ public void testFlowFileQueueIdentifierNotValid() {
+ final String identifier = "invalid-identifier";
+
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+ when(flowFileQueue.getIdentifier()).thenReturn(identifier);
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
+ final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo);
+ final List<FlowFileRecord> flowFileRecords = Collections.singletonList(new MockFlowFileRecord(0));
+
+ final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+ () -> swapManager.swapOut(flowFileRecords, flowFileQueue, "partition-1"));
+
+ assertTrue(exception.getMessage().contains(identifier));
+ }
+
+ @Test
public void testBackwardCompatible() throws IOException {
- try (final InputStream fis = new FileInputStream(new File("src/test/resources/old-swap-file.swap"));
- final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
+ try (final InputStream fis = new FileInputStream("src/test/resources/old-swap-file.swap");
+ final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
- final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
final FileSystemSwapManager swapManager = createSwapManager();
@@ -76,11 +95,11 @@
@Test
public void testFailureOnRepoSwapOut() throws IOException {
- final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
- final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
- Mockito.doThrow(new IOException("Intentional IOException for unit test"))
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
+ doThrow(new IOException("Intentional IOException for unit test"))
.when(flowFileRepo).swapFlowFilesOut(any(), any(), any());
final FileSystemSwapManager swapManager = createSwapManager(flowFileRepo);
@@ -96,7 +115,7 @@
@Test
public void testSwapFileUnknownToRepoNotSwappedIn() throws IOException {
- final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
when(flowFileQueue.getIdentifier()).thenReturn("");
final File targetDir = new File("target/swap");
@@ -111,7 +130,7 @@
final FileSystemSwapManager swapManager = new FileSystemSwapManager(Paths.get("target"));
final ResourceClaimManager resourceClaimManager = new NopResourceClaimManager();
- final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
swapManager.initialize(new SwapManagerInitializationContext() {
@Override
@@ -134,7 +153,7 @@
final List<String> recoveredLocations = swapManager.recoverSwapLocations(flowFileQueue, null);
assertEquals(1, recoveredLocations.size());
- final String firstLocation = recoveredLocations.get(0);
+ final String firstLocation = recoveredLocations.getFirst();
final SwapContents emptyContents = swapManager.swapIn(firstLocation, flowFileQueue);
assertEquals(0, emptyContents.getFlowFiles().size());
@@ -144,8 +163,8 @@
assertEquals(10000, contents.getFlowFiles().size());
}
- private FileSystemSwapManager createSwapManager() throws IOException {
- final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+ private FileSystemSwapManager createSwapManager() {
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
return createSwapManager(flowFileRepo);
}
@@ -175,7 +194,7 @@
return swapManager;
}
- public class NopResourceClaimManager implements ResourceClaimManager {
+ public static class NopResourceClaimManager implements ResourceClaimManager {
@Override
public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) {
return null;