| package org.apache.savan.messagereceiver; |
| |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.HashMap; |
| import java.util.StringTokenizer; |
| |
| import javax.xml.namespace.QName; |
| |
| import org.apache.axiom.om.OMElement; |
| import org.apache.axiom.om.OMException; |
| import org.apache.axiom.soap.SOAPEnvelope; |
| import org.apache.axis2.AxisFault; |
| import org.apache.axis2.context.MessageContext; |
| import org.apache.axis2.context.ServiceContext; |
| import org.apache.axis2.engine.MessageReceiver; |
| import org.apache.savan.atom.AtomConstants; |
| import org.apache.savan.eventing.EventingConstants; |
| import org.apache.savan.publication.client.PublicationClient; |
| |
| /** |
| * This Message reciver handles the publish requests. It will received all messages sent to SOAP/WS action |
| * http://ws.apache.org/ws/2007/05/eventing-extended/Publish, or request URL http://<host>:port//services/<service-name>/publish. |
| * It will search for topic in URL query parameter "topic" or |
| * Soap Header <eevt::topic xmlns="http://ws.apache.org/ws/2007/05/eventing-extended">...</topic> |
| * @author Srinath Perera (hemapani@apache.org) |
| */ |
| public class PublishingMessageReceiver implements MessageReceiver{ |
| |
| public void receive(MessageContext messageCtx) throws AxisFault { |
| try { |
| String toAddress = messageCtx.getTo().getAddress(); |
| //Here we try to locate the topic. It can be either a query parameter of the input address or a header |
| //in the SOAP evvelope |
| URI topic = null; |
| |
| SOAPEnvelope requestEnvelope = messageCtx.getEnvelope(); |
| int querySeperatorIndex = toAddress.indexOf('?'); |
| if(querySeperatorIndex > 0){ |
| String queryString = toAddress.substring(querySeperatorIndex+1); |
| HashMap map = new HashMap(); |
| StringTokenizer t = new StringTokenizer(queryString,"=&"); |
| while(t.hasMoreTokens()){ |
| map.put(t.nextToken(), t.nextToken()); |
| } |
| if(map.containsKey(EventingConstants.ElementNames.Topic)){ |
| topic = new URI((String)map.get(EventingConstants.ElementNames.Topic)); |
| } |
| }else{ |
| OMElement topicHeader = requestEnvelope.getHeader().getFirstChildWithName(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE, |
| EventingConstants.ElementNames.Topic)); |
| if(topicHeader != null){ |
| topic = new URI(topicHeader.getText()); |
| } |
| } |
| |
| //Here we locate the content of the Event. If this is APP we unwrap APP wrapping elements. |
| OMElement eventData = requestEnvelope.getBody().getFirstElement(); |
| if(AtomConstants.ATOM_NAMESPACE.equals(eventData.getNamespace().getNamespaceURI()) && |
| AtomConstants.ElementNames.Entry.equals(eventData.getLocalName())){ |
| OMElement content = eventData.getFirstChildWithName(new QName(AtomConstants.ATOM_NAMESPACE,AtomConstants.ElementNames.Content)); |
| if(content != null && content.getFirstElement() != null){ |
| eventData.getFirstElement(); |
| } |
| } |
| //Use in memory API to publish the event |
| ServiceContext serviceContext = messageCtx.getServiceContext(); |
| PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext()); |
| client.sendPublication(eventData,serviceContext.getAxisService(),topic); |
| } catch (OMException e) { |
| throw new AxisFault(e); |
| } catch (URISyntaxException e) { |
| throw new AxisFault(e); |
| } |
| } |
| } |