integrate derby data source
git-svn-id: https://svn.apache.org/repos/asf/webservices/savan/trunk/java@545717 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
index 851dcc3..60feee7 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
@@ -40,7 +40,7 @@
String SUBSCRIBER_UUID = "SAVAN_EVENTING_SUBSCRIBER_UUID";
}
- interface ElementNames {
+ public interface ElementNames {
String Subscribe = "Subscribe";
String EndTo = "EndTo";
String Delivery = "Delivery";
@@ -58,13 +58,15 @@
String GetStatus = "GetStatus";
String GetStatusResponse = "GetStatusResponse";
String FeedUrl = "FeedUrl";
-
+ String Entry = "entry";
+ String Content = "content";
String deleteFeedResponse = "DeleteFeedResponse";
}
interface Properties {
String SOAPVersion = "SOAPVersion";
String feedUrl = "feedUrl";
+ String DataSource = "DataSource";
}
}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
new file mode 100644
index 0000000..9cced07
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
@@ -0,0 +1,190 @@
+package org.apache.savan.atom;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMDocument;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.impl.builder.StAXBuilder;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.util.StAXUtils;
+import org.apache.axis2.context.MessageContext;
+import org.apache.savan.SavanException;
+
+public class AtomDataSource {
+ public static final String SQL_CREATE_FEEDS = "CREATE TABLE FEEDS(id CHAR(250) NOT NULL, " +
+ "title CHAR(250), updated TIMESTAMP, author CHAR(250), PRIMARY KEY(id))";
+ public static final String SQL_CREATE_ENTRIES = "CREATE TABLE ENTIES(feed CHAR(250), content VARCHAR(2000))";
+
+ public static final String SQL_ADD_FEED = "INSERT INTO FEEDS(id,title, updated,author) VALUES(?,?,?,?)";
+ public static final String SQL_ADD_ENTRY = "INSERT INTO ENTIES(feed, content) VALUES(?,?)";
+ public static final String SQL_GET_ENTRIES_4_FEED = "SELECT content from ENTIES WHERE feed=?";
+ public static final String SQL_GET_FEED_DATA = "SELECT id,title,updated,author from FEEDS WHERE id=?";
+
+
+
+
+ public String framework = "embedded";
+ public String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+ public String protocol = "jdbc:derby:";
+ private Properties props;
+
+ public AtomDataSource() throws SavanException{
+ try {
+ Class.forName(driver).newInstance();
+ System.out.println("Loaded the appropriate driver.");
+
+ props = new Properties();
+ props.put("user", "user1");
+ props.put("password", "user1");
+
+ Connection connection = getConnection();
+ Statement statement = connection.createStatement();
+
+ ResultSet feedtable = connection.getMetaData().getTables(null, null, "FEEDS", null);
+ if(!feedtable.next()){
+ statement.execute(SQL_CREATE_FEEDS);
+ }
+ ResultSet entirestable = connection.getMetaData().getTables(null, null, "ENTIES", null);
+ if(!entirestable.next()){
+ statement.execute(SQL_CREATE_ENTRIES);
+ }
+ connection.close();
+ } catch (InstantiationException e) {
+ throw new SavanException(e);
+ } catch (IllegalAccessException e) {
+ throw new SavanException(e);
+ } catch (ClassNotFoundException e) {
+ throw new SavanException(e);
+ } catch (SQLException e) {
+ throw new SavanException(e);
+ }
+
+ }
+
+ public Connection getConnection() throws SavanException{
+ try {
+ /*
+ The connection specifies create=true to cause
+ the database to be created. To remove the database,
+ remove the directory derbyDB and its contents.
+ The directory derbyDB will be created under
+ the directory that the system property
+ derby.system.home points to, or the current
+ directory if derby.system.home is not set.
+ */
+ return DriverManager.getConnection(protocol +
+ "derbyDB;create=true", props);
+ } catch (SQLException e) {
+ throw new SavanException(e);
+ }
+
+ }
+
+
+ public void addFeed(String id,String title,Date lastEditedtime,String author) throws SavanException{
+
+ try {
+ Connection connection = getConnection();
+ try{
+ PreparedStatement statement = connection.prepareStatement(SQL_ADD_FEED);
+ statement.setString(1,id );
+ statement.setString(2,title );
+ Timestamp t = new Timestamp(lastEditedtime.getTime());
+ statement.setTimestamp(3, t);
+ statement.setString(4, author);
+ statement.executeUpdate();
+ }finally{
+ connection.close();
+ }
+ } catch (SQLException e) {
+ throw new SavanException(e);
+ }
+
+
+ }
+
+ public void addEntry(String id,OMElement entry) throws SavanException{
+ try {
+ StringWriter w = new StringWriter();
+ entry.serialize(w);
+ Connection connection = getConnection();
+ try{
+ PreparedStatement statement = connection.prepareStatement(SQL_ADD_ENTRY);
+ statement.setString(1,id );
+ statement.setString(2,w.getBuffer().toString() );
+ statement.executeUpdate();
+ }finally{
+ connection.close();
+ }
+ } catch (SQLException e) {
+ throw new SavanException(e);
+ } catch (XMLStreamException e) {
+ throw new SavanException(e);
+ }
+ }
+
+
+ public OMElement getFeedAsXml(String feedId) throws SavanException{
+
+ try {
+ Connection connection = getConnection();
+ try{
+ PreparedStatement statement = connection.prepareStatement(SQL_GET_FEED_DATA);
+ statement.setString(1,feedId );
+ ResultSet results = statement.executeQuery();
+ if(results.next()){
+ String title = results.getString("title");
+ Timestamp updatedTime = results.getTimestamp("updated");
+ String author = results.getString("author");
+
+ Feed feed = new Feed(title,feedId,author,updatedTime);
+
+ statement.close();
+
+ statement = connection.prepareStatement(SQL_GET_ENTRIES_4_FEED);
+ statement.setString(1,feedId );
+ results = statement.executeQuery();
+ while(results.next()){
+ String entryAsStr = results.getString("content");
+ InputStream atomIn = new ByteArrayInputStream(entryAsStr.getBytes());
+ XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader(atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING);
+ StAXBuilder builder = new StAXOMBuilder(feed.getFactory(),xmlreader);
+ feed.addEntry(builder.getDocumentElement());
+ }
+ return feed.getFeedAsXml();
+ }else{
+ throw new SavanException("No such feed "+feedId);
+ }
+ }finally{
+ connection.close();
+ }
+ } catch (SQLException e) {
+ throw new SavanException(e);
+ } catch (XMLStreamException e) {
+ throw new SavanException(e);
+ }
+ }
+
+
+
+
+
+}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
index 181abc6..8d817fb 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
@@ -49,44 +49,64 @@
OMElement bodyContent = envlope.getBody().getFirstElement();
OMElement feedID = bodyContent.getFirstElement();
- String pathWRTRepository = "atom/"+feedID.getText();
- File atomFile = messageCtx.getConfigurationContext().getRealPath(pathWRTRepository);
- if(pathWRTRepository.equals("atom/all.atom") && !atomFile.exists()){
- AtomSubscriber atomSubscriber = new AtomSubscriber();
- atomSubscriber.setId(new URI("All"));
- atomSubscriber.setAtomFile(atomFile);
- atomSubscriber.setAuthor("DefaultUser");
- atomSubscriber.setTitle("default Feed");
-
- String serviceAddress = messageCtx.getTo().getAddress();
- int cutIndex = serviceAddress.indexOf("services");
- if(cutIndex > 0){
- serviceAddress = serviceAddress.substring(0,cutIndex-1);
- }
- atomSubscriber.setFeedUrl(serviceAddress+"/services/"+messageCtx.getServiceContext().getAxisService().getName() +"/atom?feed=all.atom");
-
-
- SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService());
- if (store == null)
- throw new AxisFault ("Cant find the Savan subscriber store");
- store.store(atomSubscriber);
- }
-
- if(!atomFile.exists()){
- throw new AxisFault("no feed exisits for "+feedID.getText() + " no file found "+ atomFile.getAbsolutePath());
- }
- FileInputStream atomIn = new FileInputStream(atomFile);
-
+
+
+ String feedIDAsUrn = feedID.getText().replaceAll("_", ":").replaceAll(".atom", "");
+
+ SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService());
+ if (store == null)
+ throw new AxisFault ("Cant find the Savan subscriber store");
+
+
+ AtomSubscriber subscriber = (AtomSubscriber)store.retrieve(feedIDAsUrn);
+
SOAPFactory fac = getSOAPFactory(messageCtx);
SOAPEnvelope envelope = fac.getDefaultEnvelope();
+
+ OMElement result = subscriber.getFeedAsXml();
+
+
+
+
+
+// String pathWRTRepository = "atom/"+feedID.getText();
+//
+// File atomFile = messageCtx.getConfigurationContext().getRealPath(pathWRTRepository);
+// if(pathWRTRepository.equals("atom/all.atom") && !atomFile.exists()){
+// AtomSubscriber atomSubscriber = new AtomSubscriber();
+// atomSubscriber.setId(new URI("All"));
+// atomSubscriber.setAtomFile(atomFile);
+// atomSubscriber.setAuthor("DefaultUser");
+// atomSubscriber.setTitle("default Feed");
+//
+// String serviceAddress = messageCtx.getTo().getAddress();
+// int cutIndex = serviceAddress.indexOf("services");
+// if(cutIndex > 0){
+// serviceAddress = serviceAddress.substring(0,cutIndex-1);
+// }
+// atomSubscriber.setFeedUrl(serviceAddress+"/services/"+messageCtx.getServiceContext().getAxisService().getName() +"/atom?feed=all.atom");
+//
+//
+// SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService());
+// if (store == null)
+// throw new AxisFault ("Cant find the Savan subscriber store");
+// store.store(atomSubscriber);
+// }
+//
+//
+// if(!atomFile.exists()){
+// throw new AxisFault("no feed exisits for "+feedID.getText() + " no file found "+ atomFile.getAbsolutePath());
+// }
+// FileInputStream atomIn = new FileInputStream(atomFile);
+
//add the content of the file to the response
- XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader
- (atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING);
- StAXBuilder builder = new StAXOMBuilder(fac,xmlreader);
- OMElement result = (OMElement) builder.getDocumentElement();
+// XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader
+// (atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING);
+// StAXBuilder builder = new StAXOMBuilder(fac,xmlreader);
+// OMElement result = (OMElement) builder.getDocumentElement();
envelope.getBody().addChild(result);
//send beck the response
@@ -125,15 +145,15 @@
} catch (OMException e) {
e.printStackTrace();
throw new AxisFault(e);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- throw new AxisFault(e);
- } catch (XMLStreamException e) {
- e.printStackTrace();
- throw new AxisFault(e);
- } catch (URISyntaxException e) {
- e.printStackTrace();
- throw new AxisFault(e);
+// } catch (FileNotFoundException e) {
+// e.printStackTrace();
+// throw new AxisFault(e);
+// } catch (XMLStreamException e) {
+// e.printStackTrace();
+// throw new AxisFault(e);
+// } catch (URISyntaxException e) {
+// e.printStackTrace();
+// throw new AxisFault(e);
}
}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java
index 77b6352..35adc57 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java
@@ -21,14 +21,20 @@
public class AtomSubscriber implements Subscriber{
private static final Log log = LogFactory.getLog(AtomSubscriber.class);
private Date subscriptionEndingTime = null;
- private Feed feed;
+ //private Feed feed;
private Filter filter = null;
private File atomFile;
private String feedUrl;
-
+ private AtomDataSource atomDataSource;
private URI id;
- private String title;
- private String author;
+
+ public void init(AtomDataSource dataSource,URI id,String title,String author) throws SavanException{
+ this.atomDataSource = dataSource;
+ atomDataSource.addFeed(id.toString(), title, new Date(), author);
+ }
+
+
+
public URI getId() {
return id;
}
@@ -36,7 +42,7 @@
throw new UnsupportedOperationException();
}
public void sendEventData(OMElement eventData) throws SavanException {
- try {
+// try {
Date date = new Date ();
boolean expired = false;
@@ -47,25 +53,28 @@
String message = "Cant notify the listner since the subscription has been expired";
log.debug(message);
}
- if(feed == null){
- feed = new Feed(title,id.toString(),author);
- }
- feed.addEntry(eventData);
- if(!atomFile.getParentFile().exists()){
- atomFile.getParentFile().mkdir();
- }
- FileOutputStream out = new FileOutputStream(atomFile);
- feed.write(out);
- out.close();
- System.out.println("Atom file at "+ atomFile + " is updated");
- } catch (FileNotFoundException e) {
- throw new SavanException(e);
- } catch (XMLStreamException e) {
- throw new SavanException(e);
- } catch (IOException e) {
- throw new SavanException(e);
- }
+ atomDataSource.addEntry(id.toString(), eventData);
+//
+// if(feed == null){
+// feed = new Feed(title,id.toString(),author);
+// }
+// feed.addEntry(eventData);
+//
+// if(!atomFile.getParentFile().exists()){
+// atomFile.getParentFile().mkdir();
+// }
+// FileOutputStream out = new FileOutputStream(atomFile);
+// feed.write(out);
+// out.close();
+// System.out.println("Atom file at "+ atomFile + " is updated");
+// } catch (FileNotFoundException e) {
+// throw new SavanException(e);
+// } catch (XMLStreamException e) {
+// throw new SavanException(e);
+// } catch (IOException e) {
+// throw new SavanException(e);
+// }
}
public void setId(URI id) {
this.id = id;
@@ -84,22 +93,22 @@
throw new UnsupportedOperationException();
}
- public String getAuthor() {
- return author;
- }
+// public String getAuthor() {
+// return author;
+// }
+//
+// public void setAuthor(String author) {
+// this.author = author;
+// }
+//
+//
+// public String getTitle() {
+// return title;
+// }
- public void setAuthor(String author) {
- this.author = author;
- }
-
-
- public String getTitle() {
- return title;
- }
-
- public void setTitle(String title) {
- this.title = title;
- }
+// public void setTitle(String title) {
+// this.title = title;
+// }
public String getFeedUrl(){
@@ -121,6 +130,15 @@
public void setFeedUrl(String feedUrl) {
this.feedUrl = feedUrl;
}
+// public Feed getFeed() {
+// return feed;
+// }
+ public OMElement getFeedAsXml() throws SavanException {
+ return atomDataSource.getFeedAsXml(id.toString());
+ }
+// public void setAtomDataSource(AtomDataSource atomDataSource) {
+// this.atomDataSource = atomDataSource;
+// }
}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
index 98203d6..a87c6f0 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
@@ -31,6 +31,7 @@
import org.apache.axiom.soap.SOAPHeader;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ServiceContext;
import org.apache.savan.SavanConstants;
import org.apache.savan.SavanException;
import org.apache.savan.SavanMessageContext;
@@ -98,6 +99,13 @@
if (envelope==null)
return null;
+ ServiceContext serviceContext = smc.getMessageContext().getServiceContext();
+ AtomDataSource dataSource = (AtomDataSource)serviceContext.getProperty(AtomConstants.Properties.DataSource);
+ if(dataSource == null){
+ dataSource = new AtomDataSource();
+ serviceContext.setProperty(AtomConstants.Properties.DataSource, dataSource);
+ }
+
String subscriberName = protocol.getDefaultSubscriber();
Subscriber subscriber = configurationManager.getSubscriberInstance(subscriberName);
@@ -105,6 +113,8 @@
String message = "Savan only support implementations of Atom subscriber as Subscribers";
throw new SavanException (message);
}
+
+
//find the real path for atom feeds
File repositoryPath = smc.getConfigurationContext().getRealPath("/");
@@ -118,6 +128,7 @@
}
AtomSubscriber atomSubscriber = (AtomSubscriber) subscriber;
+
String id = UUIDGenerator.getUUID();
smc.setProperty(AtomConstants.TransferedProperties.SUBSCRIBER_UUID,id);
atomSubscriber.setId(new URI(id));
@@ -151,8 +162,8 @@
}
atomSubscriber.setFilter(filter);
}
- atomSubscriber.setAuthor(createFeed.getAuthor());
- atomSubscriber.setTitle(createFeed.getTitle());
+
+ atomSubscriber.init(dataSource, new URI(id), createFeed.getTitle(), createFeed.getAuthor());
smc.setProperty(AtomConstants.Properties.feedUrl, atomSubscriber.getFeedUrl());
return atomSubscriber;
} catch (AxisFault e) {
diff --git a/modules/core/src/main/java/org/apache/savan/atom/Feed.java b/modules/core/src/main/java/org/apache/savan/atom/Feed.java
index 0bdebfb..aea30a4 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/Feed.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/Feed.java
@@ -56,11 +56,20 @@
// </feed>
- public Feed(String title, String id, String author) {
+ public Feed(String title, String id, String author,Date lastUpdated) {
this.title = title;
+ if(title != null){
+ title = title.trim();
+ }
+ if(author != null){
+ author = author.trim();
+ }
+
this.id = id;
this.author = author;
- lastUpdated = new Date();
+ if(lastUpdated == null){
+ lastUpdated = new Date();
+ }
factory = OMAbstractFactory.getOMFactory();
document = factory.createOMDocument();
atomNs = factory.createOMNamespace(AtomConstants.ATOM_NAMESPACE,AtomConstants.ATOM_PREFIX);
@@ -107,4 +116,11 @@
//
// }
+ public OMElement getFeedAsXml(){
+ return document.getOMDocumentElement();
+ }
+ public OMFactory getFactory() {
+ return factory;
+ }
+
}
diff --git a/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java b/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java
index 793de80..49e3027 100644
--- a/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java
+++ b/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java
@@ -20,6 +20,7 @@
public interface EventingConstants {
String EVENTING_NAMESPACE = "http://schemas.xmlsoap.org/ws/2004/08/eventing";
+ String EXTENDED_EVENTING_NAMESPACE = "http://ws.apache.org/ws/2007/05/eventing-extended";
String EVENTING_PREFIX = "wse";
String DEFAULT_DELIVERY_MODE = "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push";
String DEFAULT_FILTER_IDENTIFIER = FilterDialects.XPath;
@@ -46,6 +47,7 @@
String Unsubscribe = "Unsubscribe";
String GetStatus = "GetStatus";
String GetStatusResponse = "GetStatusResponse";
+ String Topic = "topic";
}
interface Actions {
@@ -57,7 +59,7 @@
String UnsubscribeResponse = "http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse";
String GetStatus = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus";
String GetStatusResponse = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse";
- String Publish = "http://wso2.com/ws/2007/05/eventing/Publish";
+ String Publish = "http://ws.apache.org/ws/2007/05/eventing-extended/Publish";
}
interface Properties {
diff --git a/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java
index fe4ebba..020c0dd 100644
--- a/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java
+++ b/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java
@@ -1,17 +1,76 @@
package org.apache.savan.messagereceiver;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.StringTokenizer;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.engine.MessageReceiver;
+import org.apache.savan.atom.AtomConstants;
+import org.apache.savan.eventing.EventingConstants;
import org.apache.savan.publication.client.PublicationClient;
+/**
+ * This Message reciver handles the publish requests. It will received all messages sent to SOAP/WS action
+ * http://ws.apache.org/ws/2007/05/eventing-extended/Publish, or request URL http://<host>:port//services/<service-name>/publish.
+ * It will search for topic in URL query parameter "topic" or
+ * Soap Header <eevt::topic xmlns="http://ws.apache.org/ws/2007/05/eventing-extended">...</topic>
+ * @author Srinath Perera (hemapani@apache.org)
+ */
public class PublishingMessageReceiver implements MessageReceiver{
+
public void receive(MessageContext messageCtx) throws AxisFault {
- SOAPEnvelope requestEnvelope = messageCtx.getEnvelope();
- ServiceContext serviceContext = messageCtx.getServiceContext();
- PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext());
- client.sendPublication(requestEnvelope.getBody().getFirstElement(),serviceContext.getAxisService(),null);
+ try {
+ String toAddress = messageCtx.getTo().getAddress();
+ //Here we try to locate the topic. It can be either a query parameter of the input address or a header
+ //in the SOAP evvelope
+ URI topic = null;
+
+ SOAPEnvelope requestEnvelope = messageCtx.getEnvelope();
+ int querySeperatorIndex = toAddress.indexOf('?');
+ if(querySeperatorIndex > 0){
+ String queryString = toAddress.substring(querySeperatorIndex+1);
+ HashMap map = new HashMap();
+ StringTokenizer t = new StringTokenizer(queryString,"=&");
+ while(t.hasMoreTokens()){
+ map.put(t.nextToken(), t.nextToken());
+ }
+ if(map.containsKey(EventingConstants.ElementNames.Topic)){
+ topic = new URI((String)map.get(EventingConstants.ElementNames.Topic));
+ }
+ }else{
+ OMElement topicHeader = requestEnvelope.getHeader().getFirstChildWithName(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE,
+ EventingConstants.ElementNames.Topic));
+ if(topicHeader != null){
+ topic = new URI(topicHeader.getText());
+ }
+ }
+
+ //Here we locate the content of the Event. If this is APP we unwrap APP wrapping elements.
+ OMElement eventData = requestEnvelope.getBody().getFirstElement();
+ if(AtomConstants.ATOM_NAMESPACE.equals(eventData.getNamespace().getNamespaceURI()) &&
+ AtomConstants.ElementNames.Entry.equals(eventData.getLocalName())){
+ OMElement content = eventData.getFirstChildWithName(new QName(AtomConstants.ATOM_NAMESPACE,AtomConstants.ElementNames.Content));
+ if(content != null && content.getFirstElement() != null){
+ eventData.getFirstElement();
+ }
+ }
+ //Use in memory API to publish the event
+ ServiceContext serviceContext = messageCtx.getServiceContext();
+ PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext());
+ client.sendPublication(eventData,serviceContext.getAxisService(),topic);
+ } catch (OMException e) {
+ throw new AxisFault(e);
+ } catch (URISyntaxException e) {
+ throw new AxisFault(e);
+ }
}
}
diff --git a/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java b/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java
new file mode 100644
index 0000000..e24a658
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java
@@ -0,0 +1,246 @@
+package org.apache.axis2.savan.atom;
+
+import java.io.StringWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.savan.SavanException;
+import org.apache.savan.atom.AtomDataSource;
+
+import junit.framework.TestCase;
+
+public class DerbyTest extends TestCase{
+
+ /* the default framework is embedded*/
+ public String framework = "embedded";
+ public String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+ public String protocol = "jdbc:derby:";
+
+ public void testDataSource() throws Exception{
+ AtomDataSource dataSource = new AtomDataSource();
+ String id = "id"+new Random().nextDouble();
+ dataSource.addFeed(id, "foo", new Date(),"Srinath");
+ dataSource.addEntry(id,getDummyMethodRequestElement());
+ StringWriter w = new StringWriter();
+
+ OMElement result = dataSource.getFeedAsXml(id);
+// Iterator it = result.getChildElements();
+// while(it.hasNext()){
+// System.out.println(it.next());
+// }
+
+ result.serialize(w);
+ System.out.println(w.getBuffer().toString());
+ }
+
+// public static void main(String[] args)
+// {
+// new DerbyTest().go(args);
+// }
+
+// void go(String[] args)
+// {
+// /* parse the arguments to determine which framework is desired*/
+// parseArguments(args);
+//
+// System.out.println("SimpleApp starting in " + framework + " mode.");
+//
+// try
+// {
+// /*
+// The driver is installed by loading its class.
+// In an embedded environment, this will start up Derby, since it is not already running.
+// */
+// Class.forName(driver).newInstance();
+// System.out.println("Loaded the appropriate driver.");
+//
+// Connection conn = null;
+// Properties props = new Properties();
+// props.put("user", "user1");
+// props.put("password", "user1");
+//
+// /*
+// The connection specifies create=true to cause
+// the database to be created. To remove the database,
+// remove the directory derbyDB and its contents.
+// The directory derbyDB will be created under
+// the directory that the system property
+// derby.system.home points to, or the current
+// directory if derby.system.home is not set.
+// */
+// conn = DriverManager.getConnection(protocol +
+// "derbyDB;create=true", props);
+//
+// System.out.println("Connected to and created database derbyDB");
+//
+// conn.setAutoCommit(false);
+//
+// /*
+// Creating a statement lets us issue commands against
+// the connection.
+// */
+// Statement s = conn.createStatement();
+//
+// /*
+// We create a table, add a few rows, and update one.
+// */
+// s.execute("create table derbyDB(num int, addr varchar(40))");
+// System.out.println("Created table derbyDB");
+// s.execute("insert into derbyDB values (1956,'Webster St.')");
+// System.out.println("Inserted 1956 Webster");
+// s.execute("insert into derbyDB values (1910,'Union St.')");
+// System.out.println("Inserted 1910 Union");
+// s.execute(
+// "update derbyDB set num=180, addr='Grand Ave.' where num=1956");
+// System.out.println("Updated 1956 Webster to 180 Grand");
+//
+// s.execute(
+// "update derbyDB set num=300, addr='Lakeshore Ave.' where num=180");
+// System.out.println("Updated 180 Grand to 300 Lakeshore");
+//
+// /*
+// We select the rows and verify the results.
+// */
+// ResultSet rs = s.executeQuery(
+// "SELECT num, addr FROM derbyDB ORDER BY num");
+//
+// if (!rs.next())
+// {
+// throw new Exception("Wrong number of rows");
+// }
+//
+// if (rs.getInt(1) != 300)
+// {
+// throw new Exception("Wrong row returned");
+// }
+//
+// if (!rs.next())
+// {
+// throw new Exception("Wrong number of rows");
+// }
+//
+// if (rs.getInt(1) != 1910)
+// {
+// throw new Exception("Wrong row returned");
+// }
+//
+// if (rs.next())
+// {
+// throw new Exception("Wrong number of rows");
+// }
+//
+// System.out.println("Verified the rows");
+//
+// s.execute("drop table derbyDB");
+// System.out.println("Dropped table derbyDB");
+//
+// /*
+// We release the result and statement resources.
+// */
+// rs.close();
+// s.close();
+// System.out.println("Closed result set and statement");
+//
+// /*
+// We end the transaction and the connection.
+// */
+// conn.commit();
+// conn.close();
+// System.out.println("Committed transaction and closed connection");
+//
+// /*
+// In embedded mode, an application should shut down Derby.
+// If the application fails to shut down Derby explicitly,
+// the Derby does not perform a checkpoint when the JVM shuts down, which means
+// that the next connection will be slower.
+// Explicitly shutting down Derby with the URL is preferred.
+// This style of shutdown will always throw an "exception".
+// */
+// boolean gotSQLExc = false;
+//
+// if (framework.equals("embedded"))
+// {
+// try
+// {
+// DriverManager.getConnection("jdbc:derby:;shutdown=true");
+// }
+// catch (SQLException se)
+// {
+// gotSQLExc = true;
+// }
+//
+// if (!gotSQLExc)
+// {
+// System.out.println("Database did not shut down normally");
+// }
+// else
+// {
+// System.out.println("Database shut down normally");
+// }
+// }
+// }
+// catch (Throwable e)
+// {
+// System.out.println("exception thrown:");
+//
+// if (e instanceof SQLException)
+// {
+// printSQLError((SQLException) e);
+// }
+// else
+// {
+// e.printStackTrace();
+// }
+// }
+//
+// System.out.println("SimpleApp finished");
+// }
+//
+// static void printSQLError(SQLException e)
+// {
+// while (e != null)
+// {
+// System.out.println(e.toString());
+// e = e.getNextException();
+// }
+// }
+//
+// private void parseArguments(String[] args)
+// {
+// int length = args.length;
+//
+// for (int index = 0; index < length; index++)
+// {
+// if (args[index].equalsIgnoreCase("jccjdbcclient"))
+// {
+// framework = "jccjdbc";
+// driver = "com.ibm.db2.jcc.DB2Driver";
+// protocol = "jdbc:derby:net://localhost:1527/";
+// }
+// if (args[index].equalsIgnoreCase("derbyclient"))
+// {
+// framework = "derbyclient";
+// driver = "org.apache.derby.jdbc.ClientDriver";
+// protocol = "jdbc:derby://localhost:1527/";
+// }
+// }
+// }
+ private final String applicationNamespaceName = "http://tempuri.org/";
+ private final String dummyMethod = "dummyMethod";
+ private OMElement getDummyMethodRequestElement() {
+ OMFactory fac = OMAbstractFactory.getOMFactory();
+ OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1");
+ return fac.createOMElement(dummyMethod, namespace);
+ }
+ }
diff --git a/modules/mar/module.xml b/modules/mar/module.xml
index 65a7bcf..d5b2b4b 100644
--- a/modules/mar/module.xml
+++ b/modules/mar/module.xml
@@ -18,7 +18,7 @@
<operation name="publish" mep="http://www.w3.org/2004/08/wsdl/in-out">
<messageReceiver class="org.apache.savan.messagereceiver.PublishingMessageReceiver"/>
- <actionMapping>http://wso2.com/ws/2007/05/eventing/Publish</actionMapping>
+ <actionMapping>http://ws.apache.org/ws/2007/05/eventing-extended/Publish</actionMapping>
</operation>
</module>