Add retry interval feature
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index d487247..9ca4b0d 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -33,6 +33,7 @@
import org.apache.synapse.message.processors.MessageProcessorConstants;
import org.apache.synapse.message.store.MessageStore;
import org.apache.synapse.transport.nhttp.NhttpConstants;
+import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -45,20 +46,24 @@
* Redelivery Job will replay all the Messages in the Message Store when executed
* Excluding ones that are already tried redelivering more than max number of tries
*/
+@DisallowConcurrentExecution
public class ForwardingJob implements StatefulJob {
private static final Log log = LogFactory.getLog(ForwardingJob.class);
+ enum State { CONTINUE_PROCESSING, CONTINUE_RETRYING, STOP_PROCESSING }
+
private boolean isMaxDeliverAttemptDropEnabled;
private int maxDeliverAttempts;
+ private int retryInterval;
private String deactivateSequence;
private String faultSequence;
private String replySequence;
private String[] retryHttpStatusCodes;
+ private State jobState;
private MessageStore messageStore;
private Axis2BlockingClient sender;
private ScheduledMessageForwardingProcessor processor;
- private boolean errorStop = false;
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//Get the Global Objects from DataMap
@@ -80,20 +85,31 @@
ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
processor = (ScheduledMessageForwardingProcessor) jdm.get(
ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+ retryInterval = 1000;
+
+ setParameters(jdm);
+ }
+
+ private void setParameters(JobDataMap jdm) {
Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
- maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
- isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
- retryHttpStatusCodes(parameters);
- setSequences(parameters);
+ if (parameters != null) {
+ maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
+ isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
+ if (parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL) != null) {
+ retryInterval = Integer.parseInt((String) parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL));
+ }
+ if (parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+ retryHttpStatusCodes = parameters
+ .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+ }
+ setSequences(parameters);
+ }
}
private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
ScheduledMessageForwardingProcessor processor) {
int maxDeliverAttempts = -1;
- String mdaParam = null;
- if (parameters != null) {
- mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
- }
+ String mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
if (mdaParam != null) {
maxDeliverAttempts = Integer.parseInt(mdaParam);
// Here we look for the edge case
@@ -116,13 +132,6 @@
return isMaxDeliverAttemptDropEnabled;
}
- private void retryHttpStatusCodes(Map<String, Object> parameters) {
- if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
- retryHttpStatusCodes = parameters
- .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
- }
- }
-
private void setSequences(Map<String, Object> parameters) {
if (parameters != null) {
if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
@@ -139,15 +148,27 @@
}
private void startProcessingMsgs() {
- errorStop = false;
- while (!errorStop) {
+ do {
+ jobState = State.CONTINUE_PROCESSING;
MessageContext inMsgCtx = messageStore.peek();
if (inMsgCtx != null) {
if (isMsgRelatedToThisServer(inMsgCtx)) {
handleNewMessage(inMsgCtx);
}
} else {
- errorStop = true;
+ jobState = State.STOP_PROCESSING;
+ }
+ waitBeforeRetry();
+ } while (jobState == State.CONTINUE_PROCESSING || jobState == State.CONTINUE_RETRYING);
+ }
+
+ private void waitBeforeRetry() {
+ if (jobState == State.CONTINUE_RETRYING) {
+ try {
+ // wait for some time before retrying
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException ignore) {
+ // No harm even it gets interrupted. So nothing to handle.
}
}
}
@@ -243,7 +264,7 @@
processor.resetSentAttemptCount();
}
} catch (Exception e) {
- errorStop = handleOutOnlyError(inMsgCtx);
+ handleOutOnlyError(inMsgCtx);
log.error("Error Forwarding Message ", e);
}
}
@@ -299,7 +320,6 @@
if (maxDeliverAttempts > 0) {
handleMaxDeliveryAttempts(inMsgCtx);
}
- errorStop = true;
}
private void handleMaxDeliveryAttempts(MessageContext inMsgCtx) {
@@ -312,15 +332,16 @@
} else {
deactivate(processor, inMsgCtx);
}
+ } else {
+ jobState = State.CONTINUE_RETRYING;
}
}
- private boolean handleOutOnlyError(MessageContext inMsgCtx) {
+ private void handleOutOnlyError(MessageContext inMsgCtx) {
if (maxDeliverAttempts > 0) {
processor.incrementSendAttemptCount();
handleMaxDeliveryAttempts(inMsgCtx);
}
- return true;
}
private void sendResponseToReplySeq(MessageContext outCtx) {
@@ -371,6 +392,7 @@
}
private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
+ jobState = State.STOP_PROCESSING;
processor.deactivate();
if (deactivateSequence != null) {
if (inMsgCtx != null) {
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
index 9378ee7..027e37b 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
@@ -69,4 +69,9 @@
* Used to determine for which HTTP status codes, message processor should retry
*/
public static final String RETRY_HTTP_STATUS_CODES = "retry.http.status.codes";
+
+ /**
+ * Used to determine the retry interval between retries
+ */
+ public static final String RETRY_INTERVAL = "retry.interval";
}