| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.processors.standard; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.anyInt; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.apache.commons.net.ftp.FTPClient; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processors.standard.util.FTPTransfer; |
| import org.apache.nifi.processors.standard.util.FileTransfer; |
| import org.apache.nifi.processors.standard.util.PermissionDeniedException; |
| import org.apache.nifi.util.MockFlowFile; |
| import org.apache.nifi.util.MockProcessContext; |
| import org.apache.nifi.util.TestRunner; |
| import org.apache.nifi.util.TestRunners; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Mockito; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestFetchFTP { |
| |
| private TestableFetchFTP proc; |
| private TestRunner runner; |
| |
| @Before |
| public void setUp() throws Exception { |
| proc = new TestableFetchFTP(); |
| runner = TestRunners.newTestRunner(proc); |
| runner.setValidateExpressionUsage(false); |
| |
| runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); |
| runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); |
| runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); |
| |
| MockProcessContext ctx = (MockProcessContext) runner.getProcessContext(); |
| setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT, |
| FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE); |
| ctx.setProperty(FTPTransfer.USERNAME, "foo"); |
| ctx.setProperty(FTPTransfer.PASSWORD, "bar"); |
| } |
| |
| private void setDefaultValues(MockProcessContext ctx, PropertyDescriptor... propertyDescriptors) { |
| Arrays.stream(propertyDescriptors).forEach(d -> ctx.setProperty(d, d.getDefaultValue())); |
| } |
| |
| private void addFileAndEnqueue(String filename) { |
| proc.addContent(filename, "world".getBytes()); |
| runner.enqueue(new byte[0], Collections.singletonMap("filename", filename)); |
| } |
| |
| @Test |
| public void testContentFetched() { |
| addFileAndEnqueue("hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| assertFalse(proc.closed); |
| runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world"); |
| } |
| |
| @Test |
| public void testFilenameContainsPath() { |
| addFileAndEnqueue("./here/is/my/path/hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| assertFalse(proc.closed); |
| MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0); |
| transferredFlowFile.assertContentEquals("world"); |
| transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key()); |
| transferredFlowFile.assertAttributeEquals(CoreAttributes.PATH.key(), "./here/is/my/path"); |
| } |
| |
| @Test |
| public void testControlEncodingIsSetToUTF8() { |
| runner.setProperty(FTPTransfer.UTF8_ENCODING, "true"); |
| |
| addFileAndEnqueue("őűőű.txt"); |
| |
| runner.run(1, false, false); |
| |
| ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); |
| verify(proc.mockFtpClient).setControlEncoding(argument.capture()); |
| assertEquals("utf-8", argument.getValue().toLowerCase()); |
| } |
| |
| @Test |
| public void testContentNotFound() { |
| runner.enqueue(new byte[0], Collections.singletonMap("filename", "hello.txt")); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1); |
| } |
| |
| @Test |
| public void testInsufficientPermissions() { |
| addFileAndEnqueue("hello.txt"); |
| proc.allowAccess = false; |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); |
| } |
| |
| |
| @Test |
| public void testMoveFileWithNoTrailingSlashDirName() { |
| runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); |
| runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved"); |
| runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true"); |
| |
| addFileAndEnqueue("hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| |
| proc.fileContents.containsKey("/moved/hello.txt"); |
| assertEquals(1, proc.fileContents.size()); |
| } |
| |
| @Test |
| public void testMoveFileWithTrailingSlashDirName() { |
| runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); |
| runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); |
| |
| addFileAndEnqueue("hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| |
| proc.fileContents.containsKey("/moved/hello.txt"); |
| assertEquals(1, proc.fileContents.size()); |
| } |
| |
| @Test |
| public void testDeleteFile() { |
| runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); |
| |
| addFileAndEnqueue("hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| assertTrue(proc.fileContents.isEmpty()); |
| } |
| |
| @Test |
| public void testDeleteFails() { |
| runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); |
| proc.allowDelete = false; |
| |
| addFileAndEnqueue("hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| assertFalse(proc.fileContents.isEmpty()); |
| } |
| |
| @Test |
| public void testRenameFails() { |
| runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); |
| runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); |
| proc.allowDelete = false; |
| proc.allowRename = false; |
| |
| addFileAndEnqueue("hello.txt"); |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| assertEquals(1, proc.fileContents.size()); |
| |
| assertTrue(proc.fileContents.containsKey("hello.txt")); |
| } |
| |
| @Test |
| public void testCreateDirFails() { |
| runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); |
| runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); |
| runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true"); |
| |
| addFileAndEnqueue("hello.txt"); |
| |
| proc.allowCreateDir = false; |
| |
| runner.run(1, false, false); |
| runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); |
| assertEquals(1, proc.fileContents.size()); |
| |
| assertTrue(proc.fileContents.containsKey("hello.txt")); |
| } |
| |
| |
| private static class TestableFetchFTP extends FetchFTP { |
| private boolean allowAccess = true; |
| private boolean allowDelete = true; |
| private boolean allowCreateDir = true; |
| private boolean allowRename = true; |
| private boolean closed = false; |
| private final Map<String, byte[]> fileContents = new HashMap<>(); |
| private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class); |
| |
| private TestableFetchFTP() throws IOException { |
| when(mockFtpClient.retrieveFileStream(anyString())) |
| .then((Answer) invocationOnMock -> { |
| byte[] content = fileContents.get(invocationOnMock.getArgument(0)); |
| if (content == null) { |
| throw new FileNotFoundException(); |
| } |
| return new ByteArrayInputStream(content); |
| }); |
| when(mockFtpClient.login(anyString(), anyString())).thenReturn(true); |
| when(mockFtpClient.setFileType(anyInt())).thenReturn(true); |
| |
| } |
| |
| public void addContent(final String filename, final byte[] content) { |
| this.fileContents.put(filename, content); |
| } |
| |
| @Override |
| protected FileTransfer createFileTransfer(final ProcessContext context) { |
| return new FTPTransfer(context, getLogger()) { |
| |
| @Override |
| protected FTPClient createFTPClient() { |
| return mockFtpClient; |
| } |
| |
| @Override |
| public FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException { |
| if (!allowAccess) { |
| throw new PermissionDeniedException("test permission denied"); |
| } |
| |
| return super.getRemoteFile(remoteFileName, flowFile, session); |
| } |
| |
| @Override |
| public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException { |
| if (!allowDelete) { |
| throw new PermissionDeniedException("test permission denied"); |
| } |
| |
| if (!fileContents.containsKey(remoteFileName)) { |
| throw new FileNotFoundException(); |
| } |
| |
| fileContents.remove(remoteFileName); |
| } |
| |
| @Override |
| public void rename(FlowFile flowFile, String source, String target) throws IOException { |
| if (!allowRename) { |
| throw new PermissionDeniedException("test permission denied"); |
| } |
| |
| if (!fileContents.containsKey(source)) { |
| throw new FileNotFoundException(); |
| } |
| |
| final byte[] content = fileContents.remove(source); |
| fileContents.put(target, content); |
| } |
| |
| @Override |
| public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException { |
| if (!allowCreateDir) { |
| throw new PermissionDeniedException("test permission denied"); |
| } |
| } |
| }; |
| } |
| } |
| } |