Merge pull request #46 from shafreenAnfar/consumer-all

Add consume one message at a time feature
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index ce54cbe..94ddf4f 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -54,6 +54,7 @@
     enum State { CONTINUE_PROCESSING, CONTINUE_RETRYING, STOP_PROCESSING }
 
     private boolean isMaxDeliverAttemptDropEnabled;
+    private boolean consumeAllEnabled;
     private int maxDeliverAttempts;
     private int retryInterval;
     private String deactivateSequence;
@@ -95,15 +96,8 @@
         if (parameters != null) {
             maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
             isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
-            if (parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL) != null) {
-                try {
-                    retryInterval = Integer.parseInt(
-                            (String) parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL));
-                } catch (NumberFormatException nfe) {
-                    parameters.remove(ForwardingProcessorConstants.RETRY_INTERVAL);
-                    log.error("Invalid value for retry.interval switching back to default value", nfe);
-                }
-            }
+            consumeAllEnabled = isConsumeAllEnabled(parameters);
+            setRetryInterval(parameters);
             if (parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
                 retryHttpStatusCodes = parameters
                         .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
@@ -112,6 +106,28 @@
         }
     }
 
+    private boolean isConsumeAllEnabled(Map<String, Object> parameters) {
+        boolean isConsumeAllEnabled = true;
+        if (parameters.get(ForwardingProcessorConstants.CONSUME_ALL) != null &&
+                parameters.get(ForwardingProcessorConstants.CONSUME_ALL).toString()
+                        .equalsIgnoreCase("false")) {
+            isConsumeAllEnabled = false;
+        }
+        return isConsumeAllEnabled;
+    }
+
+    private void setRetryInterval(Map<String, Object> parameters) {
+        if (parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL) != null) {
+            try {
+                retryInterval = Integer.parseInt(
+                        (String) parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL));
+            } catch (NumberFormatException nfe) {
+                parameters.remove(ForwardingProcessorConstants.RETRY_INTERVAL);
+                log.error("Invalid value for retry.interval switching back to default value", nfe);
+            }
+        }
+    }
+
     private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
                                            ScheduledMessageForwardingProcessor processor) {
         int maxDeliverAttempts = -1;
@@ -161,6 +177,9 @@
                 if (isMsgRelatedToThisServer(inMsgCtx)) {
                     handleNewMessage(inMsgCtx);
                 }
+                if (jobState == State.CONTINUE_PROCESSING && !consumeAllEnabled) {
+                    jobState = State.STOP_PROCESSING;
+                }
             } else {
                 jobState = State.STOP_PROCESSING;
             }
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
index 027e37b..d6e413e 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
@@ -74,4 +74,9 @@
      * Used to determine the retry interval between retries
      */
     public static final String RETRY_INTERVAL = "retry.interval";
+
+    /**
+     * Used to determine if all the messages should be consumed per iteration
+     */
+    public static final String CONSUME_ALL = "consume.all";
 }