blob: 44d4b1675e383a35b5148d28083f6146fac8d7ad [file] [log] [blame]
package org.apache.savan.atom;
import com.wso2.eventing.atom.CreateFeedDocument;
import com.wso2.eventing.atom.CreateFeedDocument.CreateFeed;
import com.wso2.eventing.atom.CreateFeedResponseDocument;
import com.wso2.eventing.atom.CreateFeedResponseDocument.CreateFeedResponse;
import com.wso2.eventing.atom.FilterType;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.http.HttpStatus;
import org.apache.savan.SavanException;
import org.apache.savan.eventing.EventingConstants;
import org.apache.savan.filters.XPathBasedFilter;
import org.apache.savan.util.CommonUtil;
import org.apache.xmlbeans.XmlException;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Calendar;
import java.util.Iterator;
/**
* This class take provide client interface for Savan atom support
*
* @author Srinath Perera(hemapani@apache.org)
*/
public class AtomEventingClient {
private ServiceClient serviceClient = null;
private EndpointReference feedEpr;
public AtomEventingClient(String serviceUrl, String clientRepository) throws AxisFault {
ConfigurationContext configContext = ConfigurationContextFactory
.createConfigurationContextFromFileSystem(clientRepository,
clientRepository + "/conf/axis2.xml");
serviceClient = new ServiceClient(configContext, null); //TODO give a repo
Options options = new Options();
serviceClient.setOptions(options);
serviceClient.engageModule(new QName("addressing"));
options.setTo(new EndpointReference(serviceUrl));
}
public AtomEventingClient(ServiceClient serviceClient) {
this.serviceClient = serviceClient;
}
public CreateFeedResponse createFeed(String title, String author)
throws AxisFault {
return createFeed(title, author, null, null);
}
public CreateFeedResponse createFeed(String title, String author,
Calendar expiredTime, String xpathFilter)
throws AxisFault {
try {
serviceClient.getOptions().setAction(
AtomConstants.Actions.Subscribe);
CreateFeedDocument createFeedDocument = CreateFeedDocument.Factory
.newInstance();
CreateFeed createFeed = createFeedDocument.addNewCreateFeed();
createFeed.setAuthor(author);
createFeed.setTitle(title);
if (expiredTime != null) {
createFeed.setExpires(expiredTime);
}
if (xpathFilter != null) {
FilterType filter = createFeed.addNewFilter();
filter.setDialect(XPathBasedFilter.XPATH_BASED_FILTER);
filter.setStringValue(xpathFilter);
}
OMElement request = CommonUtil.toOM(createFeedDocument);
request.build();
OMElement element = serviceClient.sendReceive(request);
CreateFeedResponseDocument createFeedResponseDocument =
CreateFeedResponseDocument.Factory
.parse(element.getXMLStreamReader());
System.out.println(createFeedDocument.xmlText());
// read epr for subscription from response and store it
OMElement responseAsOM = CommonUtil
.toOM(createFeedResponseDocument);
OMElement eprAsOM = responseAsOM.getFirstChildWithName(new QName(
AtomConstants.ATOM_MSG_NAMESPACE, "SubscriptionManager"));
feedEpr = new EndpointReference(eprAsOM.getFirstElement().getText());
OMElement referanceParameters = eprAsOM
.getFirstChildWithName(new QName(eprAsOM.getFirstElement()
.getNamespace().getNamespaceURI(),
AddressingConstants.EPR_REFERENCE_PARAMETERS));
Iterator refparams = referanceParameters.getChildElements();
while (refparams.hasNext()) {
feedEpr.addReferenceParameter((OMElement)refparams.next());
}
return createFeedResponseDocument.getCreateFeedResponse();
} catch (XmlException e) {
throw AxisFault.makeFault(e);
}
}
public void deleteFeed(EndpointReference epr) throws AxisFault {
serviceClient.getOptions().setAction(AtomConstants.Actions.Unsubscribe);
serviceClient.getOptions().setTo(epr);
OMElement request = OMAbstractFactory.getOMFactory().createOMElement(
new QName(AtomConstants.ATOM_MSG_NAMESPACE, "DeleteFeed"));
serviceClient.sendReceive(request);
}
public void deleteFeed() throws AxisFault {
if (feedEpr != null) {
deleteFeed(feedEpr);
} else {
throw new AxisFault(
"No feed epr alreday stored, you must have create a feed using same AtomEventingClient Object");
}
}
public OMElement fetchFeed(String url) throws SavanException {
// Create an instance of HttpClient.
HttpClient client = new HttpClient();
// Create a method instance.
GetMethod method = new GetMethod(url);
try {
// Execute the method.
int statusCode = client.executeMethod(method);
if (statusCode != HttpStatus.SC_OK) {
throw new SavanException("Method failed: " + method.getStatusLine());
}
// Read the response body.
byte[] responseBody = method.getResponseBody();
StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(
responseBody));
return builder.getDocumentElement();
} catch (IOException e) {
throw new SavanException(e);
} catch (XMLStreamException e) {
throw new SavanException(e);
} finally {
// Release the connection.
method.releaseConnection();
}
}
public void publishWithREST(String serviceurl, final OMElement content, String topic)
throws SavanException {
// Create an instance of HttpClient.
HttpClient client = new HttpClient();
StringBuffer queryUrl = new StringBuffer(serviceurl);
if (!serviceurl.endsWith("/")) {
queryUrl.append("/");
}
queryUrl.append("publish");
if (topic != null) {
queryUrl.append("?").append(EventingConstants.ElementNames.Topic).append("=")
.append(topic);
}
PostMethod method = new PostMethod(queryUrl.toString());
// Request content will be retrieved directly
// from the input stream
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
content.serialize(out);
out.flush();
final byte[] data = out.toByteArray();
RequestEntity entity = new RequestEntity() {
public void writeRequest(OutputStream outstream)
throws IOException {
outstream.write(data);
}
public boolean isRepeatable() {
return false;
}
public String getContentType() {
return "text/xml";
}
public long getContentLength() {
return data.length;
}
};
method.setRequestEntity(entity);
// Execute the method.
int statusCode = client.executeMethod(method);
if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_ACCEPTED) {
throw new SavanException("Method failed: " + method.getStatusLine());
}
} catch (IOException e) {
throw new SavanException(e);
} catch (XMLStreamException e) {
throw new SavanException(e);
} finally {
// Release the connection.
method.releaseConnection();
}
}
public void publishWithSOAP(String serviceurl, final OMElement content, String topic)
throws SavanException {
try {
Options options = serviceClient.getOptions();
EndpointReference to = new EndpointReference(serviceurl);
if (topic != null) {
to.addReferenceParameter(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE,
EventingConstants.ElementNames.Topic), topic);
}
options.setAction(EventingConstants.Actions.Publish);
serviceClient.fireAndForget(content);
} catch (AxisFault e) {
throw new SavanException(e);
}
}
}