blob: d48724763d9237df9081131bf3864492358445b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.message.processors.forward;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.core.axis2.Axis2BlockingClient;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.AbstractEndpoint;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.processors.MessageProcessorConstants;
import org.apache.synapse.message.store.MessageStore;
import org.apache.synapse.transport.nhttp.NhttpConstants;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.StatefulJob;
import java.util.Map;
import java.util.Set;
/**
* Redelivery Job will replay all the Messages in the Message Store when executed
* Excluding ones that are already tried redelivering more than max number of tries
*/
public class ForwardingJob implements StatefulJob {
private static final Log log = LogFactory.getLog(ForwardingJob.class);
private boolean isMaxDeliverAttemptDropEnabled;
private int maxDeliverAttempts;
private String deactivateSequence;
private String faultSequence;
private String replySequence;
private String[] retryHttpStatusCodes;
private MessageStore messageStore;
private Axis2BlockingClient sender;
private ScheduledMessageForwardingProcessor processor;
private boolean errorStop = false;
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//Get the Global Objects from DataMap
JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
configureForwardingJob(jdm);
// WE do not try to process if the processor is inactive or
// there is no message store attached.
if(!processor.isActive() || messageStore == null) {
return;
}
startProcessingMsgs();
}
private void configureForwardingJob(JobDataMap jdm) {
messageStore = (MessageStore) jdm.get(MessageProcessorConstants.MESSAGE_STORE);
sender = (Axis2BlockingClient) jdm.get(
ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
processor = (ScheduledMessageForwardingProcessor) jdm.get(
ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
retryHttpStatusCodes(parameters);
setSequences(parameters);
}
private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
ScheduledMessageForwardingProcessor processor) {
int maxDeliverAttempts = -1;
String mdaParam = null;
if (parameters != null) {
mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
}
if (mdaParam != null) {
maxDeliverAttempts = Integer.parseInt(mdaParam);
// Here we look for the edge case
if(maxDeliverAttempts == 0) {
processor.deactivate();
}
}
return maxDeliverAttempts;
}
private boolean isMaxDeliverAttemptDropEnabled(Map<String, Object> parameters) {
boolean isMaxDeliverAttemptDropEnabled = false;
if (maxDeliverAttempts > 0 && parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP) != null &&
parameters.get(ForwardingProcessorConstants.MAX_DELIVER_DROP).toString()
.equalsIgnoreCase("true")) {
//Configuration to continue the message processor even without stopping the message processor
// after maximum number of delivery
isMaxDeliverAttemptDropEnabled = true;
}
return isMaxDeliverAttemptDropEnabled;
}
private void retryHttpStatusCodes(Map<String, Object> parameters) {
if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
retryHttpStatusCodes = parameters
.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
}
}
private void setSequences(Map<String, Object> parameters) {
if (parameters != null) {
if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
faultSequence = (String) parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE);
}
if (parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE) != null) {
deactivateSequence = (String) parameters.get(ForwardingProcessorConstants.DEACTIVATE_SEQUENCE);
}
if (parameters.get(ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
replySequence = (String) parameters.get(
ForwardingProcessorConstants.REPLY_SEQUENCE);
}
}
}
private void startProcessingMsgs() {
errorStop = false;
while (!errorStop) {
MessageContext inMsgCtx = messageStore.peek();
if (inMsgCtx != null) {
if (isMsgRelatedToThisServer(inMsgCtx)) {
handleNewMessage(inMsgCtx);
}
} else {
errorStop = true;
}
}
}
private boolean isMsgRelatedToThisServer(MessageContext inMsgCtx) {
String serverName = (String) inMsgCtx.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
if(serverName != null && inMsgCtx instanceof Axis2MessageContext) {
AxisConfiguration configuration = ((Axis2MessageContext)inMsgCtx).
getAxis2MessageContext().getConfigurationContext().getAxisConfiguration();
String myServerName = getAxis2ParameterValue(configuration,
SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
return serverName.equals(myServerName);
}
return true;
}
/**
* Helper method to get a value of a parameters in the AxisConfiguration
*
* @param axisConfiguration AxisConfiguration instance
* @param paramKey The name / key of the parameter
* @return The value of the parameter
*/
private static String getAxis2ParameterValue(AxisConfiguration axisConfiguration, String paramKey) {
Parameter parameter = axisConfiguration.getParameter(paramKey);
if (parameter == null) {
return null;
}
Object value = parameter.getValue();
if (value != null && value instanceof String) {
return (String) parameter.getValue();
} else {
return null;
}
}
private void handleNewMessage(MessageContext inMsgCtx) {
sanitizeMsgContext(inMsgCtx);
String targetEp = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
if (targetEp != null) {
Endpoint ep = inMsgCtx.getEndpoint(targetEp);
// stop processing if endpoint is not ready to send
if(ep.getContext().readyToSend()) {
if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
sendMsgToEndpoint(inMsgCtx, ep);
} else {
logMsg(targetEp, ep);
messageStore.poll();
}
}
} else {
//No Target Endpoint defined for the Message
//So we do not have a place to deliver.
//Here we log a warning and remove the message
//todo: we can improve this by implementing a target inferring mechanism
log.warn("Property " + ForwardingProcessorConstants.TARGET_ENDPOINT +
" not found in the message context , Hence removing the message ");
messageStore.poll();
}
}
private void sanitizeMsgContext(MessageContext messageContext) {
Set proSet = messageContext.getPropertyKeySet();
if (proSet != null) {
if (proSet.contains(SynapseConstants.BLOCKING_CLIENT_ERROR)) {
proSet.remove(SynapseConstants.BLOCKING_CLIENT_ERROR);
}
}
}
private void logMsg(String targetEp, Endpoint ep) {
String logMsg;
if (ep == null) {
logMsg = "Endpoint named " + targetEp + " not found.Hence removing " +
"the message form store";
} else {
logMsg = "Unsupported endpoint type. Only address/wsdl/default " +
"endpoint types supported";
}
log.warn(logMsg);
}
private void sendMsgToEndpoint(MessageContext inMsgCtx, Endpoint ep) {
try {
MessageContext outCtx = sender.send(ep, inMsgCtx);
if (outCtx != null) {
handleResponse(inMsgCtx, outCtx);
} else {
// If no Exception Occurred We remove the Message
// and reset the delivery attempt count
messageStore.poll();
processor.resetSentAttemptCount();
}
} catch (Exception e) {
errorStop = handleOutOnlyError(inMsgCtx);
log.error("Error Forwarding Message ", e);
}
}
private void handleResponse(MessageContext inMsgCtx, MessageContext outCtx) {
handle400and500statusCodes(outCtx);
if ("true".equals(outCtx.getProperty(SynapseConstants.BLOCKING_CLIENT_ERROR))) {
handleError(inMsgCtx, outCtx);
} else {
// This Means we have invoked an out only operation
// remove the message and reset the count
doPostSuccessTasks(outCtx);
}
}
private void handle400and500statusCodes(MessageContext outCtx) {
if ((outCtx.getProperty(NhttpConstants.HTTP_SC) != null)) {
String httpStatusCode = outCtx.getProperty(NhttpConstants.HTTP_SC).toString();
if (httpStatusCode.equals(MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR)) {
outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
outCtx.setProperty(SynapseConstants.ERROR_MESSAGE,
MessageProcessorConstants.HTTP_INTERNAL_SERVER_ERROR);
} else if (httpStatusCode.equals(MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR)) {
outCtx.setProperty(SynapseConstants.BLOCKING_CLIENT_ERROR, "true");
outCtx.setProperty(SynapseConstants.ERROR_MESSAGE, MessageProcessorConstants.HTTP_BAD_REQUEST_ERROR);
}
}
}
private void handleError(MessageContext inMsgCtx, MessageContext outCtx) {
if (isHttpStatusCodeError(outCtx)) {
if (isRetryHttpStatusCode(outCtx)) {
doPostErrorTasks(inMsgCtx, outCtx);
} else {
doPostSuccessTasks(outCtx);
}
} else {
doPostErrorTasks(inMsgCtx, outCtx);
}
}
private void doPostSuccessTasks(MessageContext outCtx) {
messageStore.poll();
processor.resetSentAttemptCount();
sendResponseToReplySeq(outCtx);
}
private void doPostErrorTasks(MessageContext inMsgCtx, MessageContext outCtx) {
if (maxDeliverAttempts > 0) {
processor.incrementSendAttemptCount();
}
sendItToFaultSequence(outCtx);
if (maxDeliverAttempts > 0) {
handleMaxDeliveryAttempts(inMsgCtx);
}
errorStop = true;
}
private void handleMaxDeliveryAttempts(MessageContext inMsgCtx) {
if (processor.getSendAttemptCount() >= maxDeliverAttempts) {
if (isMaxDeliverAttemptDropEnabled) {
//Since explicitly enabled the message drop after max delivery attempt
// message has been removed and reset the delivery attempt count of the processor
processor.resetSentAttemptCount();
messageStore.poll();
} else {
deactivate(processor, inMsgCtx);
}
}
}
private boolean handleOutOnlyError(MessageContext inMsgCtx) {
if (maxDeliverAttempts > 0) {
processor.incrementSendAttemptCount();
handleMaxDeliveryAttempts(inMsgCtx);
}
return true;
}
private void sendResponseToReplySeq(MessageContext outCtx) {
// If there is a sequence defined to send success replies,
// we must send the message to it
if (replySequence != null) {
Mediator mediator = outCtx.getSequence(replySequence);
if (mediator != null) {
mediator.mediate(outCtx);
} else {
log.warn("Can't Send the Out Message , Sequence " + replySequence + " Does not Exist");
}
}
}
private void sendItToFaultSequence(MessageContext outCtx) {
if (faultSequence != null) {
Mediator mediator = outCtx.getSequence(faultSequence);
if (mediator != null) {
mediator.mediate(outCtx);
} else {
log.warn("Can't Send the fault Message , Sequence " + faultSequence +
" Does not Exist");
}
}
}
private boolean isHttpStatusCodeError(MessageContext outCtx) {
// No need to retry for application level failures
if (outCtx.getProperty(SynapseConstants.ERROR_MESSAGE) != null) {
String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
return errorMsg.matches(".*[3-5]\\d\\d.*");
}
return false;
}
private boolean isRetryHttpStatusCode(MessageContext outCtx) {
String errorMsg = outCtx.getProperty(SynapseConstants.ERROR_MESSAGE).toString();
if (retryHttpStatusCodes == null) {
return false;
}
for (String statsCode : retryHttpStatusCodes) {
if (errorMsg.contains(statsCode)) {
return true;
}
}
return false;
}
private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
processor.deactivate();
if (deactivateSequence != null) {
if (inMsgCtx != null) {
sendMsgToDeactivateSeq(inMsgCtx);
}
}
}
private void sendMsgToDeactivateSeq(MessageContext inMsgCtx) {
Mediator mediator = inMsgCtx.getSequence(deactivateSequence);
if (mediator != null) {
mediator.mediate(inMsgCtx);
} else {
log.warn("Deactivate sequence: " + deactivateSequence + " does not exist");
}
}
}