blob: fb471e11411ceda6174c5564f43d2492cde62257 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ee.test.utils;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.URI;
import java.util.concurrent.Semaphore;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.uima.UIMAFramework;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.util.Level;
public class ActiveMQSupport extends TestCase {
private static final Class CLASS_NAME = ActiveMQSupport.class;
protected static BrokerService broker;
protected String uri = null;
protected static ThreadGroup brokerThreadGroup = null;
protected static TransportConnector tcpConnector = null;
protected static final String relativePath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources"
+ System.getProperty("file.separator") + "deployment";
protected static final String relativeDataPath = "src" + System.getProperty("file.separator")
+ "test" + System.getProperty("file.separator") + "resources"
+ System.getProperty("file.separator") + "data";
private static Thread brokerThread = null;
protected static TransportConnector httpConnector = null;
public static Semaphore brokerSemaphore = new Semaphore(1);
protected synchronized void setUp() throws Exception {
System.out.println("\nSetting Up New Test - Thread Id:" + Thread.currentThread().getId());
super.setUp();
if (brokerThreadGroup == null) {
brokerThreadGroup = new ThreadGroup("BrokerThreadGroup");
// Acquire a semaphore to force this thread to wait until the broker
// starts and initializes
brokerSemaphore.acquire();
brokerThread = new Thread(brokerThreadGroup, "BrokerThread") {
public void run() {
try {
broker = createBroker();
broker.start();
broker.setMasterConnectorURI(uri);
addHttpConnector(8888);
brokerSemaphore.release(); // broker started
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"setUp", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
}
}
};
brokerThread.start();
try {
// wait for the broker to start and initialize. The semaphore is
// released
// in the run method above
brokerSemaphore.acquire();
} finally {
brokerSemaphore.release();
}
} else {
// Remove messages from all queues
broker.deleteAllMessages();
}
}
protected String addHttpConnector(int aDefaultPort) throws Exception {
return addHttpConnector(broker, aDefaultPort);
}
protected String addHttpConnector(BrokerService aBroker, int aDefaultPort) throws Exception {
try {
String httpURI = generateInternalURI("http", aDefaultPort);
httpConnector = aBroker.addConnector(httpURI);
// Use reflection to determine if the AMQ version is at least 5.2. If it is, we must
// plug in a broker to the httpConnector otherwise we get NPE when starting the connector.
// AMQ version 4.1.1 doesn't exhibit this problem.
try {
Method m = httpConnector.getClass().getDeclaredMethod("setBrokerService", new Class[] {BrokerService.class});
m.invoke(httpConnector, aBroker);
} catch ( NoSuchMethodException e) {
// Ignore, this is not AMQ 5.2
}
System.out.println("Adding HTTP Connector:" + httpConnector.getConnectUri());
httpConnector.start();
return httpURI;
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"addHttpConnector", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
throw e;
}
}
protected String getHttpURI() throws Exception {
while ( httpConnector == null ) {
synchronized(this) {
this.wait(100);
}
}
return httpConnector.getConnectUri().toString();
}
protected void removeQueue(String aQueueName) throws Exception {
httpConnector.stop();
}
protected void removeHttpConnector() throws Exception {
httpConnector.stop();
broker.removeConnector(httpConnector);
}
private String generateInternalURI(String aProtocol, int aDefaultPort) throws Exception {
boolean success = false;
int openPort = aDefaultPort;
ServerSocket ssocket = null;
while (!success) {
try {
ssocket = new ServerSocket(openPort);
// String uri = aProtocol + "://" +
// ssocket.getInetAddress().getLocalHost().getCanonicalHostName()
// + ":" + openPort;
String uri = aProtocol + "://localhost:" + openPort;
success = true;
return uri;
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"generateInternalURI", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
}
throw e;
} finally {
try {
if (ssocket != null) {
ssocket.close();
}
} catch (IOException ioe) {
}
}
}
return null;
}
protected String getBrokerUri() {
return uri;
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(uri);
}
protected Connection getConnection() throws Exception {
return createConnectionFactory().createConnection();
}
public BrokerService createBroker() throws Exception {
return createBroker(8118, true);
}
protected BrokerService createBroker(int port, boolean useJmx) throws Exception {
ServerSocket ssocket = null;
System.out.println(">>>> Starting Broker On Port:" + port);
try {
ssocket = new ServerSocket();
String hostName = ssocket.getInetAddress().getLocalHost().getCanonicalHostName();
uri = "tcp://" + hostName + ":" + port;
BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/" + hostName
+ "?persistent=false"));
broker.setUseJmx(useJmx);
tcpConnector = broker.addConnector(uri);
PolicyEntry policy = new PolicyEntry();
policy.setDeadLetterStrategy(new SharedDeadLetterStrategy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
return broker;
} finally {
if (ssocket != null)
ssocket.close();
}
}
protected void stopBroker() throws Exception {
if (broker != null) {
System.out.println(">>> Stopping Broker");
if (tcpConnector != null) {
tcpConnector.stop();
broker.removeConnector(tcpConnector);
System.out.println("Broker Connector:" + tcpConnector.getUri().toString() + " is stopped");
}
removeHttpConnector();
broker.deleteAllMessages();
broker.stop();
System.out.println(">>> Broker Stopped");
}
}
protected synchronized void tearDown() throws Exception {
super.tearDown();
}
}