/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nifi.processors.standard;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TestFetchFTP {

    private TestableFetchFTP proc;
    private TestRunner runner;

    @Before
    public void setUp() throws Exception {
        proc = new TestableFetchFTP();
        runner = TestRunners.newTestRunner(proc);
        runner.setValidateExpressionUsage(false);

        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");

        MockProcessContext ctx = (MockProcessContext) runner.getProcessContext();
        setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT,
            FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE);
        ctx.setProperty(FTPTransfer.USERNAME, "foo");
        ctx.setProperty(FTPTransfer.PASSWORD, "bar");
    }

    private void setDefaultValues(MockProcessContext ctx, PropertyDescriptor... propertyDescriptors) {
        Arrays.stream(propertyDescriptors).forEach(d -> ctx.setProperty(d, d.getDefaultValue()));
    }

    private void addFileAndEnqueue(String filename) {
        proc.addContent(filename, "world".getBytes());
        runner.enqueue(new byte[0], Collections.singletonMap("filename", filename));
    }

    @Test
    public void testContentFetched() {
        addFileAndEnqueue("hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
        assertFalse(proc.closed);
        runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
    }

    @Test
    public void testFilenameContainsPath() {
        addFileAndEnqueue("./here/is/my/path/hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
        assertFalse(proc.closed);
        MockFlowFile transferredFlowFile = runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0);
        transferredFlowFile.assertContentEquals("world");
        transferredFlowFile.assertAttributeExists(CoreAttributes.PATH.key());
        transferredFlowFile.assertAttributeEquals(CoreAttributes.PATH.key(), "./here/is/my/path");
    }

    @Test
    public void testControlEncodingIsSetToUTF8() {
        runner.setProperty(FTPTransfer.UTF8_ENCODING, "true");

        addFileAndEnqueue("őűőű.txt");

        runner.run(1, false, false);

        ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
        verify(proc.mockFtpClient).setControlEncoding(argument.capture());
        assertEquals("utf-8", argument.getValue().toLowerCase());
    }

    @Test
    public void testContentNotFound() {
        runner.enqueue(new byte[0], Collections.singletonMap("filename", "hello.txt"));

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
    }

    @Test
    public void testInsufficientPermissions() {
        addFileAndEnqueue("hello.txt");
        proc.allowAccess = false;

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
    }


    @Test
    public void testMoveFileWithNoTrailingSlashDirName() {
        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
        runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");

        addFileAndEnqueue("hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);

        proc.fileContents.containsKey("/moved/hello.txt");
        assertEquals(1, proc.fileContents.size());
    }

    @Test
    public void testMoveFileWithTrailingSlashDirName() {
        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");

        addFileAndEnqueue("hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);

        proc.fileContents.containsKey("/moved/hello.txt");
        assertEquals(1, proc.fileContents.size());
    }

    @Test
    public void testDeleteFile() {
        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());

        addFileAndEnqueue("hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
        assertTrue(proc.fileContents.isEmpty());
    }

    @Test
    public void testDeleteFails() {
        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
        proc.allowDelete = false;

        addFileAndEnqueue("hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
        assertFalse(proc.fileContents.isEmpty());
    }

    @Test
    public void testRenameFails() {
        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
        proc.allowDelete = false;
        proc.allowRename = false;

        addFileAndEnqueue("hello.txt");

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
        assertEquals(1, proc.fileContents.size());

        assertTrue(proc.fileContents.containsKey("hello.txt"));
    }

    @Test
    public void testCreateDirFails() {
        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
        runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");

        addFileAndEnqueue("hello.txt");

        proc.allowCreateDir = false;

        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
        assertEquals(1, proc.fileContents.size());

        assertTrue(proc.fileContents.containsKey("hello.txt"));
    }


    private static class TestableFetchFTP extends FetchFTP {
        private boolean allowAccess = true;
        private boolean allowDelete = true;
        private boolean allowCreateDir = true;
        private boolean allowRename = true;
        private boolean closed = false;
        private final Map<String, byte[]> fileContents = new HashMap<>();
        private final FTPClient mockFtpClient = Mockito.mock(FTPClient.class);

        private TestableFetchFTP() throws IOException {
            when(mockFtpClient.retrieveFileStream(anyString()))
                .then((Answer) invocationOnMock -> {
                    byte[] content = fileContents.get(invocationOnMock.getArgument(0));
                    if (content == null) {
                        throw new FileNotFoundException();
                    }
                    return new ByteArrayInputStream(content);
                });
            when(mockFtpClient.login(anyString(), anyString())).thenReturn(true);
            when(mockFtpClient.setFileType(anyInt())).thenReturn(true);

        }

        public void addContent(final String filename, final byte[] content) {
            this.fileContents.put(filename, content);
        }

        @Override
        protected FileTransfer createFileTransfer(final ProcessContext context) {
            return new FTPTransfer(context, getLogger()) {

                @Override
                protected FTPClient createFTPClient() {
                    return mockFtpClient;
                }

                @Override
                public FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException {
                    if (!allowAccess) {
                        throw new PermissionDeniedException("test permission denied");
                    }

                    return super.getRemoteFile(remoteFileName, flowFile, session);
                }

                @Override
                public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException {
                    if (!allowDelete) {
                        throw new PermissionDeniedException("test permission denied");
                    }

                    if (!fileContents.containsKey(remoteFileName)) {
                        throw new FileNotFoundException();
                    }

                    fileContents.remove(remoteFileName);
                }

                @Override
                public void rename(FlowFile flowFile, String source, String target) throws IOException {
                    if (!allowRename) {
                        throw new PermissionDeniedException("test permission denied");
                    }

                    if (!fileContents.containsKey(source)) {
                        throw new FileNotFoundException();
                    }

                    final byte[] content = fileContents.remove(source);
                    fileContents.put(target, content);
                }

                @Override
                public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
                    if (!allowCreateDir) {
                        throw new PermissionDeniedException("test permission denied");
                    }
                }
            };
        }
    }
}
