blob: c5db337e72311d1ee96dd469e1d88d4aa35ce0ac [file] [log] [blame]
package org.apache.savan.messagereceiver;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMException;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.engine.MessageReceiver;
import org.apache.savan.atom.AtomConstants;
import org.apache.savan.eventing.EventingConstants;
import org.apache.savan.publication.client.PublicationClient;
import javax.xml.namespace.QName;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.StringTokenizer;
/**
* This Message reciver handles the publish requests. It will received all messages sent to SOAP/WS
* action http://ws.apache.org/ws/2007/05/eventing-extended/Publish, or request URL
* http://<host>:port//services/<service-name>/publish. It will search for topic in URL query
* parameter "topic" or Soap Header <eevt::topic xmlns="http://ws.apache.org/ws/2007/05/eventing-extended">...</topic>
*
* @author Srinath Perera (hemapani@apache.org)
*/
public class PublishingMessageReceiver implements MessageReceiver {
public void receive(MessageContext messageCtx) throws AxisFault {
try {
String toAddress = messageCtx.getTo().getAddress();
//Here we try to locate the topic. It can be either a query parameter of the input address or a header
//in the SOAP evvelope
URI topic = null;
SOAPEnvelope requestEnvelope = messageCtx.getEnvelope();
int querySeperatorIndex = toAddress.indexOf('?');
if (querySeperatorIndex > 0) {
String queryString = toAddress.substring(querySeperatorIndex + 1);
HashMap map = new HashMap();
StringTokenizer t = new StringTokenizer(queryString, "=&");
while (t.hasMoreTokens()) {
map.put(t.nextToken(), t.nextToken());
}
if (map.containsKey(EventingConstants.ElementNames.Topic)) {
topic = new URI((String)map.get(EventingConstants.ElementNames.Topic));
}
} else {
OMElement topicHeader = requestEnvelope.getHeader().getFirstChildWithName(
new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE,
EventingConstants.ElementNames.Topic));
if (topicHeader != null) {
topic = new URI(topicHeader.getText());
}
}
//Here we locate the content of the Event. If this is APP we unwrap APP wrapping elements.
OMElement eventData = requestEnvelope.getBody().getFirstElement();
if (AtomConstants.ATOM_NAMESPACE.equals(eventData.getNamespace().getNamespaceURI()) &&
AtomConstants.ElementNames.Entry.equals(eventData.getLocalName())) {
OMElement content = eventData.getFirstChildWithName(new QName(
AtomConstants.ATOM_NAMESPACE, AtomConstants.ElementNames.Content));
if (content != null && content.getFirstElement() != null) {
eventData.getFirstElement();
}
}
//Use in memory API to publish the event
ServiceContext serviceContext = messageCtx.getServiceContext();
PublicationClient client =
new PublicationClient(serviceContext.getConfigurationContext());
client.sendPublication(eventData, serviceContext.getAxisService(), topic);
} catch (OMException e) {
throw AxisFault.makeFault(e);
} catch (URISyntaxException e) {
throw AxisFault.makeFault(e);
}
}
}