integrate derby data source

git-svn-id: https://svn.apache.org/repos/asf/webservices/savan/trunk/java@545717 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
index 851dcc3..60feee7 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
@@ -40,7 +40,7 @@
 		String SUBSCRIBER_UUID = "SAVAN_EVENTING_SUBSCRIBER_UUID";
 	}
 	
-	interface ElementNames {
+	public interface ElementNames {
 		String Subscribe = "Subscribe";
 		String EndTo = "EndTo";
 		String Delivery = "Delivery";
@@ -58,13 +58,15 @@
 		String GetStatus = "GetStatus";
 		String GetStatusResponse = "GetStatusResponse";
 		String FeedUrl = "FeedUrl";
-		
+		String Entry = "entry";
+		String Content = "content";
 		String deleteFeedResponse = "DeleteFeedResponse";
 	}
 	
 	interface Properties {
 		String SOAPVersion = "SOAPVersion";
 		String feedUrl = "feedUrl";
+		String DataSource = "DataSource";
 	}
 	
 }
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
new file mode 100644
index 0000000..9cced07
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
@@ -0,0 +1,190 @@
+package org.apache.savan.atom;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMDocument;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.impl.builder.StAXBuilder;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.util.StAXUtils;
+import org.apache.axis2.context.MessageContext;
+import org.apache.savan.SavanException;
+
+public class AtomDataSource {
+	public static final String SQL_CREATE_FEEDS = "CREATE TABLE FEEDS(id CHAR(250) NOT NULL, " +
+			"title CHAR(250), updated TIMESTAMP, author CHAR(250), PRIMARY KEY(id))";
+	public static final String SQL_CREATE_ENTRIES = "CREATE TABLE ENTIES(feed CHAR(250), content VARCHAR(2000))";
+
+	public static final String SQL_ADD_FEED = "INSERT INTO FEEDS(id,title, updated,author) VALUES(?,?,?,?)";
+	public static final String SQL_ADD_ENTRY = "INSERT INTO ENTIES(feed, content) VALUES(?,?)";
+	public static final String SQL_GET_ENTRIES_4_FEED = "SELECT content from ENTIES WHERE feed=?";
+	public static final String SQL_GET_FEED_DATA = "SELECT id,title,updated,author from FEEDS WHERE id=?";
+
+	
+	
+	
+	public String framework = "embedded";
+    public String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+    public String protocol = "jdbc:derby:";
+    private Properties props;
+	
+	public AtomDataSource() throws SavanException{
+		try {
+			Class.forName(driver).newInstance();
+			System.out.println("Loaded the appropriate driver.");
+
+			props = new Properties();
+			props.put("user", "user1");
+			props.put("password", "user1");
+			
+			Connection connection = getConnection();
+			Statement statement = connection.createStatement();
+			
+			ResultSet feedtable = connection.getMetaData().getTables(null, null, "FEEDS", null);
+			if(!feedtable.next()){
+				statement.execute(SQL_CREATE_FEEDS);	
+			}
+			ResultSet entirestable = connection.getMetaData().getTables(null, null, "ENTIES", null);
+			if(!entirestable.next()){
+				statement.execute(SQL_CREATE_ENTRIES);
+			}
+			connection.close();
+		} catch (InstantiationException e) {
+			throw new SavanException(e);
+		} catch (IllegalAccessException e) {
+			throw new SavanException(e);
+		} catch (ClassNotFoundException e) {
+			throw new SavanException(e);
+		} catch (SQLException e) {
+			throw new SavanException(e);
+		}
+
+	}
+	
+	public Connection getConnection() throws SavanException{
+        try {
+		/*
+		    The connection specifies create=true to cause
+		    the database to be created. To remove the database,
+		    remove the directory derbyDB and its contents.
+		    The directory derbyDB will be created under
+		    the directory that the system property
+		    derby.system.home points to, or the current
+		    directory if derby.system.home is not set.
+		  */
+		 return DriverManager.getConnection(protocol +
+		         "derbyDB;create=true", props);
+	} catch (SQLException e) {
+		throw new SavanException(e);
+	}
+		
+	}
+	
+	
+	public void addFeed(String id,String title,Date lastEditedtime,String author) throws SavanException{
+		
+		try {
+			Connection connection = getConnection();
+			try{
+				PreparedStatement statement = connection.prepareStatement(SQL_ADD_FEED);
+				statement.setString(1,id );
+				statement.setString(2,title );
+				Timestamp t = new Timestamp(lastEditedtime.getTime());
+				statement.setTimestamp(3, t);
+				statement.setString(4, author);
+				statement.executeUpdate();
+			}finally{
+				connection.close();
+			}
+		} catch (SQLException e) {
+			throw new SavanException(e);
+		}
+		
+		
+	}
+	
+	public void addEntry(String id,OMElement entry) throws SavanException{
+		try {
+			StringWriter w = new StringWriter();
+			entry.serialize(w);
+			Connection connection = getConnection();
+			try{
+				PreparedStatement statement = connection.prepareStatement(SQL_ADD_ENTRY);
+				statement.setString(1,id );
+				statement.setString(2,w.getBuffer().toString() );
+				statement.executeUpdate();
+			}finally{
+				connection.close();
+			}
+		} catch (SQLException e) {
+			throw new SavanException(e);
+		} catch (XMLStreamException e) {
+			throw new SavanException(e);
+		}
+	}
+	
+	
+	public OMElement getFeedAsXml(String feedId) throws SavanException{
+		
+		try {
+			Connection connection = getConnection();
+			try{
+				PreparedStatement statement = connection.prepareStatement(SQL_GET_FEED_DATA);
+				statement.setString(1,feedId );
+				ResultSet results = statement.executeQuery();
+				if(results.next()){
+					String title = results.getString("title");
+					Timestamp updatedTime = results.getTimestamp("updated");
+					String author = results.getString("author");
+					
+					Feed feed = new Feed(title,feedId,author,updatedTime);
+					
+					statement.close();
+					
+					statement = connection.prepareStatement(SQL_GET_ENTRIES_4_FEED);
+					statement.setString(1,feedId );
+					results = statement.executeQuery();
+					while(results.next()){
+						String entryAsStr = results.getString("content");
+						InputStream atomIn = new ByteArrayInputStream(entryAsStr.getBytes());
+						XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader(atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING);
+						StAXBuilder builder = new StAXOMBuilder(feed.getFactory(),xmlreader);
+						feed.addEntry(builder.getDocumentElement());
+					}
+					return feed.getFeedAsXml();
+				}else{
+					throw new SavanException("No such feed "+feedId);
+				}
+			}finally{
+				connection.close();
+			}
+		} catch (SQLException e) {
+			throw new SavanException(e);
+		} catch (XMLStreamException e) {
+			throw new SavanException(e);
+		}
+	}
+	
+	
+	
+	
+	
+}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
index 181abc6..8d817fb 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
@@ -49,44 +49,64 @@
 				OMElement bodyContent = envlope.getBody().getFirstElement();
 				
 				OMElement feedID = bodyContent.getFirstElement();
-				String pathWRTRepository = "atom/"+feedID.getText();
 				
-				File atomFile = messageCtx.getConfigurationContext().getRealPath(pathWRTRepository);
-				if(pathWRTRepository.equals("atom/all.atom") && !atomFile.exists()){
-					AtomSubscriber atomSubscriber = new AtomSubscriber();
-					atomSubscriber.setId(new URI("All"));
-					atomSubscriber.setAtomFile(atomFile);
-					atomSubscriber.setAuthor("DefaultUser");
-					atomSubscriber.setTitle("default Feed");
-					
-					String serviceAddress = messageCtx.getTo().getAddress();
-					int cutIndex = serviceAddress.indexOf("services");
-					if(cutIndex > 0){
-						serviceAddress = serviceAddress.substring(0,cutIndex-1);
-					}
-					atomSubscriber.setFeedUrl(serviceAddress+"/services/"+messageCtx.getServiceContext().getAxisService().getName() +"/atom?feed=all.atom");
-					
-					
-					SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService());
-					if (store == null)
-						throw new AxisFault ("Cant find the Savan subscriber store");
-					store.store(atomSubscriber);
-				}
-
 				
-				if(!atomFile.exists()){
-					throw new AxisFault("no feed exisits for "+feedID.getText() + " no file found "+ atomFile.getAbsolutePath());
-				}
-				FileInputStream atomIn =  new FileInputStream(atomFile);
-
+				
+				
+				String feedIDAsUrn = feedID.getText().replaceAll("_", ":").replaceAll(".atom", "");
+				
+				SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService());
+				if (store == null)
+					throw new AxisFault ("Cant find the Savan subscriber store");
+				
+				
+				AtomSubscriber subscriber = (AtomSubscriber)store.retrieve(feedIDAsUrn);
+				
 	            SOAPFactory fac = getSOAPFactory(messageCtx);
 	            SOAPEnvelope envelope = fac.getDefaultEnvelope();
+
+	            OMElement result = subscriber.getFeedAsXml();
+				
+				
+				
+				
+				
+//				String pathWRTRepository = "atom/"+feedID.getText();
+//				
+//				File atomFile = messageCtx.getConfigurationContext().getRealPath(pathWRTRepository);
+//				if(pathWRTRepository.equals("atom/all.atom") && !atomFile.exists()){
+//					AtomSubscriber atomSubscriber = new AtomSubscriber();
+//					atomSubscriber.setId(new URI("All"));
+//					atomSubscriber.setAtomFile(atomFile);
+//					atomSubscriber.setAuthor("DefaultUser");
+//					atomSubscriber.setTitle("default Feed");
+//					
+//					String serviceAddress = messageCtx.getTo().getAddress();
+//					int cutIndex = serviceAddress.indexOf("services");
+//					if(cutIndex > 0){
+//						serviceAddress = serviceAddress.substring(0,cutIndex-1);
+//					}
+//					atomSubscriber.setFeedUrl(serviceAddress+"/services/"+messageCtx.getServiceContext().getAxisService().getName() +"/atom?feed=all.atom");
+//					
+//					
+//					SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService());
+//					if (store == null)
+//						throw new AxisFault ("Cant find the Savan subscriber store");
+//					store.store(atomSubscriber);
+//				}
+//
+//				
+//				if(!atomFile.exists()){
+//					throw new AxisFault("no feed exisits for "+feedID.getText() + " no file found "+ atomFile.getAbsolutePath());
+//				}
+//				FileInputStream atomIn =  new FileInputStream(atomFile);
+
 	            
 	            //add the content of the file to the response
-	            XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader
-		            (atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING);
-		        StAXBuilder builder = new StAXOMBuilder(fac,xmlreader);
-		        OMElement result = (OMElement) builder.getDocumentElement();
+//	            XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader
+//		            (atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING);
+//		        StAXBuilder builder = new StAXOMBuilder(fac,xmlreader);
+//		        OMElement result = (OMElement) builder.getDocumentElement();
 	            envelope.getBody().addChild(result);
 				
 	            //send beck the response
@@ -125,15 +145,15 @@
 		} catch (OMException e) {
 			e.printStackTrace();
 			throw new AxisFault(e);
-		} catch (FileNotFoundException e) {
-			e.printStackTrace();
-			throw new AxisFault(e);
-		} catch (XMLStreamException e) {
-			e.printStackTrace();
-			throw new AxisFault(e);
-		} catch (URISyntaxException e) {
-			e.printStackTrace();
-			throw new AxisFault(e);
+//		} catch (FileNotFoundException e) {
+//			e.printStackTrace();
+//			throw new AxisFault(e);
+//		} catch (XMLStreamException e) {
+//			e.printStackTrace();
+//			throw new AxisFault(e);
+//		} catch (URISyntaxException e) {
+//			e.printStackTrace();
+//			throw new AxisFault(e);
 		}
 	}
 	
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java
index 77b6352..35adc57 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java
@@ -21,14 +21,20 @@
 public class AtomSubscriber implements Subscriber{
 	private static final Log log = LogFactory.getLog(AtomSubscriber.class);
 	private Date subscriptionEndingTime = null;
-	private Feed feed; 
+	//private Feed feed; 
 	private Filter filter = null;
 	private File atomFile;
 	private String feedUrl;
-	
+	private AtomDataSource atomDataSource;
 	private URI id;
-	private String title;
-	private String author;
+	
+	public void init(AtomDataSource dataSource,URI id,String title,String author) throws SavanException{
+		this.atomDataSource = dataSource;
+		atomDataSource.addFeed(id.toString(), title, new Date(), author);
+	}
+	
+	
+	
 	public URI getId() {
 		return id;
 	}
@@ -36,7 +42,7 @@
 		throw new UnsupportedOperationException();
 	}
 	public void sendEventData(OMElement eventData) throws SavanException {
-		try {
+//		try {
 			Date date = new Date ();
 			
 			boolean expired = false;
@@ -47,25 +53,28 @@
 				String message = "Cant notify the listner since the subscription has been expired";
 				log.debug(message);
 			}
-			if(feed == null){
-				feed = new Feed(title,id.toString(),author);
-			}
-			feed.addEntry(eventData);
 			
-			if(!atomFile.getParentFile().exists()){
-				atomFile.getParentFile().mkdir();
-			}
-			FileOutputStream out = new FileOutputStream(atomFile);
-			feed.write(out);
-			out.close();
-			System.out.println("Atom file at "+ atomFile + " is updated");
-		} catch (FileNotFoundException e) {
-			throw new SavanException(e);
-		} catch (XMLStreamException e) {
-			throw new SavanException(e);
-		} catch (IOException e) {
-			throw new SavanException(e);
-		}
+			atomDataSource.addEntry(id.toString(), eventData);
+//			
+//			if(feed == null){
+//				feed = new Feed(title,id.toString(),author);
+//			}
+//			feed.addEntry(eventData);
+//			
+//			if(!atomFile.getParentFile().exists()){
+//				atomFile.getParentFile().mkdir();
+//			}
+//			FileOutputStream out = new FileOutputStream(atomFile);
+//			feed.write(out);
+//			out.close();
+//			System.out.println("Atom file at "+ atomFile + " is updated");
+//		} catch (FileNotFoundException e) {
+//			throw new SavanException(e);
+//		} catch (XMLStreamException e) {
+//			throw new SavanException(e);
+//		} catch (IOException e) {
+//			throw new SavanException(e);
+//		}
 	}
 	public void setId(URI id) {
 		this.id = id;
@@ -84,22 +93,22 @@
 		throw new UnsupportedOperationException();
 	}
 
-	public String getAuthor() {
-		return author;
-	}
+//	public String getAuthor() {
+//		return author;
+//	}
+//
+//	public void setAuthor(String author) {
+//		this.author = author;
+//	}
+//
+//
+//	public String getTitle() {
+//		return title;
+//	}
 
-	public void setAuthor(String author) {
-		this.author = author;
-	}
-
-
-	public String getTitle() {
-		return title;
-	}
-
-	public void setTitle(String title) {
-		this.title = title;
-	}
+//	public void setTitle(String title) {
+//		this.title = title;
+//	}
 	
 	
 	public String getFeedUrl(){
@@ -121,6 +130,15 @@
 	public void setFeedUrl(String feedUrl) {
 		this.feedUrl = feedUrl;
 	}
+//	public Feed getFeed() {
+//		return feed;
+//	}
+	public OMElement getFeedAsXml() throws SavanException {
+		return atomDataSource.getFeedAsXml(id.toString());
+	}
+//	public void setAtomDataSource(AtomDataSource atomDataSource) {
+//		this.atomDataSource = atomDataSource;
+//	}
 	
 	
 }
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
index 98203d6..a87c6f0 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
@@ -31,6 +31,7 @@
 import org.apache.axiom.soap.SOAPHeader;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ServiceContext;
 import org.apache.savan.SavanConstants;
 import org.apache.savan.SavanException;
 import org.apache.savan.SavanMessageContext;
@@ -98,6 +99,13 @@
 			if (envelope==null)
 				return null;
 			
+			ServiceContext serviceContext = smc.getMessageContext().getServiceContext();
+			AtomDataSource dataSource = (AtomDataSource)serviceContext.getProperty(AtomConstants.Properties.DataSource);
+			if(dataSource == null){
+				dataSource = new AtomDataSource();
+				serviceContext.setProperty(AtomConstants.Properties.DataSource, dataSource);
+			}
+			
 			String subscriberName = protocol.getDefaultSubscriber();
 			Subscriber subscriber = configurationManager.getSubscriberInstance(subscriberName);
 			
@@ -105,6 +113,8 @@
 				String message = "Savan only support implementations of Atom subscriber as Subscribers";
 				throw new SavanException (message);
 			}
+			
+			
 
 			//find the real path for atom feeds
 			File repositoryPath = smc.getConfigurationContext().getRealPath("/"); 
@@ -118,6 +128,7 @@
 			}
 			
 			AtomSubscriber atomSubscriber = (AtomSubscriber) subscriber;
+			
 			String id = UUIDGenerator.getUUID();
 			smc.setProperty(AtomConstants.TransferedProperties.SUBSCRIBER_UUID,id);
 			atomSubscriber.setId(new URI(id));
@@ -151,8 +162,8 @@
 				}
 				atomSubscriber.setFilter(filter);
 			}
-			atomSubscriber.setAuthor(createFeed.getAuthor());
-			atomSubscriber.setTitle(createFeed.getTitle());
+			
+			atomSubscriber.init(dataSource, new URI(id), createFeed.getTitle(), createFeed.getAuthor());
 			smc.setProperty(AtomConstants.Properties.feedUrl, atomSubscriber.getFeedUrl());
 			return atomSubscriber;
 		} catch (AxisFault e) {
diff --git a/modules/core/src/main/java/org/apache/savan/atom/Feed.java b/modules/core/src/main/java/org/apache/savan/atom/Feed.java
index 0bdebfb..aea30a4 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/Feed.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/Feed.java
@@ -56,11 +56,20 @@
 //	</feed>
 	
 	
-	public Feed(String title, String id, String author) {
+	public Feed(String title, String id, String author,Date lastUpdated) {
 		this.title = title;
+		if(title != null){
+			title = title.trim();
+		}
+		if(author != null){
+			author = author.trim();
+		}
+		
 		this.id = id;
 		this.author = author;
-		lastUpdated = new Date();
+		if(lastUpdated == null){
+			lastUpdated = new Date();	
+		}
 		factory = OMAbstractFactory.getOMFactory();
 		document = factory.createOMDocument();
 		atomNs = factory.createOMNamespace(AtomConstants.ATOM_NAMESPACE,AtomConstants.ATOM_PREFIX);
@@ -107,4 +116,11 @@
 //		
 //	}
 	
+	public OMElement getFeedAsXml(){
+		return document.getOMDocumentElement();
+	}
+	public OMFactory getFactory() {
+		return factory;
+	}
+	
 }
diff --git a/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java b/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java
index 793de80..49e3027 100644
--- a/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java
+++ b/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java
@@ -20,6 +20,7 @@
 public interface EventingConstants {
 
 	String EVENTING_NAMESPACE = "http://schemas.xmlsoap.org/ws/2004/08/eventing";
+	String EXTENDED_EVENTING_NAMESPACE = "http://ws.apache.org/ws/2007/05/eventing-extended";
 	String EVENTING_PREFIX = "wse";
 	String DEFAULT_DELIVERY_MODE = "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push";
 	String DEFAULT_FILTER_IDENTIFIER = FilterDialects.XPath;
@@ -46,6 +47,7 @@
 		String Unsubscribe = "Unsubscribe";
 		String GetStatus = "GetStatus";
 		String GetStatusResponse = "GetStatusResponse";
+		String Topic = "topic";
 	}
 	
 	interface Actions {
@@ -57,7 +59,7 @@
 		String UnsubscribeResponse = "http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse";
 		String GetStatus = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus";
 		String GetStatusResponse = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse";
-		String Publish = "http://wso2.com/ws/2007/05/eventing/Publish";
+		String Publish = "http://ws.apache.org/ws/2007/05/eventing-extended/Publish";
 	}
 	
 	interface Properties {
diff --git a/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java
index fe4ebba..020c0dd 100644
--- a/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java
+++ b/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java
@@ -1,17 +1,76 @@
 package org.apache.savan.messagereceiver;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.StringTokenizer;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.ServiceContext;
 import org.apache.axis2.engine.MessageReceiver;
+import org.apache.savan.atom.AtomConstants;
+import org.apache.savan.eventing.EventingConstants;
 import org.apache.savan.publication.client.PublicationClient;
 
+/**
+ * This Message reciver handles the publish requests. It will received all messages sent to SOAP/WS action 
+ * http://ws.apache.org/ws/2007/05/eventing-extended/Publish, or request URL http://<host>:port//services/<service-name>/publish. 
+ * It will search for topic in URL query parameter "topic" or
+ * Soap Header <eevt::topic xmlns="http://ws.apache.org/ws/2007/05/eventing-extended">...</topic>
+ * @author Srinath Perera (hemapani@apache.org)
+ */
 public class PublishingMessageReceiver implements MessageReceiver{
+	
 	public void receive(MessageContext messageCtx) throws AxisFault {
-		SOAPEnvelope requestEnvelope = messageCtx.getEnvelope();
-		ServiceContext serviceContext = messageCtx.getServiceContext();
-		PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext());
-		client.sendPublication(requestEnvelope.getBody().getFirstElement(),serviceContext.getAxisService(),null);
+		try {
+			String toAddress = messageCtx.getTo().getAddress();
+			//Here we try to locate the topic. It can be either a query parameter of the input address or a header
+			//in the SOAP evvelope
+			URI topic = null;
+			
+			SOAPEnvelope requestEnvelope = messageCtx.getEnvelope();
+			int querySeperatorIndex = toAddress.indexOf('?');
+			if(querySeperatorIndex > 0){
+				String queryString = toAddress.substring(querySeperatorIndex+1);
+				HashMap map = new HashMap();
+				StringTokenizer t = new StringTokenizer(queryString,"=&");
+				while(t.hasMoreTokens()){
+					map.put(t.nextToken(), t.nextToken());
+				}
+				if(map.containsKey(EventingConstants.ElementNames.Topic)){
+					topic = new URI((String)map.get(EventingConstants.ElementNames.Topic));
+				}
+			}else{
+				OMElement topicHeader = requestEnvelope.getHeader().getFirstChildWithName(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE,
+						EventingConstants.ElementNames.Topic));
+				if(topicHeader != null){
+					topic = new URI(topicHeader.getText());
+				}
+			}
+			
+			//Here we locate the content of the Event. If this is APP we unwrap APP wrapping elements. 
+			OMElement eventData = requestEnvelope.getBody().getFirstElement();
+			if(AtomConstants.ATOM_NAMESPACE.equals(eventData.getNamespace().getNamespaceURI()) && 
+					AtomConstants.ElementNames.Entry.equals(eventData.getLocalName())){
+				OMElement content = eventData.getFirstChildWithName(new QName(AtomConstants.ATOM_NAMESPACE,AtomConstants.ElementNames.Content));
+				if(content != null && content.getFirstElement() != null){
+					eventData.getFirstElement();
+				}
+			}
+			//Use in memory API to publish the event
+			ServiceContext serviceContext = messageCtx.getServiceContext();
+			PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext());
+			client.sendPublication(eventData,serviceContext.getAxisService(),topic);
+		} catch (OMException e) {
+			throw new AxisFault(e);
+		} catch (URISyntaxException e) {
+			throw new AxisFault(e);
+		}
 	}
 }
diff --git a/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java b/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java
new file mode 100644
index 0000000..e24a658
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java
@@ -0,0 +1,246 @@
+package org.apache.axis2.savan.atom;
+
+import java.io.StringWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.savan.SavanException;
+import org.apache.savan.atom.AtomDataSource;
+
+import junit.framework.TestCase;
+
+public class DerbyTest extends TestCase{
+	
+	    /* the default framework is embedded*/
+	    public String framework = "embedded";
+	    public String driver = "org.apache.derby.jdbc.EmbeddedDriver";
+	    public String protocol = "jdbc:derby:";
+	    
+	    public void testDataSource() throws Exception{
+	    	AtomDataSource dataSource = new AtomDataSource();
+	    	String id  = "id"+new Random().nextDouble();
+	    	dataSource.addFeed(id, "foo", new Date(),"Srinath");
+	    	dataSource.addEntry(id,getDummyMethodRequestElement());
+	    	StringWriter w = new StringWriter();
+	    	
+	    	OMElement result = dataSource.getFeedAsXml(id); 
+//	    	Iterator it = result.getChildElements();
+//	    	while(it.hasNext()){
+//	    		System.out.println(it.next());
+//	    	}
+	    	
+	    	result.serialize(w);
+	    	System.out.println(w.getBuffer().toString());
+	    }
+
+//	    public static void main(String[] args)
+//	    {
+//	        new DerbyTest().go(args);
+//	    }
+
+//	    void go(String[] args)
+//	    {
+//	        /* parse the arguments to determine which framework is desired*/
+//	        parseArguments(args);
+//
+//	        System.out.println("SimpleApp starting in " + framework + " mode.");
+//
+//	        try
+//	        {
+//	            /*
+//	               The driver is installed by loading its class.
+//	               In an embedded environment, this will start up Derby, since it is not already running.
+//	             */
+//	            Class.forName(driver).newInstance();
+//	            System.out.println("Loaded the appropriate driver.");
+//
+//	            Connection conn = null;
+//	            Properties props = new Properties();
+//	            props.put("user", "user1");
+//	            props.put("password", "user1");
+//
+//	            /*
+//	               The connection specifies create=true to cause
+//	               the database to be created. To remove the database,
+//	               remove the directory derbyDB and its contents.
+//	               The directory derbyDB will be created under
+//	               the directory that the system property
+//	               derby.system.home points to, or the current
+//	               directory if derby.system.home is not set.
+//	             */
+//	            conn = DriverManager.getConnection(protocol +
+//	                    "derbyDB;create=true", props);
+//
+//	            System.out.println("Connected to and created database derbyDB");
+//
+//	            conn.setAutoCommit(false);
+//
+//	            /*
+//	               Creating a statement lets us issue commands against
+//	               the connection.
+//	             */
+//	            Statement s = conn.createStatement();
+//
+//	            /*
+//	               We create a table, add a few rows, and update one.
+//	             */
+//	            s.execute("create table derbyDB(num int, addr varchar(40))");
+//	            System.out.println("Created table derbyDB");
+//	            s.execute("insert into derbyDB values (1956,'Webster St.')");
+//	            System.out.println("Inserted 1956 Webster");
+//	            s.execute("insert into derbyDB values (1910,'Union St.')");
+//	            System.out.println("Inserted 1910 Union");
+//	            s.execute(
+//	                "update derbyDB set num=180, addr='Grand Ave.' where num=1956");
+//	            System.out.println("Updated 1956 Webster to 180 Grand");
+//
+//	            s.execute(
+//	                "update derbyDB set num=300, addr='Lakeshore Ave.' where num=180");
+//	            System.out.println("Updated 180 Grand to 300 Lakeshore");
+//
+//	            /*
+//	               We select the rows and verify the results.
+//	             */
+//	            ResultSet rs = s.executeQuery(
+//	                    "SELECT num, addr FROM derbyDB ORDER BY num");
+//
+//	            if (!rs.next())
+//	            {
+//	                throw new Exception("Wrong number of rows");
+//	            }
+//
+//	            if (rs.getInt(1) != 300)
+//	            {
+//	                throw new Exception("Wrong row returned");
+//	            }
+//
+//	            if (!rs.next())
+//	            {
+//	                throw new Exception("Wrong number of rows");
+//	            }
+//
+//	            if (rs.getInt(1) != 1910)
+//	            {
+//	                throw new Exception("Wrong row returned");
+//	            }
+//
+//	            if (rs.next())
+//	            {
+//	                throw new Exception("Wrong number of rows");
+//	            }
+//
+//	            System.out.println("Verified the rows");
+//
+//	            s.execute("drop table derbyDB");
+//	            System.out.println("Dropped table derbyDB");
+//
+//	            /*
+//	               We release the result and statement resources.
+//	             */
+//	            rs.close();
+//	            s.close();
+//	            System.out.println("Closed result set and statement");
+//
+//	            /*
+//	               We end the transaction and the connection.
+//	             */
+//	            conn.commit();
+//	            conn.close();
+//	            System.out.println("Committed transaction and closed connection");
+//
+//	            /*
+//	               In embedded mode, an application should shut down Derby.
+//	               If the application fails to shut down Derby explicitly,
+//	               the Derby does not perform a checkpoint when the JVM shuts down, which means
+//	               that the next connection will be slower.
+//	               Explicitly shutting down Derby with the URL is preferred.
+//	               This style of shutdown will always throw an "exception".
+//	             */
+//	            boolean gotSQLExc = false;
+//
+//	            if (framework.equals("embedded"))
+//	            {
+//	                try
+//	                {
+//	                    DriverManager.getConnection("jdbc:derby:;shutdown=true");
+//	                }
+//	                catch (SQLException se)
+//	                {
+//	                    gotSQLExc = true;
+//	                }
+//
+//	                if (!gotSQLExc)
+//	                {
+//	                    System.out.println("Database did not shut down normally");
+//	                }
+//	                else
+//	                {
+//	                    System.out.println("Database shut down normally");
+//	                }
+//	            }
+//	        }
+//	        catch (Throwable e)
+//	        {
+//	            System.out.println("exception thrown:");
+//
+//	            if (e instanceof SQLException)
+//	            {
+//	                printSQLError((SQLException) e);
+//	            }
+//	            else
+//	            {
+//	                e.printStackTrace();
+//	            }
+//	        }
+//
+//	        System.out.println("SimpleApp finished");
+//	    }
+//
+//	    static void printSQLError(SQLException e)
+//	    {
+//	        while (e != null)
+//	        {
+//	            System.out.println(e.toString());
+//	            e = e.getNextException();
+//	        }
+//	    }
+//
+//	    private void parseArguments(String[] args)
+//	    {
+//	        int length = args.length;
+//
+//	        for (int index = 0; index < length; index++)
+//	        {
+//	            if (args[index].equalsIgnoreCase("jccjdbcclient"))
+//	            {
+//	                framework = "jccjdbc";
+//	                driver = "com.ibm.db2.jcc.DB2Driver";
+//	                protocol = "jdbc:derby:net://localhost:1527/";
+//	            }
+//	            if (args[index].equalsIgnoreCase("derbyclient"))
+//	            {
+//	                framework = "derbyclient";
+//	                driver = "org.apache.derby.jdbc.ClientDriver";
+//	                protocol = "jdbc:derby://localhost:1527/";
+//	            }
+//	        }
+//	    }
+	    private final String applicationNamespaceName = "http://tempuri.org/"; 
+		private final String dummyMethod = "dummyMethod";
+	    private OMElement getDummyMethodRequestElement() {
+			OMFactory fac = OMAbstractFactory.getOMFactory();
+			OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1");
+			return fac.createOMElement(dummyMethod, namespace);
+		}
+	}
diff --git a/modules/mar/module.xml b/modules/mar/module.xml
index 65a7bcf..d5b2b4b 100644
--- a/modules/mar/module.xml
+++ b/modules/mar/module.xml
@@ -18,7 +18,7 @@
     

     <operation name="publish" mep="http://www.w3.org/2004/08/wsdl/in-out">

         <messageReceiver class="org.apache.savan.messagereceiver.PublishingMessageReceiver"/>

-        <actionMapping>http://wso2.com/ws/2007/05/eventing/Publish</actionMapping>

+        <actionMapping>http://ws.apache.org/ws/2007/05/eventing-extended/Publish</actionMapping>

 	</operation>        

     

 </module>