Merge pull request #32 from shafreenAnfar/newMSMP

Improve Message Forwarding Processor (MFP) to support HTTP status codes
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
index e7c98aa..92f83ef 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
@@ -35,4 +35,10 @@
      */
     public static final String MAX_DELIVER_ATTEMPTS = "max.deliver.attempts";
 
+    /**
+     * HTTP status codes which are used for message processor retry implementation
+     */
+    public static final String HTTP_INTERNAL_SERVER_ERROR = "500";
+    public static final String HTTP_BAD_REQUEST_ERROR = "400";
+
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 8f8a566..40ef410 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -32,6 +32,7 @@
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.message.processors.MessageProcessorConsents;
 import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.transport.nhttp.NhttpConstants;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -48,6 +49,10 @@
 
     private static final Log log = LogFactory.getLog(ForwardingJob.class);
 
+    /**
+     * Includes HTTP status for which message processor should retry
+     */
+    private String[] retryHttpStatusCodes;
 
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
         JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
@@ -86,6 +91,11 @@
             isMaxDeliverAttemptDropEnabled = true;
         }
 
+        if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+            retryHttpStatusCodes = parameters
+                    .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+        }
+
         // WE do not try to process if the processor is inactive or
         // there is no message store attached.
         if(!processor.isActive() || messageStore == null) {
@@ -142,43 +152,47 @@
                         try {
                             MessageContext outCtx = sender.send(ep, messageContext);
 
-                            if (outCtx != null && "true".equals(outCtx.
-                                    getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
-                                // This Means an Error has occurred
+                            if (outCtx != null) {
+                                handle400and500statusCodes(outCtx);
 
-                                if (maxDeliverAttempts > 0) {
-                                    processor.incrementSendAttemptCount();
-                                }
-
-                                if (parameters != null &&
-                                        parameters.get(
-                                                ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
-
-                                    String seq = (String) parameters.get(
-                                            ForwardingProcessorConstants.FAULT_SEQUENCE);
-                                    Mediator mediator = outCtx.getSequence(seq);
-                                    if (mediator != null) {
-                                        mediator.mediate(outCtx);
-                                    } else {
-                                        log.warn("Can't Send the fault Message , Sequence " + seq +
-                                                " Does not Exist");
+                                if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
+                                    // This Means an Error has occurred
+                                    if (!retryForHttpStatusCodes(messageStore, processor, outCtx)) {
+                                        continue;
                                     }
 
-                                }
-
-                                if (maxDeliverAttempts > 0) {
-                                    if(processor.getSendAttemptCount() >= maxDeliverAttempts) {
-                                        deactivate(processor, messageContext, parameters);
+                                    if (maxDeliverAttempts > 0) {
+                                        processor.incrementSendAttemptCount();
                                     }
-                                }
-                                errorStop = true;
-                                continue;
 
-                            } else if (outCtx == null) {
-                                // This Means we have invoked an out only operation
-                                // remove the message and reset the count
-                                messageStore.poll();
-                                processor.resetSentAttemptCount();
+                                    if (parameters != null &&
+                                            parameters.get(
+                                                    ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+
+                                        String seq = (String) parameters.get(
+                                                ForwardingProcessorConstants.FAULT_SEQUENCE);
+                                        Mediator mediator = outCtx.getSequence(seq);
+                                        if (mediator != null) {
+                                            mediator.mediate(outCtx);
+                                        } else {
+                                            log.warn("Can't Send the fault Message , Sequence " + seq +
+                                                             " Does not Exist");
+                                        }
+
+                                    }
+
+                                    if (maxDeliverAttempts > 0) {
+                                        if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+                                            deactivate(processor, messageContext, parameters);
+                                        }
+                                    }
+                                    errorStop = true;
+                                } else {
+                                    // This Means we have invoked an out only operation
+                                    // remove the message and reset the count
+                                    messageStore.poll();
+                                    processor.resetSentAttemptCount();
+                                }
                                 continue;
                             }
 
@@ -255,6 +269,35 @@
         }
     }
 
+    private boolean retryForHttpStatusCodes(MessageStore messageStore, ScheduledMessageForwardingProcessor processor,
+                                            MessageContext outCtx) {
+        // No need to retry for application level failures
+        if (outCtx.getProperty(SynapseConstants.ERROR_MESSAGE) != null) {
+            String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
+            if (errorMsg.matches(".*[3-5]\\d\\d.*")) {
+                if (!isRetryHttpStatusCode(errorMsg)) {
+                    messageStore.poll();
+                    processor.resetSentAttemptCount();
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void handle400and500statusCodes(MessageContext outCtx) {
+        if ((outCtx.getProperty(NhttpConstants.HTTP_SC) != null)) {
+            String httpStatusCode =  outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
+            if (httpStatusCode.equals(MessageProcessorConsents.HTTP_INTERNAL_SERVER_ERROR)) {
+                outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
+                outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConsents.HTTP_INTERNAL_SERVER_ERROR);
+            } else if (httpStatusCode.equals(MessageProcessorConsents.HTTP_BAD_REQUEST_ERROR)) {
+                outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
+                outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConsents.HTTP_BAD_REQUEST_ERROR);
+            }
+        }
+    }
+
     /**
      * Helper method to get a value of a parameters in the AxisConfiguration
      *
@@ -292,4 +335,16 @@
             }
         }
     }
+
+    private boolean isRetryHttpStatusCode(String message) {
+        if (retryHttpStatusCodes == null) {
+            return false;
+        }
+        for (String statsCode : retryHttpStatusCodes) {
+            if (message.contains(statsCode)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
index f706f1f..9378ee7 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
@@ -65,4 +65,8 @@
      */
     public static final String MAX_DELIVER_DROP = "max.deliver.drop";
 
+    /**
+     * Used to determine for which HTTP status codes, message processor should retry
+     */
+    public static final String RETRY_HTTP_STATUS_CODES = "retry.http.status.codes";
 }