| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.controller.repository; |
| |
| import org.apache.commons.lang3.SystemUtils; |
| import org.apache.nifi.controller.repository.claim.ContentClaim; |
| import org.apache.nifi.controller.repository.claim.ResourceClaim; |
| import org.apache.nifi.controller.repository.claim.StandardContentClaim; |
| import org.apache.nifi.controller.repository.claim.StandardResourceClaim; |
| import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; |
| import org.apache.nifi.controller.repository.util.DiskUtils; |
| import org.apache.nifi.processor.DataUnit; |
| import org.apache.nifi.stream.io.StreamUtils; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.lang.reflect.Method; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.StandardCopyOption; |
| import java.nio.file.StandardOpenOption; |
| import java.text.NumberFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Random; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNotSame; |
| import static org.junit.Assert.assertTrue; |
| |
| public class TestFileSystemRepository { |
| |
| public static final int NUM_REPO_SECTIONS = 1; |
| |
| public static final File helloWorldFile = new File("src/test/resources/hello.txt"); |
| |
| private FileSystemRepository repository = null; |
| private StandardResourceClaimManager claimManager = null; |
| private final File rootFile = new File("target/content_repository"); |
| private NiFiProperties nifiProperties; |
| |
| @BeforeClass |
| public static void setupClass() { |
| Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS); |
| } |
| |
| @Before |
| public void setup() throws IOException { |
| nifiProperties = NiFiProperties.createBasicNiFiProperties(TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile()); |
| if (rootFile.exists()) { |
| DiskUtils.deleteRecursively(rootFile); |
| } |
| repository = new FileSystemRepository(nifiProperties); |
| claimManager = new StandardResourceClaimManager(); |
| repository.initialize(claimManager); |
| repository.purge(); |
| } |
| |
| @After |
| public void shutdown() throws IOException { |
| repository.shutdown(); |
| } |
| |
| @Test |
| @Ignore("Intended for manual testing only, in order to judge changes to performance") |
| public void testWritePerformance() throws IOException { |
| final long bytesToWrite = 1_000_000_000L; |
| final int contentSize = 100; |
| |
| final int iterations = (int) (bytesToWrite / contentSize); |
| final byte[] content = new byte[contentSize]; |
| final Random random = new Random(); |
| random.nextBytes(content); |
| |
| // final ContentClaimWriteCache cache = new ContentClaimWriteCache(repository); |
| final long start = System.nanoTime(); |
| for (int i = 0; i < iterations; i++) { |
| final ContentClaim claim = repository.create(false); |
| try (final OutputStream out = repository.write(claim)) { |
| out.write(content); |
| } |
| // final ContentClaim claim = cache.getContentClaim(); |
| // try (final OutputStream out = cache.write(claim)) { |
| // out.write(content); |
| // } |
| } |
| final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); |
| |
| final long mb = bytesToWrite / (1024 * 1024); |
| final long seconds = millis / 1000L; |
| final double mbps = (double) mb / (double) seconds; |
| System.out.println("Took " + millis + " millis to write " + contentSize + " bytes " + iterations + " times (total of " |
| + NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + " bytes) for a write rate of " + mbps + " MB/s"); |
| } |
| |
| @Test |
| public void testContentNotFoundExceptionThrownIfResourceClaimTooShort() throws IOException { |
| final File contentFile = new File("target/content_repository/0/0.bin"); |
| try (final OutputStream fos = new FileOutputStream(contentFile)) { |
| fos.write("Hello World".getBytes(StandardCharsets.UTF_8)); |
| } |
| |
| final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "default", "0", "0.bin", false); |
| final StandardContentClaim existingContentClaim = new StandardContentClaim(resourceClaim, 0); |
| existingContentClaim.setLength(11); |
| |
| try (final InputStream in = repository.read(existingContentClaim)) { |
| final byte[] buff = new byte[11]; |
| StreamUtils.fillBuffer(in, buff); |
| assertEquals("Hello World", new String(buff, StandardCharsets.UTF_8)); |
| } |
| |
| final StandardContentClaim halfContentClaim = new StandardContentClaim(resourceClaim, 6); |
| halfContentClaim.setLength(5); |
| |
| try (final InputStream in = repository.read(halfContentClaim)) { |
| final byte[] buff = new byte[5]; |
| StreamUtils.fillBuffer(in, buff); |
| assertEquals("World", new String(buff, StandardCharsets.UTF_8)); |
| } |
| |
| final StandardContentClaim emptyContentClaim = new StandardContentClaim(resourceClaim, 11); |
| existingContentClaim.setLength(0); |
| |
| try (final InputStream in = repository.read(emptyContentClaim)) { |
| assertEquals(-1, in.read()); |
| } |
| |
| final StandardContentClaim missingContentClaim = new StandardContentClaim(resourceClaim, 12); |
| missingContentClaim.setLength(1); |
| |
| try { |
| repository.read(missingContentClaim); |
| Assert.fail("Did not throw ContentNotFoundException"); |
| } catch (final ContentNotFoundException cnfe) { |
| // Expected |
| } |
| } |
| |
| @Test |
| public void testBogusFile() throws IOException { |
| repository.shutdown(); |
| System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile()); |
| |
| File bogus = new File(rootFile, "bogus"); |
| try { |
| bogus.mkdir(); |
| bogus.setReadable(false); |
| |
| repository = new FileSystemRepository(nifiProperties); |
| repository.initialize(new StandardResourceClaimManager()); |
| } finally { |
| bogus.setReadable(true); |
| assertTrue(bogus.delete()); |
| } |
| } |
| |
| @Test |
| public void testCreateContentClaim() throws IOException { |
| // value passed to #create is irrelevant because the FileSystemRepository does not currently support loss tolerance. |
| final ContentClaim claim = repository.create(true); |
| assertNotNull(claim); |
| assertEquals(1, repository.getClaimantCount(claim)); |
| } |
| |
| @Test |
| public void testReadClaimThenWriteThenReadMore() throws IOException { |
| final ContentClaim claim = repository.create(false); |
| |
| final OutputStream out = repository.write(claim); |
| out.write("hello".getBytes()); |
| out.flush(); |
| |
| final InputStream in = repository.read(claim); |
| final byte[] buffer = new byte[5]; |
| StreamUtils.fillBuffer(in, buffer); |
| |
| assertEquals("hello", new String(buffer)); |
| |
| out.write("good-bye".getBytes()); |
| out.close(); |
| |
| final byte[] buffer2 = new byte[8]; |
| StreamUtils.fillBuffer(in, buffer2); |
| assertEquals("good-bye", new String(buffer2)); |
| } |
| |
| @Test |
| public void testClaimantCounts() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| assertNotNull(claim); |
| assertEquals(1, repository.getClaimantCount(claim)); |
| assertEquals(2, repository.incrementClaimaintCount(claim)); |
| assertEquals(3, repository.incrementClaimaintCount(claim)); |
| assertEquals(4, repository.incrementClaimaintCount(claim)); |
| assertEquals(5, repository.incrementClaimaintCount(claim)); |
| repository.decrementClaimantCount(claim); |
| assertEquals(4, repository.getClaimantCount(claim)); |
| repository.decrementClaimantCount(claim); |
| assertEquals(3, repository.getClaimantCount(claim)); |
| repository.decrementClaimantCount(claim); |
| assertEquals(2, repository.getClaimantCount(claim)); |
| repository.decrementClaimantCount(claim); |
| assertEquals(1, repository.getClaimantCount(claim)); |
| repository.decrementClaimantCount(claim); |
| assertEquals(0, repository.getClaimantCount(claim)); |
| repository.remove(claim); |
| } |
| |
| @Test |
| public void testResourceClaimReused() throws IOException { |
| final ContentClaim claim1 = repository.create(false); |
| final ContentClaim claim2 = repository.create(false); |
| |
| // should not be equal because claim1 may still be in use |
| assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim()); |
| |
| try (final OutputStream out = repository.write(claim1)) { |
| } |
| |
| final ContentClaim claim3 = repository.create(false); |
| assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim()); |
| } |
| |
| @Test |
| public void testResourceClaimNotReusedAfterRestart() throws IOException, InterruptedException { |
| final ContentClaim claim1 = repository.create(false); |
| try (final OutputStream out = repository.write(claim1)) { |
| } |
| |
| repository.shutdown(); |
| Thread.sleep(1000L); |
| |
| repository = new FileSystemRepository(nifiProperties); |
| repository.initialize(new StandardResourceClaimManager()); |
| repository.purge(); |
| |
| final ContentClaim claim2 = repository.create(false); |
| assertNotSame(claim1.getResourceClaim(), claim2.getResourceClaim()); |
| } |
| |
| @Test |
| public void testWriteWithNoContent() throws IOException { |
| final ContentClaim claim1 = repository.create(false); |
| try (final OutputStream out = repository.write(claim1)) { |
| out.write("Hello".getBytes()); |
| } |
| |
| final ContentClaim claim2 = repository.create(false); |
| assertEquals(claim1.getResourceClaim(), claim2.getResourceClaim()); |
| try (final OutputStream out = repository.write(claim2)) { |
| |
| } |
| |
| final ContentClaim claim3 = repository.create(false); |
| assertEquals(claim1.getResourceClaim(), claim3.getResourceClaim()); |
| try (final OutputStream out = repository.write(claim3)) { |
| out.write(" World".getBytes()); |
| } |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try (final InputStream in = repository.read(claim1)) { |
| StreamUtils.copy(in, baos); |
| } |
| |
| assertEquals("Hello", baos.toString()); |
| |
| baos.reset(); |
| try (final InputStream in = repository.read(claim2)) { |
| StreamUtils.copy(in, baos); |
| } |
| assertEquals("", baos.toString()); |
| assertEquals(0, baos.size()); |
| |
| baos.reset(); |
| try (final InputStream in = repository.read(claim3)) { |
| StreamUtils.copy(in, baos); |
| } |
| assertEquals(" World", baos.toString()); |
| } |
| |
| @Test |
| public void testRemoveDeletesFileIfNoClaimants() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| assertNotNull(claim); |
| assertEquals(1, repository.getClaimantCount(claim)); |
| repository.incrementClaimaintCount(claim); |
| |
| final Path claimPath = getPath(claim); |
| final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize(); |
| final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue(); |
| |
| // Create the file. |
| try (final OutputStream out = repository.write(claim)) { |
| out.write(new byte[maxClaimLength]); |
| } |
| |
| int count = repository.decrementClaimantCount(claim); |
| assertEquals(1, count); |
| assertTrue(Files.exists(claimPath)); |
| // ensure that no Exception is thrown here. |
| repository.remove(null); |
| assertTrue(Files.exists(claimPath)); |
| |
| count = repository.decrementClaimantCount(claim); |
| assertEquals(0, count); |
| repository.remove(claim); |
| assertFalse(Files.exists(claimPath)); |
| } |
| |
| private Path getPath(final ContentClaim claim) { |
| try { |
| final Method m = repository.getClass().getDeclaredMethod("getPath", ContentClaim.class); |
| m.setAccessible(true); |
| return (Path) m.invoke(repository, claim); |
| } catch (final Exception e) { |
| throw new RuntimeException("Could not invoke #getPath on FileSystemRepository due to " + e.toString()); |
| } |
| } |
| |
| @Test |
| public void testImportFromFile() throws IOException { |
| final ContentClaim claim = repository.create(false); |
| final File testFile = new File("src/test/resources/hello.txt"); |
| final File file1 = new File("target/testFile1"); |
| final Path path1 = file1.toPath(); |
| final File file2 = new File("target/testFile2"); |
| final Path path2 = file2.toPath(); |
| |
| Files.copy(testFile.toPath(), path1, StandardCopyOption.REPLACE_EXISTING); |
| Files.copy(testFile.toPath(), path2, StandardCopyOption.REPLACE_EXISTING); |
| |
| repository.importFrom(path1, claim); |
| assertTrue(file1.exists()); |
| assertTrue(file2.exists()); |
| |
| // try to read the data back out. |
| final Path path = getPath(claim); |
| final byte[] data = Files.readAllBytes(path); |
| final byte[] expected = Files.readAllBytes(testFile.toPath()); |
| assertTrue(Arrays.equals(expected, data)); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try (final InputStream in = repository.read(claim)) { |
| StreamUtils.copy(in, baos); |
| } |
| |
| assertTrue(Arrays.equals(expected, baos.toByteArray())); |
| } |
| |
| @Test |
| public void testImportFromStream() throws IOException { |
| final ContentClaim claim = repository.create(false); |
| final byte[] data = "hello".getBytes(); |
| final ByteArrayInputStream bais = new ByteArrayInputStream(data); |
| repository.importFrom(bais, claim); |
| |
| final Path claimPath = getPath(claim); |
| assertTrue(Arrays.equals(data, Files.readAllBytes(claimPath))); |
| } |
| |
| @Test |
| public void testExportToOutputStream() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| |
| try (final OutputStream out = repository.write(claim)) { |
| Files.copy(helloWorldFile.toPath(), out); |
| } |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| repository.exportTo(claim, baos); |
| final byte[] data = baos.toByteArray(); |
| assertTrue(Arrays.equals(Files.readAllBytes(helloWorldFile.toPath()), data)); |
| } |
| |
| @Test |
| public void testExportToFile() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| try (final OutputStream out = repository.write(claim)) { |
| Files.copy(helloWorldFile.toPath(), out); |
| } |
| |
| final File outFile = new File("target/testExportToFile"); |
| final Path outPath = outFile.toPath(); |
| Files.deleteIfExists(outPath); |
| |
| final byte[] expected = Files.readAllBytes(helloWorldFile.toPath()); |
| |
| repository.exportTo(claim, outPath, false); |
| assertTrue(Arrays.equals(expected, Files.readAllBytes(outPath))); |
| |
| repository.exportTo(claim, outPath, true); |
| final byte[] doubleExpected = new byte[expected.length * 2]; |
| System.arraycopy(expected, 0, doubleExpected, 0, expected.length); |
| System.arraycopy(expected, 0, doubleExpected, expected.length, expected.length); |
| assertTrue(Arrays.equals(doubleExpected, Files.readAllBytes(outPath))); |
| } |
| |
| @Test |
| public void testSize() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| final Path path = getPath(claim); |
| |
| Files.createDirectories(path.getParent()); |
| final byte[] data = "The quick brown fox jumps over the lazy dog".getBytes(); |
| try (final OutputStream out = Files.newOutputStream(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { |
| out.write(data); |
| } |
| |
| assertEquals(data.length, repository.size(claim)); |
| } |
| |
| @Test(expected = ContentNotFoundException.class) |
| public void testSizeWithNoContent() throws IOException { |
| final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L); |
| assertEquals(0L, repository.size(claim)); |
| } |
| |
| @Test(expected = ContentNotFoundException.class) |
| public void testReadWithNoContent() throws IOException { |
| final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L); |
| final InputStream in = repository.read(claim); |
| in.close(); |
| } |
| |
| @Test |
| public void testReadWithContent() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| final Path path = getPath(claim); |
| |
| Files.createDirectories(path.getParent()); |
| final byte[] data = "The quick brown fox jumps over the lazy dog".getBytes(); |
| try (final OutputStream out = Files.newOutputStream(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { |
| out.write(data); |
| } |
| |
| try (final InputStream inStream = repository.read(claim)) { |
| assertNotNull(inStream); |
| final byte[] dataRead = readFully(inStream, data.length); |
| assertTrue(Arrays.equals(data, dataRead)); |
| } |
| } |
| |
| @Test |
| public void testReadWithContentArchived() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| final Path path = getPath(claim); |
| Files.deleteIfExists(path); |
| |
| Path archivePath = FileSystemRepository.getArchivePath(path); |
| |
| Files.createDirectories(archivePath.getParent()); |
| final byte[] data = "The quick brown fox jumps over the lazy dog".getBytes(); |
| try (final OutputStream out = Files.newOutputStream(archivePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { |
| out.write(data); |
| } |
| |
| try (final InputStream inStream = repository.read(claim)) { |
| assertNotNull(inStream); |
| final byte[] dataRead = readFully(inStream, data.length); |
| assertArrayEquals(data, dataRead); |
| } |
| } |
| |
| private boolean isWindowsEnvironment() { |
| return System.getProperty("os.name").toLowerCase().startsWith("windows"); |
| } |
| |
| @Test(expected = ContentNotFoundException.class) |
| public void testReadWithNoContentArchived() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| final Path path = getPath(claim); |
| Files.deleteIfExists(path); |
| |
| Path archivePath = FileSystemRepository.getArchivePath(path); |
| Files.deleteIfExists(archivePath); |
| repository.read(claim).close(); |
| } |
| |
| @Test |
| public void testWrite() throws IOException { |
| final ContentClaim claim = repository.create(true); |
| final byte[] data = "The quick brown fox jumps over the lazy dog".getBytes(); |
| try (final OutputStream out = repository.write(claim)) { |
| out.write(data); |
| } |
| |
| final Path path = getPath(claim); |
| assertTrue(Arrays.equals(data, Files.readAllBytes(path))); |
| } |
| |
| @Test |
| public void testRemoveWhileWritingToClaim() throws IOException { |
| final ContentClaim claim = repository.create(false); |
| final OutputStream out = repository.write(claim); |
| |
| // write at least 1 MB to the output stream so that when we close the output stream |
| // the repo won't keep the stream open. |
| final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize(); |
| final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue(); |
| final byte[] buff = new byte[maxClaimLength]; |
| out.write(buff); |
| out.write(buff); |
| |
| // false because claimant count is still 1, so the resource claim was not removed |
| assertFalse(repository.remove(claim)); |
| |
| assertEquals(0, repository.decrementClaimantCount(claim)); |
| |
| // false because claimant count is 0 but there is an 'active' stream for the claim |
| assertFalse(repository.remove(claim)); |
| |
| out.close(); |
| assertTrue(repository.remove(claim)); |
| } |
| |
| @Test |
| public void testMarkDestructableDoesNotArchiveIfStreamOpenAndWrittenTo() throws IOException, InterruptedException { |
| FileSystemRepository repository = null; |
| try { |
| final List<Path> archivedPaths = Collections.synchronizedList(new ArrayList<Path>()); |
| |
| // We are creating our own 'local' repository in this test so shut down the one created in the setup() method |
| shutdown(); |
| |
| repository = new FileSystemRepository(nifiProperties) { |
| @Override |
| protected boolean archive(Path curPath) throws IOException { |
| archivedPaths.add(curPath); |
| return true; |
| } |
| }; |
| |
| final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); |
| repository.initialize(claimManager); |
| repository.purge(); |
| |
| final ContentClaim claim = repository.create(false); |
| |
| // Create a stream and write a bit to it, then close it. This will cause the |
| // claim to be put back onto the 'writableClaimsQueue' |
| try (final OutputStream out = repository.write(claim)) { |
| assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); |
| out.write("1\n".getBytes()); |
| } |
| |
| assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); |
| |
| int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); |
| assertEquals(0, claimantCount); |
| assertTrue(archivedPaths.isEmpty()); |
| |
| claimManager.markDestructable(claim.getResourceClaim()); |
| |
| // Wait for the archive thread to have a chance to run |
| Thread.sleep(2000L); |
| |
| // Should still be empty because we have a stream open to the file. |
| assertTrue(archivedPaths.isEmpty()); |
| assertEquals(0, claimManager.getClaimantCount(claim.getResourceClaim())); |
| } finally { |
| if (repository != null) { |
| repository.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testWriteCannotProvideNullOutput() throws IOException { |
| FileSystemRepository repository = null; |
| try { |
| final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>()); |
| |
| // We are creating our own 'local' repository in this test so shut down the one created in the setup() method |
| shutdown(); |
| |
| repository = new FileSystemRepository(nifiProperties) { |
| @Override |
| protected boolean archive(Path curPath) throws IOException { |
| if (getOpenStreamCount() > 0) { |
| archivedPathsWithOpenStream.add(curPath); |
| } |
| |
| return true; |
| } |
| }; |
| |
| final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); |
| repository.initialize(claimManager); |
| repository.purge(); |
| |
| final ContentClaim claim = repository.create(false); |
| |
| assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); |
| |
| int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); |
| assertEquals(0, claimantCount); |
| assertTrue(archivedPathsWithOpenStream.isEmpty()); |
| |
| OutputStream out = repository.write(claim); |
| out.close(); |
| repository.decrementClaimantCount(claim); |
| |
| ContentClaim claim2 = repository.create(false); |
| assertEquals(claim.getResourceClaim(), claim2.getResourceClaim()); |
| out = repository.write(claim2); |
| |
| final boolean archived = repository.archive(claim.getResourceClaim()); |
| assertFalse(archived); |
| } finally { |
| if (repository != null) { |
| repository.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * We have encountered a situation where the File System Repo is moving |
| * files to archive and then eventually aging them off while there is still |
| * an open file handle. This test is meant to replicate the conditions under |
| * which this would happen and verify that it is fixed. |
| * |
| * The condition that caused this appears to be that a Process Session |
| * created a Content Claim and then did not write to it. It then decremented |
| * the claimant count (which reduced the count to 0). This was likely due to |
| * creating the claim in ProcessSession.write(FlowFile, StreamCallback) and |
| * then having an Exception thrown when the Process Session attempts to read |
| * the current Content Claim. In this case, it would not ever get to the |
| * point of calling FileSystemRepository.write(). |
| * |
| * The above sequence of events is problematic because calling |
| * FileSystemRepository.create() will remove the Resource Claim from the |
| * 'writable claims queue' and expects that we will write to it. When we |
| * call FileSystemRepository.write() with that Resource Claim, we return an |
| * OutputStream that, when closed, will take care of adding the Resource |
| * Claim back to the 'writable claims queue' or otherwise close the |
| * FileOutputStream that is open for that Resource Claim. If |
| * FileSystemRepository.write() is never called, or if the OutputStream |
| * returned by that method is never closed, but the Content Claim is then |
| * decremented to 0, we can get into a situation where we do archive the |
| * content (because the claimant count is 0 and it is not in the 'writable |
| * claims queue') and then eventually age it off, without ever closing the |
| * OutputStream. We need to ensure that we do always close that Output |
| * Stream. |
| */ |
| @Test |
| public void testMarkDestructableDoesNotArchiveIfStreamOpenAndNotWrittenTo() throws IOException, InterruptedException { |
| FileSystemRepository repository = null; |
| try { |
| final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>()); |
| |
| // We are creating our own 'local' repository in this test so shut down the one created in the setup() method |
| shutdown(); |
| |
| repository = new FileSystemRepository(nifiProperties) { |
| @Override |
| protected boolean archive(Path curPath) throws IOException { |
| if (getOpenStreamCount() > 0) { |
| archivedPathsWithOpenStream.add(curPath); |
| } |
| |
| return true; |
| } |
| }; |
| |
| final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); |
| repository.initialize(claimManager); |
| repository.purge(); |
| |
| final ContentClaim claim = repository.create(false); |
| |
| assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); |
| |
| int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); |
| assertEquals(0, claimantCount); |
| assertTrue(archivedPathsWithOpenStream.isEmpty()); |
| |
| // This would happen when FlowFile repo is checkpointed, if Resource Claim has claimant count of 0. |
| // Since the Resource Claim of interest is still 'writable', we should not archive it. |
| claimManager.markDestructable(claim.getResourceClaim()); |
| |
| // Wait for the archive thread to have a chance to run |
| long totalSleepMillis = 0; |
| final long startTime = System.nanoTime(); |
| while (archivedPathsWithOpenStream.isEmpty() && totalSleepMillis < 5000) { |
| Thread.sleep(100L); |
| totalSleepMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); |
| } |
| |
| // Should still be empty because we have a stream open to the file so we should |
| // not actually try to archive the data. |
| assertTrue(archivedPathsWithOpenStream.isEmpty()); |
| assertEquals(0, claimManager.getClaimantCount(claim.getResourceClaim())); |
| } finally { |
| if (repository != null) { |
| repository.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testMergeWithHeaderFooterDemarcator() throws IOException { |
| testMerge("HEADER", "FOOTER", "DEMARCATOR"); |
| } |
| |
| @Test |
| public void testMergeWithHeaderFooter() throws IOException { |
| testMerge("HEADER", "FOOTER", null); |
| } |
| |
| @Test |
| public void testMergeWithHeaderOnly() throws IOException { |
| testMerge("HEADER", null, null); |
| } |
| |
| @Test |
| public void testMergeWithFooterOnly() throws IOException { |
| testMerge(null, "FOOTER", null); |
| } |
| |
| @Test |
| public void testMergeWithDemarcator() throws IOException { |
| testMerge(null, null, "DEMARCATOR"); |
| } |
| |
| @Test |
| public void testWithHeaderDemarcator() throws IOException { |
| testMerge("HEADER", null, "DEMARCATOR"); |
| } |
| |
| @Test |
| public void testMergeWithFooterDemarcator() throws IOException { |
| testMerge(null, "FOOTER", "DEMARCATOR"); |
| } |
| |
| @Test |
| public void testMergeWithoutHeaderFooterDemarcator() throws IOException { |
| testMerge(null, null, null); |
| } |
| |
| private void testMerge(final String header, final String footer, final String demarcator) throws IOException { |
| final int count = 4; |
| final String content = "The quick brown fox jumps over the lazy dog"; |
| final List<ContentClaim> claims = new ArrayList<>(); |
| for (int i = 0; i < count; i++) { |
| final ContentClaim claim = repository.create(true); |
| claims.add(claim); |
| try (final OutputStream out = repository.write(claim)) { |
| out.write(content.getBytes()); |
| } |
| } |
| |
| final ContentClaim destination = repository.create(true); |
| final byte[] headerBytes = header == null ? null : header.getBytes(); |
| final byte[] footerBytes = footer == null ? null : footer.getBytes(); |
| final byte[] demarcatorBytes = demarcator == null ? null : demarcator.getBytes(); |
| repository.merge(claims, destination, headerBytes, footerBytes, demarcatorBytes); |
| |
| final StringBuilder sb = new StringBuilder(); |
| if (header != null) { |
| sb.append(header); |
| } |
| for (int i = 0; i < count; i++) { |
| sb.append(content); |
| if (demarcator != null && i != count - 1) { |
| sb.append(demarcator); |
| } |
| } |
| if (footer != null) { |
| sb.append(footer); |
| } |
| final String expectedText = sb.toString(); |
| final byte[] expected = expectedText.getBytes(); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) destination.getLength()); |
| try (final InputStream in = repository.read(destination)) { |
| StreamUtils.copy(in, baos); |
| } |
| final byte[] actual = baos.toByteArray(); |
| assertTrue(Arrays.equals(expected, actual)); |
| } |
| |
| private byte[] readFully(final InputStream inStream, final int size) throws IOException { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(size); |
| int len; |
| final byte[] buffer = new byte[size]; |
| while ((len = inStream.read(buffer)) >= 0) { |
| baos.write(buffer, 0, len); |
| } |
| |
| return baos.toByteArray(); |
| } |
| } |