blob: 3030f9e36b07b5abcd5d0685d2ecfba8add0e84d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.transport.jms;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;
import java.net.URL;
import javax.jms.ConnectionFactory;
import javax.xml.namespace.QName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.testutil.common.TestUtil;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl11.WSDLServiceFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
public abstract class AbstractJMSTester extends Assert {
protected static final String WSDL = "/jms_test.wsdl";
protected static final String SERVICE_NS = "http://cxf.apache.org/hello_world_jms";
protected static final int MAX_RECEIVE_TIME = 10;
protected static final String MESSAGE_CONTENT = "HelloWorld";
protected static Bus bus;
protected static ActiveMQConnectionFactory cf1;
protected static ConnectionFactory cf;
protected static BrokerService broker;
protected enum ExchangePattern { oneway, requestReply };
protected EndpointReferenceType target;
protected Message inMessage;
protected Message destMessage;
@BeforeClass
public static void startSerices() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.setPopulateJMSXUserID(true);
broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
broker.setUseJmx(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
String brokerUri = "tcp://localhost:" + TestUtil.getNewPortNumber(AbstractJMSTester.class);
broker.addConnector(brokerUri);
broker.start();
bus = BusFactory.getDefaultBus();
cf1 = new ActiveMQConnectionFactory(brokerUri);
cf = cf1;
}
@AfterClass
public static void stopServices() throws Exception {
bus.shutdown(false);
broker.stop();
}
protected EndpointInfo setupServiceInfo(String serviceName, String portName) {
return setupServiceInfo(SERVICE_NS, WSDL, serviceName, portName);
}
protected EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
URL wsdlUrl = getClass().getResource(wsdl);
if (wsdlUrl == null) {
throw new IllegalArgumentException("Wsdl file not found on class path " + wsdl);
}
WSDLServiceFactory factory = new WSDLServiceFactory(bus, wsdlUrl.toExternalForm(),
new QName(ns, serviceName));
Service service = factory.create();
return service.getEndpointInfo(new QName(ns, portName));
}
protected MessageObserver createMessageObserver() {
return new MessageObserver() {
public void onMessage(Message m) {
Exchange exchange = new ExchangeImpl();
exchange.setInMessage(m);
m.setExchange(exchange);
destMessage = m;
}
};
}
protected void sendMessageAsync(Conduit conduit, Message message) throws IOException {
sendoutMessage(conduit, message, false, false);
}
protected void sendMessageSync(Conduit conduit, Message message) throws IOException {
sendoutMessage(conduit, message, false, true);
}
protected void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException {
sendoutMessage(conduit, message, false, synchronous);
}
protected void sendOneWayMessage(Conduit conduit, Message message) throws IOException {
sendoutMessage(conduit, message, true, true);
}
private void sendoutMessage(Conduit conduit,
Message message,
boolean isOneWay,
boolean synchronous) throws IOException {
Exchange exchange = new ExchangeImpl();
exchange.setOneWay(isOneWay);
exchange.setSynchronous(synchronous);
message.setExchange(exchange);
exchange.setOutMessage(message);
conduit.prepare(message);
OutputStream os = message.getContent(OutputStream.class);
Writer writer = message.getContent(Writer.class);
assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null);
if (os != null) {
os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
os.close();
} else {
writer.write(MESSAGE_CONTENT);
writer.close();
}
}
protected JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException {
JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
jmsConfig.setConnectionFactory(cf);
return new JMSConduit(target, jmsConfig, bus);
}
protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws IOException {
JMSConduit jmsConduit = setupJMSConduit(ei);
MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
inMessage = m;
}
};
jmsConduit.setMessageObserver(observer);
return jmsConduit;
}
protected JMSDestination setupJMSDestination(EndpointInfo ei) throws IOException {
JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
jmsConfig.setConnectionFactory(cf);
return new JMSDestination(bus, ei, jmsConfig);
}
protected String getContent(Message message) {
ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
String response = "<not found>";
if (bis != null) {
byte bytes[] = new byte[bis.available()];
try {
bis.read(bytes);
} catch (IOException ex) {
assertFalse("Read the Destination recieved Message error ", false);
ex.printStackTrace();
}
response = IOUtils.newStringFromBytes(bytes);
} else {
StringReader reader = (StringReader)message.getContent(Reader.class);
char buffer[] = new char[5000];
try {
int i = reader.read(buffer);
response = new String(buffer, 0, i);
} catch (IOException e) {
assertFalse("Read the Destination recieved Message error ", false);
e.printStackTrace();
}
}
return response;
}
protected void waitForReceiveInMessage() {
int waitTime = 0;
while (inMessage == null && waitTime < MAX_RECEIVE_TIME) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// do nothing here
}
waitTime++;
}
assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds",
inMessage != null);
}
protected void waitForReceiveDestMessage() {
int waitTime = 0;
while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// do nothing here
}
waitTime++;
}
assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME
+ " seconds", destMessage);
}
}