Refactor ForwardingJob code
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 38f4010..3e5b426 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -49,52 +49,21 @@
 
     private static final Log log = LogFactory.getLog(ForwardingJob.class);
 
-    /**
-     * Includes HTTP status for which message processor should retry
-     */
+    private boolean isMaxDeliverAttemptDropEnabled;
+    private int maxDeliverAttempts;
+    private String deactivateSequence;
+    private String faultSequence;
+    private String replySequence;
     private String[] retryHttpStatusCodes;
+    private MessageStore messageStore;
+    private Axis2BlockingClient sender;
+    private ScheduledMessageForwardingProcessor processor;
+    private boolean errorStop = false;
 
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
-        JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
-
         //Get the Global Objects from DataMap
-        MessageStore messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
-        Map<String, Object> parameters = (Map<String, Object>) jdm.get(
-                MessageProcessorConstants.PARAMETERS);
-        Axis2BlockingClient sender = (Axis2BlockingClient) jdm.get(
-                ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
-        ScheduledMessageForwardingProcessor processor = (ScheduledMessageForwardingProcessor) jdm.get(
-                ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
-
-        int maxDeliverAttempts = -1;
-
-        boolean isMaxDeliverAttemptDropEnabled = false;
-
-        String mdaParam = null;
-        if (parameters != null) {
-            mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
-        }
-
-        if (mdaParam != null) {
-            maxDeliverAttempts = Integer.parseInt(mdaParam);
-
-            // Here we look for the edge case
-            if(maxDeliverAttempts == 0) {
-                processor.deactivate();
-            }
-        }
-        if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
-                parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
-                        .equalsIgnoreCase("true")) {
-	        //Configuration to continue the message processor even without stopping the message processor
-	        // after maximum number of delivery
-            isMaxDeliverAttemptDropEnabled = true;
-        }
-
-        if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
-            retryHttpStatusCodes = parameters
-                    .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
-        }
+        JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+        configureForwardingJob(jdm);
 
         // WE do not try to process if the processor is inactive or
         // there is no message store attached.
@@ -102,173 +71,237 @@
             return;
         }
 
-        boolean errorStop = false;
+        startProcessingMsgs();
+    }
+
+    private void startProcessingMsgs() {
+        errorStop = false;
         while (!errorStop) {
-
-            MessageContext messageContext = messageStore.peek();
-            if (messageContext != null) {
-
-
-                //If The Message not belongs to this server we ignore it.
-                String serverName = (String)
-                        messageContext.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
-
-                if(serverName != null && messageContext instanceof Axis2MessageContext) {
-
-                    AxisConfiguration configuration = ((Axis2MessageContext)messageContext).
-                            getAxis2MessageContext().
-                            getConfigurationContext().getAxisConfiguration();
-
-                    String myServerName = getAxis2ParameterValue(configuration,
-                            SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
-
-                    if(!serverName.equals(myServerName)) {
-                        return;
-                    }
-
+            MessageContext inMsgCtx = messageStore.peek();
+            if (inMsgCtx != null) {
+                if (isMsgRelatedToThisServer(inMsgCtx)) {
+                    handleNewMessage(inMsgCtx);
                 }
-
-                Set proSet = messageContext.getPropertyKeySet();
-
-                if (proSet != null) {
-                    if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
-                        proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
-                    }
-                }
-
-                String targetEp =
-                        (String) messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
-
-                if (targetEp != null) {
-                    Endpoint ep = messageContext.getEndpoint(targetEp);
-
-                    // stop processing if endpoint is not ready to send
-                    if(!ep.getContext().readyToSend()) {
-                        return;
-                    }
-
-                    if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
-
-                        try {
-                            MessageContext outCtx = sender.send(ep, messageContext);
-
-                            if (outCtx != null) {
-                                handle400and500statusCodes(outCtx);
-
-                                if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
-                                    // This Means an Error has occurred
-                                    if (!retryForHttpStatusCodes(messageStore, processor, outCtx)) {
-                                        continue;
-                                    }
-
-                                    if (maxDeliverAttempts > 0) {
-                                        processor.incrementSendAttemptCount();
-                                    }
-
-                                    if (parameters != null &&
-                                            parameters.get(
-                                                    ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
-
-                                        String seq = (String) parameters.get(
-                                                ForwardingProcessorConstants.FAULT_SEQUENCE);
-                                        Mediator mediator = outCtx.getSequence(seq);
-                                        if (mediator != null) {
-                                            mediator.mediate(outCtx);
-                                        } else {
-                                            log.warn("Can't Send the fault Message , Sequence " + seq +
-                                                             " Does not Exist");
-                                        }
-
-                                    }
-
-                                    if (maxDeliverAttempts > 0) {
-                                        if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
-                                            deactivate(processor, messageContext, parameters);
-                                        }
-                                    }
-                                    errorStop = true;
-                                } else {
-                                    // This Means we have invoked an out only operation
-                                    // remove the message and reset the count
-                                    messageStore.poll();
-                                    processor.resetSentAttemptCount();
-                                }
-                                continue;
-                            }
-
-                            // If there is a sequence defined to send success replies,
-                            // we must send the message to it
-                            if (parameters != null &&
-                                    parameters.get(
-                                            ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
-                                if (outCtx != null) {
-                                    String seq = (String) parameters.get(
-                                            ForwardingProcessorConstants.REPLY_SEQUENCE);
-                                    Mediator mediator = outCtx.getSequence(seq);
-                                    if (mediator != null) {
-                                        mediator.mediate(outCtx);
-                                    } else {
-                                        log.warn("Can't Send the Out Message , Sequence " + seq +
-                                                " Does not Exist");
-                                    }
-                                }
-                            }
-
-                            // If no Exception Occurred We remove the Message
-                            // and reset the delivery attempt count
-                            processor.resetSentAttemptCount();
-                            messageStore.poll();
-                        } catch (Exception e) {
-
-                            if (maxDeliverAttempts > 0) {
-                                processor.incrementSendAttemptCount();
-                                if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
-                                    if (isMaxDeliverAttemptDropEnabled) {
-                                        //Since explicitly enabled the message drop after max delivery attempt
-                                        // message has been removed and reset the delivery attempt count of the processor
-                                        processor.resetSentAttemptCount();
-                                        messageStore.poll();
-                                    } else {
-                                        deactivate(processor, messageContext, parameters);
-                                    }
-                                }
-                            }
-                            errorStop = true;
-                            log.error("Error Forwarding Message ", e);
-                            continue;
-                        }
-                    } else {
-                        String logMsg;
-                        if (ep == null) {
-                            logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
-                                    "the message form store";
-                        } else {
-                            logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
-                                    "endpoint types supported";
-                        }
-                        log.warn(logMsg);
-                        messageStore.poll();
-                    }
-
-
-                } else {
-                    //No Target Endpoint defined for the Message
-                    //So we do not have a place to deliver.
-                    //Here we log a warning and remove the message
-                    //todo: we can improve this by implementing a target inferring mechanism
-
-                    log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
-                            " not found in the message context , Hence removing the message ");
-                    messageStore.poll();
-
-                }
-
             } else {
                 errorStop = true;
             }
         }
     }
 
+    private void configureForwardingJob(JobDataMap jdm) {
+        messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
+        sender = (Axis2BlockingClient) jdm.get(
+                ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
+        processor = (ScheduledMessageForwardingProcessor) jdm.get(
+                ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+        Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
+        maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
+        isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters, maxDeliverAttempts);
+        retryHttpStatusCodes(parameters);
+        setSequences(parameters);
+    }
+
+    private boolean isMsgRelatedToThisServer(MessageContext inMsgCtx) {
+        String serverName = (String)
+                inMsgCtx.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+        if(serverName != null && inMsgCtx instanceof Axis2MessageContext) {
+            AxisConfiguration configuration = ((Axis2MessageContext)inMsgCtx).
+                    getAxis2MessageContext().
+                    getConfigurationContext().getAxisConfiguration();
+            String myServerName = getAxis2ParameterValue(configuration,
+                                                         SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+            return serverName.equals(myServerName);
+        }
+        return false;
+    }
+
+    private void handleNewMessage(MessageContext inMsgCtx) {
+        sanitizeMsgContext(inMsgCtx);
+        String targetEp =
+                (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+        if (targetEp != null) {
+            Endpoint ep = inMsgCtx.getEndpoint(targetEp);
+            // stop processing if endpoint is not ready to send
+            if(ep.getContext().readyToSend()) {
+                if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
+                    sendMsgToEndpoint(inMsgCtx, ep);
+                } else {
+                    logMsg(targetEp, ep);
+                    messageStore.poll();
+                }
+            }
+        } else {
+            //No Target Endpoint defined for the Message
+            //So we do not have a place to deliver.
+            //Here we log a warning and remove the message
+            //todo: we can improve this by implementing a target inferring mechanism
+            log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
+                             " not found in the message context , Hence removing the message ");
+            messageStore.poll();
+        }
+    }
+
+    private void sanitizeMsgContext(MessageContext messageContext) {
+        Set proSet = messageContext.getPropertyKeySet();
+        if (proSet != null) {
+            if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
+                proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
+            }
+        }
+    }
+
+    private void sendMsgToEndpoint(MessageContext inMsgCtx, Endpoint ep) {
+        try {
+            MessageContext outCtx = sender.send(ep, inMsgCtx);
+            if (outCtx != null) {
+                handleResponse(inMsgCtx, outCtx);
+            } else {
+                // If no Exception Occurred We remove the Message
+                // and reset the delivery attempt count
+                messageStore.poll();
+                processor.resetSentAttemptCount();
+            }
+        } catch (Exception e) {
+            errorStop = handleOutOnlyError(maxDeliverAttempts, isMaxDeliverAttemptDropEnabled, inMsgCtx);
+            log.error("Error Forwarding Message ", e);
+        }
+    }
+
+    private void handleResponse(MessageContext inMsgCtx, MessageContext outCtx) {
+        handle400and500statusCodes(outCtx);
+        if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
+            handleError(maxDeliverAttempts, inMsgCtx, outCtx);
+        } else {
+            // This Means we have invoked an out only operation
+            // remove the message and reset the count
+            messageStore.poll();
+            processor.resetSentAttemptCount();
+            sendResponseToReplySeq(outCtx);
+        }
+    }
+
+    private void handleError(int maxDeliverAttempts, MessageContext inMsgCtx, MessageContext outCtx) {
+        if (retryForHttpStatusCodes(messageStore, processor, outCtx)) {
+            if (maxDeliverAttempts > 0) {
+                processor.incrementSendAttemptCount();
+            }
+            sendItToFaultSequence(outCtx);
+            if (maxDeliverAttempts > 0) {
+                if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+                    deactivate(processor, inMsgCtx);
+                }
+            }
+            errorStop = true;
+        }
+    }
+
+    private void logMsg(String targetEp, Endpoint ep) {
+        String logMsg;
+        if (ep == null) {
+            logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
+                    "the message form store";
+        } else {
+            logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
+                    "endpoint types supported";
+        }
+        log.warn(logMsg);
+    }
+
+    private boolean handleOutOnlyError(int maxDeliverAttempts, boolean isMaxDeliverAttemptDropEnabled,
+                                       MessageContext messageContext) {
+        if (maxDeliverAttempts > 0) {
+            processor.incrementSendAttemptCount();
+            if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+                if (isMaxDeliverAttemptDropEnabled) {
+                    //Since explicitly enabled the message drop after max delivery attempt
+                    // message has been removed and reset the delivery attempt count of the processor
+                    processor.resetSentAttemptCount();
+                    messageStore.poll();
+                } else {
+                    deactivate(processor, messageContext);
+                }
+            }
+        }
+        return true;
+    }
+
+    private void sendResponseToReplySeq(MessageContext outCtx) {
+        // If there is a sequence defined to send success replies,
+        // we must send the message to it
+        if (replySequence != null) {
+            Mediator mediator = outCtx.getSequence(replySequence);
+            if (mediator != null) {
+                mediator.mediate(outCtx);
+            } else {
+                log.warn("Can't Send the Out Message , Sequence " + replySequence + " Does not Exist");
+            }
+        }
+    }
+
+    private void sendItToFaultSequence(MessageContext outCtx) {
+        if (faultSequence != null) {
+            Mediator mediator = outCtx.getSequence(faultSequence);
+            if (mediator != null) {
+                mediator.mediate(outCtx);
+            } else {
+                log.warn("Can't Send the fault Message , Sequence " + faultSequence +
+                                 " Does not Exist");
+            }
+        }
+    }
+
+    private void setSequences(Map<String, Object> parameters) {
+        if (parameters != null) {
+            if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+                faultSequence = (String) parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE);
+            }
+            if (parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
+                deactivateSequence = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
+            }
+            if (parameters.get(ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
+                replySequence = (String) parameters.get(
+                        ForwardingProcessorConstants.REPLY_SEQUENCE);
+            }
+        }
+    }
+
+    private void retryHttpStatusCodes(Map<String, Object> parameters) {
+        if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+            retryHttpStatusCodes = parameters
+                    .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+        }
+    }
+
+    private boolean isMaxDeliverAttemptDropEnabled(Map<String, Object> parameters, int maxDeliverAttempts) {
+        boolean isMaxDeliverAttemptDropEnabled = false;
+        if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
+                parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
+                        .equalsIgnoreCase("true")) {
+	        //Configuration to continue the message processor even without stopping the message processor
+	        // after maximum number of delivery
+            isMaxDeliverAttemptDropEnabled = true;
+        }
+        return isMaxDeliverAttemptDropEnabled;
+    }
+
+    private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
+                                           ScheduledMessageForwardingProcessor processor) {
+        int maxDeliverAttempts = -1;
+        String mdaParam = null;
+        if (parameters != null) {
+            mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
+        }
+        if (mdaParam != null) {
+            maxDeliverAttempts = Integer.parseInt(mdaParam);
+            // Here we look for the edge case
+            if(maxDeliverAttempts == 0) {
+                processor.deactivate();
+            }
+        }
+        return maxDeliverAttempts;
+    }
+
     private boolean retryForHttpStatusCodes(MessageStore messageStore, ScheduledMessageForwardingProcessor processor,
                                             MessageContext outCtx) {
         // No need to retry for application level failures
@@ -290,7 +323,8 @@
             String httpStatusCode =  outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
             if (httpStatusCode.equals(MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR)) {
                 outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
-                outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
+                outCtx.setProperty(SynapseConstants.ERROR_MESSAGE,
+                                   MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
             } else if (httpStatusCode.equals(MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR)) {
                 outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
                 outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR);
@@ -305,9 +339,7 @@
      * @param paramKey The name / key of the parameter
      * @return The value of the parameter
      */
-    private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration,
-                                                 String paramKey) {
-
+    private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration, String paramKey) {
         Parameter parameter = axisConfiguration.getParameter(paramKey);
         if (parameter == null) {
             return null;
@@ -320,22 +352,24 @@
         }
     }
 
-    private void deactivate(ScheduledMessageForwardingProcessor processor,
-                            MessageContext msgContext, Map<String, Object> parameters) {
+    private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
         processor.deactivate();
-        if (parameters != null && parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
-            if (msgContext != null) {
-                String seq = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
-                Mediator mediator = msgContext.getSequence(seq);
-                if (mediator != null) {
-                    mediator.mediate(msgContext);
-                } else {
-                    log.warn("Deactivate sequence: " + seq + " does not exist");
-                }
+        if (deactivateSequence != null) {
+            if (inMsgCtx != null) {
+                sendMsgToDeactivateSeq(inMsgCtx);
             }
         }
     }
 
+    private void sendMsgToDeactivateSeq(MessageContext inMsgCtx) {
+        Mediator mediator = inMsgCtx.getSequence(deactivateSequence);
+        if (mediator != null) {
+            mediator.mediate(inMsgCtx);
+        } else {
+            log.warn("Deactivate sequence: " + deactivateSequence + " does not exist");
+        }
+    }
+
     private boolean isRetryHttpStatusCode(String message) {
         if (retryHttpStatusCodes == null) {
             return false;