Merge pull request #50 from shafreenAnfar/targetEndpoint

Improving how Target Endpoint is Associated with Message Forwarding Processor
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
index 1735eec..254bd87 100644
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorFactory.java
@@ -54,8 +54,12 @@
                                                       "parameter");
     public static final QName MESSAGE_STORE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE,
                                                           "messageStore");
+    public static final QName TARGET_ENDPOINT_Q =
+            new QName(XMLConfigConstants.NULL_NAMESPACE, "targetEndpoint");
     private static final QName DESCRIPTION_Q
             = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description");
+    public static final String FORWARDING_PROCESSOR =
+            "org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor";
 
 
     /**
@@ -88,6 +92,18 @@
             handleException("Can't create Message processor without a name ");
         }
 
+        if (FORWARDING_PROCESSOR.equals(clssAtt.getAttributeValue())) {
+            OMAttribute targetEndpointAtt = elem.getAttribute(TARGET_ENDPOINT_Q);
+
+            if (targetEndpointAtt != null) {
+                assert processor != null;
+                processor.setTargetEndpoint(targetEndpointAtt.getAttributeValue());
+            } else {
+                // This validation is commented due to backward compatibility
+                // handleException("Can't create Message processor without a target endpoint ");
+            }
+        }
+
         OMAttribute storeAtt = elem.getAttribute(MESSAGE_STORE_Q);
 
         if (storeAtt != null) {
diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
index f2a7d48..a2311ef 100644
--- a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
+++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageProcessorSerializer.java
@@ -49,6 +49,8 @@
 
     private static final Log log = LogFactory.getLog(MessageProcessorSerializer.class);
 
+    public static final String FORWARDING_PROCESSOR =
+            "org.apache.synapse.message.processors.forward.ScheduledMessageForwardingProcessor";
     protected static final OMFactory fac = OMAbstractFactory.getOMFactory();
     protected static final OMNamespace synNS = SynapseConstants.SYNAPSE_OMNAMESPACE;
     protected static final OMNamespace nullNS = fac.createOMNamespace(
@@ -78,6 +80,16 @@
             handleException("Message store Name not specified");
         }
 
+        if (FORWARDING_PROCESSOR.equals(processor.getClass().getName())) {
+            if (processor.getTargetEndpoint() != null) {
+                processorElem.addAttribute(
+                        fac.createOMAttribute("targetEndpoint", nullNS, processor.getTargetEndpoint()));
+            } else {
+                // This validation is removed to support backward compatibility
+                // handleException("Target Endpoint not specified");
+            }
+        }
+
         if (processor.getMessageStoreName() != null) {
             processorElem.addAttribute(fac.createOMAttribute(
                     "messageStore", nullNS, processor.getMessageStoreName()));
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java b/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
index b319067..d33b8c8 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
@@ -47,6 +47,10 @@
 
     protected String fileName;
 
+    /** This attribute is only need for forwarding message processor. However, it here because
+     * then we don't need to implement this in sampling processor with nothing */
+    protected String targetEndpoint;
+
     protected SynapseConfiguration configuration;
 
     protected enum State {
@@ -117,4 +121,12 @@
     public String getFileName() {
         return fileName;
     }
+
+    public void setTargetEndpoint(String targetEndpoint) {
+        this.targetEndpoint = targetEndpoint;
+    }
+
+    public String getTargetEndpoint() {
+        return targetEndpoint;
+    }
 }
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
index b8af8b2..17ef51c 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
@@ -85,4 +85,17 @@
      * @return Name of the file where this artifact is defined
      */
     public String getFileName();
+
+    /**
+     * This method set the target-endpoint associated with the Message Processor. Target-endpoint is a required
+     * parameter for Message Forwarding Processor but optional for Sampling Processor.
+     * @param targetEndpoint is the name of the associated endpoint
+     */
+    void setTargetEndpoint(String targetEndpoint);
+
+    /**
+     * This method is used to retrieve the associated target endpoint name of the message processor.
+     * @return The name of the endpoint
+     */
+    String getTargetEndpoint();
 }
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java b/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
index c748ee2..eeb4eed 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
@@ -21,6 +21,7 @@
 
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.forward.ForwardingProcessorConstants;
 import org.quartz.*;
 import org.quartz.impl.StdSchedulerFactory;
 
@@ -84,6 +85,7 @@
         JobDataMap jobDataMap = getJobDataMap();
         jobDataMap.put(MessageProcessorConstants.MESSAGE_STORE,
                        configuration.getMessageStore(messageStore));
+        jobDataMap.put(ForwardingProcessorConstants.TARGET_ENDPOINT, getTargetEndpoint());
         jobDataMap.put(MessageProcessorConstants.PARAMETERS, parameters);
 
         JobDetail jobDetail = jobBuilder.usingJobData(jobDataMap).build();
diff --git a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
index 94ddf4f..aee0d08 100644
--- a/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
+++ b/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
@@ -65,6 +65,7 @@
     private MessageStore messageStore;
     private Axis2BlockingClient sender;
     private ScheduledMessageForwardingProcessor processor;
+    private String targetEndpoint = null;
 
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
         //Get the Global Objects from DataMap
@@ -103,6 +104,9 @@
                         .get(ForwardingProcessorConstants.RETRY_HTTP_STATUS_CODES).toString().split(",");
             }
             setSequences(parameters);
+            if (jdm.get(ForwardingProcessorConstants.TARGET_ENDPOINT) != null) {
+                targetEndpoint = (String) jdm.get(ForwardingProcessorConstants.TARGET_ENDPOINT);
+            }
         }
     }
 
@@ -233,15 +237,17 @@
 
     private void handleNewMessage(MessageContext inMsgCtx) {
         sanitizeMsgContext(inMsgCtx);
-        String targetEp = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
-        if (targetEp != null) {
-            Endpoint ep = inMsgCtx.getEndpoint(targetEp);
+        if (targetEndpoint == null) {
+            targetEndpoint = (String) inMsgCtx.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+        }
+        if (targetEndpoint != null) {
+            Endpoint ep = inMsgCtx.getEndpoint(targetEndpoint);
             // stop processing if endpoint is not ready to send
             if(ep.getContext().readyToSend()) {
                 if ((ep != null) && (((AbstractEndpoint) ep).isLeafEndpoint())) {
                     sendMsgToEndpoint(inMsgCtx, ep);
                 } else {
-                    logMsg(targetEp, ep);
+                    logMsg(targetEndpoint, ep);
                     messageStore.poll();
                 }
             }