/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.web.dao.impl;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;

import static org.apache.nifi.util.StringUtils.isEmpty;

public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {

    private FlowController flowController;
    private ComponentStateDAO componentStateDAO;

    private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
        final RemoteProcessGroup remoteProcessGroup = rootGroup.findRemoteProcessGroup(remoteProcessGroupId);

        if (remoteProcessGroup == null) {
            throw new ResourceNotFoundException(String.format("Unable to find remote process group with id '%s'.", remoteProcessGroupId));
        } else {
            return remoteProcessGroup;
        }
    }

    @Override
    public boolean hasRemoteProcessGroup(String remoteProcessGroupId) {
        final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
        return rootGroup.findRemoteProcessGroup(remoteProcessGroupId) != null;
    }

    /**
     * Creates a remote process group reference.
     *
     * @param remoteProcessGroupDTO The remote process group
     * @return The remote process group
     */
    @Override
    public RemoteProcessGroup createRemoteProcessGroup(String groupId, RemoteProcessGroupDTO remoteProcessGroupDTO) {
        ProcessGroup group = locateProcessGroup(flowController, groupId);

        if (remoteProcessGroupDTO.getParentGroupId() != null && !flowController.getFlowManager().areGroupsSame(groupId, remoteProcessGroupDTO.getParentGroupId())) {
            throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Remote Process Group is being added.");
        }

        final String targetUris = remoteProcessGroupDTO.getTargetUris();
        if (targetUris == null || targetUris.length() == 0) {
            throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI(s)");
        }

        // create the remote process group
        RemoteProcessGroup remoteProcessGroup = flowController.getFlowManager().createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
        remoteProcessGroup.initialize();

        // set other properties
        updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);

        // get the group to add the remote process group to
        group.addRemoteProcessGroup(remoteProcessGroup);

        return remoteProcessGroup;
    }

    /**
     * Gets the specified remote process group.
     *
     * @param remoteProcessGroupId The remote process group id
     * @return The remote process group
     */
    @Override
    public RemoteProcessGroup getRemoteProcessGroup(String remoteProcessGroupId) {
        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);

        return remoteProcessGroup;
    }

    /**
     * Gets all of the remote process groups.
     *
     * @return The remote process groups
     */
    @Override
    public Set<RemoteProcessGroup> getRemoteProcessGroups(String groupId) {
        final ProcessGroup group = locateProcessGroup(flowController, groupId);
        final Set<RemoteProcessGroup> remoteProcessGroups = group.getRemoteProcessGroups();
        return remoteProcessGroups;
    }

    @Override
    public void verifyUpdate(RemoteProcessGroupDTO remoteProcessGroup) {
        verifyUpdate(locateRemoteProcessGroup(remoteProcessGroup.getId()), remoteProcessGroup);
    }

    /**
     * Verifies the specified remote group can be updated, if necessary.
     */
    @SuppressWarnings("unchecked")
    private void verifyUpdate(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDto) {
        // see if the remote process group can start/stop transmitting
        if (isNotNull(remoteProcessGroupDto.isTransmitting())) {
            if (!remoteProcessGroup.isTransmitting() && remoteProcessGroupDto.isTransmitting()) {
                remoteProcessGroup.verifyCanStartTransmitting();
            } else if (remoteProcessGroup.isTransmitting() && !remoteProcessGroupDto.isTransmitting()) {
                remoteProcessGroup.verifyCanStopTransmitting();
            }
        }

        // validate the proposed configuration
        final List<String> requestValidation = validateProposedRemoteProcessGroupConfiguration(remoteProcessGroupDto);
        // ensure there was no validation errors
        if (!requestValidation.isEmpty()) {
            throw new ValidationException(requestValidation);
        }

        // if any remote group properties are changing, verify update
        if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(),
                remoteProcessGroupDto.getLocalNetworkInterface(),
                remoteProcessGroupDto.getCommunicationsTimeout(),
                remoteProcessGroupDto.getProxyHost(),
                remoteProcessGroupDto.getProxyPort(),
                remoteProcessGroupDto.getProxyUser(),
                remoteProcessGroupDto.getProxyPassword())) {
            remoteProcessGroup.verifyCanUpdate();
        }
    }

    @Override
    public void verifyUpdateInputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        final RemoteGroupPort port = remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId());

        if (port == null) {
            throw new ResourceNotFoundException(
                    String.format("Unable to find remote process group input port with id '%s'.", remoteProcessGroupPortDto.getId()));
        }

        verifyUpdatePort(port, remoteProcessGroupPortDto);
    }

    @Override
    public void verifyUpdateOutputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        final RemoteGroupPort port = remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId());

        if (port == null) {
            throw new ResourceNotFoundException(
                    String.format("Unable to find remote process group output port with id '%s'.", remoteProcessGroupPortDto.getId()));
        }

        verifyUpdatePort(port, remoteProcessGroupPortDto);
    }

    /**
     * Verified the specified remote port can be updated, if necessary.
     */
    private void verifyUpdatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
        // see if the remote process group can start/stop transmitting
        if (isNotNull(remoteProcessGroupPortDto.isTransmitting())) {
            if (!port.isRunning() && remoteProcessGroupPortDto.isTransmitting()) {
                port.verifyCanStart();
            } else if (port.isRunning() && !remoteProcessGroupPortDto.isTransmitting()) {
                port.verifyCanStop();
            }
        }

        // validate the proposed configuration
        final List<String> requestValidation = validateProposedRemoteProcessGroupPortConfiguration(port, remoteProcessGroupPortDto);
        // ensure there was no validation errors
        if (!requestValidation.isEmpty()) {
            throw new ValidationException(requestValidation);
        }


        // verify update when appropriate
        if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(),
                remoteProcessGroupPortDto.getUseCompression(),
                remoteProcessGroupPortDto.getBatchSettings())) {
            port.verifyCanUpdate();
        }
    }

    /**
     * Validates the proposed configuration for the specified remote port.
     */
    private List<String> validateProposedRemoteProcessGroupPortConfiguration(RemoteGroupPort remoteGroupPort, RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) {
        final List<String> validationErrors = new ArrayList<>();

        // ensure the proposed port configuration is valid
        if (isNotNull(remoteProcessGroupPortDTO.getConcurrentlySchedulableTaskCount()) && remoteProcessGroupPortDTO.getConcurrentlySchedulableTaskCount() <= 0) {
            validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName()));
        }

        final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDTO.getBatchSettings();
        if (batchSettingsDTO != null) {
            final Integer batchCount = batchSettingsDTO.getCount();
            if (isNotNull(batchCount) && batchCount < 0) {
                validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName()));
            }

            final String batchSize = batchSettingsDTO.getSize();
            if (isNotNull(batchSize) && batchSize.length() > 0
                    && !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) {
                validationErrors.add(String.format("Batch size for port '%s' must be of format <Data Size> <Data Unit>" +
                        " where <Data Size> is a non-negative integer and <Data Unit> is a supported Data"
                        + " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName()));
            }

            final String batchDuration = batchSettingsDTO.getDuration();
            if (isNotNull(batchDuration) && batchDuration.length() > 0
                    && !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) {
                validationErrors.add(String.format("Batch duration for port '%s' must be of format <duration> <TimeUnit>" +
                        " where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such "
                        + "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName()));
            }
        }

        return validationErrors;
    }

    /**
     * Validates the proposed configuration for the specified remote group.
     */
    private List<String> validateProposedRemoteProcessGroupConfiguration(RemoteProcessGroupDTO remoteProcessGroupDTO) {
        final List<String> validationErrors = new ArrayList<>();

        if (isNotNull(remoteProcessGroupDTO.getCommunicationsTimeout())) {
            Matcher yieldMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(remoteProcessGroupDTO.getCommunicationsTimeout());
            if (!yieldMatcher.matches()) {
                validationErrors.add("Communications timeout is not a valid time duration (ie 30 sec, 5 min)");
            }
        }
        if (isNotNull(remoteProcessGroupDTO.getYieldDuration())) {
            Matcher yieldMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(remoteProcessGroupDTO.getYieldDuration());
            if (!yieldMatcher.matches()) {
                validationErrors.add("Yield duration is not a valid time duration (ie 30 sec, 5 min)");
            }
        }
        String proxyPassword = remoteProcessGroupDTO.getProxyPassword();
        String proxyUser = remoteProcessGroupDTO.getProxyUser();
        String proxyHost = remoteProcessGroupDTO.getProxyHost();

        if (isNotNull(remoteProcessGroupDTO.getProxyPort())) {
            if (isEmpty(proxyHost)) {
                validationErrors.add("Proxy port was specified, but proxy host was empty.");
            }
        }

        if (!isEmpty(proxyUser)) {
            if (isEmpty(proxyHost)) {
                validationErrors.add("Proxy user name was specified, but proxy host was empty.");
            }
            if (isEmpty(proxyPassword)) {
                validationErrors.add("User password should be specified if Proxy server needs user authentication.");
            }
        }

        if (!isEmpty(proxyPassword)) {
            if (isEmpty(proxyHost)) {
                validationErrors.add("Proxy user password was specified, but proxy host was empty.");
            }
            if (isEmpty(proxyPassword)) {
                validationErrors.add("User name should be specified if Proxy server needs user authentication.");
            }
        }
        return validationErrors;
    }

    @Override
    public RemoteGroupPort updateRemoteProcessGroupInputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        final RemoteGroupPort port = remoteProcessGroup.getInputPort(remoteProcessGroupPortDto.getId());

        if (port == null) {
            throw new ResourceNotFoundException(
                    String.format("Unable to find remote process group input port with id '%s'.", remoteProcessGroupPortDto.getId()));
        }

        // verify the update
        verifyUpdatePort(port, remoteProcessGroupPortDto);

        // perform the update
        updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);

        remoteProcessGroup.getProcessGroup().onComponentModified();
        return port;
    }

    @Override
    public RemoteGroupPort updateRemoteProcessGroupOutputPort(String remoteProcessGroupId, RemoteProcessGroupPortDTO remoteProcessGroupPortDto) {
        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        final RemoteGroupPort port = remoteProcessGroup.getOutputPort(remoteProcessGroupPortDto.getId());

        if (port == null) {
            throw new ResourceNotFoundException(
                    String.format("Unable to find remote process group output port with id '%s'.", remoteProcessGroupId));
        }

        // verify the update
        verifyUpdatePort(port, remoteProcessGroupPortDto);

        // perform the update
        updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
        remoteProcessGroup.getProcessGroup().onComponentModified();

        return port;
    }

    /**
     *
     * @param port Port instance to be updated.
     * @param remoteProcessGroupPortDto DTO containing updated remote process group port settings.
     * @param remoteProcessGroup If remoteProcessGroupPortDto has updated isTransmitting input,
     *                           this method will start or stop the port in this remoteProcessGroup as necessary.
     */
    private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) {
        if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) {
            port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount());
        }
        if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) {
            port.setUseCompression(remoteProcessGroupPortDto.getUseCompression());
        }

        final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDto.getBatchSettings();
        if (isNotNull(batchSettingsDTO)) {
            port.setBatchCount(batchSettingsDTO.getCount());
            port.setBatchSize(batchSettingsDTO.getSize());
            port.setBatchDuration(batchSettingsDTO.getDuration());
        }

        final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting();
        if (isNotNull(isTransmitting)) {
            // start or stop as necessary
            if (!port.isRunning() && isTransmitting) {
                remoteProcessGroup.startTransmitting(port);
            } else if (port.isRunning() && !isTransmitting) {
                remoteProcessGroup.stopTransmitting(port);
            }
        }
    }

    @Override
    public RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroupDTO remoteProcessGroupDTO) {
        RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupDTO.getId());
        return updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);
    }

    private RemoteProcessGroup updateRemoteProcessGroup(RemoteProcessGroup remoteProcessGroup, RemoteProcessGroupDTO remoteProcessGroupDTO) {
        // verify the update request
        verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO);

        // configure the remote process group
        final String targetUris = remoteProcessGroupDTO.getTargetUris();
        final String name = remoteProcessGroupDTO.getName();
        final String comments = remoteProcessGroupDTO.getComments();
        final String communicationsTimeout = remoteProcessGroupDTO.getCommunicationsTimeout();
        final String yieldDuration = remoteProcessGroupDTO.getYieldDuration();
        final String proxyHost = remoteProcessGroupDTO.getProxyHost();
        final Integer proxyPort = remoteProcessGroupDTO.getProxyPort();
        final String proxyUser = remoteProcessGroupDTO.getProxyUser();
        final String proxyPassword = remoteProcessGroupDTO.getProxyPassword();

        final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol();
        final String localNetworkInterface = remoteProcessGroupDTO.getLocalNetworkInterface();

        if (isNotNull(targetUris)) {
            remoteProcessGroup.setTargetUris(targetUris);
        }
        if (isNotNull(name)) {
            remoteProcessGroup.setName(name);
        }
        if (isNotNull(comments)) {
            remoteProcessGroup.setComments(comments);
        }
        if (isNotNull(communicationsTimeout)) {
            remoteProcessGroup.setCommunicationsTimeout(communicationsTimeout);
        }
        if (isNotNull(yieldDuration)) {
            remoteProcessGroup.setYieldDuration(yieldDuration);
        }
        if (isNotNull(remoteProcessGroupDTO.getPosition())) {
            remoteProcessGroup.setPosition(new Position(remoteProcessGroupDTO.getPosition().getX(), remoteProcessGroupDTO.getPosition().getY()));
        }
        if (isNotNull(transportProtocol)) {
            remoteProcessGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase()));
            // No null check because these proxy settings have to be clear if not specified.
            // But when user Enable/Disable transmission, only isTransmitting is sent.
            // To prevent clearing these values in that case, set these only if transportProtocol is sent,
            // assuming UI sends transportProtocol always for update.
            remoteProcessGroup.setProxyHost(proxyHost);
            remoteProcessGroup.setProxyPort(proxyPort);
            remoteProcessGroup.setProxyUser(proxyUser);
            // Keep using current password when null or "********" was sent.
            // Passing other values updates the password,
            // specify empty String to clear password.
            if (isNotNull(proxyPassword) && !DtoFactory.SENSITIVE_VALUE_MASK.equals(proxyPassword)) {
                remoteProcessGroup.setProxyPassword(proxyPassword);
            }
        }
        if (localNetworkInterface != null) {
            if (StringUtils.isBlank(localNetworkInterface)) {
                remoteProcessGroup.setNetworkInterface(null);
            } else {
                remoteProcessGroup.setNetworkInterface(localNetworkInterface);
            }
        }

        final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
        if (isNotNull(isTransmitting)) {
            // start or stop as necessary
            if (!remoteProcessGroup.isTransmitting() && isTransmitting) {
                remoteProcessGroup.startTransmitting();
            } else if (remoteProcessGroup.isTransmitting() && !isTransmitting) {
                remoteProcessGroup.stopTransmitting();
            }
        }

        final ProcessGroup group = remoteProcessGroup.getProcessGroup();
        if (group != null) {
            group.onComponentModified();
        }
        return remoteProcessGroup;
    }

    @Override
    public void verifyDelete(String remoteProcessGroupId) {
        RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        remoteProcessGroup.verifyCanDelete();
    }

    @Override
    public void deleteRemoteProcessGroup(String remoteProcessGroupId) {
        RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        remoteProcessGroup.getProcessGroup().removeRemoteProcessGroup(remoteProcessGroup);
    }

    @Override
    public StateMap getState(String remoteProcessGroupId, Scope scope) {
        final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
        return componentStateDAO.getState(remoteProcessGroup, scope);
    }

    public void setFlowController(FlowController flowController) {
        this.flowController = flowController;
    }

    public void setComponentStateDAO(ComponentStateDAO componentStateDAO) {
        this.componentStateDAO = componentStateDAO;
    }
}
