Merge pull request #38 from shafreenAnfar/refactor_msmp
Refactor ForwardingJob class and fix few issues
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 38f4010..d487247 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -49,52 +49,21 @@
private static final Log log = LogFactory.getLog(ForwardingJob.class);
- /**
- * Includes HTTP status for which message processor should retry
- */
+ private boolean isMaxDeliverAttemptDropEnabled;
+ private int maxDeliverAttempts;
+ private String deactivateSequence;
+ private String faultSequence;
+ private String replySequence;
private String[] retryHttpStatusCodes;
+ private MessageStore messageStore;
+ private Axis2BlockingClient sender;
+ private ScheduledMessageForwardingProcessor processor;
+ private boolean errorStop = false;
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
-
//Get the Global Objects from DataMap
- MessageStore messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
- Map<String, Object> parameters = (Map<String, Object>) jdm.get(
- MessageProcessorConstants.PARAMETERS);
- Axis2BlockingClient sender = (Axis2BlockingClient) jdm.get(
- ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
- ScheduledMessageForwardingProcessor processor = (ScheduledMessageForwardingProcessor) jdm.get(
- ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
-
- int maxDeliverAttempts = -1;
-
- boolean isMaxDeliverAttemptDropEnabled = false;
-
- String mdaParam = null;
- if (parameters != null) {
- mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
- }
-
- if (mdaParam != null) {
- maxDeliverAttempts = Integer.parseInt(mdaParam);
-
- // Here we look for the edge case
- if(maxDeliverAttempts == 0) {
- processor.deactivate();
- }
- }
- if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
- parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
- .equalsIgnoreCase("true")) {
- //Configuration to continue the message processor even without stopping the message processor
- // after maximum number of delivery
- isMaxDeliverAttemptDropEnabled = true;
- }
-
- if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
- retryHttpStatusCodes = parameters
- .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
- }
+ JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+ configureForwardingJob(jdm);
// WE do not try to process if the processor is inactive or
// there is no message store attached.
@@ -102,202 +71,100 @@
return;
}
- boolean errorStop = false;
+ startProcessingMsgs();
+ }
+
+ private void configureForwardingJob(JobDataMap jdm) {
+ messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
+ sender = (Axis2BlockingClient) jdm.get(
+ ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
+ processor = (ScheduledMessageForwardingProcessor) jdm.get(
+ ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+ Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
+ maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
+ isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
+ retryHttpStatusCodes(parameters);
+ setSequences(parameters);
+ }
+
+ private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
+ ScheduledMessageForwardingProcessor processor) {
+ int maxDeliverAttempts = -1;
+ String mdaParam = null;
+ if (parameters != null) {
+ mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
+ }
+ if (mdaParam != null) {
+ maxDeliverAttempts = Integer.parseInt(mdaParam);
+ // Here we look for the edge case
+ if(maxDeliverAttempts == 0) {
+ processor.deactivate();
+ }
+ }
+ return maxDeliverAttempts;
+ }
+
+ private boolean isMaxDeliverAttemptDropEnabled(Map<String, Object> parameters) {
+ boolean isMaxDeliverAttemptDropEnabled = false;
+ if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
+ parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
+ .equalsIgnoreCase("true")) {
+ //Configuration to continue the message processor even without stopping the message processor
+ // after maximum number of delivery
+ isMaxDeliverAttemptDropEnabled = true;
+ }
+ return isMaxDeliverAttemptDropEnabled;
+ }
+
+ private void retryHttpStatusCodes(Map<String, Object> parameters) {
+ if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+ retryHttpStatusCodes = parameters
+ .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+ }
+ }
+
+ private void setSequences(Map<String, Object> parameters) {
+ if (parameters != null) {
+ if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+ faultSequence = (String) parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE);
+ }
+ if (parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
+ deactivateSequence = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
+ }
+ if (parameters.get(ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
+ replySequence = (String) parameters.get(
+ ForwardingProcessorConstants.REPLY_SEQUENCE);
+ }
+ }
+ }
+
+ private void startProcessingMsgs() {
+ errorStop = false;
while (!errorStop) {
-
- MessageContext messageContext = messageStore.peek();
- if (messageContext != null) {
-
-
- //If The Message not belongs to this server we ignore it.
- String serverName = (String)
- messageContext.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
-
- if(serverName != null && messageContext instanceof Axis2MessageContext) {
-
- AxisConfiguration configuration = ((Axis2MessageContext)messageContext).
- getAxis2MessageContext().
- getConfigurationContext().getAxisConfiguration();
-
- String myServerName = getAxis2ParameterValue(configuration,
- SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
-
- if(!serverName.equals(myServerName)) {
- return;
- }
-
+ MessageContext inMsgCtx = messageStore.peek();
+ if (inMsgCtx != null) {
+ if (isMsgRelatedToThisServer(inMsgCtx)) {
+ handleNewMessage(inMsgCtx);
}
-
- Set proSet = messageContext.getPropertyKeySet();
-
- if (proSet != null) {
- if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
- proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
- }
- }
-
- String targetEp =
- (String) messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
-
- if (targetEp != null) {
- Endpoint ep = messageContext.getEndpoint(targetEp);
-
- // stop processing if endpoint is not ready to send
- if(!ep.getContext().readyToSend()) {
- return;
- }
-
- if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
-
- try {
- MessageContext outCtx = sender.send(ep, messageContext);
-
- if (outCtx != null) {
- handle400and500statusCodes(outCtx);
-
- if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
- // This Means an Error has occurred
- if (!retryForHttpStatusCodes(messageStore, processor, outCtx)) {
- continue;
- }
-
- if (maxDeliverAttempts > 0) {
- processor.incrementSendAttemptCount();
- }
-
- if (parameters != null &&
- parameters.get(
- ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
-
- String seq = (String) parameters.get(
- ForwardingProcessorConstants.FAULT_SEQUENCE);
- Mediator mediator = outCtx.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(outCtx);
- } else {
- log.warn("Can't Send the fault Message , Sequence " + seq +
- " Does not Exist");
- }
-
- }
-
- if (maxDeliverAttempts > 0) {
- if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
- deactivate(processor, messageContext, parameters);
- }
- }
- errorStop = true;
- } else {
- // This Means we have invoked an out only operation
- // remove the message and reset the count
- messageStore.poll();
- processor.resetSentAttemptCount();
- }
- continue;
- }
-
- // If there is a sequence defined to send success replies,
- // we must send the message to it
- if (parameters != null &&
- parameters.get(
- ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
- if (outCtx != null) {
- String seq = (String) parameters.get(
- ForwardingProcessorConstants.REPLY_SEQUENCE);
- Mediator mediator = outCtx.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(outCtx);
- } else {
- log.warn("Can't Send the Out Message , Sequence " + seq +
- " Does not Exist");
- }
- }
- }
-
- // If no Exception Occurred We remove the Message
- // and reset the delivery attempt count
- processor.resetSentAttemptCount();
- messageStore.poll();
- } catch (Exception e) {
-
- if (maxDeliverAttempts > 0) {
- processor.incrementSendAttemptCount();
- if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
- if (isMaxDeliverAttemptDropEnabled) {
- //Since explicitly enabled the message drop after max delivery attempt
- // message has been removed and reset the delivery attempt count of the processor
- processor.resetSentAttemptCount();
- messageStore.poll();
- } else {
- deactivate(processor, messageContext, parameters);
- }
- }
- }
- errorStop = true;
- log.error("Error Forwarding Message ", e);
- continue;
- }
- } else {
- String logMsg;
- if (ep == null) {
- logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
- "the message form store";
- } else {
- logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
- "endpoint types supported";
- }
- log.warn(logMsg);
- messageStore.poll();
- }
-
-
- } else {
- //No Target Endpoint defined for the Message
- //So we do not have a place to deliver.
- //Here we log a warning and remove the message
- //todo: we can improve this by implementing a target inferring mechanism
-
- log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
- " not found in the message context , Hence removing the message ");
- messageStore.poll();
-
- }
-
} else {
errorStop = true;
}
}
}
- private boolean retryForHttpStatusCodes(MessageStore messageStore, ScheduledMessageForwardingProcessor processor,
- MessageContext outCtx) {
- // No need to retry for application level failures
- if (outCtx.getProperty(SynapseConstants.ERROR_MESSAGE) != null) {
- String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
- if (errorMsg.matches(".*[3-5]\\d\\d.*")) {
- if (!isRetryHttpStatusCode(errorMsg)) {
- messageStore.poll();
- processor.resetSentAttemptCount();
- return false;
- }
- }
+ private boolean isMsgRelatedToThisServer(MessageContext inMsgCtx) {
+ String serverName = (String) inMsgCtx.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+ if(serverName != null && inMsgCtx instanceof Axis2MessageContext) {
+ AxisConfiguration configuration = ((Axis2MessageContext)inMsgCtx).
+ getAxis2MessageContext().getConfigurationContext().getAxisConfiguration();
+ String myServerName = getAxis2ParameterValue(configuration,
+ SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+ return serverName.equals(myServerName);
}
return true;
}
- private void handle400and500statusCodes(MessageContext outCtx) {
- if ((outCtx.getProperty(NhttpConstants.HTTP_SC) != null)) {
- String httpStatusCode = outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
- if (httpStatusCode.equals(MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR)) {
- outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
- outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
- } else if (httpStatusCode.equals(MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR)) {
- outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
- outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR);
- }
- }
- }
-
/**
* Helper method to get a value of a parameters in the AxisConfiguration
*
@@ -305,9 +172,7 @@
* @param paramKey The name / key of the parameter
* @return The value of the parameter
*/
- private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration,
- String paramKey) {
-
+ private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration, String paramKey) {
Parameter parameter = axisConfiguration.getParameter(paramKey);
if (parameter == null) {
return null;
@@ -320,31 +185,206 @@
}
}
- private void deactivate(ScheduledMessageForwardingProcessor processor,
- MessageContext msgContext, Map<String, Object> parameters) {
- processor.deactivate();
- if (parameters != null && parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
- if (msgContext != null) {
- String seq = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
- Mediator mediator = msgContext.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(msgContext);
+ private void handleNewMessage(MessageContext inMsgCtx) {
+ sanitizeMsgContext(inMsgCtx);
+ String targetEp = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+ if (targetEp != null) {
+ Endpoint ep = inMsgCtx.getEndpoint(targetEp);
+ // stop processing if endpoint is not ready to send
+ if(ep.getContext().readyToSend()) {
+ if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
+ sendMsgToEndpoint(inMsgCtx, ep);
} else {
- log.warn("Deactivate sequence: " + seq + " does not exist");
+ logMsg(targetEp, ep);
+ messageStore.poll();
}
}
+ } else {
+ //No Target Endpoint defined for the Message
+ //So we do not have a place to deliver.
+ //Here we log a warning and remove the message
+ //todo: we can improve this by implementing a target inferring mechanism
+ log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
+ " not found in the message context , Hence removing the message ");
+ messageStore.poll();
+ }
+ }
+
+ private void sanitizeMsgContext(MessageContext messageContext) {
+ Set proSet = messageContext.getPropertyKeySet();
+ if (proSet != null) {
+ if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
+ proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
+ }
}
}
- private boolean isRetryHttpStatusCode(String message) {
+ private void logMsg(String targetEp, Endpoint ep) {
+ String logMsg;
+ if (ep == null) {
+ logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
+ "the message form store";
+ } else {
+ logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
+ "endpoint types supported";
+ }
+ log.warn(logMsg);
+ }
+
+ private void sendMsgToEndpoint(MessageContext inMsgCtx, Endpoint ep) {
+ try {
+ MessageContext outCtx = sender.send(ep, inMsgCtx);
+ if (outCtx != null) {
+ handleResponse(inMsgCtx, outCtx);
+ } else {
+ // If no Exception Occurred We remove the Message
+ // and reset the delivery attempt count
+ messageStore.poll();
+ processor.resetSentAttemptCount();
+ }
+ } catch (Exception e) {
+ errorStop = handleOutOnlyError(inMsgCtx);
+ log.error("Error Forwarding Message ", e);
+ }
+ }
+
+ private void handleResponse(MessageContext inMsgCtx, MessageContext outCtx) {
+ handle400and500statusCodes(outCtx);
+ if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
+ handleError(inMsgCtx, outCtx);
+ } else {
+ // This Means we have invoked an out only operation
+ // remove the message and reset the count
+ doPostSuccessTasks(outCtx);
+ }
+ }
+
+ private void handle400and500statusCodes(MessageContext outCtx) {
+ if ((outCtx.getProperty(NhttpConstants.HTTP_SC) != null)) {
+ String httpStatusCode = outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
+ if (httpStatusCode.equals(MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR)) {
+ outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
+ outCtx.setProperty(SynapseConstants.ERROR_MESSAGE,
+ MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
+ } else if (httpStatusCode.equals(MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR)) {
+ outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
+ outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR);
+ }
+ }
+ }
+
+ private void handleError(MessageContext inMsgCtx, MessageContext outCtx) {
+ if (isHttpStatusCodeError(outCtx)) {
+ if (isRetryHttpStatusCode(outCtx)) {
+ doPostErrorTasks(inMsgCtx, outCtx);
+ } else {
+ doPostSuccessTasks(outCtx);
+ }
+ } else {
+ doPostErrorTasks(inMsgCtx, outCtx);
+ }
+ }
+
+ private void doPostSuccessTasks(MessageContext outCtx) {
+ messageStore.poll();
+ processor.resetSentAttemptCount();
+ sendResponseToReplySeq(outCtx);
+ }
+
+ private void doPostErrorTasks(MessageContext inMsgCtx, MessageContext outCtx) {
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
+ }
+ sendItToFaultSequence(outCtx);
+ if (maxDeliverAttempts > 0) {
+ handleMaxDeliveryAttempts(inMsgCtx);
+ }
+ errorStop = true;
+ }
+
+ private void handleMaxDeliveryAttempts(MessageContext inMsgCtx) {
+ if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+ if (isMaxDeliverAttemptDropEnabled) {
+ //Since explicitly enabled the message drop after max delivery attempt
+ // message has been removed and reset the delivery attempt count of the processor
+ processor.resetSentAttemptCount();
+ messageStore.poll();
+ } else {
+ deactivate(processor, inMsgCtx);
+ }
+ }
+ }
+
+ private boolean handleOutOnlyError(MessageContext inMsgCtx) {
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
+ handleMaxDeliveryAttempts(inMsgCtx);
+ }
+ return true;
+ }
+
+ private void sendResponseToReplySeq(MessageContext outCtx) {
+ // If there is a sequence defined to send success replies,
+ // we must send the message to it
+ if (replySequence != null) {
+ Mediator mediator = outCtx.getSequence(replySequence);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the Out Message , Sequence " + replySequence + " Does not Exist");
+ }
+ }
+ }
+
+ private void sendItToFaultSequence(MessageContext outCtx) {
+ if (faultSequence != null) {
+ Mediator mediator = outCtx.getSequence(faultSequence);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the fault Message , Sequence " + faultSequence +
+ " Does not Exist");
+ }
+ }
+ }
+
+ private boolean isHttpStatusCodeError(MessageContext outCtx) {
+ // No need to retry for application level failures
+ if (outCtx.getProperty(SynapseConstants.ERROR_MESSAGE) != null) {
+ String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
+ return errorMsg.matches(".*[3-5]\\d\\d.*");
+ }
+ return false;
+ }
+
+ private boolean isRetryHttpStatusCode(MessageContext outCtx) {
+ String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
if (retryHttpStatusCodes == null) {
return false;
}
for (String statsCode : retryHttpStatusCodes) {
- if (message.contains(statsCode)) {
+ if (errorMsg.contains(statsCode)) {
return true;
}
}
return false;
}
+
+ private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
+ processor.deactivate();
+ if (deactivateSequence != null) {
+ if (inMsgCtx != null) {
+ sendMsgToDeactivateSeq(inMsgCtx);
+ }
+ }
+ }
+
+ private void sendMsgToDeactivateSeq(MessageContext inMsgCtx) {
+ Mediator mediator = inMsgCtx.getSequence(deactivateSequence);
+ if (mediator != null) {
+ mediator.mediate(inMsgCtx);
+ } else {
+ log.warn("Deactivate sequence: " + deactivateSequence + " does not exist");
+ }
+ }
}