Refactor ForwardingJob code
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 38f4010..3e5b426 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -49,52 +49,21 @@
private static final Log log = LogFactory.getLog(ForwardingJob.class);
- /**
- * Includes HTTP status for which message processor should retry
- */
+ private boolean isMaxDeliverAttemptDropEnabled;
+ private int maxDeliverAttempts;
+ private String deactivateSequence;
+ private String faultSequence;
+ private String replySequence;
private String[] retryHttpStatusCodes;
+ private MessageStore messageStore;
+ private Axis2BlockingClient sender;
+ private ScheduledMessageForwardingProcessor processor;
+ private boolean errorStop = false;
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
- JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
-
//Get the Global Objects from DataMap
- MessageStore messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
- Map<String, Object> parameters = (Map<String, Object>) jdm.get(
- MessageProcessorConstants.PARAMETERS);
- Axis2BlockingClient sender = (Axis2BlockingClient) jdm.get(
- ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
- ScheduledMessageForwardingProcessor processor = (ScheduledMessageForwardingProcessor) jdm.get(
- ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
-
- int maxDeliverAttempts = -1;
-
- boolean isMaxDeliverAttemptDropEnabled = false;
-
- String mdaParam = null;
- if (parameters != null) {
- mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
- }
-
- if (mdaParam != null) {
- maxDeliverAttempts = Integer.parseInt(mdaParam);
-
- // Here we look for the edge case
- if(maxDeliverAttempts == 0) {
- processor.deactivate();
- }
- }
- if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
- parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
- .equalsIgnoreCase("true")) {
- //Configuration to continue the message processor even without stopping the message processor
- // after maximum number of delivery
- isMaxDeliverAttemptDropEnabled = true;
- }
-
- if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
- retryHttpStatusCodes = parameters
- .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
- }
+ JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+ configureForwardingJob(jdm);
// WE do not try to process if the processor is inactive or
// there is no message store attached.
@@ -102,173 +71,237 @@
return;
}
- boolean errorStop = false;
+ startProcessingMsgs();
+ }
+
+ private void startProcessingMsgs() {
+ errorStop = false;
while (!errorStop) {
-
- MessageContext messageContext = messageStore.peek();
- if (messageContext != null) {
-
-
- //If The Message not belongs to this server we ignore it.
- String serverName = (String)
- messageContext.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
-
- if(serverName != null && messageContext instanceof Axis2MessageContext) {
-
- AxisConfiguration configuration = ((Axis2MessageContext)messageContext).
- getAxis2MessageContext().
- getConfigurationContext().getAxisConfiguration();
-
- String myServerName = getAxis2ParameterValue(configuration,
- SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
-
- if(!serverName.equals(myServerName)) {
- return;
- }
-
+ MessageContext inMsgCtx = messageStore.peek();
+ if (inMsgCtx != null) {
+ if (isMsgRelatedToThisServer(inMsgCtx)) {
+ handleNewMessage(inMsgCtx);
}
-
- Set proSet = messageContext.getPropertyKeySet();
-
- if (proSet != null) {
- if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
- proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
- }
- }
-
- String targetEp =
- (String) messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
-
- if (targetEp != null) {
- Endpoint ep = messageContext.getEndpoint(targetEp);
-
- // stop processing if endpoint is not ready to send
- if(!ep.getContext().readyToSend()) {
- return;
- }
-
- if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
-
- try {
- MessageContext outCtx = sender.send(ep, messageContext);
-
- if (outCtx != null) {
- handle400and500statusCodes(outCtx);
-
- if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
- // This Means an Error has occurred
- if (!retryForHttpStatusCodes(messageStore, processor, outCtx)) {
- continue;
- }
-
- if (maxDeliverAttempts > 0) {
- processor.incrementSendAttemptCount();
- }
-
- if (parameters != null &&
- parameters.get(
- ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
-
- String seq = (String) parameters.get(
- ForwardingProcessorConstants.FAULT_SEQUENCE);
- Mediator mediator = outCtx.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(outCtx);
- } else {
- log.warn("Can't Send the fault Message , Sequence " + seq +
- " Does not Exist");
- }
-
- }
-
- if (maxDeliverAttempts > 0) {
- if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
- deactivate(processor, messageContext, parameters);
- }
- }
- errorStop = true;
- } else {
- // This Means we have invoked an out only operation
- // remove the message and reset the count
- messageStore.poll();
- processor.resetSentAttemptCount();
- }
- continue;
- }
-
- // If there is a sequence defined to send success replies,
- // we must send the message to it
- if (parameters != null &&
- parameters.get(
- ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
- if (outCtx != null) {
- String seq = (String) parameters.get(
- ForwardingProcessorConstants.REPLY_SEQUENCE);
- Mediator mediator = outCtx.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(outCtx);
- } else {
- log.warn("Can't Send the Out Message , Sequence " + seq +
- " Does not Exist");
- }
- }
- }
-
- // If no Exception Occurred We remove the Message
- // and reset the delivery attempt count
- processor.resetSentAttemptCount();
- messageStore.poll();
- } catch (Exception e) {
-
- if (maxDeliverAttempts > 0) {
- processor.incrementSendAttemptCount();
- if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
- if (isMaxDeliverAttemptDropEnabled) {
- //Since explicitly enabled the message drop after max delivery attempt
- // message has been removed and reset the delivery attempt count of the processor
- processor.resetSentAttemptCount();
- messageStore.poll();
- } else {
- deactivate(processor, messageContext, parameters);
- }
- }
- }
- errorStop = true;
- log.error("Error Forwarding Message ", e);
- continue;
- }
- } else {
- String logMsg;
- if (ep == null) {
- logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
- "the message form store";
- } else {
- logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
- "endpoint types supported";
- }
- log.warn(logMsg);
- messageStore.poll();
- }
-
-
- } else {
- //No Target Endpoint defined for the Message
- //So we do not have a place to deliver.
- //Here we log a warning and remove the message
- //todo: we can improve this by implementing a target inferring mechanism
-
- log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
- " not found in the message context , Hence removing the message ");
- messageStore.poll();
-
- }
-
} else {
errorStop = true;
}
}
}
+ private void configureForwardingJob(JobDataMap jdm) {
+ messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
+ sender = (Axis2BlockingClient) jdm.get(
+ ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
+ processor = (ScheduledMessageForwardingProcessor) jdm.get(
+ ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+ Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
+ maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
+ isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters, maxDeliverAttempts);
+ retryHttpStatusCodes(parameters);
+ setSequences(parameters);
+ }
+
+ private boolean isMsgRelatedToThisServer(MessageContext inMsgCtx) {
+ String serverName = (String)
+ inMsgCtx.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+ if(serverName != null && inMsgCtx instanceof Axis2MessageContext) {
+ AxisConfiguration configuration = ((Axis2MessageContext)inMsgCtx).
+ getAxis2MessageContext().
+ getConfigurationContext().getAxisConfiguration();
+ String myServerName = getAxis2ParameterValue(configuration,
+ SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+ return serverName.equals(myServerName);
+ }
+ return false;
+ }
+
+ private void handleNewMessage(MessageContext inMsgCtx) {
+ sanitizeMsgContext(inMsgCtx);
+ String targetEp =
+ (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+ if (targetEp != null) {
+ Endpoint ep = inMsgCtx.getEndpoint(targetEp);
+ // stop processing if endpoint is not ready to send
+ if(ep.getContext().readyToSend()) {
+ if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
+ sendMsgToEndpoint(inMsgCtx, ep);
+ } else {
+ logMsg(targetEp, ep);
+ messageStore.poll();
+ }
+ }
+ } else {
+ //No Target Endpoint defined for the Message
+ //So we do not have a place to deliver.
+ //Here we log a warning and remove the message
+ //todo: we can improve this by implementing a target inferring mechanism
+ log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
+ " not found in the message context , Hence removing the message ");
+ messageStore.poll();
+ }
+ }
+
+ private void sanitizeMsgContext(MessageContext messageContext) {
+ Set proSet = messageContext.getPropertyKeySet();
+ if (proSet != null) {
+ if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
+ proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
+ }
+ }
+ }
+
+ private void sendMsgToEndpoint(MessageContext inMsgCtx, Endpoint ep) {
+ try {
+ MessageContext outCtx = sender.send(ep, inMsgCtx);
+ if (outCtx != null) {
+ handleResponse(inMsgCtx, outCtx);
+ } else {
+ // If no Exception Occurred We remove the Message
+ // and reset the delivery attempt count
+ messageStore.poll();
+ processor.resetSentAttemptCount();
+ }
+ } catch (Exception e) {
+ errorStop = handleOutOnlyError(maxDeliverAttempts, isMaxDeliverAttemptDropEnabled, inMsgCtx);
+ log.error("Error Forwarding Message ", e);
+ }
+ }
+
+ private void handleResponse(MessageContext inMsgCtx, MessageContext outCtx) {
+ handle400and500statusCodes(outCtx);
+ if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
+ handleError(maxDeliverAttempts, inMsgCtx, outCtx);
+ } else {
+ // This Means we have invoked an out only operation
+ // remove the message and reset the count
+ messageStore.poll();
+ processor.resetSentAttemptCount();
+ sendResponseToReplySeq(outCtx);
+ }
+ }
+
+ private void handleError(int maxDeliverAttempts, MessageContext inMsgCtx, MessageContext outCtx) {
+ if (retryForHttpStatusCodes(messageStore, processor, outCtx)) {
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
+ }
+ sendItToFaultSequence(outCtx);
+ if (maxDeliverAttempts > 0) {
+ if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+ deactivate(processor, inMsgCtx);
+ }
+ }
+ errorStop = true;
+ }
+ }
+
+ private void logMsg(String targetEp, Endpoint ep) {
+ String logMsg;
+ if (ep == null) {
+ logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
+ "the message form store";
+ } else {
+ logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
+ "endpoint types supported";
+ }
+ log.warn(logMsg);
+ }
+
+ private boolean handleOutOnlyError(int maxDeliverAttempts, boolean isMaxDeliverAttemptDropEnabled,
+ MessageContext messageContext) {
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
+ if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
+ if (isMaxDeliverAttemptDropEnabled) {
+ //Since explicitly enabled the message drop after max delivery attempt
+ // message has been removed and reset the delivery attempt count of the processor
+ processor.resetSentAttemptCount();
+ messageStore.poll();
+ } else {
+ deactivate(processor, messageContext);
+ }
+ }
+ }
+ return true;
+ }
+
+ private void sendResponseToReplySeq(MessageContext outCtx) {
+ // If there is a sequence defined to send success replies,
+ // we must send the message to it
+ if (replySequence != null) {
+ Mediator mediator = outCtx.getSequence(replySequence);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the Out Message , Sequence " + replySequence + " Does not Exist");
+ }
+ }
+ }
+
+ private void sendItToFaultSequence(MessageContext outCtx) {
+ if (faultSequence != null) {
+ Mediator mediator = outCtx.getSequence(faultSequence);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the fault Message , Sequence " + faultSequence +
+ " Does not Exist");
+ }
+ }
+ }
+
+ private void setSequences(Map<String, Object> parameters) {
+ if (parameters != null) {
+ if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+ faultSequence = (String) parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE);
+ }
+ if (parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
+ deactivateSequence = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
+ }
+ if (parameters.get(ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
+ replySequence = (String) parameters.get(
+ ForwardingProcessorConstants.REPLY_SEQUENCE);
+ }
+ }
+ }
+
+ private void retryHttpStatusCodes(Map<String, Object> parameters) {
+ if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+ retryHttpStatusCodes = parameters
+ .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+ }
+ }
+
+ private boolean isMaxDeliverAttemptDropEnabled(Map<String, Object> parameters, int maxDeliverAttempts) {
+ boolean isMaxDeliverAttemptDropEnabled = false;
+ if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
+ parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
+ .equalsIgnoreCase("true")) {
+ //Configuration to continue the message processor even without stopping the message processor
+ // after maximum number of delivery
+ isMaxDeliverAttemptDropEnabled = true;
+ }
+ return isMaxDeliverAttemptDropEnabled;
+ }
+
+ private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
+ ScheduledMessageForwardingProcessor processor) {
+ int maxDeliverAttempts = -1;
+ String mdaParam = null;
+ if (parameters != null) {
+ mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
+ }
+ if (mdaParam != null) {
+ maxDeliverAttempts = Integer.parseInt(mdaParam);
+ // Here we look for the edge case
+ if(maxDeliverAttempts == 0) {
+ processor.deactivate();
+ }
+ }
+ return maxDeliverAttempts;
+ }
+
private boolean retryForHttpStatusCodes(MessageStore messageStore, ScheduledMessageForwardingProcessor processor,
MessageContext outCtx) {
// No need to retry for application level failures
@@ -290,7 +323,8 @@
String httpStatusCode = outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
if (httpStatusCode.equals(MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR)) {
outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
- outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
+ outCtx.setProperty(SynapseConstants.ERROR_MESSAGE,
+ MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
} else if (httpStatusCode.equals(MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR)) {
outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR);
@@ -305,9 +339,7 @@
* @param paramKey The name / key of the parameter
* @return The value of the parameter
*/
- private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration,
- String paramKey) {
-
+ private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration, String paramKey) {
Parameter parameter = axisConfiguration.getParameter(paramKey);
if (parameter == null) {
return null;
@@ -320,22 +352,24 @@
}
}
- private void deactivate(ScheduledMessageForwardingProcessor processor,
- MessageContext msgContext, Map<String, Object> parameters) {
+ private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
processor.deactivate();
- if (parameters != null && parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
- if (msgContext != null) {
- String seq = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
- Mediator mediator = msgContext.getSequence(seq);
- if (mediator != null) {
- mediator.mediate(msgContext);
- } else {
- log.warn("Deactivate sequence: " + seq + " does not exist");
- }
+ if (deactivateSequence != null) {
+ if (inMsgCtx != null) {
+ sendMsgToDeactivateSeq(inMsgCtx);
}
}
}
+ private void sendMsgToDeactivateSeq(MessageContext inMsgCtx) {
+ Mediator mediator = inMsgCtx.getSequence(deactivateSequence);
+ if (mediator != null) {
+ mediator.mediate(inMsgCtx);
+ } else {
+ log.warn("Deactivate sequence: " + deactivateSequence + " does not exist");
+ }
+ }
+
private boolean isRetryHttpStatusCode(String message) {
if (retryHttpStatusCodes == null) {
return false;