Add retry interval feature
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index d487247..9ca4b0d 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -33,6 +33,7 @@
 import org.apache.synapse.message.processors.MessageProcessorConstants;
 import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.transport.nhttp.NhttpConstants;
+import org.quartz.DisallowConcurrentExecution;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -45,20 +46,24 @@
  * Redelivery Job will replay all the Messages in the Message Store when executed
  * Excluding ones that are already tried redelivering more than max number of tries
  */
+@DisallowConcurrentExecution
 public class ForwardingJob implements StatefulJob {
 
     private static final Log log = LogFactory.getLog(ForwardingJob.class);
 
+    enum State { CONTINUE_PROCESSING, CONTINUE_RETRYING, STOP_PROCESSING }
+
     private boolean isMaxDeliverAttemptDropEnabled;
     private int maxDeliverAttempts;
+    private int retryInterval;
     private String deactivateSequence;
     private String faultSequence;
     private String replySequence;
     private String[] retryHttpStatusCodes;
+    private State jobState;
     private MessageStore messageStore;
     private Axis2BlockingClient sender;
     private ScheduledMessageForwardingProcessor processor;
-    private boolean errorStop = false;
 
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
         //Get the Global Objects from DataMap
@@ -80,20 +85,31 @@
                 ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
         processor = (ScheduledMessageForwardingProcessor) jdm.get(
                 ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+        retryInterval = 1000;
+
+        setParameters(jdm);
+    }
+
+    private void setParameters(JobDataMap jdm) {
         Map<String, Object> parameters = (Map<String, Object>) jdm.get(MessageProcessorConstants.PARAMETERS);
-        maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
-        isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
-        retryHttpStatusCodes(parameters);
-        setSequences(parameters);
+        if (parameters != null) {
+            maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
+            isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
+            if (parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL) != null) {
+                retryInterval = Integer.parseInt((String) parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL));
+            }
+            if (parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
+                retryHttpStatusCodes = parameters
+                        .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
+            }
+            setSequences(parameters);
+        }
     }
 
     private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
                                            ScheduledMessageForwardingProcessor processor) {
         int maxDeliverAttempts = -1;
-        String mdaParam = null;
-        if (parameters != null) {
-            mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
-        }
+        String mdaParam = (String) parameters.get(MessageProcessorConstants.MAX_DELIVER_ATTEMPTS);
         if (mdaParam != null) {
             maxDeliverAttempts = Integer.parseInt(mdaParam);
             // Here we look for the edge case
@@ -116,13 +132,6 @@
         return isMaxDeliverAttemptDropEnabled;
     }
 
-    private void retryHttpStatusCodes(Map<String, Object> parameters) {
-        if (parameters != null && parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
-            retryHttpStatusCodes = parameters
-                    .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
-        }
-    }
-
     private void setSequences(Map<String, Object> parameters) {
         if (parameters != null) {
             if (parameters.get(ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
@@ -139,15 +148,27 @@
     }
 
     private void startProcessingMsgs() {
-        errorStop = false;
-        while (!errorStop) {
+        do {
+            jobState = State.CONTINUE_PROCESSING;
             MessageContext inMsgCtx = messageStore.peek();
             if (inMsgCtx != null) {
                 if (isMsgRelatedToThisServer(inMsgCtx)) {
                     handleNewMessage(inMsgCtx);
                 }
             } else {
-                errorStop = true;
+                jobState = State.STOP_PROCESSING;
+            }
+            waitBeforeRetry();
+        } while (jobState == State.CONTINUE_PROCESSING || jobState == State.CONTINUE_RETRYING);
+    }
+
+    private void waitBeforeRetry() {
+        if (jobState == State.CONTINUE_RETRYING) {
+            try {
+                // wait for some time before retrying
+                Thread.sleep(retryInterval);
+            } catch (InterruptedException ignore) {
+                // No harm even it gets interrupted. So nothing to handle.
             }
         }
     }
@@ -243,7 +264,7 @@
                 processor.resetSentAttemptCount();
             }
         } catch (Exception e) {
-            errorStop = handleOutOnlyError(inMsgCtx);
+            handleOutOnlyError(inMsgCtx);
             log.error("Error Forwarding Message ", e);
         }
     }
@@ -299,7 +320,6 @@
         if (maxDeliverAttempts > 0) {
             handleMaxDeliveryAttempts(inMsgCtx);
         }
-        errorStop = true;
     }
 
     private void handleMaxDeliveryAttempts(MessageContext inMsgCtx) {
@@ -312,15 +332,16 @@
             } else {
                 deactivate(processor, inMsgCtx);
             }
+        } else {
+            jobState = State.CONTINUE_RETRYING;
         }
     }
 
-    private boolean handleOutOnlyError(MessageContext inMsgCtx) {
+    private void handleOutOnlyError(MessageContext inMsgCtx) {
         if (maxDeliverAttempts > 0) {
             processor.incrementSendAttemptCount();
             handleMaxDeliveryAttempts(inMsgCtx);
         }
-        return true;
     }
 
     private void sendResponseToReplySeq(MessageContext outCtx) {
@@ -371,6 +392,7 @@
     }
 
     private void deactivate(ScheduledMessageForwardingProcessor processor, MessageContext inMsgCtx) {
+        jobState = State.STOP_PROCESSING;
         processor.deactivate();
         if (deactivateSequence != null) {
             if (inMsgCtx != null) {
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
index 9378ee7..027e37b 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
@@ -69,4 +69,9 @@
      * Used to determine for which HTTP status codes, message processor should retry
      */
     public static final String RETRY_HTTP_STATUS_CODES = "retry.http.status.codes";
+
+    /**
+     * Used to determine the retry interval between retries
+     */
+    public static final String RETRY_INTERVAL = "retry.interval";
 }