Merge pull request #50 from shafreenAnfar/targetEndpoint
Improving how Target Endpoint is Associated with Message Forwarding Processor
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
index 1735eec..254bd87 100644
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
@@ -54,8 +54,12 @@
"parameter");
public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE,
"messageStore");
+ public static final QName TARGET_ENDPOINT_Q =
+ new QName(XMLConfigConstants.NULL_NAMESPACE, "targetEndpoint");
private static final QName DESCRIPTION_Q
= new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
+ public static final String FORWARDING_PROCESSOR =
+ "org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor";
/**
@@ -88,6 +92,18 @@
handleException("Can't create Message processor without a name ");
}
+ if (FORWARDING_PROCESSOR.equals(clssAtt.getAttributeValue())) {
+ OMAttribute targetEndpointAtt = elem.getAttribute(TARGET_ENDPOINT_Q);
+
+ if (targetEndpointAtt != null) {
+ assert processor != null;
+ processor.setTargetEndpoint(targetEndpointAtt.getAttributeValue());
+ } else {
+ // This validation is commented due to backward compatibility
+ // handleException("Can't create Message processor without a target endpoint ");
+ }
+ }
+
OMAttribute storeAtt = elem.getAttribute(MESSAGE_STORE_Q);
if (storeAtt != null) {
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
index f2a7d48..a2311ef 100644
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
@@ -49,6 +49,8 @@
private static final Log log = LogFactory.getLog(MessageProcessorSerializer.class);
+ public static final String FORWARDING_PROCESSOR =
+ "org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor";
protected static final OMFactory fac = OMAbstractFactory.getOMFactory();
protected static final OMNamespace synNS = SynapseConstants.SYNAPSE_OMNAMESPACE;
protected static final OMNamespace nullNS = fac.createOMNamespace(
@@ -78,6 +80,16 @@
handleException("Message store Name not specified");
}
+ if (FORWARDING_PROCESSOR.equals(processor.getClass().getName())) {
+ if (processor.getTargetEndpoint() != null) {
+ processorElem.addAttribute(
+ fac.createOMAttribute("targetEndpoint", nullNS, processor.getTargetEndpoint()));
+ } else {
+ // This validation is removed to support backward compatibility
+ // handleException("Target Endpoint not specified");
+ }
+ }
+
if (processor.getMessageStoreName() != null) {
processorElem.addAttribute(fac.createOMAttribute(
"messageStore", nullNS, processor.getMessageStoreName()));
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java b/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
index b319067..d33b8c8 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
@@ -47,6 +47,10 @@
protected String fileName;
+ /** This attribute is only need for forwarding message processor. However, it here because
+ * then we don't need to implement this in sampling processor with nothing */
+ protected String targetEndpoint;
+
protected SynapseConfiguration configuration;
protected enum State {
@@ -117,4 +121,12 @@
public String getFileName() {
return fileName;
}
+
+ public void setTargetEndpoint(String targetEndpoint) {
+ this.targetEndpoint = targetEndpoint;
+ }
+
+ public String getTargetEndpoint() {
+ return targetEndpoint;
+ }
}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
index b8af8b2..17ef51c 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
@@ -85,4 +85,17 @@
* @return Name of the file where this artifact is defined
*/
public String getFileName();
+
+ /**
+ * This method set the target-endpoint associated with the Message Processor. Target-endpoint is a required
+ * parameter for Message Forwarding Processor but optional for Sampling Processor.
+ * @param targetEndpoint is the name of the associated endpoint
+ */
+ void setTargetEndpoint(String targetEndpoint);
+
+ /**
+ * This method is used to retrieve the associated target endpoint name of the message processor.
+ * @return The name of the endpoint
+ */
+ String getTargetEndpoint();
}
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java b/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
index c748ee2..eeb4eed 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
@@ -21,6 +21,7 @@
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.forward.ForwardingProcessorConstants;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
@@ -84,6 +85,7 @@
JobDataMap jobDataMap = getJobDataMap();
jobDataMap.put(MessageProcessorConstants.MESSAGE_STORE,
configuration.getMessageStore(messageStore));
+ jobDataMap.put(ForwardingProcessorConstants.TARGET_ENDPOINT, getTargetEndpoint());
jobDataMap.put(MessageProcessorConstants.PARAMETERS, parameters);
JobDetail jobDetail = jobBuilder.usingJobData(jobDataMap).build();
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 94ddf4f..aee0d08 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -65,6 +65,7 @@
private MessageStore messageStore;
private Axis2BlockingClient sender;
private ScheduledMessageForwardingProcessor processor;
+ private String targetEndpoint = null;
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//Get the Global Objects from DataMap
@@ -103,6 +104,9 @@
.get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
}
setSequences(parameters);
+ if (jdm.get(ForwardingProcessorConstants.TARGET_ENDPOINT) != null) {
+ targetEndpoint = (String) jdm.get(ForwardingProcessorConstants.TARGET_ENDPOINT);
+ }
}
}
@@ -233,15 +237,17 @@
private void handleNewMessage(MessageContext inMsgCtx) {
sanitizeMsgContext(inMsgCtx);
- String targetEp = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
- if (targetEp != null) {
- Endpoint ep = inMsgCtx.getEndpoint(targetEp);
+ if (targetEndpoint == null) {
+ targetEndpoint = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+ }
+ if (targetEndpoint != null) {
+ Endpoint ep = inMsgCtx.getEndpoint(targetEndpoint);
// stop processing if endpoint is not ready to send
if(ep.getContext().readyToSend()) {
if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
sendMsgToEndpoint(inMsgCtx, ep);
} else {
- logMsg(targetEp, ep);
+ logMsg(targetEndpoint, ep);
messageStore.poll();
}
}