Merge pull request #32 from shafreenAnfar/newMSMP
Improve Message Forwarding Processor (MFP) to support HTTP status codes
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
index e7c98aa..92f83ef 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
@@ -35,4 +35,10 @@
*/
public static final String MAX_DELIVER_ATTEMPTS = "max.deliver.attempts";
+ /**
+ * HTTP status codes which are used for message processor retry implementation
+ */
+ public static final String HTTP_INTERNAL_SERVER_ERROR = "500";
+ public static final String HTTP_BAD_REQUEST_ERROR = "400";
+
}
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 8f8a566..40ef410 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -32,6 +32,7 @@
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.processors.MessageProcessorConsents;
import org.apache.synapse.message.store.MessageStore;
+import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -48,6 +49,10 @@
private static final Log log = LogFactory.getLog(ForwardingJob.class);
+ /**
+ * Includes HTTP status for which message processor should retry
+ */
+ private String[] retryHttpStatusCodes;
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
@@ -86,6 +91,11 @@
isMaxDeliverAttemptDropEnabled = true;
}
+ if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+ retryHttpStatusCodes = parameters
+ .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+ }
+
// WE do not try to process if the processor is inactive or
// there is no message store attached.
if(!processor.isActive() || messageStore == null) {
@@ -142,43 +152,47 @@
try {
MessageContext outCtx = sender.send(ep, messageContext);
- if (outCtx != null && "true".equals(outCtx.
- getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
- // This Means an Error has occurred
+ if (outCtx != null) {
+ handle400and500statusCodes(outCtx);
- if (maxDeliverAttempts > 0) {
- processor.incrementSendAttemptCount();
- }
-
- if (parameters != null &&
- parameters.get(
- ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
-
- String seq = (String) parameters.get(
- ForwardingProcessorConstants.FAULT_SEQUENCE);
- Mediator mediator = outCtx.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(outCtx);
- } else {
- log.warn("Can't Send the fault Message , Sequence " + seq +
- " Does not Exist");
+ if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
+ // This Means an Error has occurred
+ if (!retryForHttpStatusCodes(messageStore, processor, outCtx)) {
+ continue;
}
- }
-
- if (maxDeliverAttempts > 0) {
- if(processor.getSendAttemptCount() >= maxDeliverAttempts) {
- deactivate(processor, messageContext, parameters);
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
}
- }
- errorStop = true;
- continue;
- } else if (outCtx == null) {
- // This Means we have invoked an out only operation
- // remove the message and reset the count
- messageStore.poll();
- processor.resetSentAttemptCount();
+ if (parameters != null &&
+ parameters.get(
+ ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+
+ String seq = (String) parameters.get(
+ ForwardingProcessorConstants.FAULT_SEQUENCE);
+ Mediator mediator = outCtx.getSequence(seq);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the fault Message , Sequence " + seq +
+ " Does not Exist");
+ }
+
+ }
+
+ if (maxDeliverAttempts > 0) {
+ if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+ deactivate(processor, messageContext, parameters);
+ }
+ }
+ errorStop = true;
+ } else {
+ // This Means we have invoked an out only operation
+ // remove the message and reset the count
+ messageStore.poll();
+ processor.resetSentAttemptCount();
+ }
continue;
}
@@ -255,6 +269,35 @@
}
}
+ private boolean retryForHttpStatusCodes(MessageStore messageStore, ScheduledMessageForwardingProcessor processor,
+ MessageContext outCtx) {
+ // No need to retry for application level failures
+ if (outCtx.getProperty(SynapseConstants.ERROR_MESSAGE) != null) {
+ String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
+ if (errorMsg.matches(".*[3-5]\\d\\d.*")) {
+ if (!isRetryHttpStatusCode(errorMsg)) {
+ messageStore.poll();
+ processor.resetSentAttemptCount();
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private void handle400and500statusCodes(MessageContext outCtx) {
+ if ((outCtx.getProperty(NhttpConstants.HTTP_SC) != null)) {
+ String httpStatusCode = outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
+ if (httpStatusCode.equals(MessageProcessorConsents.HTTP_INTERNAL_SERVER_ERROR)) {
+ outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
+ outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConsents.HTTP_INTERNAL_SERVER_ERROR);
+ } else if (httpStatusCode.equals(MessageProcessorConsents.HTTP_BAD_REQUEST_ERROR)) {
+ outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
+ outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConsents.HTTP_BAD_REQUEST_ERROR);
+ }
+ }
+ }
+
/**
* Helper method to get a value of a parameters in the AxisConfiguration
*
@@ -292,4 +335,16 @@
}
}
}
+
+ private boolean isRetryHttpStatusCode(String message) {
+ if (retryHttpStatusCodes == null) {
+ return false;
+ }
+ for (String statsCode : retryHttpStatusCodes) {
+ if (message.contains(statsCode)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
index f706f1f..9378ee7 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
@@ -65,4 +65,8 @@
*/
public static final String MAX_DELIVER_DROP = "max.deliver.drop";
+ /**
+ * Used to determine for which HTTP status codes, message processor should retry
+ */
+ public static final String RETRY_HTTP_STATUS_CODES = "retry.http.status.codes";
}