blob: b6af93849bd3505bc4869e22c7d2a0fcfd9bbe35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
import org.apache.nifi.processors.standard.util.FileInfo;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TestListFile {
private static boolean isMillisecondSupported = false;
private final String TESTDIR = "target/test/data/in";
private final File testDir = new File(TESTDIR);
private ListFile processor;
private TestRunner runner;
private ProcessContext context;
// Testing factors in milliseconds for file ages that are configured on each run by resetAges()
// age#millis are relative time references
// time#millis are absolute time references
// age#filter are filter label strings for the filter properties
private Long syncTime = getTestModifiedTime();
private Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
private Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
private String age0, age1, age2, age3, age4, age5;
@Rule
public ListProcessorTestWatcher dumpState = new ListProcessorTestWatcher(
() -> {
try {
return runner.getStateManager().getState(Scope.LOCAL).toMap();
} catch (IOException e) {
throw new RuntimeException("Failed to retrieve state", e);
}
},
() -> listFiles(testDir).stream()
.map(f -> new FileInfo.Builder().filename(f.getName()).lastModifiedTime(f.lastModified()).build()).collect(Collectors.toList()),
() -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
) {
@Override
protected void finished(Description description) {
try {
// In order to refer files in testDir, we want to execute this rule before tearDown, because tearDown removes files.
// And @After is always executed before @Rule.
tearDown();
} catch (Exception e) {
throw new RuntimeException("Failed to tearDown.", e);
}
}
};
@BeforeClass
public static void setupClass() throws Exception {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
// This only has to be done once.
final File file = Files.createTempFile(Paths.get("target/"), "TestListFile", null).toFile();
file.setLastModified(325990917351L);
isMillisecondSupported = file.lastModified() % 1_000 > 0;
}
@Before
public void setUp() throws Exception {
processor = new ListFile();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
context = runner.getProcessContext();
deleteDirectory(testDir);
assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
resetAges();
}
public void tearDown() throws Exception {
deleteDirectory(testDir);
File tempFile = processor.getPersistenceFile();
if (tempFile.exists()) {
File[] stateFiles = tempFile.getParentFile().listFiles();
if (stateFiles != null) {
for (File stateFile : stateFiles) {
assertTrue(stateFile.delete());
}
}
}
}
private List<File> listFiles(final File file) {
if (file.isDirectory()) {
final List<File> result = new ArrayList<>();
Optional.ofNullable(file.listFiles()).ifPresent(files -> Arrays.stream(files).forEach(f -> result.addAll(listFiles(f))));
return result;
} else {
return Collections.singletonList(file);
}
}
/**
* This method ensures runner clears transfer state,
* and sleeps the current thread for specific period defined at {@link AbstractListProcessor#LISTING_LAG_MILLIS}
* based on local filesystem timestamp precision before executing runner.run().
*/
private void runNext() throws InterruptedException {
runner.clearTransferState();
final List<File> files = listFiles(testDir);
final Long lagMillis;
if (isMillisecondSupported) {
lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
} else {
// Filesystems such as Mac OS X HFS (Hierarchical File System) or EXT3 are known that only support timestamp in seconds precision.
lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS);
}
Thread.sleep(lagMillis * 2);
final long startedAtMillis = System.currentTimeMillis();
runner.run();
dumpState.dumpState(startedAtMillis);
}
@Test
@Ignore("Intended only for manual testing, as is very expensive to run as a unit test. Performs listing of 1,000,000 files (doesn't actually create the files, though - injects them in) to " +
"ensure performance is not harmed")
public void testPerformanceOnLargeListing() {
final List<Path> paths = new ArrayList<>(1_000_000);
final File base = new File("target");
for (int firstLevel=0; firstLevel < 1000; firstLevel++) {
final File dir = new File(base, String.valueOf(firstLevel));
for (int secondLevel = 0; secondLevel < 1000; secondLevel++) {
final File file = new File(dir, String.valueOf(secondLevel));
paths.add(file.toPath());
}
}
processor = new ListFile();
runner = TestRunners.newTestRunner(processor);
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
runner.setProperty(ListFile.TRACK_PERFORMANCE, "true");
runner.setProperty(ListFile.MAX_TRACKED_FILES, "100000");
runner.setProperty(ListFile.DIRECTORY, "target");
runner.run();
final ListFile.PerformanceTracker tracker = processor.getPerformanceTracker();
assertEquals(100_000, tracker.getTrackedFileCount());
final ListFile.MonitorActiveTasks monitorActiveTasks = new ListFile.MonitorActiveTasks(tracker, runner.getLogger(), 1000, 1000, 1);
while (tracker.getTrackedFileCount() > 0) {
monitorActiveTasks.run();
}
assertEquals(0, tracker.getTrackedFileCount());
}
@Test
public void testGetPath() {
runner.setProperty(ListFile.DIRECTORY, "/dir/test1");
assertEquals("/dir/test1", processor.getPath(context));
runner.setProperty(ListFile.DIRECTORY, "${literal(\"/DIR/TEST2\"):toLower()}");
assertEquals("/dir/test2", processor.getPath(context));
}
@Test
public void testPerformListing() throws Exception {
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
// create first file
final File file1 = new File(TESTDIR + "/listing1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(time4millis));
// process first file and set new timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size());
// create second file
final File file2 = new File(TESTDIR + "/listing2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(time2millis));
// process second file after timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
// create third file
final File file3 = new File(TESTDIR + "/listing3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(time4millis));
// process third file before timestamp
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles3.size());
// force state to reset and process all files
runner.removeProperty(ListFile.DIRECTORY);
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles4.size());
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
}
@Test
public void testFilterAge() throws Exception {
final File file1 = new File(TESTDIR + "/age1.txt");
assertTrue(file1.createNewFile());
final File file2 = new File(TESTDIR + "/age2.txt");
assertTrue(file2.createNewFile());
final File file3 = new File(TESTDIR + "/age3.txt");
assertTrue(file3.createNewFile());
final Function<Boolean, Object> runNext = resetAges -> {
if (resetAges) {
resetAges();
assertTrue(file1.setLastModified(time0millis));
assertTrue(file2.setLastModified(time2millis));
assertTrue(file3.setLastModified(time4millis));
}
assertTrue(file1.lastModified() > time3millis && file1.lastModified() <= time0millis);
assertTrue(file2.lastModified() > time3millis && file2.lastModified() < time1millis);
assertTrue(file3.lastModified() < time3millis);
try {
runNext();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
};
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext.apply(true);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
// processor updates internal state, it shouldn't pick the same ones.
runNext.apply(false);
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
// exclude oldest
runner.setProperty(ListFile.MIN_AGE, age0);
runner.setProperty(ListFile.MAX_AGE, age3);
runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles2.size());
assertEquals(file2.getName(), successFiles2.get(0).getAttribute("filename"));
assertEquals(file1.getName(), successFiles2.get(1).getAttribute("filename"));
// exclude newest
runner.setProperty(ListFile.MIN_AGE, age1);
runner.setProperty(ListFile.MAX_AGE, age5);
runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles3.size());
assertEquals(file3.getName(), successFiles3.get(0).getAttribute("filename"));
assertEquals(file2.getName(), successFiles3.get(1).getAttribute("filename"));
// exclude oldest and newest
runner.setProperty(ListFile.MIN_AGE, age1);
runner.setProperty(ListFile.MAX_AGE, age3);
runNext.apply(true);
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles4.size());
assertEquals(file2.getName(), successFiles4.get(0).getAttribute("filename"));
}
@Test
public void testFilterSize() throws Exception {
final byte[] bytes1000 = new byte[1000];
final byte[] bytes5000 = new byte[5000];
final byte[] bytes10000 = new byte[10000];
FileOutputStream fos;
final File file1 = new File(TESTDIR + "/size1.txt");
assertTrue(file1.createNewFile());
fos = new FileOutputStream(file1);
fos.write(bytes10000);
fos.close();
final File file2 = new File(TESTDIR + "/size2.txt");
assertTrue(file2.createNewFile());
fos = new FileOutputStream(file2);
fos.write(bytes5000);
fos.close();
final File file3 = new File(TESTDIR + "/size3.txt");
assertTrue(file3.createNewFile());
fos = new FileOutputStream(file3);
fos.write(bytes1000);
fos.close();
final long now = getTestModifiedTime();
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
assertTrue(file3.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles1.size());
// exclude largest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "0 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles2.size());
// exclude smallest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.removeProperty(ListFile.MAX_SIZE);
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles3.size());
// exclude oldest and newest
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles4.size());
}
@Test
public void testFilterHidden() throws Exception {
final long now = getTestModifiedTime();
FileOutputStream fos;
final File file1 = new File(TESTDIR + "/hidden1.txt");
assertTrue(file1.createNewFile());
fos = new FileOutputStream(file1);
fos.close();
final File file2 = new File(TESTDIR + "/.hidden2.txt");
assertTrue(file2.createNewFile());
fos = new FileOutputStream(file2);
fos.close();
FileStore store = Files.getFileStore(file2.toPath());
if (store.supportsFileAttributeView("dos")) {
Files.setAttribute(file2.toPath(), "dos:hidden", true);
}
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
runner.removeProperty(ListFile.MIN_AGE);
runner.removeProperty(ListFile.MAX_AGE);
runner.removeProperty(ListFile.MIN_SIZE);
runner.removeProperty(ListFile.MAX_SIZE);
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(2, successFiles1.size());
// exclude hidden
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
}
@Test
public void testListWithUnreadableFiles() throws Exception {
final File file1 = new File(TESTDIR + "/unreadable.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setReadable(false));
final File file2 = new File(TESTDIR + "/readable.txt");
assertTrue(file2.createNewFile());
final long now = getTestModifiedTime();
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
runNext();
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles.size());
}
@Test
public void testListWithinUnreadableDirectory() throws Exception {
final File subdir = new File(TESTDIR + "/subdir");
assertTrue(subdir.mkdir());
assertTrue(subdir.setReadable(false));
final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setReadable(false));
final File file2 = new File(TESTDIR + "/subdir/readable.txt");
assertTrue(file2.createNewFile());
final File file3 = new File(TESTDIR + "/secondReadable.txt");
assertTrue(file3.createNewFile());
final long now = getTestModifiedTime();
assertTrue(file1.setLastModified(now));
assertTrue(file2.setLastModified(now));
assertTrue(file3.setLastModified(now));
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ".*");
runNext();
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles.size());
assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
subdir.setReadable(true);
}
@Test
public void testListingNeedsSufficientPrivilegesAndFittingFilter() throws Exception {
final File file = new File(TESTDIR + "/file.txt");
assertTrue(file.createNewFile());
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
// Run with privileges but without fitting filter
runner.setProperty(ListFile.FILE_FILTER, "willBeFilteredOut");
assertTrue(file.setLastModified(getTestModifiedTime()));
runNext();
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles1.size());
// Run with privileges and with fitting filter
runner.setProperty(ListFile.FILE_FILTER, "file.*");
assertTrue(file.setLastModified(getTestModifiedTime()));
runNext();
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
// Run without privileges and with fitting filter
assertTrue(file.setReadable(false));
assertTrue(file.setLastModified(getTestModifiedTime()));
runNext();
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(0, successFiles3.size());
}
@Test
public void testFilterFilePattern() throws Exception {
final long now = getTestModifiedTime();
final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/file2-xyz-apple.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/file3-xyz-banana.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
final File file4 = new File(TESTDIR + "/file4-pdq-banana.txt");
assertTrue(file4.createNewFile());
assertTrue(file4.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(4, successFiles1.size());
// filter file on pattern
// Modifying FILE_FILTER property reset listing status, so these files will be listed again.
runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
runNext();
runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
}
@Test
public void testFilterPathPattern() throws Exception {
final long now = getTestModifiedTime();
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
final File subdir2 = new File(TESTDIR + "/subdir1/subdir2");
assertTrue(subdir2.mkdirs());
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/subdir1/file2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
final File file4 = new File(TESTDIR + "/subdir1/file4.txt");
assertTrue(file4.createNewFile());
assertTrue(file4.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
runner.setProperty(ListFile.RECURSE, "true");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(4, successFiles1.size());
// filter path on pattern subdir1
runner.setProperty(ListFile.PATH_FILTER, "subdir1");
runner.setProperty(ListFile.RECURSE, "true");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(3, successFiles2.size());
// filter path on pattern subdir2
runner.setProperty(ListFile.PATH_FILTER, "subdir2");
runner.setProperty(ListFile.RECURSE, "true");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles3.size());
}
@Test
public void testRecurse() throws Exception {
final long now = getTestModifiedTime();
final File subdir1 = new File(TESTDIR + "/subdir1");
assertTrue(subdir1.mkdirs());
final File subdir2 = new File(TESTDIR + "/subdir1/subdir2");
assertTrue(subdir2.mkdirs());
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/subdir1/file2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
for (final MockFlowFile mff : successFiles1) {
final String filename = mff.getAttribute(CoreAttributes.FILENAME.key());
final String path = mff.getAttribute(CoreAttributes.PATH.key());
switch (filename) {
case "file1.txt":
assertEquals("." + File.separator, path);
mff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), file1.getParentFile().getAbsolutePath() + File.separator);
break;
case "file2.txt":
assertEquals("subdir1" + File.separator, path);
mff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), file2.getParentFile().getAbsolutePath() + File.separator);
break;
case "file3.txt":
assertEquals("subdir1" + File.separator + "subdir2" + File.separator, path);
mff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), file3.getParentFile().getAbsolutePath() + File.separator);
break;
}
}
assertEquals(3, successFiles1.size());
// exclude hidden
runner.setProperty(ListFile.RECURSE, "false");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles2.size());
}
@Test
public void testReadable() throws Exception {
final long now = getTestModifiedTime();
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
assertTrue(file1.setLastModified(now));
final File file2 = new File(TESTDIR + "/file2.txt");
assertTrue(file2.createNewFile());
assertTrue(file2.setLastModified(now));
final File file3 = new File(TESTDIR + "/file3.txt");
assertTrue(file3.createNewFile());
assertTrue(file3.setLastModified(now));
// check all files
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runner.setProperty(ListFile.RECURSE, "true");
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
}
@Test
public void testAttributesSet() throws Exception {
// create temp file and time constant
final File file1 = new File(TESTDIR + "/file1.txt");
assertTrue(file1.createNewFile());
FileOutputStream fos = new FileOutputStream(file1);
fos.write(new byte[1234]);
fos.close();
assertTrue(file1.setLastModified(time3millis));
Long time3rounded = time3millis - time3millis % 1000;
String userName = System.getProperty("user.name");
// validate the file transferred
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
assertEquals(1, successFiles1.size());
// get attribute check values
final Path file1Path = file1.toPath();
final Path directoryPath = new File(TESTDIR).toPath();
final Path relativePath = directoryPath.relativize(file1.toPath().getParent());
String relativePathString = relativePath.toString();
relativePathString = relativePathString.isEmpty() ? "." + File.separator : relativePathString + File.separator;
final Path absolutePath = file1.toPath().toAbsolutePath();
final String absolutePathString = absolutePath.getParent().toString() + File.separator;
final FileStore store = Files.getFileStore(file1Path);
final DateFormat formatter = new SimpleDateFormat(ListFile.FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
final String time3Formatted = formatter.format(time3rounded);
// check standard attributes
MockFlowFile mock1 = successFiles1.get(0);
assertEquals(relativePathString, mock1.getAttribute(CoreAttributes.PATH.key()));
assertEquals("file1.txt", mock1.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals(absolutePathString, mock1.getAttribute(CoreAttributes.ABSOLUTE_PATH.key()));
assertEquals("1234", mock1.getAttribute(ListFile.FILE_SIZE_ATTRIBUTE));
// check attributes dependent on views supported
if (store.supportsFileAttributeView("basic")) {
assertEquals(time3Formatted, mock1.getAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE));
assertNotNull(mock1.getAttribute(ListFile.FILE_CREATION_TIME_ATTRIBUTE));
assertNotNull(mock1.getAttribute(ListFile.FILE_LAST_ACCESS_TIME_ATTRIBUTE));
}
if (store.supportsFileAttributeView("owner")) {
// look for username containment to handle Windows domains as well as Unix user names
// org.junit.ComparisonFailure: expected:<[]username> but was:<[DOMAIN\]username>
assertTrue(mock1.getAttribute(ListFile.FILE_OWNER_ATTRIBUTE).contains(userName));
}
if (store.supportsFileAttributeView("posix")) {
assertNotNull("Group name should be set", mock1.getAttribute(ListFile.FILE_GROUP_ATTRIBUTE));
assertNotNull("File permissions should be set", mock1.getAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE));
}
}
@Test
public void testIsListingResetNecessary() throws Exception {
assertTrue(processor.isListingResetNecessary(ListFile.DIRECTORY));
assertTrue(processor.isListingResetNecessary(ListFile.RECURSE));
assertTrue(processor.isListingResetNecessary(ListFile.FILE_FILTER));
assertTrue(processor.isListingResetNecessary(ListFile.PATH_FILTER));
assertTrue(processor.isListingResetNecessary(ListFile.MIN_AGE));
assertTrue(processor.isListingResetNecessary(ListFile.MAX_AGE));
assertTrue(processor.isListingResetNecessary(ListFile.MIN_SIZE));
assertTrue(processor.isListingResetNecessary(ListFile.MAX_SIZE));
assertEquals(true, processor.isListingResetNecessary(ListFile.IGNORE_HIDDEN_FILES));
assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
}
private void makeTestFile(final String name, final long millis, final Map<String, Long> fileTimes) throws IOException {
final File file = new File(TESTDIR + name);
assertTrue(file.createNewFile());
assertTrue(file.setLastModified(millis));
fileTimes.put(file.getName(), file.lastModified());
}
@Test
public void testFilterRunMidFileWrites() throws Exception {
final Map<String, Long> fileTimes = new HashMap<>();
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
makeTestFile("/batch1-age3.txt", time3millis, fileTimes);
makeTestFile("/batch1-age4.txt", time4millis, fileTimes);
makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
// check files
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
assertEquals(3, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
// should be picked since it's newer than age3
makeTestFile("/batch2-age2.txt", time2millis, fileTimes);
// should be picked even if it has the same age3 timestamp, because it wasn't there at the previous cycle.
makeTestFile("/batch2-age3.txt", time3millis, fileTimes);
// should be ignored since it's older than age3
makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
runNext();
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
assertEquals(2, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
}
/*
* HFS+, default for OS X, only has granularity to one second, accordingly, we go back in time to establish consistent test cases
*
* Provides "now" minus 1 second in millis
*/
private static long getTestModifiedTime() {
final long nowMillis = System.currentTimeMillis();
// Subtract a second to avoid possible rounding issues
final long nowSeconds = TimeUnit.SECONDS.convert(nowMillis, TimeUnit.MILLISECONDS) - 1;
return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
}
private void resetAges() {
syncTime = getTestModifiedTime();
age0millis = 0L;
age1millis = 5000L;
age2millis = 10000L;
age3millis = 15000L;
age4millis = 20000L;
age5millis = 100000L;
// Allow for bigger gaps since the lag is 2s w/o milliseconds.
if (!isMillisecondSupported) {
age1millis *= 2;
age2millis *= 2;
age3millis *= 2;
age4millis *= 2;
}
time0millis = syncTime - age0millis;
time1millis = syncTime - age1millis;
time2millis = syncTime - age2millis;
time3millis = syncTime - age3millis;
time4millis = syncTime - age4millis;
time5millis = syncTime - age5millis;
age0 = Long.toString(age0millis) + " millis";
age1 = Long.toString(age1millis) + " millis";
age2 = Long.toString(age2millis) + " millis";
age3 = Long.toString(age3millis) + " millis";
age4 = Long.toString(age4millis) + " millis";
age5 = Long.toString(age5millis) + " millis";
}
private void deleteDirectory(final File directory) throws IOException {
if (directory.exists()) {
File[] files = directory.listFiles();
if (files != null) {
for (final File file : files) {
if (file.isDirectory()) {
deleteDirectory(file);
}
assertTrue("Could not delete " + file.getAbsolutePath(), file.delete());
}
}
}
}
}