Merge pull request #46 from shafreenAnfar/consumer-all
Add consume one message at a time feature
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index ce54cbe..94ddf4f 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -54,6 +54,7 @@
enum State { CONTINUE_PROCESSING, CONTINUE_RETRYING, STOP_PROCESSING }
private boolean isMaxDeliverAttemptDropEnabled;
+ private boolean consumeAllEnabled;
private int maxDeliverAttempts;
private int retryInterval;
private String deactivateSequence;
@@ -95,15 +96,8 @@
if (parameters != null) {
maxDeliverAttempts = extractMaxDeliveryAttempts(parameters, processor);
isMaxDeliverAttemptDropEnabled = isMaxDeliverAttemptDropEnabled(parameters);
- if (parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL) != null) {
- try {
- retryInterval = Integer.parseInt(
- (String) parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL));
- } catch (NumberFormatException nfe) {
- parameters.remove(ForwardingProcessorConstants.RETRY_INTERVAL);
- log.error("Invalid value for retry.interval switching back to default value", nfe);
- }
- }
+ consumeAllEnabled = isConsumeAllEnabled(parameters);
+ setRetryInterval(parameters);
if (parameters.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES) != null) {
retryHttpStatusCodes = parameters
.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
@@ -112,6 +106,28 @@
}
}
+ private boolean isConsumeAllEnabled(Map<String, Object> parameters) {
+ boolean isConsumeAllEnabled = true;
+ if (parameters.get(ForwardingProcessorConstants.CONSUME_ALL) != null &&
+ parameters.get(ForwardingProcessorConstants.CONSUME_ALL).toString()
+ .equalsIgnoreCase("false")) {
+ isConsumeAllEnabled = false;
+ }
+ return isConsumeAllEnabled;
+ }
+
+ private void setRetryInterval(Map<String, Object> parameters) {
+ if (parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL) != null) {
+ try {
+ retryInterval = Integer.parseInt(
+ (String) parameters.get(ForwardingProcessorConstants.RETRY_INTERVAL));
+ } catch (NumberFormatException nfe) {
+ parameters.remove(ForwardingProcessorConstants.RETRY_INTERVAL);
+ log.error("Invalid value for retry.interval switching back to default value", nfe);
+ }
+ }
+ }
+
private int extractMaxDeliveryAttempts(Map<String, Object> parameters,
ScheduledMessageForwardingProcessor processor) {
int maxDeliverAttempts = -1;
@@ -161,6 +177,9 @@
if (isMsgRelatedToThisServer(inMsgCtx)) {
handleNewMessage(inMsgCtx);
}
+ if (jobState == State.CONTINUE_PROCESSING && !consumeAllEnabled) {
+ jobState = State.STOP_PROCESSING;
+ }
} else {
jobState = State.STOP_PROCESSING;
}
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
index 027e37b..d6e413e 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
@@ -74,4 +74,9 @@
* Used to determine the retry interval between retries
*/
public static final String RETRY_INTERVAL = "retry.interval";
+
+ /**
+ * Used to determine if all the messages should be consumed per iteration
+ */
+ public static final String CONSUME_ALL = "consume.all";
}