/*
 *  Licensed to the Apache Software Foundation (ASF) under one
 *  or more contributor license agreements.  See the NOTICE file
 *  distributed with this work for additional information
 *  regarding copyright ownership.  The ASF licenses this file
 *  to you under the Apache License, Version 2.0 (the
 *  "License"); you may not use this file except in compliance
 *  with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing,
 *  software distributed under the License is distributed on an
 *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 *  KIND, either express or implied.  See the License for the
 *  specific language governing permissions and limitations
 *  under the License.
 */

package org.apache.synapse.message.processors.forward;

import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.core.axis2.Axis2BlockingClient;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.AbstractEndpoint;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.processors.MessageProcessorConstants;
import org.apache.synapse.message.store.MessageStore;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.StatefulJob;

import java.util.Map;
import java.util.Set;

/**
 * Redelivery Job will replay all the Messages in the Message Store when executed
 * Excluding ones that are already tried redelivering more than max number of tries
 */
public class ForwardingJob implements StatefulJob {

    private static final Log log = LogFactory.getLog(ForwardingJob.class);

    private boolean isMaxDeliverAttemptDropEnabled;
    private int maxDeliverAttempts;
    private String deactivateSequence;
    private String faultSequence;
    private String replySequence;
    private String[] retryHttpStatusCodes;
    private MessageStore messageStore;
    private Axis2BlockingClient sender;
    private ScheduledMessageForwardingProcessor processor;
    private boolean errorStop = false;

    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //Get the Global Objects from DataMap
        JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
        configureForwardingJob(jdm);

        // WE do not try to process if the processor is inactive or
        // there is no message store attached.
        if(!processor.isActive() || messageStore == null) {
            return;
        }

        startProcessingMsgs();
    }

    private void configureForwardingJob(JobDataMap jdm) {
        messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
        sender = (Axis2BlockingClient) jdm.get(
                ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
        processor = (ScheduledMessageForwardingProcessor) jdm.get(
                ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
        Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
        maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
        isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
        retryHttpStatusCodes(parameters);
        setSequences(parameters);
    }

    private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
                                           ScheduledMessageForwardingProcessor processor) {
        int maxDeliverAttempts = -1;
        String mdaParam = null;
        if (parameters != null) {
            mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
        }
        if (mdaParam != null) {
            maxDeliverAttempts = Integer.parseInt(mdaParam);
            // Here we look for the edge case
            if(maxDeliverAttempts == 0) {
                processor.deactivate();
            }
        }
        return maxDeliverAttempts;
    }

    private boolean isMaxDeliverAttemptDropEnabled(Map<String, Object> parameters) {
        boolean isMaxDeliverAttemptDropEnabled = false;
        if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
                parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
                        .equalsIgnoreCase("true")) {
            //Configuration to continue the message processor even without stopping the message processor
            // after maximum number of delivery
            isMaxDeliverAttemptDropEnabled = true;
        }
        return isMaxDeliverAttemptDropEnabled;
    }

    private void retryHttpStatusCodes(Map<String, Object> parameters) {
        if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
            retryHttpStatusCodes = parameters
                    .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
        }
    }

    private void setSequences(Map<String, Object> parameters) {
        if (parameters != null) {
            if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
                faultSequence = (String) parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE);
            }
            if (parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
                deactivateSequence = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
            }
            if (parameters.get(ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
                replySequence = (String) parameters.get(
                        ForwardingProcessorConstants.REPLY_SEQUENCE);
            }
        }
    }

    private void startProcessingMsgs() {
        errorStop = false;
        while (!errorStop) {
            MessageContext inMsgCtx = messageStore.peek();
            if (inMsgCtx != null) {
                if (isMsgRelatedToThisServer(inMsgCtx)) {
                    handleNewMessage(inMsgCtx);
                }
            } else {
                errorStop = true;
            }
        }
    }

    private boolean isMsgRelatedToThisServer(MessageContext inMsgCtx) {
        String serverName = (String) inMsgCtx.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
        if(serverName != null && inMsgCtx instanceof Axis2MessageContext) {
            AxisConfiguration configuration = ((Axis2MessageContext)inMsgCtx).
                    getAxis2MessageContext().getConfigurationContext().getAxisConfiguration();
            String myServerName = getAxis2ParameterValue(configuration,
                                                         SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);

            return serverName.equals(myServerName);
        }
        return true;
    }

    /**
     * Helper method to get a value of a parameters in the AxisConfiguration
     *
     * @param axisConfiguration AxisConfiguration instance
     * @param paramKey The name / key of the parameter
     * @return The value of the parameter
     */
    private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration, String paramKey) {
        Parameter parameter = axisConfiguration.getParameter(paramKey);
        if (parameter == null) {
            return null;
        }
        Object value = parameter.getValue();
        if (value != null && value instanceof String) {
            return (String) parameter.getValue();
        } else {
            return null;
        }
    }

    private void handleNewMessage(MessageContext inMsgCtx) {
        sanitizeMsgContext(inMsgCtx);
        String targetEp = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
        if (targetEp != null) {
            Endpoint ep = inMsgCtx.getEndpoint(targetEp);
            // stop processing if endpoint is not ready to send
            if(ep.getContext().readyToSend()) {
                if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
                    sendMsgToEndpoint(inMsgCtx, ep);
                } else {
                    logMsg(targetEp, ep);
                    messageStore.poll();
                }
            }
        } else {
            //No Target Endpoint defined for the Message
            //So we do not have a place to deliver.
            //Here we log a warning and remove the message
            //todo: we can improve this by implementing a target inferring mechanism
            log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
                             " not found in the message context , Hence removing the message ");
            messageStore.poll();
        }
    }

    private void sanitizeMsgContext(MessageContext messageContext) {
        Set proSet = messageContext.getPropertyKeySet();
        if (proSet != null) {
            if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
                proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
            }
        }
    }

    private void logMsg(String targetEp, Endpoint ep) {
        String logMsg;
        if (ep == null) {
            logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
                    "the message form store";
        } else {
            logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
                    "endpoint types supported";
        }
        log.warn(logMsg);
    }

    private void sendMsgToEndpoint(MessageContext inMsgCtx, Endpoint ep) {
        try {
            MessageContext outCtx = sender.send(ep, inMsgCtx);
            if (outCtx != null) {
                handleResponse(inMsgCtx, outCtx);
            } else {
                // If no Exception Occurred We remove the Message
                // and reset the delivery attempt count
                messageStore.poll();
                processor.resetSentAttemptCount();
            }
        } catch (Exception e) {
            errorStop = handleOutOnlyError(inMsgCtx);
            log.error("Error Forwarding Message ", e);
        }
    }

    private void handleResponse(MessageContext inMsgCtx, MessageContext outCtx) {
        handle400and500statusCodes(outCtx);
        if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
            handleError(inMsgCtx, outCtx);
        } else {
            // This Means we have invoked an out only operation
            // remove the message and reset the count
            messageStore.poll();
            processor.resetSentAttemptCount();
            sendResponseToReplySeq(outCtx);
        }
    }

    private void handle400and500statusCodes(MessageContext outCtx) {
        if ((outCtx.getProperty(NhttpConstants.HTTP_SC) != null)) {
            String httpStatusCode =  outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
            if (httpStatusCode.equals(MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR)) {
                outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
                outCtx.setProperty(SynapseConstants.ERROR_MESSAGE,
                                   MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
            } else if (httpStatusCode.equals(MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR)) {
                outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
                outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR);
            }
        }
    }

    private void handleError(MessageContext inMsgCtx, MessageContext outCtx) {
        if (isHttpStatusCodeError(outCtx)) {
            if (isRetryHttpStatusCode(outCtx)) {
                doPostErrorTasks(inMsgCtx, outCtx);
            }
        } else {
            doPostErrorTasks(inMsgCtx, outCtx);
        }
    }

    private void doPostErrorTasks(MessageContext inMsgCtx, MessageContext outCtx) {
        if (maxDeliverAttempts > 0) {
            processor.incrementSendAttemptCount();
        }
        sendItToFaultSequence(outCtx);
        if (maxDeliverAttempts > 0) {
            if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
                deactivate(processor, inMsgCtx);
            }
        }
        errorStop = true;
    }

    private boolean handleOutOnlyError(MessageContext inMsgCtx) {
        if (maxDeliverAttempts > 0) {
            processor.incrementSendAttemptCount();
            if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
                if (isMaxDeliverAttemptDropEnabled) {
                    //Since explicitly enabled the message drop after max delivery attempt
                    // message has been removed and reset the delivery attempt count of the processor
                    processor.resetSentAttemptCount();
                    messageStore.poll();
                } else {
                    deactivate(processor, inMsgCtx);
                }
            }
        }
        return true;
    }

    private void sendResponseToReplySeq(MessageContext outCtx) {
        // If there is a sequence defined to send success replies,
        // we must send the message to it
        if (replySequence != null) {
            Mediator mediator = outCtx.getSequence(replySequence);
            if (mediator != null) {
                mediator.mediate(outCtx);
            } else {
                log.warn("Can't Send the Out Message , Sequence " + replySequence + " Does not Exist");
            }
        }
    }

    private void sendItToFaultSequence(MessageContext outCtx) {
        if (faultSequence != null) {
            Mediator mediator = outCtx.getSequence(faultSequence);
            if (mediator != null) {
                mediator.mediate(outCtx);
            } else {
                log.warn("Can't Send the fault Message , Sequence " + faultSequence +
                                 " Does not Exist");
            }
        }
    }

    private boolean isHttpStatusCodeError(MessageContext outCtx) {
        // No need to retry for application level failures
        if (outCtx.getProperty(SynapseConstants.ERROR_MESSAGE) != null) {
            String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
            return errorMsg.matches(".*[3-5]\\d\\d.*");
        }
        return false;
    }

    private boolean isRetryHttpStatusCode(MessageContext outCtx) {
        String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
        if (retryHttpStatusCodes == null) {
            return false;
        }
        for (String statsCode : retryHttpStatusCodes) {
            if (errorMsg.contains(statsCode)) {
                return true;
            }
        }
        return false;
    }

    private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
        processor.deactivate();
        if (deactivateSequence != null) {
            if (inMsgCtx != null) {
                sendMsgToDeactivateSeq(inMsgCtx);
            }
        }
    }

    private void sendMsgToDeactivateSeq(MessageContext inMsgCtx) {
        Mediator mediator = inMsgCtx.getSequence(deactivateSequence);
        if (mediator != null) {
            mediator.mediate(inMsgCtx);
        } else {
            log.warn("Deactivate sequence: " + deactivateSequence + " does not exist");
        }
    }
}
