add client methods to AtomEventing Client

git-svn-id: https://svn.apache.org/repos/asf/webservices/savan/trunk/java@545726 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
index 60feee7..fde0b88 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
@@ -1,6 +1,10 @@
 
 package org.apache.savan.atom;
 
+/**
+ * 
+ * @author Srinath Perera(hemapani@apache.org)
+ */
 public class AtomConstants {
 	public static String ATOM_NAMESPACE = "http://www.w3.org/2005/Atom";
 	public static String ATOM_PREFIX = "atom";
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
index 9cced07..f062cce 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
@@ -10,24 +10,24 @@
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Properties;
 
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
 
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMDocument;
 import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
 import org.apache.axiom.om.impl.builder.StAXBuilder;
 import org.apache.axiom.om.impl.builder.StAXOMBuilder;
 import org.apache.axiom.om.util.StAXUtils;
 import org.apache.axis2.context.MessageContext;
 import org.apache.savan.SavanException;
 
+/**
+ * This class interface between Derby and Savan atom implementation 
+ * @author Srinath Perera(hemapani@apache.org)
+ *
+ */
 public class AtomDataSource {
 	public static final String SQL_CREATE_FEEDS = "CREATE TABLE FEEDS(id CHAR(250) NOT NULL, " +
 			"title CHAR(250), updated TIMESTAMP, author CHAR(250), PRIMARY KEY(id))";
@@ -182,9 +182,4 @@
 			throw new SavanException(e);
 		}
 	}
-	
-	
-	
-	
-	
 }
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java b/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java
index 701f663..ca81043 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java
@@ -1,16 +1,30 @@
 package org.apache.savan.atom;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Calendar;
 import java.util.Iterator;
 
 import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
 
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
 import org.apache.axis2.client.ServiceClient;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.http.HttpStatus;
+import org.apache.savan.SavanException;
+import org.apache.savan.eventing.EventingConstants;
 import org.apache.savan.filters.XPathBasedFilter;
 import org.apache.savan.util.CommonUtil;
 import org.apache.xmlbeans.XmlException;
@@ -21,74 +35,202 @@
 import com.wso2.eventing.atom.CreateFeedDocument.CreateFeed;
 import com.wso2.eventing.atom.CreateFeedResponseDocument.CreateFeedResponse;
 
+/**
+ * This class take provide client interface for Savan atom support
+ * 
+ * @author Srinath Perera(hemapani@apache.org)
+ * 
+ */
 public class AtomEventingClient {
 	private ServiceClient serviceClient = null;
+
 	private EndpointReference feedEpr;
-	
-	public AtomEventingClient(ServiceClient serviceClient){
+
+	public AtomEventingClient(ServiceClient serviceClient) {
 		this.serviceClient = serviceClient;
 	}
-	
-	public CreateFeedResponse createFeed(String title,String author) throws AxisFault{
-		return createFeed(title, author,null,null);
+
+	public CreateFeedResponse createFeed(String title, String author)
+			throws AxisFault {
+		return createFeed(title, author, null, null);
 	}
-	public CreateFeedResponse createFeed(String title,String author,Calendar expiredTime,String xpathFilter) throws AxisFault{
+
+	public CreateFeedResponse createFeed(String title, String author,
+			Calendar expiredTime, String xpathFilter) throws AxisFault {
 		try {
-			serviceClient.getOptions().setAction(AtomConstants.Actions.Subscribe);
-			
-			CreateFeedDocument createFeedDocument = CreateFeedDocument.Factory.newInstance();
+			serviceClient.getOptions().setAction(
+					AtomConstants.Actions.Subscribe);
+
+			CreateFeedDocument createFeedDocument = CreateFeedDocument.Factory
+					.newInstance();
 			CreateFeed createFeed = createFeedDocument.addNewCreateFeed();
-			
+
 			createFeed.setAuthor(author);
 			createFeed.setTitle(title);
-			
-			if(expiredTime != null){
-				createFeed.setExpires(expiredTime);	
+
+			if (expiredTime != null) {
+				createFeed.setExpires(expiredTime);
 			}
-			if(xpathFilter != null){
+			if (xpathFilter != null) {
 				FilterType filter = createFeed.addNewFilter();
 				filter.setDialect(XPathBasedFilter.XPATH_BASED_FILTER);
 				filter.setStringValue(xpathFilter);
 			}
-			
+
 			OMElement request = CommonUtil.toOM(createFeedDocument);
 			request.build();
 			OMElement element = serviceClient.sendReceive(request);
-			CreateFeedResponseDocument createFeedResponseDocument = CreateFeedResponseDocument.Factory.parse(element.getXMLStreamReader());
+			CreateFeedResponseDocument createFeedResponseDocument = CreateFeedResponseDocument.Factory
+					.parse(element.getXMLStreamReader());
 			System.out.println(createFeedDocument.xmlText());
 
-			//read epr for subscription from response and store it
-			OMElement responseAsOM = CommonUtil.toOM(createFeedResponseDocument);
-			OMElement eprAsOM = responseAsOM.getFirstChildWithName(new QName(AtomConstants.ATOM_MSG_NAMESPACE,"SubscriptionManager"));
-			
+			// read epr for subscription from response and store it
+			OMElement responseAsOM = CommonUtil
+					.toOM(createFeedResponseDocument);
+			OMElement eprAsOM = responseAsOM.getFirstChildWithName(new QName(
+					AtomConstants.ATOM_MSG_NAMESPACE, "SubscriptionManager"));
+
 			feedEpr = new EndpointReference(eprAsOM.getFirstElement().getText());
-			OMElement referanceParameters = eprAsOM.getFirstChildWithName(new QName(eprAsOM.getFirstElement().getNamespace().getNamespaceURI(),
-					AddressingConstants.EPR_REFERENCE_PARAMETERS));
+			OMElement referanceParameters = eprAsOM
+					.getFirstChildWithName(new QName(eprAsOM.getFirstElement()
+							.getNamespace().getNamespaceURI(),
+							AddressingConstants.EPR_REFERENCE_PARAMETERS));
 			Iterator refparams = referanceParameters.getChildElements();
-			while(refparams.hasNext()){
-				feedEpr.addReferenceParameter((OMElement)refparams.next());	
+			while (refparams.hasNext()) {
+				feedEpr.addReferenceParameter((OMElement) refparams.next());
 			}
-			
+
 			return createFeedResponseDocument.getCreateFeedResponse();
 		} catch (XmlException e) {
 			throw new AxisFault(e);
-		} 
-	}
-	
-	public void deleteFeed(EndpointReference epr)throws AxisFault{
-		serviceClient.getOptions().setAction(AtomConstants.Actions.Unsubscribe);
-		serviceClient.getOptions().setTo(epr);
-		
-		OMElement request = OMAbstractFactory.getOMFactory().createOMElement(new QName(AtomConstants.ATOM_MSG_NAMESPACE,"DeleteFeed"));
-		serviceClient.sendReceive(request);
-	}
-	
-	
-	public void deleteFeed()throws AxisFault{
-		if(feedEpr != null){
-			deleteFeed(feedEpr);
-		}else{
-			throw new AxisFault("No feed epr alreday stored, you must have create a feed using same AtomEventingClient Object");
 		}
 	}
+
+	public void deleteFeed(EndpointReference epr) throws AxisFault {
+		serviceClient.getOptions().setAction(AtomConstants.Actions.Unsubscribe);
+		serviceClient.getOptions().setTo(epr);
+
+		OMElement request = OMAbstractFactory.getOMFactory().createOMElement(
+				new QName(AtomConstants.ATOM_MSG_NAMESPACE, "DeleteFeed"));
+		serviceClient.sendReceive(request);
+	}
+
+	public void deleteFeed() throws AxisFault {
+		if (feedEpr != null) {
+			deleteFeed(feedEpr);
+		} else {
+			throw new AxisFault(
+					"No feed epr alreday stored, you must have create a feed using same AtomEventingClient Object");
+		}
+	}
+
+	public OMElement fetchFeed(String url) throws SavanException {
+		// Create an instance of HttpClient.
+		HttpClient client = new HttpClient();
+
+		// Create a method instance.
+		GetMethod method = new GetMethod(url);
+
+		try {
+			// Execute the method.
+			int statusCode = client.executeMethod(method);
+
+			if (statusCode != HttpStatus.SC_OK) {
+				throw new SavanException("Method failed: " + method.getStatusLine());
+			}
+
+			// Read the response body.
+			byte[] responseBody = method.getResponseBody();
+
+			StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(
+					responseBody));
+			return builder.getDocumentElement();
+		} catch (IOException e) {
+			throw new SavanException(e);
+		} catch (XMLStreamException e) {
+			throw new SavanException(e);
+		} finally {
+			// Release the connection.
+			method.releaseConnection();
+		}
+	}
+
+	public void publishWithREST(String serviceurl, final OMElement content,String topic)
+			throws SavanException {
+		// Create an instance of HttpClient.
+		HttpClient client = new HttpClient();
+
+		StringBuffer queryUrl = new StringBuffer(serviceurl);
+		
+		if(!serviceurl.endsWith("/")){
+			queryUrl.append("/");
+		}
+		queryUrl.append("publish");
+		if(topic != null ){
+			queryUrl.append("?").append(EventingConstants.ElementNames.Topic).append("=").append(topic);	
+		}
+		PostMethod method = new PostMethod(queryUrl.toString());
+		// Request content will be retrieved directly
+		// from the input stream
+		try {
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			content.serialize(out);
+			out.flush();
+			final byte[] data = out.toByteArray();
+
+			RequestEntity entity = new RequestEntity() {
+
+				public void writeRequest(OutputStream outstream)
+						throws IOException {
+					outstream.write(data);
+				}
+
+				public boolean isRepeatable() {
+					return false;
+				}
+
+				public String getContentType() {
+					return "text/xml";
+				}
+
+				public long getContentLength() {
+					return data.length;
+				}
+
+			};
+			method.setRequestEntity(entity);
+
+			// Execute the method.
+			int statusCode = client.executeMethod(method);
+
+			if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_ACCEPTED) {
+				throw new SavanException("Method failed: " + method.getStatusLine());
+			}
+
+		} catch (IOException e) {
+			throw new SavanException(e);
+		} catch (XMLStreamException e) {
+			throw new SavanException(e);
+		} finally {
+			// Release the connection.
+			method.releaseConnection();
+		}
+	}
+	
+	public void publishWithSOAP(String serviceurl, final OMElement content,String topic) throws SavanException{
+		try {
+			Options options = serviceClient.getOptions();
+			EndpointReference to = new EndpointReference(serviceurl);
+			if(topic != null){
+				to.addReferenceParameter(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE,
+						EventingConstants.ElementNames.Topic), topic);
+			}
+			options.setAction(EventingConstants.Actions.Publish);
+			serviceClient.fireAndForget(content);
+		} catch (AxisFault e) {
+			throw new SavanException(e);
+		}
+	}
+	
+
 }
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
index 8d817fb..c9c4001 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
@@ -31,6 +31,11 @@
 import org.apache.savan.storage.SubscriberStore;
 import org.apache.savan.util.CommonUtil;
 
+/**
+ * Handle the HTTP GET requests for feeds
+ * @author Srinath Perera(hemapani@apache.org)
+ */
+
 public class AtomMessageReceiver implements MessageReceiver{
 	
 	public static final String ATOM_NAME = "atom";
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
index a87c6f0..8105462 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
@@ -30,7 +30,6 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPHeader;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.ServiceContext;
 import org.apache.savan.SavanConstants;
 import org.apache.savan.SavanException;
@@ -39,7 +38,6 @@
 import org.apache.savan.configuration.Protocol;
 import org.apache.savan.filters.Filter;
 import org.apache.savan.filters.XPathBasedFilter;
-import org.apache.savan.storage.SubscriberStore;
 import org.apache.savan.subscribers.Subscriber;
 import org.apache.savan.subscription.ExpirationBean;
 import org.apache.savan.subscription.SubscriptionProcessor;
diff --git a/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java b/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java
index bad3eb8..995b547 100644
--- a/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java
+++ b/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java
@@ -28,6 +28,7 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMFactory;
 import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.OMOutputFormat;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.client.Options;
@@ -137,22 +138,31 @@
 		CreateFeedResponse createFeedResponse = atomEventingClient.createFeed("test Title","Srinath Perera");
 		
 		options.setAction("http://wso2.com/eventing/dummyMethod");
-		serviceClient.fireAndForget(getDummyMethodRequestElement ());
+		serviceClient.fireAndForget(getDummyMethodRequestElement (0));
 		
-		options.setAction(EventingConstants.Actions.Publish);
-		serviceClient.fireAndForget(getDummyMethodRequestElement ());
+//		options.setAction(EventingConstants.Actions.Publish);
+//		serviceClient.fireAndForget(getDummyMethodRequestElement ());
 		
+		atomEventingClient.publishWithSOAP(toAddress, getDummyMethodRequestElement (1), null);
+		atomEventingClient.publishWithREST(toAddress, getDummyMethodRequestElement (2), null);
 		//Thread.sleep(1000*10*1000);
 		
-		int i = 0;
-		while(i<1){
+//		int i = 0;
+//		while(i<1){
 			System.out.println(createFeedResponse.getFeedUrl());
-			URL url = new URL(createFeedResponse.getFeedUrl());
-			System.out.println(readFromStream(url.openStream()));
-			Thread.sleep(1000*10);	
-			i++;
-		}
+			OMElement feedAsXml = atomEventingClient.fetchFeed(createFeedResponse.getFeedUrl());
+			feedAsXml.serialize(System.out,new OMOutputFormat());
+			
+//			URL url = new URL(createFeedResponse.getFeedUrl());
+//			System.out.println(readFromStream(url.openStream()));
+//			Thread.sleep(1000*10);	
+//			i++;
+//		}
 //		
+		feedAsXml = atomEventingClient.fetchFeed(createFeedResponse.getFeedUrl());
+		feedAsXml.serialize(System.out,new OMOutputFormat());	
+			
+			
 		atomEventingClient.deleteFeed();
 		
 		
@@ -272,10 +282,12 @@
 		System.out.println("Status of the subscriber '" + ID +"' is" + statusValue);
 	}
 	
-	private OMElement getDummyMethodRequestElement() {
+	private OMElement getDummyMethodRequestElement(int i) {
 		OMFactory fac = OMAbstractFactory.getOMFactory();
 		OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1");
-		return fac.createOMElement(dummyMethod, namespace);
+		OMElement de =  fac.createOMElement(dummyMethod, namespace);
+		de.setText(String.valueOf(i));
+		return de;
 	}
     
 	public static String readFromStream(InputStream in) throws Exception{