blob: abfd5f5febdd21f42ddc3e63dfaef850bbe381b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private Map<String, TestFile> testFiles;
@Override
protected Class<? extends Processor> getProcessorClass() {
return ListAzureDataLakeStorage.class;
}
@BeforeEach
public void setUp() {
testFiles = new HashMap<>();
TestFile testFile1 = new TestFile("", "file1");
uploadFile(testFile1);
testFiles.put(testFile1.getFilePath(), testFile1);
TestFile testTempFile1 = new TestFile(AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY, "1234file1");
uploadFile(testTempFile1);
testFiles.put(testTempFile1.getFilePath(), testTempFile1);
TestFile testFile2 = new TestFile("", "file2");
uploadFile(testFile2);
testFiles.put(testFile2.getFilePath(), testFile2);
TestFile testFile11 = new TestFile("dir1", "file11");
createDirectoryAndUploadFile(testFile11);
testFiles.put(testFile11.getFilePath(), testFile11);
TestFile testTempFile11 = new TestFile(String.format("dir1/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "5678file11");
uploadFile(testTempFile11);
testFiles.put(testTempFile11.getFilePath(), testTempFile11);
TestFile testFile12 = new TestFile("dir1", "file12");
uploadFile(testFile12);
testFiles.put(testFile12.getFilePath(), testFile12);
TestFile testFile111 = new TestFile("dir1/dir11", "file111");
createDirectoryAndUploadFile(testFile111);
testFiles.put(testFile111.getFilePath(), testFile111);
TestFile testTempFile111 = new TestFile(String.format("dir1/dir11/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "9010file111");
uploadFile(testTempFile111);
testFiles.put(testTempFile111.getFilePath(), testTempFile111);
TestFile testFile21 = new TestFile("dir 2", "file 21", "Test");
createDirectoryAndUploadFile(testFile21);
testFiles.put(testFile21.getFilePath(), testFile21);
TestFile testTempFile21 = new TestFile(String.format("dir2/%s", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY), "1112file21", "Test");
uploadFile(testTempFile21);
testFiles.put(testTempFile21.getFilePath(), testTempFile21);
createDirectory("dir3");
}
@Test
public void testListRootRecursive() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListRootRecursiveWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootRecursiveUsingProxyConfigurationService() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
configureProxyService();
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListRootNonRecursive() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runProcessor();
assertSuccess("file1", "file2");
}
@Test
public void testListRootNonRecursiveWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2");
}
@Test
public void testListSubdirectoryRecursive() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListSubdirectoryRecursiveWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListSubdirectoryNonRecursive() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12");
}
@Test
public void testListSubdirectoryNonRecursiveWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12");
}
@Test
public void testListWithFileFilter() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithFileFilterWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListWithFileFilterWithEL() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setEnvironmentVariableValue("suffix", "1.*");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithFileFilterWithELWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setEnvironmentVariableValue("suffix", "1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootWithPathFilter() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListRootWithPathFilterWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootWithPathFilterWithEL() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setEnvironmentVariableValue("prefix", "^dir");
runner.setEnvironmentVariableValue("suffix", "1.*$");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListRootWithPathFilterWithELWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setEnvironmentVariableValue("prefix", "^dir");
runner.setEnvironmentVariableValue("suffix", "1.*$");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListSubdirectoryWithPathFilter() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runProcessor();
assertSuccess("dir1/dir11/file111");
}
@Test
public void testListSubdirectoryWithPathFilterWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/dir11/file111", String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListRootWithFileAndPathFilter() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runProcessor();
assertSuccess("dir1/file11", "dir1/dir11/file111");
}
@Test
public void testListRootWithFileAndPathFilterWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir1/file11", "dir1/dir11/file111", String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListEmptyDirectory() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir3");
runProcessor();
assertSuccess();
}
@Test
public void testListNonExistingDirectory() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dummy");
runProcessor();
assertFailure();
}
@Test
public void testListWithNonExistingFileSystem() {
runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy");
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runProcessor();
assertFailure();
}
@Test
public void testListWithRecords() throws InitializationException {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer");
runner.run();
runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "3");
}
@Test
public void testListWithRecordsWithTempFiles() throws InitializationException {
runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1");
MockRecordWriter recordWriter = new MockRecordWriter(null, false);
runner.addControllerService("record-writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.setProperty(ListAzureDataLakeStorage.RECORD_WRITER, "record-writer");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runner.run();
runner.assertAllFlowFilesTransferred(ListAzureDataLakeStorage.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("record.count", "5");
}
@Test
public void testListWithMinAge() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runProcessor();
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
@Test
public void testListWithMinAgeWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
@Test
public void testListWithMaxAge() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListWithMaxAgeWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListWithMinSize() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111");
}
@Test
public void testListWithMinSizeWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111",
String.format("%s/1234file1", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/%s/5678file11", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY),
String.format("dir1/dir11/%s/9010file111", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
@Test
public void testListWithMaxSize() {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runProcessor();
assertSuccess("dir 2/file 21");
}
@Test
public void testListWithMaxSizeWithTempFiles() throws Exception {
runner.setProperty(AzureStorageUtils.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true");
runProcessor();
assertSuccess("dir 2/file 21", String.format("dir2/%s/1112file21", AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY));
}
private void runProcessor() {
runner.assertValid();
runner.run();
}
private void assertSuccess(String... testFilePaths) {
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length);
Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);
expectedFiles.keySet().retainAll(Arrays.asList(testFilePaths));
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListAzureDataLakeStorage.REL_SUCCESS);
assertEquals(expectedFiles.size(), flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
String filePath = flowFile.getAttribute("azure.filePath");
TestFile testFile = expectedFiles.remove(filePath);
assertNotNull(testFile, "File path not found in the expected map");
assertFlowFile(testFile, flowFile);
}
}
private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) {
flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath());
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory());
flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, testFile.getFilename());
flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, String.valueOf(testFile.getFileContent().getBytes(StandardCharsets.UTF_8).length));
flowFile.assertAttributeExists(ATTR_NAME_LAST_MODIFIED);
flowFile.assertAttributeExists(ATTR_NAME_ETAG);
flowFile.assertContentEquals("");
}
private void assertFailure() {
assertFalse(runner.getLogger().getErrorMessages().isEmpty());
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, 0);
}
}