add client methods to AtomEventing Client
git-svn-id: https://svn.apache.org/repos/asf/webservices/savan/trunk/java@545726 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
index 60feee7..fde0b88 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java
@@ -1,6 +1,10 @@
package org.apache.savan.atom;
+/**
+ *
+ * @author Srinath Perera(hemapani@apache.org)
+ */
public class AtomConstants {
public static String ATOM_NAMESPACE = "http://www.w3.org/2005/Atom";
public static String ATOM_PREFIX = "atom";
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
index 9cced07..f062cce 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java
@@ -10,24 +10,24 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMDocument;
import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
import org.apache.axiom.om.impl.builder.StAXBuilder;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axiom.om.util.StAXUtils;
import org.apache.axis2.context.MessageContext;
import org.apache.savan.SavanException;
+/**
+ * This class interface between Derby and Savan atom implementation
+ * @author Srinath Perera(hemapani@apache.org)
+ *
+ */
public class AtomDataSource {
public static final String SQL_CREATE_FEEDS = "CREATE TABLE FEEDS(id CHAR(250) NOT NULL, " +
"title CHAR(250), updated TIMESTAMP, author CHAR(250), PRIMARY KEY(id))";
@@ -182,9 +182,4 @@
throw new SavanException(e);
}
}
-
-
-
-
-
}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java b/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java
index 701f663..ca81043 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomEventingClient.java
@@ -1,16 +1,30 @@
package org.apache.savan.atom;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Calendar;
import java.util.Iterator;
import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.http.HttpStatus;
+import org.apache.savan.SavanException;
+import org.apache.savan.eventing.EventingConstants;
import org.apache.savan.filters.XPathBasedFilter;
import org.apache.savan.util.CommonUtil;
import org.apache.xmlbeans.XmlException;
@@ -21,74 +35,202 @@
import com.wso2.eventing.atom.CreateFeedDocument.CreateFeed;
import com.wso2.eventing.atom.CreateFeedResponseDocument.CreateFeedResponse;
+/**
+ * This class take provide client interface for Savan atom support
+ *
+ * @author Srinath Perera(hemapani@apache.org)
+ *
+ */
public class AtomEventingClient {
private ServiceClient serviceClient = null;
+
private EndpointReference feedEpr;
-
- public AtomEventingClient(ServiceClient serviceClient){
+
+ public AtomEventingClient(ServiceClient serviceClient) {
this.serviceClient = serviceClient;
}
-
- public CreateFeedResponse createFeed(String title,String author) throws AxisFault{
- return createFeed(title, author,null,null);
+
+ public CreateFeedResponse createFeed(String title, String author)
+ throws AxisFault {
+ return createFeed(title, author, null, null);
}
- public CreateFeedResponse createFeed(String title,String author,Calendar expiredTime,String xpathFilter) throws AxisFault{
+
+ public CreateFeedResponse createFeed(String title, String author,
+ Calendar expiredTime, String xpathFilter) throws AxisFault {
try {
- serviceClient.getOptions().setAction(AtomConstants.Actions.Subscribe);
-
- CreateFeedDocument createFeedDocument = CreateFeedDocument.Factory.newInstance();
+ serviceClient.getOptions().setAction(
+ AtomConstants.Actions.Subscribe);
+
+ CreateFeedDocument createFeedDocument = CreateFeedDocument.Factory
+ .newInstance();
CreateFeed createFeed = createFeedDocument.addNewCreateFeed();
-
+
createFeed.setAuthor(author);
createFeed.setTitle(title);
-
- if(expiredTime != null){
- createFeed.setExpires(expiredTime);
+
+ if (expiredTime != null) {
+ createFeed.setExpires(expiredTime);
}
- if(xpathFilter != null){
+ if (xpathFilter != null) {
FilterType filter = createFeed.addNewFilter();
filter.setDialect(XPathBasedFilter.XPATH_BASED_FILTER);
filter.setStringValue(xpathFilter);
}
-
+
OMElement request = CommonUtil.toOM(createFeedDocument);
request.build();
OMElement element = serviceClient.sendReceive(request);
- CreateFeedResponseDocument createFeedResponseDocument = CreateFeedResponseDocument.Factory.parse(element.getXMLStreamReader());
+ CreateFeedResponseDocument createFeedResponseDocument = CreateFeedResponseDocument.Factory
+ .parse(element.getXMLStreamReader());
System.out.println(createFeedDocument.xmlText());
- //read epr for subscription from response and store it
- OMElement responseAsOM = CommonUtil.toOM(createFeedResponseDocument);
- OMElement eprAsOM = responseAsOM.getFirstChildWithName(new QName(AtomConstants.ATOM_MSG_NAMESPACE,"SubscriptionManager"));
-
+ // read epr for subscription from response and store it
+ OMElement responseAsOM = CommonUtil
+ .toOM(createFeedResponseDocument);
+ OMElement eprAsOM = responseAsOM.getFirstChildWithName(new QName(
+ AtomConstants.ATOM_MSG_NAMESPACE, "SubscriptionManager"));
+
feedEpr = new EndpointReference(eprAsOM.getFirstElement().getText());
- OMElement referanceParameters = eprAsOM.getFirstChildWithName(new QName(eprAsOM.getFirstElement().getNamespace().getNamespaceURI(),
- AddressingConstants.EPR_REFERENCE_PARAMETERS));
+ OMElement referanceParameters = eprAsOM
+ .getFirstChildWithName(new QName(eprAsOM.getFirstElement()
+ .getNamespace().getNamespaceURI(),
+ AddressingConstants.EPR_REFERENCE_PARAMETERS));
Iterator refparams = referanceParameters.getChildElements();
- while(refparams.hasNext()){
- feedEpr.addReferenceParameter((OMElement)refparams.next());
+ while (refparams.hasNext()) {
+ feedEpr.addReferenceParameter((OMElement) refparams.next());
}
-
+
return createFeedResponseDocument.getCreateFeedResponse();
} catch (XmlException e) {
throw new AxisFault(e);
- }
- }
-
- public void deleteFeed(EndpointReference epr)throws AxisFault{
- serviceClient.getOptions().setAction(AtomConstants.Actions.Unsubscribe);
- serviceClient.getOptions().setTo(epr);
-
- OMElement request = OMAbstractFactory.getOMFactory().createOMElement(new QName(AtomConstants.ATOM_MSG_NAMESPACE,"DeleteFeed"));
- serviceClient.sendReceive(request);
- }
-
-
- public void deleteFeed()throws AxisFault{
- if(feedEpr != null){
- deleteFeed(feedEpr);
- }else{
- throw new AxisFault("No feed epr alreday stored, you must have create a feed using same AtomEventingClient Object");
}
}
+
+ public void deleteFeed(EndpointReference epr) throws AxisFault {
+ serviceClient.getOptions().setAction(AtomConstants.Actions.Unsubscribe);
+ serviceClient.getOptions().setTo(epr);
+
+ OMElement request = OMAbstractFactory.getOMFactory().createOMElement(
+ new QName(AtomConstants.ATOM_MSG_NAMESPACE, "DeleteFeed"));
+ serviceClient.sendReceive(request);
+ }
+
+ public void deleteFeed() throws AxisFault {
+ if (feedEpr != null) {
+ deleteFeed(feedEpr);
+ } else {
+ throw new AxisFault(
+ "No feed epr alreday stored, you must have create a feed using same AtomEventingClient Object");
+ }
+ }
+
+ public OMElement fetchFeed(String url) throws SavanException {
+ // Create an instance of HttpClient.
+ HttpClient client = new HttpClient();
+
+ // Create a method instance.
+ GetMethod method = new GetMethod(url);
+
+ try {
+ // Execute the method.
+ int statusCode = client.executeMethod(method);
+
+ if (statusCode != HttpStatus.SC_OK) {
+ throw new SavanException("Method failed: " + method.getStatusLine());
+ }
+
+ // Read the response body.
+ byte[] responseBody = method.getResponseBody();
+
+ StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(
+ responseBody));
+ return builder.getDocumentElement();
+ } catch (IOException e) {
+ throw new SavanException(e);
+ } catch (XMLStreamException e) {
+ throw new SavanException(e);
+ } finally {
+ // Release the connection.
+ method.releaseConnection();
+ }
+ }
+
+ public void publishWithREST(String serviceurl, final OMElement content,String topic)
+ throws SavanException {
+ // Create an instance of HttpClient.
+ HttpClient client = new HttpClient();
+
+ StringBuffer queryUrl = new StringBuffer(serviceurl);
+
+ if(!serviceurl.endsWith("/")){
+ queryUrl.append("/");
+ }
+ queryUrl.append("publish");
+ if(topic != null ){
+ queryUrl.append("?").append(EventingConstants.ElementNames.Topic).append("=").append(topic);
+ }
+ PostMethod method = new PostMethod(queryUrl.toString());
+ // Request content will be retrieved directly
+ // from the input stream
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ content.serialize(out);
+ out.flush();
+ final byte[] data = out.toByteArray();
+
+ RequestEntity entity = new RequestEntity() {
+
+ public void writeRequest(OutputStream outstream)
+ throws IOException {
+ outstream.write(data);
+ }
+
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ public String getContentType() {
+ return "text/xml";
+ }
+
+ public long getContentLength() {
+ return data.length;
+ }
+
+ };
+ method.setRequestEntity(entity);
+
+ // Execute the method.
+ int statusCode = client.executeMethod(method);
+
+ if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_ACCEPTED) {
+ throw new SavanException("Method failed: " + method.getStatusLine());
+ }
+
+ } catch (IOException e) {
+ throw new SavanException(e);
+ } catch (XMLStreamException e) {
+ throw new SavanException(e);
+ } finally {
+ // Release the connection.
+ method.releaseConnection();
+ }
+ }
+
+ public void publishWithSOAP(String serviceurl, final OMElement content,String topic) throws SavanException{
+ try {
+ Options options = serviceClient.getOptions();
+ EndpointReference to = new EndpointReference(serviceurl);
+ if(topic != null){
+ to.addReferenceParameter(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE,
+ EventingConstants.ElementNames.Topic), topic);
+ }
+ options.setAction(EventingConstants.Actions.Publish);
+ serviceClient.fireAndForget(content);
+ } catch (AxisFault e) {
+ throw new SavanException(e);
+ }
+ }
+
+
}
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
index 8d817fb..c9c4001 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java
@@ -31,6 +31,11 @@
import org.apache.savan.storage.SubscriberStore;
import org.apache.savan.util.CommonUtil;
+/**
+ * Handle the HTTP GET requests for feeds
+ * @author Srinath Perera(hemapani@apache.org)
+ */
+
public class AtomMessageReceiver implements MessageReceiver{
public static final String ATOM_NAME = "atom";
diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
index a87c6f0..8105462 100644
--- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
+++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java
@@ -30,7 +30,6 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPHeader;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.savan.SavanConstants;
import org.apache.savan.SavanException;
@@ -39,7 +38,6 @@
import org.apache.savan.configuration.Protocol;
import org.apache.savan.filters.Filter;
import org.apache.savan.filters.XPathBasedFilter;
-import org.apache.savan.storage.SubscriberStore;
import org.apache.savan.subscribers.Subscriber;
import org.apache.savan.subscription.ExpirationBean;
import org.apache.savan.subscription.SubscriptionProcessor;
diff --git a/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java b/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java
index bad3eb8..995b547 100644
--- a/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java
+++ b/modules/core/src/test/java/org/apache/axis2/savan/atom/AtomTest.java
@@ -28,6 +28,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.OMOutputFormat;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
@@ -137,22 +138,31 @@
CreateFeedResponse createFeedResponse = atomEventingClient.createFeed("test Title","Srinath Perera");
options.setAction("http://wso2.com/eventing/dummyMethod");
- serviceClient.fireAndForget(getDummyMethodRequestElement ());
+ serviceClient.fireAndForget(getDummyMethodRequestElement (0));
- options.setAction(EventingConstants.Actions.Publish);
- serviceClient.fireAndForget(getDummyMethodRequestElement ());
+// options.setAction(EventingConstants.Actions.Publish);
+// serviceClient.fireAndForget(getDummyMethodRequestElement ());
+ atomEventingClient.publishWithSOAP(toAddress, getDummyMethodRequestElement (1), null);
+ atomEventingClient.publishWithREST(toAddress, getDummyMethodRequestElement (2), null);
//Thread.sleep(1000*10*1000);
- int i = 0;
- while(i<1){
+// int i = 0;
+// while(i<1){
System.out.println(createFeedResponse.getFeedUrl());
- URL url = new URL(createFeedResponse.getFeedUrl());
- System.out.println(readFromStream(url.openStream()));
- Thread.sleep(1000*10);
- i++;
- }
+ OMElement feedAsXml = atomEventingClient.fetchFeed(createFeedResponse.getFeedUrl());
+ feedAsXml.serialize(System.out,new OMOutputFormat());
+
+// URL url = new URL(createFeedResponse.getFeedUrl());
+// System.out.println(readFromStream(url.openStream()));
+// Thread.sleep(1000*10);
+// i++;
+// }
//
+ feedAsXml = atomEventingClient.fetchFeed(createFeedResponse.getFeedUrl());
+ feedAsXml.serialize(System.out,new OMOutputFormat());
+
+
atomEventingClient.deleteFeed();
@@ -272,10 +282,12 @@
System.out.println("Status of the subscriber '" + ID +"' is" + statusValue);
}
- private OMElement getDummyMethodRequestElement() {
+ private OMElement getDummyMethodRequestElement(int i) {
OMFactory fac = OMAbstractFactory.getOMFactory();
OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1");
- return fac.createOMElement(dummyMethod, namespace);
+ OMElement de = fac.createOMElement(dummyMethod, namespace);
+ de.setText(String.valueOf(i));
+ return de;
}
public static String readFromStream(InputStream in) throws Exception{